小编Tla*_*zal的帖子

如何在 Airflow 中使用电子邮件操作符附加文件

我使用了参数 files =["abc.txt"]。我从气流文档中获取了信息... https://airflow.readthedocs.io/en/stable/_modules/airflow/operators/email_operator.html

但我收到找不到该文件的错误。我的问题是这个气流将从哪里选择我的文件。是来自 Composer 环境中的 GCS Bucket 还是 DAG 文件夹?

我需要在哪里上传文件以及“文件”参数的正确语法是什么?

提前致谢。

google-cloud-platform airflow google-cloud-composer

5
推荐指数
1
解决办法
2万
查看次数

Python/Apache-Beam:如何将文本文件解析为 CSV?

我还是 Beam 的新手,但是您究竟如何从 GCS 存储桶中的 CSV 文件中读取数据?我基本上使用 Beam 将这些文件转换为 Pandas 数据帧,然后应用 sklearn 模型来“训练”这些数据。我见过的大多数示例都预先定义了标题,我希望这个 Beam 管道可以推广到标题肯定不同的任何文件。有一个名为beam_utils的库可以完成我想做的事情,但后来我遇到了这个错误:AttributeError: module 'apache_beam.io.fileio' has no attribute 'CompressionTypes'

代码示例:

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions

# The error occurs in this import
from beam_utils.sources import CsvFileSource

options = {
    'project': 'my-project',
    'runner:': 'DirectRunner',
    'streaming': False
}

pipeline_options = PipelineOptions(flags=[], **options)

class Printer(beam.DoFn):
    def process(self, element):
        print(element)

with beam.Pipeline(options=pipeline_options) as p:  # Create the Pipeline with the specified options.

    data = (p
            | …
Run Code Online (Sandbox Code Playgroud)

python google-cloud-dataflow apache-beam

1
推荐指数
1
解决办法
2687
查看次数