阅读Cloud Dataflow文档后,我仍然不确定如何从App Engine运行数据流作业.可能吗?我的后端用Python或Java编写是否相关?谢谢!
我对使用Google Cloud Dataflow非常陌生.我想获得两个PCollections的笛卡尔积.举例来说,如果我有两个PCollections (1, 2)和("hello", "world"),它们的笛卡尔乘积是((1, "hello"), (1, "world"), (2, "hello"), (2, "world")).
任何想法我怎么能这样做?此外,由于笛卡尔积可能很大,我希望解决方案可以懒得创建产品,从而避免大量内存消耗.
谢谢!
我想利用时间分区表的新BigQuery功能,但不确定这在1.6版本的Dataflow SDK中是否可行.
查看BigQuery JSON API,要创建一个分区表,需要传入一个
"timePartitioning": { "type": "DAY" }
Run Code Online (Sandbox Code Playgroud)
选项,但com.google.cloud.dataflow.sdk.io.BigQueryIO接口仅允许指定TableReference.
我想也许我可以预先创建表,并通过BigQueryIO.Write.toTableReference lambda潜入分区装饰器..?是否有其他人通过Dataflow创建/编写分区表成功?
这似乎与设置当前不可用的表到期时间类似.
我正在使用来自流式数据流管道的DatastoreIO,并在使用相同的密钥编写实体时收到错误.
2016-12-10T22:51:04.385Z: Error: (af00222cfd901860): Exception: com.google.datastore.v1.client.DatastoreException: A non-transactional commit may not contain multiple mutations affecting the same entity., code=INVALID_ARGUMENT
Run Code Online (Sandbox Code Playgroud)
如果我在密钥中使用随机数,那么事情可行,但我需要更新相同的密钥,那么有没有使用DataStoreIO执行此操作的事务方法?
static class CreateEntityFn extends DoFn<KV<String, Tile>, Entity> {
private static final long serialVersionUID = 0;
private final String namespace;
private final String kind;
CreateEntityFn(String namespace, String kind) {
this.namespace = namespace;
this.kind = kind;
}
public Entity makeEntity(String key, Tile tile) {
Entity.Builder entityBuilder = Entity.newBuilder();
Key.Builder keyBuilder = makeKey(kind, key );
if (namespace != null) {
keyBuilder.getPartitionIdBuilder().setNamespaceId(namespace);
}
entityBuilder.setKey(keyBuilder.build()); …Run Code Online (Sandbox Code Playgroud) 我创建了一个Pipeline,它从GCS中的文件读取,转换它,最后写入BQ表.该文件包含标题行(字段).
有没有办法以编程方式设置"跳过的标题行数",就像加载时在BQ中可以做的那样?

我有一个连接到pub/sub的流管道,它发布了GCS文件的文件名.从那里我想读取每个文件并解析每一行上的事件(事件是我最终想要处理的事件).
我可以使用TextIO吗?当在执行期间定义文件名时,您是否可以在流管道中使用它(而不是使用TextIO作为源,并且fileName(s)在构造时已知).如果不是,我正在考虑做以下事情:
从pub/sub ParDo获取主题以读取每个文件并获取行处理文件的行...
我可以使用FileBasedReader或类似的东西来读取文件吗?文件不是太大,所以我不需要并行读取单个文件,但我需要读取大量文件.
我正试图找到一种优雅地结束我的工作的方式,以免丢失任何数据,从PubSub流式传输并写入BigQuery.
我可以设想的一种可能的方法是让作业停止提取新数据,然后运行直到它处理完所有内容,但我不知道是否/如何实现这一点.
BigQuery可以从Google云端硬盘中读取联合来源.看到这里.我希望能够将BigQuery中的表读入指向Drive文档的Dataflow管道中.
将BigQuery挂接到Drive中的文件非常正常:
但是,当我尝试将该表读入我的Dataflow管道时,我(可以理解)得到以下错误:
找不到合适的凭据来访问Google云端硬盘.联系表所有者以获取帮助.
[..]
PCollection<TableRow> results = pipeline.apply("whatever",
BigQueryIO.Read.fromQuery("SELECT * from [CPT_7414_PLAYGROUND.google_drive_test]"))
.apply(ParDo.of(new DoFn<TableRow, TableRow>() {
[..]
Run Code Online (Sandbox Code Playgroud)
我如何使Dataflow能够从BigQuery中指向Drive的表中读取权限?
我们有一个流管道,我们已启用自动缩放功能.通常,一个工作人员足以处理传入的数据,但是如果存在积压工作,我们希望自动增加工作人员的数量.
我们的管道从Pubsub读取,并且每3分钟使用加载作业将批次写入BigQuery.我们从一个工作者开始运行此管道,向pubsub发布两倍于一个工作者可以使用的数据.2小时后,自动缩放仍未启动,因此积压数据约为1小时.考虑到自动调节旨在将积压保持在10秒以下(根据此SO答案),这似乎相当差.
这里的文档说,流媒体作业的自动调节是测试版,并且如果接收器是高延迟的,那么已知它是粗粒度的.是的,我想每3分钟做一次BigQuery批次就算是高延迟!在改进此自动缩放算法方面是否有任何进展?
在此期间我们可以做任何解决方法,例如测量管道中不同点的吞吐量吗?我找不到有关如何将吞吐量报告给自动扩展系统的任何文档.
我想使用Dataflow将数据从发布/订阅移到GCS。因此,基本上我希望Dataflow在固定的时间量(例如15分钟)中累积一些消息,然后在经过该时间量后将这些数据作为文本文件写入GCS。
我的最终目标是创建一个自定义管道,因此“ Pub / Sub to Cloud Storage”模板对我来说还不够,而且我完全不了解Java,这使我开始使用Python进行调整。
这是到目前为止我所获得的(Apache Beam Python SDK 2.10.0):
import apache_beam as beam
TOPIC_PATH="projects/<my-project>/topics/<my-topic>"
def CombineFn(e):
return "\n".join(e)
o = beam.options.pipeline_options.PipelineOptions()
p = beam.Pipeline(options=o)
data = ( p | "Read From Pub/Sub" >> beam.io.ReadFromPubSub(topic=TOPIC_PATH)
| "Window" >> beam.WindowInto(beam.window.FixedWindows(30))
| "Combine" >> beam.transforms.core.CombineGlobally(CombineFn).without_defaults()
| "Output" >> beam.io.WriteToText("<GCS path or local path>"))
res = p.run()
res.wait_until_finish()
Run Code Online (Sandbox Code Playgroud)
我在本地环境中运行该程序没有问题。
python main.py
Run Code Online (Sandbox Code Playgroud)
它在本地运行,但可以从指定的Pub / Sub主题读取,并且每隔30秒就会按预期写入指定的GCS路径。
但是现在的问题是,当我在Google Cloud Platform(即Cloud Dataflow)上运行它时,它不断发出神秘的异常。
java.util.concurrent.ExecutionException: java.lang.RuntimeException: Error received from SDK harness for instruction -1096: Traceback (most …Run Code Online (Sandbox Code Playgroud) python google-cloud-pubsub google-cloud-dataflow apache-beam