我是 Pipeline 世界和 Google API DataFlow 的新手。
我想使用 sqlQuery 从 BigQuery 读取数据。当我读取所有数据库时,它工作正常。
PipelineOptions options = PipelineOptionsFactory.create();
Pipeline p = Pipeline.create(options);
PCollection<TableRow> qData = p.apply(
BigQueryIO.Read
.named("Read")
.from("test:DataSetTest.data"));
Run Code Online (Sandbox Code Playgroud)
但是当我使用 fromQuery 时出现错误。
PipelineOptions options = PipelineOptionsFactory.create();
Pipeline p = Pipeline.create(options);
PCollection<TableRow> qData = p.apply(
BigQueryIO.Read
.named("Read")
.fromQuery("SELECT * FROM DataSetTest.data"));
Run Code Online (Sandbox Code Playgroud)
错误:
线程“main”中出现异常 java.lang.IllegalArgumentException:查询“SELECT * FROM DataSetTest.data”的验证失败。如果查询依赖于管道的早期阶段,则可以使用#withoutValidation 禁用此验证。
在 com.google.cloud.dataflow.sdk.io.BigQueryIO$Read$Bound.dryRunQuery(BigQueryIO.java:449)
在 com.google.cloud.dataflow.sdk.io.BigQueryIO$Read$Bound.validate(BigQueryIO.java:432)
在com.google.cloud.dataflow.sdk.Pipeline.applyInternal(Pipeline.java:357)
在com.google.cloud.dataflow.sdk.Pipeline.applyTransform(Pipeline.java:267)
在 com.google.cloud.dataflow.sdk.values.PBegin.apply(PBegin.java:47)
在com.google.cloud.dataflow.sdk.Pipeline.apply(Pipeline.java:151)
在 Test.java.packageid.StarterPipeline.main(StarterPipeline.java:72)
引起原因:java.lang.NullPointerException:必须指定必需参数projectId。
在 com.google.api.client.repackaged.com.google.common.base.Preconditions.checkNotNull(Preconditions.java:229)
在 com.google.api.client.util.Preconditions.checkNotNull(Preconditions.java:140)
在 com.google.api.services.bigquery.Bigquery$Jobs$Query。(Bigquery.java:1751)
在 com.google.api.services.bigquery.Bigquery$Jobs.query(Bigquery.java:1724)
在 com.google.cloud.dataflow.sdk.io.BigQueryIO$Read$Bound.dryRunQuery(BigQueryIO.java:445)
... 6 更多 …
java google-app-engine dataflow google-bigquery google-cloud-dataflow
我知道 Google Dataflow 仅正式支持 Google Cloud Storage、BigQuery、Avro 文件或开箱即用的 Pub/Sub 中的文件作为数据流的 I/O。
但由于它有一个用于自定义源和接收器的 API,我想知道 MongoDB 是否有一些管道 I/O 实现?
现在,我必须将数据迁移到 BigQuery 或编写整个 Pipeline I/O 实现,然后才能知道 Google Dataflow 是否是解决我当前问题的可行解决方案。
我尝试用谷歌搜索并查看当前的 SDK 问题,但没有看到任何相关内容。我什至开始怀疑我是否错过了 Google Dataflow 概念和文档中的一些非常基本的内容,这些内容完全使使用 MongoDB 作为数据源的最初想法无效。
我希望使用 Python SDK 在 GCS 中摄取和写入 Avro 文件。目前 Avro 利用 Python SDK 可以实现这一点吗?如果是这样我该怎么做?我在源代码中看到了关于此的 TODO 评论,所以我不太乐观。
现在我只能使用 ParDo 获取类中的 RunTime 值,还有其他方法可以像在我的函数中一样使用运行时参数吗?
这是我现在得到的代码:
class UserOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_value_provider_argument('--firestore_document',default='')
def run(argv=None):
parser = argparse.ArgumentParser()
pipeline_options = PipelineOptions()
user_options = pipeline_options.view_as(UserOptions)
pipeline_options.view_as(SetupOptions).save_main_session = True
with beam.Pipeline(options=pipeline_options) as p:
rows = (p
| 'Create inputs' >> beam.Create([''])
| 'Call Firestore' >> beam.ParDo(
CallFirestore(user_options.firestore_document))
| 'Read DB2' >> beam.Map(ReadDB2))
Run Code Online (Sandbox Code Playgroud)
我希望 user_options.firestore_document 无需执行 ParDo 即可在其他功能中使用
python parameters google-cloud-platform google-cloud-dataflow apache-beam
我有一个用例,我读入存储在谷歌云存储中的换行 json 元素并开始处理每个 json。在处理每个 json 时,我必须调用外部 API 来执行重复数据删除,无论该 json 元素之前是否被发现。我在做一个ParDo与DoFn每个JSON。
我还没有看到任何在线教程说明如何从 apache beam DoFnDataflow调用外部 API 端点。
我正在使用JAVABeam 的 SDK。我学习的一些教程解释了使用startBundle和FinishBundle但我不清楚如何使用它
我还是 Beam 的新手,但是您究竟如何从 GCS 存储桶中的 CSV 文件中读取数据?我基本上使用 Beam 将这些文件转换为 Pandas 数据帧,然后应用 sklearn 模型来“训练”这些数据。我见过的大多数示例都预先定义了标题,我希望这个 Beam 管道可以推广到标题肯定不同的任何文件。有一个名为beam_utils的库可以完成我想做的事情,但后来我遇到了这个错误:AttributeError: module 'apache_beam.io.fileio' has no attribute 'CompressionTypes'
代码示例:
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
# The error occurs in this import
from beam_utils.sources import CsvFileSource
options = {
'project': 'my-project',
'runner:': 'DirectRunner',
'streaming': False
}
pipeline_options = PipelineOptions(flags=[], **options)
class Printer(beam.DoFn):
def process(self, element):
print(element)
with beam.Pipeline(options=pipeline_options) as p: # Create the Pipeline with the specified options.
data = (p
| …Run Code Online (Sandbox Code Playgroud) 嗨,我有几个查询,我想使用 Apache Beam 依次运行和保存结果,我见过一些类似的问题,但找不到答案。我习惯于使用 Airflow 设计管道,而我对 Apache Beam 还是比较陌生。我正在使用 Dataflow 运行程序。这是我现在的代码:我希望 query2 仅在 query1 结果保存到相应表后运行。我如何链接它们?
PCollection<TableRow> resultsStep1 = getData("Run Query 1",
"Select * FROM basetable");
resultsStep1.apply("Save Query1 data",
BigQueryIO.writeTableRows()
.withSchema(BigQueryUtils.toTableSchema(resultsStep1.getSchema()))
.to("resultsStep1")
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE)
);
PCollection<TableRow> resultsStep2 = getData("Run Query 2",
"Select * FROM resultsStep1");
resultsStep2.apply("Save Query2 data",
BigQueryIO.writeTableRows()
.withSchema(BigQueryUtils.toTableSchema(resultsStep2.getSchema()))
.to("resultsStep2")
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE)
);
Run Code Online (Sandbox Code Playgroud)
这是我的 getData 函数定义:
private PCollection<TableRow> getData(final String taskName, final String query) {
return pipeline.apply(taskName,
BigQueryIO.readTableRowsWithSchema()
.fromQuery(query)
.usingStandardSql()
.withCoder(TableRowJsonCoder.of()));
}
Run Code Online (Sandbox Code Playgroud)
编辑(更新):结果:
You can’t sequence the completion of a …
我打算在云存储数据流模板上使用Pub/Sub 到文本文件,并在写入云存储之前进行少量自定义,例如处理(按摩)PubSub 消息。
我编写了 apache-beam 管道代码,但对如何部署它感到困惑。它使用的参数将与Cloud Storage 上的 Pub/Sub 到文本文件完全相同
从文档中我了解到我可以使用 Google 提供的模板之一或创建自己的模板。但是,不是创建我自己的模板,而是有更好的方法来自定义 Google 提供的模板,因为它足以满足我的大部分要求
需要一些帮助。我有一些从 Pub/Sub 读取并写入 GCS 中的批处理文件的琐碎任务,但是在使用 fileio.WriteToFiles 时遇到了一些困难
with beam.Pipeline(options=pipeline_options) as p:
input = (p | 'ReadData' >> beam.io.ReadFromPubSub(topic=known_args.input_topic).with_output_types(bytes)
| "Decode" >> beam.Map(lambda x: x.decode('utf-8'))
| 'Parse' >> beam.Map(parse_json)
| ' data w' >> beam.WindowInto(
FixedWindows(60),
accumulation_mode=AccumulationMode.DISCARDING
))
event_data = (input
| 'filter events' >> beam.Filter(lambda x: x['t'] == 'event')
| 'encode et' >> beam.Map(lambda x: json.dumps(x))
| 'write events to file' >> fileio.WriteToFiles(
path='gs://extention/ga_analytics/events/', shards=0))
Run Code Online (Sandbox Code Playgroud)
窗口触发后我需要一个文件,但文件数等于来自 Pubsub 的消息数,有人可以帮助我吗? 当前输出文件, 但我只需要一个文件。
我正在使用数据流 kafka 到 bigquery 模板。启动数据流作业后,它会在队列中停留一段时间,然后失败并显示以下错误:
Error occurred in the launcher container: Template launch failed. See console logs.
Run Code Online (Sandbox Code Playgroud)
查看日志时,我看到以下堆栈跟踪:
at org.apache.beam.runners.dataflow.DataflowRunner.run(DataflowRunner.java:192)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:317)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:303)
at com.google.cloud.teleport.v2.templates.KafkaToBigQuery.run(KafkaToBigQuery.java:343)
at com.google.cloud.teleport.v2.templates.KafkaToBigQuery.main(KafkaToBigQuery.java:222)
Caused by: org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata –
Run Code Online (Sandbox Code Playgroud)
在启动工作时,我提供了以下参数:
我的 kafka 主题只包含消息:你好
kafka 安装在 gcp 实例中,该实例与数据流工作者位于同一区域和子网中。
dataflow apache-kafka google-cloud-platform google-cloud-dataflow