我想从我的云功能中调用其他Google API,例如,在收到Pubsub发送的消息后将文件写入云存储.我怎样才能做到这一点?
我想利用时间分区表的新BigQuery功能,但不确定这在1.6版本的Dataflow SDK中是否可行.
查看BigQuery JSON API,要创建一个分区表,需要传入一个
"timePartitioning": { "type": "DAY" }
Run Code Online (Sandbox Code Playgroud)
选项,但com.google.cloud.dataflow.sdk.io.BigQueryIO接口仅允许指定TableReference.
我想也许我可以预先创建表,并通过BigQueryIO.Write.toTableReference lambda潜入分区装饰器..?是否有其他人通过Dataflow创建/编写分区表成功?
这似乎与设置当前不可用的表到期时间类似.
我正在尝试在Dataflow中执行联合操作.是否有用于在Dataflow中结合两个PCollection的示例代码?
如何从Google Cloud功能启动Cloud Dataflow作业?我想使用Google Cloud Functions作为启用跨服务合成的机制.
我正在尝试使用以下代码从 pub/sub 中读取
Read<String> pubsub = PubsubIO.<String>read().topic("projects/<projectId>/topics/<topic>").subscription("projects/<projectId>/subscriptions/<subscription>").withCoder(StringUtf8Coder.of()).withAttributes(new SimpleFunction<PubsubMessage,String>() {
@Override
public String apply(PubsubMessage input) {
LOG.info("hola " + input.getAttributeMap());
return new String(input.getMessage());
}
});
PCollection<String> pps = p.apply(pubsub)
.apply(
Window.<String>into(
FixedWindows.of(Duration.standardSeconds(15))));
pps.apply("printdata",ParDo.of(new DoFn<String, String>() {
@ProcessElement
public void processElement(ProcessContext c) {
LOG.info("hola amigo "+c.element());
c.output(c.element());
}
}));
Run Code Online (Sandbox Code Playgroud)
与我在 NodeJS 上收到的相比,我收到了将包含在该data字段中的消息。我怎样才能得到这个ackId字段(我以后可以用它来确认消息)?我正在打印的属性映射是null. 是否有其他方法可以确认所有消息而无需弄清楚 ackId?
我正在使用 DataflowPipelineRunner 创建数据流作业。我尝试了以下场景。
在上述所有场景中,输入是来自 GCS 的一个非常小的文件(KB 大小),输出是大查询表。
我在所有情况下都出现内存不足错误
我编译的代码的大小是 94mb。我只尝试字数统计示例,它没有读取任何输入(在作业开始之前它失败了)。请帮助我理解为什么我会收到此错误。
注意:我正在使用 appengine 来开始这项工作。
注意:相同的代码适用于 beta 版本0.4.150414
编辑 1
根据答案中的建议尝试了以下操作,
经过这些配置,Java Heap Memory 问题就解决了。但它试图将一个 jar 上传到超过 10Mb 的暂存位置,因此它失败了。
它记录以下异常
com.google.api.client.http.HttpRequest execute: exception thrown while executing request
com.google.appengine.api.urlfetch.RequestPayloadTooLargeException: The request to https://www.googleapis.com/upload/storage/v1/b/pwccloudedw-stagging-bucket/o?name=appengine-api-L4wtoWwoElWmstI1Ia93cg.jar&uploadType=resumable&upload_id=AEnB2Uo6HCfw6Usa3aXlcOzg0g3RawrvuAxWuOUtQxwQdxoyA0cf22LKqno0Gu-hjKGLqXIo8MF2FHR63zTxrSmQ9Yk9HdCdZQ exceeded the 10 MiB limit.
at com.google.appengine.api.urlfetch.URLFetchServiceImpl.convertApplicationException(URLFetchServiceImpl.java:157)
at com.google.appengine.api.urlfetch.URLFetchServiceImpl.fetch(URLFetchServiceImpl.java:45)
at com.google.apphosting.utils.security.urlfetch.URLFetchServiceStreamHandler$Connection.fetchResponse(URLFetchServiceStreamHandler.java:543)
at com.google.apphosting.utils.security.urlfetch.URLFetchServiceStreamHandler$Connection.getInputStream(URLFetchServiceStreamHandler.java:422)
at com.google.apphosting.utils.security.urlfetch.URLFetchServiceStreamHandler$Connection.getResponseCode(URLFetchServiceStreamHandler.java:275)
at com.google.api.client.http.javanet.NetHttpResponse.<init>(NetHttpResponse.java:36)
at com.google.api.client.http.javanet.NetHttpRequest.execute(NetHttpRequest.java:94)
at com.google.api.client.http.HttpRequest.execute(HttpRequest.java:965)
at …Run Code Online (Sandbox Code Playgroud)