小编Daw*_*wid的帖子

Apache Spark:重新分区、排序和缓存对连接的影响

我正在探索将表加入到自身时 Spark 的行为。我正在使用数据块。

我的虚拟场景是:

  1. 将外部表读取为数据帧 A(底层文件采用 delta 格式)

  2. 将数据框 B 定义为仅选择某些列的数据框 A

  3. 在 column1 和 column2 上连接数据框 A 和 B

(是的,这没有多大意义,我只是在尝试了解 Spark 的底层机制)

a = spark.read.table("table") \
.select("column1", "column2", "column3", "column4") \
.withColumn("columnA", lower((concat(col("column4"), lit("_"), col("column5")))))

b = a.select("column1", "column2", "columnA")

c= a.join(b, how="left", on = ["column1", "column2"])
Run Code Online (Sandbox Code Playgroud)

我的第一次尝试是按原样运行代码(尝试 1)。然后我尝试重新分区和缓存(尝试 2)

a = spark.read.table("table") \
.select("column1", "column2", "column3", "column4") \
.withColumn("columnA", lower((concat(col("column4"), lit("_"), col("column5")))))
.repartition(col("column1"), col("column2")).cache()
Run Code Online (Sandbox Code Playgroud)

最后,我重新分区、排序和缓存

 a = spark.read.table("table") \
.select("column1", "column2", "column3", "column4") \
.withColumn("columnA", lower((concat(col("column4"), lit("_"), col("column5"))))) …
Run Code Online (Sandbox Code Playgroud)

bigdata apache-spark pyspark azure-databricks delta-lake

10
推荐指数
1
解决办法
499
查看次数

Airflow中的动态任务定义

我目前正在尝试使用Airflow编排一个过程,其中一些运算符是动态定义的,并依赖于另一个(早期)运算符的输出.

在下面的代码中,t1更新了一个带有新记录的文本文件(这些记录实际上是从外部队列中读取的,但为简单起见,我在这里将它们硬编码为A,B和C).然后,我想为从该文本文件读取的每个记录创建单独的运算符.这些运算符将分别创建目录A,B和C,并且在Airflow UI中将被视为单独的bash进程Create_directory_A,Create_directory_B和Create_directory_C.

dag = DAG('Test_DAG',
          description="Lorem ipsum.",
          start_date=datetime(2017, 3, 20),
          schedule_interval=None,
          catchup=False)


def create_text_file(list_of_rows):
    text_file = open('text_file.txt', "w")
    for row in list_of_rows:
        text_file.write(row + '\n')
    text_file.close()


def read_text():
    txt_file = open('text_file.txt', 'r')
    return [element for element in txt_file.readlines()]


t1 = PythonOperator(
    task_id='Create_text_file',
    python_callable=create_text_file,
    op_args=[['A', 'B', 'C']],
    dag=dag
)

for row in read_text():
    t2 = BashOperator(
        task_id='Create_directory_{}'.format(row),
        bash_command="mkdir {{params.dir_name}}",
        params={'dir_name': row},
        dag=dag
    )

    t1 >> t2
Run Code Online (Sandbox Code Playgroud)

Airflow的文档中,我可以看到调度程序将定期执行它[DAG]以反映更改(如果有的话).这是否意味着存在这样的风险:即使我的t1运算符在t2之前执行,也会在更新之前为记录列表创建bash运算符(就像评估DAG时那样)?

python bash orchestration airflow

9
推荐指数
1
解决办法
2108
查看次数

Spark:优化将DataFrame写入SQL Server

我正在使用下面的代码向SQL Server的表中写入43列和大约2,000,000行的DataFrame:

dataFrame
  .write
  .format("jdbc")
  .mode("overwrite")
  .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver")
  .option("url", url)
  .option("dbtable", tablename)
  .option("user", user)
  .option("password", password)
  .save()
Run Code Online (Sandbox Code Playgroud)

不幸的是,尽管它确实适用于小型DataFrame,但它要么非常慢,要么对于大型DataFrame超时。关于如何优化它的任何提示?

我尝试设置 rewriteBatchedStatements=true

谢谢。

sql database sql-server scala apache-spark

9
推荐指数
2
解决办法
1794
查看次数

Deequ中的唯一性检查

我目前正在探索Deequ库,并且试图了解是否有可能检查列组合的唯一性。

这段代码

.hasUniqueness(Seq("col1", "col2"), Check.IsOne))
Run Code Online (Sandbox Code Playgroud)

似乎分别计算每个列的唯一性(如果我错了,则更正)

谢谢

scala amazon-web-services amazon-deequ

5
推荐指数
1
解决办法
119
查看次数