我正在探索将表加入到自身时 Spark 的行为。我正在使用数据块。
我的虚拟场景是:
将外部表读取为数据帧 A(底层文件采用 delta 格式)
将数据框 B 定义为仅选择某些列的数据框 A
在 column1 和 column2 上连接数据框 A 和 B
(是的,这没有多大意义,我只是在尝试了解 Spark 的底层机制)
a = spark.read.table("table") \
.select("column1", "column2", "column3", "column4") \
.withColumn("columnA", lower((concat(col("column4"), lit("_"), col("column5")))))
b = a.select("column1", "column2", "columnA")
c= a.join(b, how="left", on = ["column1", "column2"])
Run Code Online (Sandbox Code Playgroud)
我的第一次尝试是按原样运行代码(尝试 1)。然后我尝试重新分区和缓存(尝试 2)
a = spark.read.table("table") \
.select("column1", "column2", "column3", "column4") \
.withColumn("columnA", lower((concat(col("column4"), lit("_"), col("column5")))))
.repartition(col("column1"), col("column2")).cache()
Run Code Online (Sandbox Code Playgroud)
最后,我重新分区、排序和缓存
a = spark.read.table("table") \
.select("column1", "column2", "column3", "column4") \
.withColumn("columnA", lower((concat(col("column4"), lit("_"), col("column5"))))) …
Run Code Online (Sandbox Code Playgroud) 我目前正在尝试使用Airflow编排一个过程,其中一些运算符是动态定义的,并依赖于另一个(早期)运算符的输出.
在下面的代码中,t1更新了一个带有新记录的文本文件(这些记录实际上是从外部队列中读取的,但为简单起见,我在这里将它们硬编码为A,B和C).然后,我想为从该文本文件读取的每个记录创建单独的运算符.这些运算符将分别创建目录A,B和C,并且在Airflow UI中将被视为单独的bash进程Create_directory_A,Create_directory_B和Create_directory_C.
dag = DAG('Test_DAG',
description="Lorem ipsum.",
start_date=datetime(2017, 3, 20),
schedule_interval=None,
catchup=False)
def create_text_file(list_of_rows):
text_file = open('text_file.txt', "w")
for row in list_of_rows:
text_file.write(row + '\n')
text_file.close()
def read_text():
txt_file = open('text_file.txt', 'r')
return [element for element in txt_file.readlines()]
t1 = PythonOperator(
task_id='Create_text_file',
python_callable=create_text_file,
op_args=[['A', 'B', 'C']],
dag=dag
)
for row in read_text():
t2 = BashOperator(
task_id='Create_directory_{}'.format(row),
bash_command="mkdir {{params.dir_name}}",
params={'dir_name': row},
dag=dag
)
t1 >> t2
Run Code Online (Sandbox Code Playgroud)
在Airflow的文档中,我可以看到调度程序将定期执行它[DAG]以反映更改(如果有的话).这是否意味着存在这样的风险:即使我的t1运算符在t2之前执行,也会在更新之前为记录列表创建bash运算符(就像评估DAG时那样)?
我正在使用下面的代码向SQL Server的表中写入43列和大约2,000,000行的DataFrame:
dataFrame
.write
.format("jdbc")
.mode("overwrite")
.option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver")
.option("url", url)
.option("dbtable", tablename)
.option("user", user)
.option("password", password)
.save()
Run Code Online (Sandbox Code Playgroud)
不幸的是,尽管它确实适用于小型DataFrame,但它要么非常慢,要么对于大型DataFrame超时。关于如何优化它的任何提示?
我尝试设置 rewriteBatchedStatements=true
谢谢。
我目前正在探索Deequ库,并且试图了解是否有可能检查列组合的唯一性。
这段代码
.hasUniqueness(Seq("col1", "col2"), Check.IsOne))
Run Code Online (Sandbox Code Playgroud)
似乎分别计算每个列的唯一性(如果我错了,则更正)
谢谢
apache-spark ×2
scala ×2
airflow ×1
amazon-deequ ×1
bash ×1
bigdata ×1
database ×1
delta-lake ×1
pyspark ×1
python ×1
sql ×1
sql-server ×1