标签: google-cloud-dataflow

如何为数据流指定工作者数量?

我有一个大约90GB的大型导入文件,由我用Java编写的数据流处理.使用PipelineOptionsFactory的默认设置,我的工作需要很长时间才能完成.如何增加工人数量以提高绩效?

谢谢

google-cloud-dataflow apache-beam

10
推荐指数
0
解决办法
1065
查看次数

来源与PTransform

我是该项目的新手,我正在尝试在Dataflow和数据库之间创建一个连接器.

文档明确指出我应该使用Source和Sink但我看到很多人直接使用与PInput或PDone相关联的PTransform.

源/接收器API处于实验阶段(使用PTransform解释所有示例),但似乎更容易将其与自定义运行器集成(例如:spark).

如果我参考代码,则使用这两种方法.我看不到任何使用PTransform API会更有趣的用例.

Source/Sink API是否应该重新设计PTranform API?

我是否遗漏了能明确区分这两种方法的东西?

Source/Sink API是否足够稳定,被认为是编码输入和输出的好方法?

谢谢你的建议!

java google-cloud-dataflow

10
推荐指数
1
解决办法
308
查看次数

如何计算Google数据流的费用?

我的公司正在评估我们是否可以使用Google Dataflow.

我在Google Cloud Platform上运行了数据流.控制台在右侧的"预留CPU时间"字段中显示5小时25分钟.

工人配置:n1-standard-4

开始8名工人......

如何计算数据流的成本?根据这个页面 ,每小时每GCEU的价格是0.01美元,我如何找到我的数据流消耗的GCEU数量和小时数?

云数据流控制台

google-compute-engine google-cloud-dataflow

9
推荐指数
1
解决办法
2278
查看次数

在Apache Beam中为不同的BigQuery表写入不同的值

假设我有一个PCollection<Foo>并且我想将它写入多个BigQuery表,为每个表选择一个可能不同的表Foo.

如何使用Apache Beam BigQueryIOAPI 执行此操作?

google-bigquery google-cloud-dataflow apache-beam

9
推荐指数
1
解决办法
2439
查看次数

数据流大侧输入中的Apache Beam

这与此问题最相似.

我正在Dataflow 2.x中创建一个管道,它从Pubsub队列中获取流输入.进来的每条消息都需要通过来自Google BigQuery的非常大的数据集进行流式处理,并在写入数据库之前将所有相关值附加到它(基于密钥).

麻烦的是来自BigQuery的映射数据集非常大 - 任何将其用作边输入的尝试都失败,Dataflow运行器抛出错误"java.lang.IllegalArgumentException:ByteString将太长".我尝试了以下策略:

1)侧输入

  • 如上所述,映射数据(显然)太大而无法做到这一点.如果我在这里错了或有解决办法,请告诉我,因为这将是最简单的解决方案.

2)键值对映射

  • 在这种策略中,我读了管道的第一部分的BigQuery数据和PubSub的信息数据,然后运行每个通过更改PCollections到键值对每个值帕尔多变换.然后,我运行Merge.Flatten转换和GroupByKey转换,将相关的映射数据附加到每条消息.
  • 这里的问题是流数据需要将窗口与其他数据合并,因此我必须将窗口应用于大的有界BigQuery数据.它还要求两个数据集上的窗口策略相同.但是对于有界数据没有窗口策略是有意义的,并且我做的少量窗口尝试只是在一个窗口中发送所有BQ数据然后再也不发送它.它需要与每个传入的pubsub消息连接.

3)直接在ParDo(DoFn)中调用BQ

  • 这似乎是一个好主意 - 让每个工作人员声明一个静态的地图数据实例.如果它不存在,那么直接调用BigQuery来获取它.不幸的是,每次都会抛出BigQuery的内部错误(就像在整个消息中只是说"内部错误").向Google提交支持票后,他们告诉我,基本上,"你不能这样做".

似乎这个任务并不真正适合"令人尴尬的可并行化"模型,所以我在这里咆哮错误的树吗?

编辑:

即使在数据流中使用高内存机器并尝试将侧输入到地图视图中,我也会收到错误 java.lang.IllegalArgumentException: ByteString would be too long

这是我正在使用的代码的示例(伪):

    Pipeline pipeline = Pipeline.create(options);

    PCollectionView<Map<String, TableRow>> mapData = pipeline
            .apply("ReadMapData", BigQueryIO.read().fromQuery("SELECT whatever FROM ...").usingStandardSql())
            .apply("BQToKeyValPairs", ParDo.of(new BQToKeyValueDoFn())) 
            .apply(View.asMap());

    PCollection<PubsubMessage> messages = pipeline.apply(PubsubIO.readMessages()
            .fromSubscription(String.format("projects/%1$s/subscriptions/%2$s", projectId, pubsubSubscription)));

    messages.apply(ParDo.of(new DoFn<PubsubMessage, TableRow>() {
        @ProcessElement
        public void processElement(ProcessContext c) {
            JSONObject data = new JSONObject(new String(c.element().getPayload()));
            String key = getKeyFromData(data);
            TableRow sideInputData = c.sideInput(mapData).get(key);
            if (sideInputData != …
Run Code Online (Sandbox Code Playgroud)

apache google-bigquery google-cloud-dataflow apache-beam

9
推荐指数
1
解决办法
3026
查看次数

数据流,使用客户提供的加密密钥加载文件

尝试使用CSEK加载GCS文件时,我收到数据流错误

[ERROR] The target object is encrypted by a customer-supplied encryption key
Run Code Online (Sandbox Code Playgroud)

我打算尝试在数据流方面进行AES解密,但我发现如果没有传递加密密钥,我甚至无法获取该文件.

是否有另一种方法可以从数据流中加载CSEK加密的Google云端存储文件?例如,使用谷歌云存储API,获取流句柄然后将其传递给数据流?

    // Fails
    p.apply("Read from source", TextIO.read().from("gs://my_bucket/myfile")).apply(..); 
Run Code Online (Sandbox Code Playgroud)

google-cloud-storage google-cloud-platform google-cloud-dataflow apache-beam

9
推荐指数
1
解决办法
576
查看次数

将数据流管道的输出写入分区目标

我们有一个单独的流媒体事件源,每秒有数千个事件,这些事件都标有一个id,用于标识事件所属的数万个客户中的哪一个.我们想使用这个事件源来填充数据仓库(在流模式下),但是,我们的事件源不是持久性的,所以我们还希望将原始数据存档在GCS中,以便我们可以通过我们的数据重放它仓库管道,如果我们进行需要它的更改.由于数据保留要求,我们持久存储的任何原始数据都需要由客户进行分区,以便我们可以轻松删除它.

在Dataflow中解决这个问题最简单的方法是什么?目前我们正在使用自定义接收器创建数据流作业,该接收器将数据写入GCS/BigQuery上的每个客户的文件,这是明智的吗?

google-cloud-storage google-cloud-dataflow

8
推荐指数
1
解决办法
1037
查看次数

在从Dataflow插入BigQuery之前验证行

根据我们 如何从数据流加载Bigquery表时设置maximum_bad_records?maxBadRecords从Dataflow将数据加载到BigQuery时,目前无法设置配置.建议在将数据插入BigQuery之前验证Dataflow作业中的行.

如果我有TableSchema和a TableRow,我该如何确保可以安全地将行插入表中?

必须有一种更简单的方法来做到这一点,而不是迭代模式中的字段,查看它们的类型并查看行中值的类,对吧?这似乎容易出错,并且该方法必须是万无一失的,因为如果无法加载单行,整个管道就会失败.

更新:

我的用例是一个ETL作业,最初将在JSON上运行(每行一个对象)登录云存储并批量写入BigQuery,但稍后将从PubSub读取对象并连续写入BigQuery.这些对象包含很多BigQuery中不需要的信息,还包含甚至无法在模式中描述的部分(基本上是自由形式的JSON有效负载).像时间戳这样的东西也需要格式化以与BigQuery一起使用.这个作业的一些变体会在不同的输入上运行并写入不同的表.

从理论上讲,这不是一个非常困难的过程,它需要一个对象,提取一些属性(50-100),格式化其中一些并将对象输出到BigQuery.我或多或少只是循环遍历属性名称列表,从源对象中提取值,查看配置以查看属性是否应该以某种方式格式化,如果需要应用格式(这可能是下行,划分毫秒时间戳) 1000,从URL中提取主机名等),并将值写入TableRow对象.

我的问题是数据混乱.有几亿个物体有一些看起来并不像预期的那样,这种情况很少见,但是这些物品仍然很少见.有时,应包含字符串的属性包含整数,反之亦然.有时会有一个数组或一个应该有字符串的对象.

理想情况下,我想接受TableRow并通过TableSchema并询问"这有效吗?".

因为这是不可能的,所以我做的是查看TableSchema对象并尝试自己验证/转换值.如果TableSchema说属性是STRINGI 类型,则value.toString()在将其添加到之前TableRow.如果是,INTEGER我检查它是a Integer,Long或者BigInteger等等.这种方法的问题在于我只是猜测BigQuery会起什么作用.它接受哪些Java数据类型FLOAT?为了TIMESTAMP?我认为我的验证/演员表可以解决大多数问题,但总有例外和边缘情况.

根据我的经验,这是非常有限的,整个工作流程(工作?工作流程?不确定正确的术语)如果单行失败BigQuery的验证失败(就像常规加载一样,除非maxBadRecords设置为足够大的数字).它也失败了表面有用的消息,如'BigQuery导入作业"dataflow_job_xxx"失败.原因:(5db0b2cdab1557e0):项目"xxx"中的BigQuery作业"dataflow_job_xxx"已完成错误:errorResult:为非记录字段指定的JSON映射,错误:为非记录字段指定的JSON映射,错误:指定了JSON映射对于非记录字段,错误:为非记录字段指定的JSON映射,错误:为非记录字段指定的JSON映射,错误:为非记录字段指定的JSON映射'.也许在哪里可以看到更详细的错误消息,可以告诉我它是哪个属性,价值是什么?没有这些信息,它也可以说"坏数据".

据我所知,至少在批处理模式下运行时,Dataflow会将TableRow对象写入云存储中的临时区域,然后在所有内容完成后启动加载.这意味着我无处可捕获任何错误,我的代码在加载BigQuery时不再运行.我还没有在流模式下运行任何工作,但是我不确定它会有什么不同,从我的(公认有限的)理解基本原理是相同的,它只是批量大小更小.

人们使用Dataflow和BigQuery,因此,如果不必担心由于单个错误输入而导致整个管道停止,就不可能完成这项工作.人们如何做到这一点?

google-bigquery google-cloud-dataflow

8
推荐指数
1
解决办法
1906
查看次数

'_UnwindowedValues'类型的对象没有len()意味着什么?

我正在使用Dataflow 0.5.5 Python.在非常简单的代码中遇到以下错误:

print(len(row_list))
Run Code Online (Sandbox Code Playgroud)

row_list是一个清单.完全相同的代码,相同的数据和相同的管道在DirectRunner上运行完全正常,但在DataflowRunner上抛出以下异常.它是什么意思以及我如何解决它?

job name: `beamapp-root-0216042234-124125`

    (f14756f20f567f62): Traceback (most recent call last):
  File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/batchworker.py", line 544, in do_work
    work_executor.execute()
  File "dataflow_worker/executor.py", line 973, in dataflow_worker.executor.MapTaskExecutor.execute (dataflow_worker/executor.c:30547)
    with op.scoped_metrics_container:
  File "dataflow_worker/executor.py", line 974, in dataflow_worker.executor.MapTaskExecutor.execute (dataflow_worker/executor.c:30495)
    op.start()
  File "dataflow_worker/executor.py", line 302, in dataflow_worker.executor.GroupedShuffleReadOperation.start (dataflow_worker/executor.c:12149)
    def start(self):
  File "dataflow_worker/executor.py", line 303, in dataflow_worker.executor.GroupedShuffleReadOperation.start (dataflow_worker/executor.c:12053)
    with self.scoped_start_state:
  File "dataflow_worker/executor.py", line 316, in dataflow_worker.executor.GroupedShuffleReadOperation.start (dataflow_worker/executor.c:11968)
    with self.shuffle_source.reader() as reader:
  File "dataflow_worker/executor.py", line 320, in dataflow_worker.executor.GroupedShuffleReadOperation.start (dataflow_worker/executor.c:11912)
    self.output(windowed_value)
  File "dataflow_worker/executor.py", line 152, …
Run Code Online (Sandbox Code Playgroud)

google-cloud-dataflow apache-beam

8
推荐指数
1
解决办法
1657
查看次数

Apache Beam - 使用无界PCollection进行集成测试

我们正在为Apache Beam管道构建集成测试,并且遇到了一些问题.有关背景信息,请参见

有关我们管道的详情:

  • 我们使用PubsubIO我们的数据源(无界PCollection)
  • 中间变换包括自定义CombineFn和非常简单的窗口/触发策略
  • 我们最终的变换JdbcIO,用org.neo4j.jdbc.Driver写的Neo4j

目前的测试方法:

  • 在运行测试的计算机上运行Google Cloud的Pub/Sub模拟器
  • 构建内存中的Neo4j数据库并将其URI传递给我们的管道选项
  • 通过调用运行管道 OurPipeline.main(TestPipeline.convertToArgs(options)
  • 使用Google Cloud的Java Pub/Sub客户端库将消息发布到测试主题(使用Pub/Sub模拟器),该主题PubsubIO将从
  • 数据应该流经管道并最终命中我们的内存中的Neo4j实例
  • 在Neo4j中对这些数据的存在做出简单的断言

这是一个简单的集成测试,它将验证我们的整个管道是否按预期运行.

我们目前面临的问题是,当我们运行我们的管道时,它会阻塞.我们正在使用DirectRunnerpipeline.run()( pipeline.run().waitUntilFinish()),但测试似乎在运行管道后挂起.因为这是一个无限制的PCollection(在流模式下运行),管道不会终止,因此不会到达它之后的任何代码.

所以,我有几个问题:

1)有没有办法运行管道然后稍后手动停止?

2)有没有办法异步运行管道?理想情况下,它会启动管道(然后将继续轮询Pub/Sub以获取数据),然后转到负责发布到Pub/Sub的代码.

3)这种集成测试方法是否合理,或者是否有更好的方法可能更直接?这里的任何信息/指导将不胜感激.

如果我能提供任何额外的代码/背景,请告诉我 - 谢谢!

java integration-testing google-cloud-pubsub google-cloud-dataflow apache-beam

8
推荐指数
1
解决办法
961
查看次数