我有一个大约90GB的大型导入文件,由我用Java编写的数据流处理.使用PipelineOptionsFactory的默认设置,我的工作需要很长时间才能完成.如何增加工人数量以提高绩效?
谢谢
我是该项目的新手,我正在尝试在Dataflow和数据库之间创建一个连接器.
文档明确指出我应该使用Source和Sink但我看到很多人直接使用与PInput或PDone相关联的PTransform.
源/接收器API处于实验阶段(使用PTransform解释所有示例),但似乎更容易将其与自定义运行器集成(例如:spark).
如果我参考代码,则使用这两种方法.我看不到任何使用PTransform API会更有趣的用例.
Source/Sink API是否应该重新设计PTranform API?
我是否遗漏了能明确区分这两种方法的东西?
Source/Sink API是否足够稳定,被认为是编码输入和输出的好方法?
谢谢你的建议!
我的公司正在评估我们是否可以使用Google Dataflow.
我在Google Cloud Platform上运行了数据流.控制台在右侧的"预留CPU时间"字段中显示5小时25分钟.
工人配置:n1-standard-4
开始8名工人......
如何计算数据流的成本?根据这个页面 ,每小时每GCEU的价格是0.01美元,我如何找到我的数据流消耗的GCEU数量和小时数?
假设我有一个PCollection<Foo>并且我想将它写入多个BigQuery表,为每个表选择一个可能不同的表Foo.
如何使用Apache Beam BigQueryIOAPI 执行此操作?
这与此问题最相似.
我正在Dataflow 2.x中创建一个管道,它从Pubsub队列中获取流输入.进来的每条消息都需要通过来自Google BigQuery的非常大的数据集进行流式处理,并在写入数据库之前将所有相关值附加到它(基于密钥).
麻烦的是来自BigQuery的映射数据集非常大 - 任何将其用作边输入的尝试都失败,Dataflow运行器抛出错误"java.lang.IllegalArgumentException:ByteString将太长".我尝试了以下策略:
1)侧输入
2)键值对映射
3)直接在ParDo(DoFn)中调用BQ
似乎这个任务并不真正适合"令人尴尬的可并行化"模型,所以我在这里咆哮错误的树吗?
编辑:
即使在数据流中使用高内存机器并尝试将侧输入到地图视图中,我也会收到错误 java.lang.IllegalArgumentException: ByteString would be too long
这是我正在使用的代码的示例(伪):
Pipeline pipeline = Pipeline.create(options);
PCollectionView<Map<String, TableRow>> mapData = pipeline
.apply("ReadMapData", BigQueryIO.read().fromQuery("SELECT whatever FROM ...").usingStandardSql())
.apply("BQToKeyValPairs", ParDo.of(new BQToKeyValueDoFn()))
.apply(View.asMap());
PCollection<PubsubMessage> messages = pipeline.apply(PubsubIO.readMessages()
.fromSubscription(String.format("projects/%1$s/subscriptions/%2$s", projectId, pubsubSubscription)));
messages.apply(ParDo.of(new DoFn<PubsubMessage, TableRow>() {
@ProcessElement
public void processElement(ProcessContext c) {
JSONObject data = new JSONObject(new String(c.element().getPayload()));
String key = getKeyFromData(data);
TableRow sideInputData = c.sideInput(mapData).get(key);
if (sideInputData != …Run Code Online (Sandbox Code Playgroud) 尝试使用CSEK加载GCS文件时,我收到数据流错误
[ERROR] The target object is encrypted by a customer-supplied encryption key
Run Code Online (Sandbox Code Playgroud)
我打算尝试在数据流方面进行AES解密,但我发现如果没有传递加密密钥,我甚至无法获取该文件.
是否有另一种方法可以从数据流中加载CSEK加密的Google云端存储文件?例如,使用谷歌云存储API,获取流句柄然后将其传递给数据流?
// Fails
p.apply("Read from source", TextIO.read().from("gs://my_bucket/myfile")).apply(..);
Run Code Online (Sandbox Code Playgroud) google-cloud-storage google-cloud-platform google-cloud-dataflow apache-beam
我们有一个单独的流媒体事件源,每秒有数千个事件,这些事件都标有一个id,用于标识事件所属的数万个客户中的哪一个.我们想使用这个事件源来填充数据仓库(在流模式下),但是,我们的事件源不是持久性的,所以我们还希望将原始数据存档在GCS中,以便我们可以通过我们的数据重放它仓库管道,如果我们进行需要它的更改.由于数据保留要求,我们持久存储的任何原始数据都需要由客户进行分区,以便我们可以轻松删除它.
在Dataflow中解决这个问题最简单的方法是什么?目前我们正在使用自定义接收器创建数据流作业,该接收器将数据写入GCS/BigQuery上的每个客户的文件,这是明智的吗?
根据我们
如何从数据流加载Bigquery表时设置maximum_bad_records?maxBadRecords从Dataflow将数据加载到BigQuery时,目前无法设置配置.建议在将数据插入BigQuery之前验证Dataflow作业中的行.
如果我有TableSchema和a TableRow,我该如何确保可以安全地将行插入表中?
必须有一种更简单的方法来做到这一点,而不是迭代模式中的字段,查看它们的类型并查看行中值的类,对吧?这似乎容易出错,并且该方法必须是万无一失的,因为如果无法加载单行,整个管道就会失败.
更新:
我的用例是一个ETL作业,最初将在JSON上运行(每行一个对象)登录云存储并批量写入BigQuery,但稍后将从PubSub读取对象并连续写入BigQuery.这些对象包含很多BigQuery中不需要的信息,还包含甚至无法在模式中描述的部分(基本上是自由形式的JSON有效负载).像时间戳这样的东西也需要格式化以与BigQuery一起使用.这个作业的一些变体会在不同的输入上运行并写入不同的表.
从理论上讲,这不是一个非常困难的过程,它需要一个对象,提取一些属性(50-100),格式化其中一些并将对象输出到BigQuery.我或多或少只是循环遍历属性名称列表,从源对象中提取值,查看配置以查看属性是否应该以某种方式格式化,如果需要应用格式(这可能是下行,划分毫秒时间戳) 1000,从URL中提取主机名等),并将值写入TableRow对象.
我的问题是数据混乱.有几亿个物体有一些看起来并不像预期的那样,这种情况很少见,但是这些物品仍然很少见.有时,应包含字符串的属性包含整数,反之亦然.有时会有一个数组或一个应该有字符串的对象.
理想情况下,我想接受TableRow并通过TableSchema并询问"这有效吗?".
因为这是不可能的,所以我做的是查看TableSchema对象并尝试自己验证/转换值.如果TableSchema说属性是STRINGI 类型,则value.toString()在将其添加到之前TableRow.如果是,INTEGER我检查它是a Integer,Long或者BigInteger等等.这种方法的问题在于我只是猜测BigQuery会起什么作用.它接受哪些Java数据类型FLOAT?为了TIMESTAMP?我认为我的验证/演员表可以解决大多数问题,但总有例外和边缘情况.
根据我的经验,这是非常有限的,整个工作流程(工作?工作流程?不确定正确的术语)如果单行失败BigQuery的验证失败(就像常规加载一样,除非maxBadRecords设置为足够大的数字).它也失败了表面有用的消息,如'BigQuery导入作业"dataflow_job_xxx"失败.原因:(5db0b2cdab1557e0):项目"xxx"中的BigQuery作业"dataflow_job_xxx"已完成错误:errorResult:为非记录字段指定的JSON映射,错误:为非记录字段指定的JSON映射,错误:指定了JSON映射对于非记录字段,错误:为非记录字段指定的JSON映射,错误:为非记录字段指定的JSON映射,错误:为非记录字段指定的JSON映射'.也许在哪里可以看到更详细的错误消息,可以告诉我它是哪个属性,价值是什么?没有这些信息,它也可以说"坏数据".
据我所知,至少在批处理模式下运行时,Dataflow会将TableRow对象写入云存储中的临时区域,然后在所有内容完成后启动加载.这意味着我无处可捕获任何错误,我的代码在加载BigQuery时不再运行.我还没有在流模式下运行任何工作,但是我不确定它会有什么不同,从我的(公认有限的)理解基本原理是相同的,它只是批量大小更小.
人们使用Dataflow和BigQuery,因此,如果不必担心由于单个错误输入而导致整个管道停止,就不可能完成这项工作.人们如何做到这一点?
我正在使用Dataflow 0.5.5 Python.在非常简单的代码中遇到以下错误:
print(len(row_list))
Run Code Online (Sandbox Code Playgroud)
row_list是一个清单.完全相同的代码,相同的数据和相同的管道在DirectRunner上运行完全正常,但在DataflowRunner上抛出以下异常.它是什么意思以及我如何解决它?
job name: `beamapp-root-0216042234-124125`
(f14756f20f567f62): Traceback (most recent call last):
File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/batchworker.py", line 544, in do_work
work_executor.execute()
File "dataflow_worker/executor.py", line 973, in dataflow_worker.executor.MapTaskExecutor.execute (dataflow_worker/executor.c:30547)
with op.scoped_metrics_container:
File "dataflow_worker/executor.py", line 974, in dataflow_worker.executor.MapTaskExecutor.execute (dataflow_worker/executor.c:30495)
op.start()
File "dataflow_worker/executor.py", line 302, in dataflow_worker.executor.GroupedShuffleReadOperation.start (dataflow_worker/executor.c:12149)
def start(self):
File "dataflow_worker/executor.py", line 303, in dataflow_worker.executor.GroupedShuffleReadOperation.start (dataflow_worker/executor.c:12053)
with self.scoped_start_state:
File "dataflow_worker/executor.py", line 316, in dataflow_worker.executor.GroupedShuffleReadOperation.start (dataflow_worker/executor.c:11968)
with self.shuffle_source.reader() as reader:
File "dataflow_worker/executor.py", line 320, in dataflow_worker.executor.GroupedShuffleReadOperation.start (dataflow_worker/executor.c:11912)
self.output(windowed_value)
File "dataflow_worker/executor.py", line 152, …Run Code Online (Sandbox Code Playgroud) 我们正在为Apache Beam管道构建集成测试,并且遇到了一些问题.有关背景信息,请参见
有关我们管道的详情:
PubsubIO我们的数据源(无界PCollection)CombineFn和非常简单的窗口/触发策略JdbcIO,用org.neo4j.jdbc.Driver写的Neo4j目前的测试方法:
OurPipeline.main(TestPipeline.convertToArgs(options)PubsubIO将从这是一个简单的集成测试,它将验证我们的整个管道是否按预期运行.
我们目前面临的问题是,当我们运行我们的管道时,它会阻塞.我们正在使用DirectRunner和pipeline.run()(不 pipeline.run().waitUntilFinish()),但测试似乎在运行管道后挂起.因为这是一个无限制的PCollection(在流模式下运行),管道不会终止,因此不会到达它之后的任何代码.
所以,我有几个问题:
1)有没有办法运行管道然后稍后手动停止?
2)有没有办法异步运行管道?理想情况下,它会启动管道(然后将继续轮询Pub/Sub以获取数据),然后转到负责发布到Pub/Sub的代码.
3)这种集成测试方法是否合理,或者是否有更好的方法可能更直接?这里的任何信息/指导将不胜感激.
如果我能提供任何额外的代码/背景,请告诉我 - 谢谢!
java integration-testing google-cloud-pubsub google-cloud-dataflow apache-beam