小编Cdr*_*Cdr的帖子

pyspark一次读取多个csv文件

我正在使用 SPARK 读取 hdfs 中的文件。有一种情况,我们从遗留系统中以 csv 格式获取文件作为块。

ID1_FILENAMEA_1.csv
ID1_FILENAMEA_2.csv
ID1_FILENAMEA_3.csv
ID1_FILENAMEA_4.csv
ID2_FILENAMEA_1.csv
ID2_FILENAMEA_2.csv
ID2_FILENAMEA_3.csv
Run Code Online (Sandbox Code Playgroud)

该文件使用 HiveWareHouse Connector 加载到 HIVE 中的 FILENAMEA,几乎不需要添加默认值等转换。同样,我们有大约 70 张桌子。Hive 表以 ORC 格式创建。表根据 ID 进行分区。现在,我正在一一处理所有这些文件。这需要很多时间。

我想让这个过程更快。文件将以 GB 为单位。

有没有办法同时读取所有 FILENAMEA 文件并将其加载到 HIVE 表中。

hive apache-spark pyspark

17
推荐指数
1
解决办法
4万
查看次数

如何重新运行已经使用 TriggerDagrunoperator 执行的 dag?

我有一个 dag,我正在使用以下运算符列表

  • TriggerDagrunoperator-触发另一个dag
  • ExternalTask​​Sensor-获取触发的dag的状态

我的用例:例如,如果整个流程成功完成,并且我发现中间的数据处理存在一些问题。我想从问题点开始针对特定执行日期重新运行作业。我清除了下游,这使得作业重新运行。但是,TriggerDagrunoperator 因以下问题而失败。

airflow.exceptions.DagRunAlreadyExists:运行 id 已触发_:dag id 已存在

我想清除这一点,并且需要在该特定执行日期再次重新运行 dag。有更好的方法来实现这一点吗?

python airflow airflow-scheduler

3
推荐指数
1
解决办法
8292
查看次数