阅读Cloud Dataflow文档后,我仍然不确定如何从App Engine运行数据流作业.可能吗?我的后端用Python或Java编写是否相关?谢谢!
我们使用数据存储作为持久性后端在Google App Engine上运行应用程序.目前,应用程序主要具有"OLTP"功能和一些初步报告.在实施报告时,我们遇到使用数据存储区和GQL处理大量数据(数百万个对象)非常困难.为了使用适当的报告和商业智能功能增强我们的应用程序,我们认为最好设置ETL过程以将数据从Datastore移动到BigQuery.
最初我们考虑将ETL过程实现为App Engine cron作业,但看起来Dataflow也可用于此.我们有以下设置流程的要求
我的问题是
问题是这两种方法可行吗?哪一个更好的成本?有没有比上面两个更好的其他方式?
谢谢,
rizTaak
google-app-engine google-bigquery google-cloud-datastore google-cloud-dataflow
我们有一个单独的流媒体事件源,每秒有数千个事件,这些事件都标有一个id,用于标识事件所属的数万个客户中的哪一个.我们想使用这个事件源来填充数据仓库(在流模式下),但是,我们的事件源不是持久性的,所以我们还希望将原始数据存档在GCS中,以便我们可以通过我们的数据重放它仓库管道,如果我们进行需要它的更改.由于数据保留要求,我们持久存储的任何原始数据都需要由客户进行分区,以便我们可以轻松删除它.
在Dataflow中解决这个问题最简单的方法是什么?目前我们正在使用自定义接收器创建数据流作业,该接收器将数据写入GCS/BigQuery上的每个客户的文件,这是明智的吗?
我对使用Google Cloud Dataflow非常陌生.我想获得两个PCollections的笛卡尔积.举例来说,如果我有两个PCollections (1, 2)和("hello", "world"),它们的笛卡尔乘积是((1, "hello"), (1, "world"), (2, "hello"), (2, "world")).
任何想法我怎么能这样做?此外,由于笛卡尔积可能很大,我希望解决方案可以懒得创建产品,从而避免大量内存消耗.
谢谢!
根据我们
如何从数据流加载Bigquery表时设置maximum_bad_records?maxBadRecords从Dataflow将数据加载到BigQuery时,目前无法设置配置.建议在将数据插入BigQuery之前验证Dataflow作业中的行.
如果我有TableSchema和a TableRow,我该如何确保可以安全地将行插入表中?
必须有一种更简单的方法来做到这一点,而不是迭代模式中的字段,查看它们的类型并查看行中值的类,对吧?这似乎容易出错,并且该方法必须是万无一失的,因为如果无法加载单行,整个管道就会失败.
更新:
我的用例是一个ETL作业,最初将在JSON上运行(每行一个对象)登录云存储并批量写入BigQuery,但稍后将从PubSub读取对象并连续写入BigQuery.这些对象包含很多BigQuery中不需要的信息,还包含甚至无法在模式中描述的部分(基本上是自由形式的JSON有效负载).像时间戳这样的东西也需要格式化以与BigQuery一起使用.这个作业的一些变体会在不同的输入上运行并写入不同的表.
从理论上讲,这不是一个非常困难的过程,它需要一个对象,提取一些属性(50-100),格式化其中一些并将对象输出到BigQuery.我或多或少只是循环遍历属性名称列表,从源对象中提取值,查看配置以查看属性是否应该以某种方式格式化,如果需要应用格式(这可能是下行,划分毫秒时间戳) 1000,从URL中提取主机名等),并将值写入TableRow对象.
我的问题是数据混乱.有几亿个物体有一些看起来并不像预期的那样,这种情况很少见,但是这些物品仍然很少见.有时,应包含字符串的属性包含整数,反之亦然.有时会有一个数组或一个应该有字符串的对象.
理想情况下,我想接受TableRow并通过TableSchema并询问"这有效吗?".
因为这是不可能的,所以我做的是查看TableSchema对象并尝试自己验证/转换值.如果TableSchema说属性是STRINGI 类型,则value.toString()在将其添加到之前TableRow.如果是,INTEGER我检查它是a Integer,Long或者BigInteger等等.这种方法的问题在于我只是猜测BigQuery会起什么作用.它接受哪些Java数据类型FLOAT?为了TIMESTAMP?我认为我的验证/演员表可以解决大多数问题,但总有例外和边缘情况.
根据我的经验,这是非常有限的,整个工作流程(工作?工作流程?不确定正确的术语)如果单行失败BigQuery的验证失败(就像常规加载一样,除非maxBadRecords设置为足够大的数字).它也失败了表面有用的消息,如'BigQuery导入作业"dataflow_job_xxx"失败.原因:(5db0b2cdab1557e0):项目"xxx"中的BigQuery作业"dataflow_job_xxx"已完成错误:errorResult:为非记录字段指定的JSON映射,错误:为非记录字段指定的JSON映射,错误:指定了JSON映射对于非记录字段,错误:为非记录字段指定的JSON映射,错误:为非记录字段指定的JSON映射,错误:为非记录字段指定的JSON映射'.也许在哪里可以看到更详细的错误消息,可以告诉我它是哪个属性,价值是什么?没有这些信息,它也可以说"坏数据".
据我所知,至少在批处理模式下运行时,Dataflow会将TableRow对象写入云存储中的临时区域,然后在所有内容完成后启动加载.这意味着我无处可捕获任何错误,我的代码在加载BigQuery时不再运行.我还没有在流模式下运行任何工作,但是我不确定它会有什么不同,从我的(公认有限的)理解基本原理是相同的,它只是批量大小更小.
人们使用Dataflow和BigQuery,因此,如果不必担心由于单个错误输入而导致整个管道停止,就不可能完成这项工作.人们如何做到这一点?
我想利用时间分区表的新BigQuery功能,但不确定这在1.6版本的Dataflow SDK中是否可行.
查看BigQuery JSON API,要创建一个分区表,需要传入一个
"timePartitioning": { "type": "DAY" }
Run Code Online (Sandbox Code Playgroud)
选项,但com.google.cloud.dataflow.sdk.io.BigQueryIO接口仅允许指定TableReference.
我想也许我可以预先创建表,并通过BigQueryIO.Write.toTableReference lambda潜入分区装饰器..?是否有其他人通过Dataflow创建/编写分区表成功?
这似乎与设置当前不可用的表到期时间类似.
我正在使用来自流式数据流管道的DatastoreIO,并在使用相同的密钥编写实体时收到错误.
2016-12-10T22:51:04.385Z: Error: (af00222cfd901860): Exception: com.google.datastore.v1.client.DatastoreException: A non-transactional commit may not contain multiple mutations affecting the same entity., code=INVALID_ARGUMENT
Run Code Online (Sandbox Code Playgroud)
如果我在密钥中使用随机数,那么事情可行,但我需要更新相同的密钥,那么有没有使用DataStoreIO执行此操作的事务方法?
static class CreateEntityFn extends DoFn<KV<String, Tile>, Entity> {
private static final long serialVersionUID = 0;
private final String namespace;
private final String kind;
CreateEntityFn(String namespace, String kind) {
this.namespace = namespace;
this.kind = kind;
}
public Entity makeEntity(String key, Tile tile) {
Entity.Builder entityBuilder = Entity.newBuilder();
Key.Builder keyBuilder = makeKey(kind, key );
if (namespace != null) {
keyBuilder.getPartitionIdBuilder().setNamespaceId(namespace);
}
entityBuilder.setKey(keyBuilder.build()); …Run Code Online (Sandbox Code Playgroud) 我正在使用Dataflow 0.5.5 Python.在非常简单的代码中遇到以下错误:
print(len(row_list))
Run Code Online (Sandbox Code Playgroud)
row_list是一个清单.完全相同的代码,相同的数据和相同的管道在DirectRunner上运行完全正常,但在DataflowRunner上抛出以下异常.它是什么意思以及我如何解决它?
job name: `beamapp-root-0216042234-124125`
(f14756f20f567f62): Traceback (most recent call last):
File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/batchworker.py", line 544, in do_work
work_executor.execute()
File "dataflow_worker/executor.py", line 973, in dataflow_worker.executor.MapTaskExecutor.execute (dataflow_worker/executor.c:30547)
with op.scoped_metrics_container:
File "dataflow_worker/executor.py", line 974, in dataflow_worker.executor.MapTaskExecutor.execute (dataflow_worker/executor.c:30495)
op.start()
File "dataflow_worker/executor.py", line 302, in dataflow_worker.executor.GroupedShuffleReadOperation.start (dataflow_worker/executor.c:12149)
def start(self):
File "dataflow_worker/executor.py", line 303, in dataflow_worker.executor.GroupedShuffleReadOperation.start (dataflow_worker/executor.c:12053)
with self.scoped_start_state:
File "dataflow_worker/executor.py", line 316, in dataflow_worker.executor.GroupedShuffleReadOperation.start (dataflow_worker/executor.c:11968)
with self.shuffle_source.reader() as reader:
File "dataflow_worker/executor.py", line 320, in dataflow_worker.executor.GroupedShuffleReadOperation.start (dataflow_worker/executor.c:11912)
self.output(windowed_value)
File "dataflow_worker/executor.py", line 152, …Run Code Online (Sandbox Code Playgroud) 我们正在为Apache Beam管道构建集成测试,并且遇到了一些问题.有关背景信息,请参见
有关我们管道的详情:
PubsubIO我们的数据源(无界PCollection)CombineFn和非常简单的窗口/触发策略JdbcIO,用org.neo4j.jdbc.Driver写的Neo4j目前的测试方法:
OurPipeline.main(TestPipeline.convertToArgs(options)PubsubIO将从这是一个简单的集成测试,它将验证我们的整个管道是否按预期运行.
我们目前面临的问题是,当我们运行我们的管道时,它会阻塞.我们正在使用DirectRunner和pipeline.run()(不 pipeline.run().waitUntilFinish()),但测试似乎在运行管道后挂起.因为这是一个无限制的PCollection(在流模式下运行),管道不会终止,因此不会到达它之后的任何代码.
所以,我有几个问题:
1)有没有办法运行管道然后稍后手动停止?
2)有没有办法异步运行管道?理想情况下,它会启动管道(然后将继续轮询Pub/Sub以获取数据),然后转到负责发布到Pub/Sub的代码.
3)这种集成测试方法是否合理,或者是否有更好的方法可能更直接?这里的任何信息/指导将不胜感激.
如果我能提供任何额外的代码/背景,请告诉我 - 谢谢!
java integration-testing google-cloud-pubsub google-cloud-dataflow apache-beam
继续从多个文件夹中读取文件,然后使用python sdk和dataflow runner将文件名如(filecontents,filename)输出到apache beam中的bigquery.
原本以为我可以为每个文件创建一个pcollection,然后使用文件名映射文件内容.
def read_documents(pipeline):
"""Read the documents at the provided uris and returns (uri, line) pairs."""
pcolls = []
count = 0
with open(TESTIN) as uris:
for uri in uris:
#print str(uri).strip("[]/'")
pcolls.append(
pipeline
| 'Read: uri' + str(uri) >>ReadFromText(str(uri).strip("[]/'"), compression_type = 'gzip')
| 'WithKey: uri' + str(uri) >> beam.Map(lambda v, uri: (v, str(uri).strip("[]")), uri)
)
return pcolls | 'FlattenReadPColls' >> beam.Flatten()
Run Code Online (Sandbox Code Playgroud)
这工作正常,但速度很慢,大约10000个文件后无法在数据流云上工作.如果超过10000个文件,它将遭受破损的管道.
目前正试图从Text.io重载ReadAllFromText函数.Text.io旨在从文件名或模式的pcollection中快速读取大量文件.如果从Google云端存储中读取并且该文件具有内容编码,则此模块中存在错误.谷歌云存储自动枪杀文件并对其进行转码,但由于某些原因,ReadAllFromText无法使用它.您必须更改文件的元数据以删除内容编码,并将ReadAllFromText上的压缩类型设置为gzip.我将此问题包含在内,以防其他人遇到ReadAllFromText问题 https://issues.apache.org/jira/browse/BEAM-1874
我目前的代码看起来像这样
class ReadFromGs(ReadAllFromText):
def __init__(self):
super(ReadFromGs, self).__init__(compression_type="gzip")
def expand(self, pvalue):
files = …Run Code Online (Sandbox Code Playgroud) python google-cloud-platform google-cloud-dataflow apache-beam