标签: google-cloud-dataflow

将 STRUCT 数组从 Dataflow 写入大查询

我正在尝试将数据流管道中的结构数组字段写入大查询,生成的表的架构是正确的,但字段中没有填充任何数据。

我的 DoFn 功能:

public class ProcessIpBlocks {

    public static class IpBlocksToIp extends DoFn<TableRow, TableRow> {

        private static final long serialVersionUID = 1L;

        @Override
        public void processElement(ProcessContext c) throws JSONException {

            TableRow row = c.element();
            DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
            Calendar cal = Calendar.getInstance();


            long startIp = 0L, endIp = 0L;
            if(row.get("start_ip") != null)
                startIp = Long.parseLong((String)row.get("start_ip"));

            if(row.get("end_ip") != null)
                endIp = Long.parseLong((String)row.get("end_ip"));

            for(long i= startIp; i<=endIp; i++)
            {
                TableRow outputRow = new TableRow();
                outputRow.set("start_ip", startIp);
                outputRow.set("ip", i);

                if(row.get("postal_code") …
Run Code Online (Sandbox Code Playgroud)

java google-bigquery google-cloud-dataflow

1
推荐指数
1
解决办法
1659
查看次数

从 Go 应用程序的数据流模板创建作业

我正在尝试通过 Go 应用程序从现有模板启动数据流作业。

到目前为止,我已经引入google.golang.org/api/dataflow/v1b3并创建了一个CreateJobFromTemplateRequest包含工作信息的文件。

现在,我如何使用 Compute Engine 中的内置服务帐号凭据执行该请求?

go google-cloud-dataflow

1
推荐指数
1
解决办法
838
查看次数

检查 PCollection 是否为空 - Apache Beam

有没有办法检查 PCollection 是否为空?

我在 Dataflow 和 Apache Beam 的文档中没有找到任何相关内容。

google-cloud-dataflow apache-beam

1
推荐指数
1
解决办法
5779
查看次数

云功能:详细的堆栈跟踪:错误:找不到模块“googleapis”

这与以下 2 个线程相关: Google 云函数 - 无法读取属性“getApplicationDefault” 从云函数触发云数据流管道 - 函数超时

我创建了一个数据流模板,它将数据从 GCS 复制到 BigQuery,作为这两个示例。

作为初始化过程的一部分,我运行

npm init
npm install --save googleapis
Run Code Online (Sandbox Code Playgroud)

这是我的index.js

    var {google} = require('googleapis');

exports.goWithTheDataFlow  = (event, callback) => {


const file = event.data;
  const context = event.context;

  console.log(`Event ${context.eventId}`);
  console.log(`  Event Type: ${context.eventType}`);
  console.log(`  Bucket: ${file.bucket}`);
  console.log(`  File: ${file.name}`);
  console.log(`  Metageneration: ${file.metageneration}`);
  console.log(`  Created: ${file.timeCreated}`);
  console.log(`  Updated: ${file.updated}`);

  google.auth.getApplicationDefault(function (err, authClient, projectId) {
     if (err) {
       throw err;
     }

 console.log(projectId);

 const dataflow = google.dataflow({ version: 'v1b3', auth: …
Run Code Online (Sandbox Code Playgroud)

google-cloud-platform google-cloud-dataflow google-cloud-functions

1
推荐指数
1
解决办法
6543
查看次数

如何检查作业为何在 Google Dataflow 上被终止(可能 OOM)

我有一个简单的任务。我有一堆文件( ~100GB in total ),每一行代表一个实体。我必须将此实体发送到 JanusGraph 服务器。

2018-07-07_05_10_46-8497016571919684639 <- job id
Run Code Online (Sandbox Code Playgroud)

一段时间后,我遇到 OOM,日志显示 Java 被杀死。

从数据流视图中,我可以看到以下日志:

Workflow failed. Causes: S01:TextIO.Read/Read+ParDo(Anonymous)+ParDo(JanusVertexConsumer) failed., A work item was attempted 4 times without success. Each time the worker eventually lost contact with the service. The work item was attempted on:

从堆栈驱动程序视图中,我可以看到: https: //www.dropbox.com/s/zvny7qwhl7hbwyw/Screenshot%202018-07-08%2010.05.33.png ?dl=0

日志显示: E Out of memory: Kill process 1180 (java) score 1100 or sacrifice child E Killed process 1180 (java) total-vm:4838044kB, anon-rss:383132kB, file-rss:0kB 更多信息请参见:https ://pastebin.com/raw/MftBwUxs

我怎样才能调试发生了什么?

java google-cloud-dataflow tinkerpop3 apache-beam janusgraph

1
推荐指数
1
解决办法
1758
查看次数

GCP Dataflow vCPU 使用情况和定价问题

我提交了一个 GCP 数据流管道,以从 GCP Pub/Sub 接收数据、解析并存储到 GCP 数据存储区。看起来工作很完美。

经过 21 天,我发现成本为 144.54 美元,工作时间为 2,094.72 小时。这意味着我提交后,即使没有从 Pub/Sub 接收(处理)任何数据,也会每秒收费。

这种行为正常吗?或者我设置了错误的参数?我以为CPU使用时间只有在接收到数据时才计算。

有什么方法可以降低相同工作模型的成本(从 Pub/Sub 接收并存储到数据存储)?

google-cloud-platform google-cloud-dataflow

1
推荐指数
1
解决办法
1193
查看次数

如何从 PubSub 主题读取数据并将其解析到梁管道中并打印它

我有一个程序,它在 pubSub 中创建一个主题,并向该主题发布消息。我还有一个自动数据流作业(使用模板),它将这些消息保存到我的 BigQuery 表中。现在我打算用 python 管道替换基于模板的作业,其中我的要求是从 PubSub 读取数据,应用转换并将数据保存到 BigQuery/发布到另一个 PubSub 主题。我开始用 python 编写脚本,并进行了大量的试验和错误来实现它,但令我沮丧的是,我无法实现它。代码如下所示:

import apache_beam as beam
from apache_beam.io import WriteToText
TOPIC_PATH = "projects/test-pipeline-253103/topics/test-pipeline-topic"
OUTPUT_PATH = "projects/test-pipeline-253103/topics/topic-repub"

def run():
    o = beam.options.pipeline_options.PipelineOptions()
    p = beam.Pipeline(options=o)

    print("I reached here")
    # # Read from PubSub into a PCollection.
    data = (
        p
        | "Read From Pub/Sub" >> beam.io.ReadFromPubSub(topic=TOPIC_PATH)
    )
    data | beam.io.WriteToPubSub(topic=OUTPUT_PATH)
    print("Lines: ", data)
run()
Run Code Online (Sandbox Code Playgroud)

如果我能尽早获得一些帮助,我将非常感激。注意:我在谷歌云上设置了我的项目,并且我的脚本在本地运行。

google-bigquery google-cloud-platform google-cloud-pubsub google-cloud-dataflow apache-beam

1
推荐指数
1
解决办法
4403
查看次数

数据流 - 函数未被调用 - 错误 - 名称未定义

我正在 Google Dataflow 上使用 Apache Beam,并通过 lambda 函数调用函数情感,但收到错误消息:函数名称未定义。

output_tweets = (lines
                     | 'decode' >> beam.Map(lambda x: x.decode('utf-8'))
                     | 'assign window key' >> beam.WindowInto(window.FixedWindows(10))
                     | 'batch into n batches' >> BatchElements(min_batch_size=49, max_batch_size=50)
                     | 'sentiment analysis' >> beam.FlatMap(lambda x: sentiment(x))
                     )
Run Code Online (Sandbox Code Playgroud)

这是我的 Apache Beam 调用,在最后一行中提到了函数情绪,这给我带来了问题。

函数代码如下(我认为这不重要):

def sentiment(messages):
    if not isinstance(messages, list):
        messages = [messages]

    instances = list(map(lambda message: json.loads(message), messages))
    lservice = discovery.build('language', 'v1beta1', developerKey = APIKEY)
    for instance in instances['text']:
        response = lservice.documents().analyzeSentiment(
            body ={
                'document': {
                    'type': 'PLAIN_TEXT',
                    'content': …
Run Code Online (Sandbox Code Playgroud)

python dataflow google-cloud-dataflow apache-beam

1
推荐指数
1
解决办法
1554
查看次数

如何访问 apache_beam.io.fileio.ReadableFile() 对象?

我正在尝试使用该apache_beam.io.fileio模块来读取文件lines.txt并将其合并到我的管道中。

lines.txt有以下内容:

line1
line2
line3
Run Code Online (Sandbox Code Playgroud)

当我运行以下管道代码时:

with beam.Pipeline(options=pipeline_options) as p:

     lines = (
         p
         | beam.io.fileio.MatchFiles(file_pattern="lines.txt")
         | beam.io.fileio.ReadMatches()
     )
     # print file contents to screen
     lines | 'print to screen' >> beam.Map(print)
Run Code Online (Sandbox Code Playgroud)

我得到以下输出:

<apache_beam.io.fileio.ReadableFile object at 0x000001A8C6C55F08>
Run Code Online (Sandbox Code Playgroud)

我期望

line1
line2
line3
Run Code Online (Sandbox Code Playgroud)

我怎样才能达到我预期的结果?

python google-cloud-platform google-cloud-dataflow apache-beam

1
推荐指数
1
解决办法
1519
查看次数

Dataflow SQL - 不支持的类型地理

我正在尝试在 Google Big Query 上创建数据流 SQL,但收到此错误

Unsupported type for column centroid.centroid: GEOGRAPHY
Run Code Online (Sandbox Code Playgroud)

我找不到任何证据表明 Dataflow SQL 实际上不支持地理数据,并且文档中根本没有提及地理数据。是这样吗,为什么会这样,有什么解决办法吗?

google-bigquery google-cloud-dataflow

1
推荐指数
1
解决办法
182
查看次数