我使用了参数 files =["abc.txt"]。我从气流文档中获取了信息... https://airflow.readthedocs.io/en/stable/_modules/airflow/operators/email_operator.html
但我收到找不到该文件的错误。我的问题是这个气流将从哪里选择我的文件。是来自 Composer 环境中的 GCS Bucket 还是 DAG 文件夹?
我需要在哪里上传文件以及“文件”参数的正确语法是什么?
提前致谢。
我还是 Beam 的新手,但是您究竟如何从 GCS 存储桶中的 CSV 文件中读取数据?我基本上使用 Beam 将这些文件转换为 Pandas 数据帧,然后应用 sklearn 模型来“训练”这些数据。我见过的大多数示例都预先定义了标题,我希望这个 Beam 管道可以推广到标题肯定不同的任何文件。有一个名为beam_utils的库可以完成我想做的事情,但后来我遇到了这个错误:AttributeError: module 'apache_beam.io.fileio' has no attribute 'CompressionTypes'
代码示例:
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
# The error occurs in this import
from beam_utils.sources import CsvFileSource
options = {
'project': 'my-project',
'runner:': 'DirectRunner',
'streaming': False
}
pipeline_options = PipelineOptions(flags=[], **options)
class Printer(beam.DoFn):
def process(self, element):
print(element)
with beam.Pipeline(options=pipeline_options) as p: # Create the Pipeline with the specified options.
data = (p
| …Run Code Online (Sandbox Code Playgroud)