标签: google-cloud-dataflow

Google Cloud Dataflow和Google Cloud Dataproc有什么区别?

我正在使用Google Data Flow来实现ETL数据仓库解决方案.

看看谷歌云产品,似乎DataProc也可以做同样的事情.

DataProc似乎比DataFlow便宜一点.

有没有人知道DataFlow上DataFlow的优缺点

为什么谷歌同时提供?

google-cloud-dataflow google-cloud-dataproc

31
推荐指数
5
解决办法
2万
查看次数

Apache Beam:FlatMap vs Map?

我想了解在哪种情况下我应该使用FlatMap或Map.我的文档似乎并不清楚.

我仍然不明白在哪种情况下我应该使用FlatMap或Map的转换.

有人能给我一个例子,这样我才能理解他们的不同之处吗?

我理解Spark中FlatMap与Map的区别,但不确定是否有相似之处?

google-cloud-dataflow apache-beam

25
推荐指数
2
解决办法
6289
查看次数

Google Dataflow与Apache Spark

我正在调查Google DataflowApache Spark,以确定哪个更适合我们的bigdata分析业务需求.

我发现有Spark SQLMLlib火花平台上做结构化数据查询和机器学习.

我想知道Google Dataflow平台中是否有相应的解决方案?

distributed-computing bigdata apache-spark google-cloud-dataflow google-cloud-ml

24
推荐指数
2
解决办法
9066
查看次数

谷歌数据流作业成本优化

我已经为 100 GB 大小的 522 个 gzip 文件运行了以下代码,解压缩后,它将是大约 320 GB 数据和 protobuf 格式的数据,并将输出写入 GCS。我已经使用 n1 标准机器和区域进行输入,输出都得到了照顾,工作花费了我大约 17 美元,这是半小时的数据,所以我真的很需要在这里做一些成本优化。

我从下面的查询中得到的成本

SELECT l.value AS JobID,  ROUND(SUM(cost),3) AS JobCost 
FROM `PROJECT.gcp_billing_data.gcp_billing_export_v1_{}` bill, 
UNNEST(bill.labels) l
WHERE service.description = 'Cloud Dataflow' and l.key = 'goog-dataflow-job-id' and 
extract(date from _PARTITIONTIME) > "2020-12-31"
GROUP BY 1
Run Code Online (Sandbox Code Playgroud)

完整代码

  import time
  import sys
  import argparse
  import apache_beam as beam
  from apache_beam.options.pipeline_options import PipelineOptions
  from apache_beam.options.pipeline_options import SetupOptions
  import csv
  import base64
  from google.protobuf import timestamp_pb2
  from google.protobuf.json_format import MessageToDict …
Run Code Online (Sandbox Code Playgroud)

python protocol-buffers avro google-cloud-dataflow apache-beam

22
推荐指数
1
解决办法
574
查看次数

使用DoFn使用Cloud Dataflow从PubSub写入Google云端存储

我正在尝试使用Google Cloud Dataflow将Google PubSub消息写入Google云端存储.我知道TextIO/AvroIO不支持流媒体管道.但是,我在[1]中读到,可以ParDo/DoFn通过作者的评论在流式传输管道中写入GCS .我尽可能地按照他们的文章构建了一条管道.

我的目标是这种行为:

  • 消息以最多100个批次写入GCS中的对象(每个窗口窗格一个),该路径对应于发布消息的时间dataflow-requests/[isodate-time]/[paneIndex].

我得到了不同的结果:

  • 每小时窗口中只有一个窗格.因此,我只在每小时"桶"中获得一个文件(它实际上是GCS中的对象路径).将MAX_EVENTS_IN_FILE减少到10没有区别,仍然只有一个窗格/文件.
  • 每个GCS对象中只有一条消息被写出
  • 写入GCS时,管道偶尔会引发CRC错误.

我如何解决这些问题并获得我期待的行为?

示例日志输出:

21:30:06.977 writing pane 0 to blob dataflow-requests/2016-04-08T20:59:59.999Z/0
21:30:06.977 writing pane 0 to blob dataflow-requests/2016-04-08T20:59:59.999Z/0
21:30:07.773 sucessfully write pane 0 to blob dataflow-requests/2016-04-08T20:59:59.999Z/0
21:30:07.846 sucessfully write pane 0 to blob dataflow-requests/2016-04-08T20:59:59.999Z/0
21:30:07.847 writing pane 0 to blob dataflow-requests/2016-04-08T20:59:59.999Z/0
Run Code Online (Sandbox Code Playgroud)

这是我的代码:

package com.example.dataflow;

import com.google.cloud.dataflow.sdk.Pipeline;
import com.google.cloud.dataflow.sdk.io.PubsubIO;
import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions;
import com.google.cloud.dataflow.sdk.options.PipelineOptions;
import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
import com.google.cloud.dataflow.sdk.transforms.DoFn;
import com.google.cloud.dataflow.sdk.transforms.ParDo;
import com.google.cloud.dataflow.sdk.transforms.windowing.*;
import com.google.cloud.dataflow.sdk.values.PCollection;
import com.google.gcloud.storage.BlobId;
import com.google.gcloud.storage.BlobInfo;
import …
Run Code Online (Sandbox Code Playgroud)

google-cloud-storage google-cloud-pubsub google-cloud-dataflow

16
推荐指数
1
解决办法
4420
查看次数

通过Google Pub/Sub + Dataflow直接流入BigQuery的优缺点

我们在Google Kubernetes Engine上托管了NodeJS API,我们想开始将事件记录到BigQuery中.

我可以看到3种不同的方法:

  1. 使用API​​中的Node BigQuery SDK将每个事件直接插入BigQuery(如"流式插入示例"中所述:https://cloud.google.com/bigquery/streaming-data-into-bigquery或此处:https:/ /github.com/googleapis/nodejs-bigquery/blob/7d7ead644e1b9fe8428462958dbc9625fe6c99c8/samples/tables.js#L367)
  2. 将每个事件发布到Cloud Pub/Sub主题,然后编写Cloud Dataflow管道以将其流式传输到BigQuery(仅在Java或Python中),例如https://blog.doit-intl.com/replacing-mixpanel- with-bigquery-dataflow-and-kubernetes-b5f844710674https://github.com/bomboradata/pubsub-to-bigquery
  3. 将每个事件从API发布到Pub/Sub主题,但不是Dataflow使用自定义工作进程,该进程在一侧订阅Pub/Sub主题并在另一侧流入BQ.像这里:https: //github.com/GoogleCloudPlatform/kubernetes-bigquery-python/blob/master/pubsub/pubsub-pipe-image/pubsub-to-bigquery.py 或在这里:https://github.com/mchon89 /Google_PubSub_BigQuery/blob/master/pubsub_to_bigquery.py

对于这个特定的用例,我们不需要进行任何转换,只是将事件直接发送到正确的格式.但是我们稍后可能会有其他用例,我们需要将主数据存储区(MySQL)中的表同步到BQ进行分析,所以可能马上从Dataflow开始值得吗?

几个问题 :

  • 选项1(直接向BQ发送单个事件)似乎最简单,如果你没有任何变换.它是否像发布Pub/Sub主题一样快速可靠?我主要关注延迟和错误/重复处理(https://cloud.google.com/bigquery/troubleshooting-errors#streaming).也许这在单独的过程中做得更好?
  • 对于选项2,是否有任何数据流"预设"不需要您编写自定义代码,只需要从Pu​​b/Sub +读取可靠地发送到BQ而不进行转换(可能只是重复数据删除/错误处理)
  • 有一个简单的自定义工作程序(选项3)是否有任何缺点,从Pub/Sub读取然后流入BQ并执行所有错误处理/重试等?

google-bigquery google-cloud-pubsub google-cloud-dataflow

16
推荐指数
1
解决办法
2052
查看次数

窗口化后,Google数据流流媒体管道不会将工作负载分配给多个工作人员

我正在尝试在python中设置数据流流管道.我对批处理管道有很多经验.我们的基本架构如下所示: 在此输入图像描述

第一步是进行一些基本处理,每条消息大约需要2秒才能进入窗口.我们使用3秒和3秒间隔的滑动窗口(可能会稍后更改,因此我们有重叠的窗口).作为最后一步,我们有SOG预测需要大约15秒的时间来处理,这显然是我们的瓶颈变换.

因此,我们似乎面临的问题是,在窗口化之前工作负载完全分布在我们的工作者上,但最重要的转换根本不是分布式的.所有的窗户一次一个地处理,看似1个工人,而我们有50个可用.

日志告诉我们,sog预测步骤每15秒输出一次,如果窗口将被更多的工作人员处理,则不应该是这种情况,因此这会在一段时间内产生巨大的延迟,这是我们不想要的.使用1分钟的消息,最后一个窗口的延迟为5分钟.当分配工作时,这应该只有大约15秒(SOG预测时间).所以在这一点上我们是无能为力的..

在此输入图像描述

有没有人看到我们的代码是否有问题或如何防止/规避这个?似乎这是谷歌云数据流内部发生的事情.这是否也出现在Java流媒体管道中?

在批处理模式下,一切正常.在那里,人们可以尝试进行重新洗牌以确保不会发生融合等.但是在流媒体窗口化之后,这是不可能的.

args = parse_arguments(sys.argv if argv is None else argv)
pipeline_options = get_pipeline_options(project=args.project_id,
                                        job_name='XX',
                                        num_workers=args.workers,
                                        max_num_workers=MAX_NUM_WORKERS,
                                        disk_size_gb=DISK_SIZE_GB,
                                        local=args.local,
                                        streaming=args.streaming)

pipeline = beam.Pipeline(options=pipeline_options)

# Build pipeline
# pylint: disable=C0330
if args.streaming:
    frames = (pipeline | 'ReadFromPubsub' >> beam.io.ReadFromPubSub(
        subscription=SUBSCRIPTION_PATH,
        with_attributes=True,
        timestamp_attribute='timestamp'
    ))

    frame_tpl = frames | 'CreateFrameTuples' >> beam.Map(
        create_frame_tuples_fn)

crops = frame_tpl | 'MakeCrops' >> beam.Map(make_crops_fn, NR_CROPS)
bboxs = crops | 'bounding boxes tfserv' >> beam.Map(
    pred_bbox_tfserv_fn, SERVER_URL)

sliding_windows = bboxs | 'Window' >> beam.WindowInto( …
Run Code Online (Sandbox Code Playgroud)

google-cloud-dataflow apache-beam

14
推荐指数
1
解决办法
363
查看次数

安排Google Cloud Dataflow作业的最简便方法

我只需要每天运行一个数据流管道,但在我看来,建议像App Engine Cron Service这样需要构建整个Web应用程序的解决方案似乎有点太多了.我正在考虑从Compute Engine Linux VM中的cron作业运行管道,但这可能太简单了:).这样做有什么问题,为什么不是任何人(除了我,我猜)建议它?

google-cloud-dataflow

13
推荐指数
3
解决办法
2468
查看次数

如何将CSV文件导入BigQuery表而不使用任何列名或模式?

我目前正在编写一个Java实用程序,用于将几个CSV文件从GCS导入BigQuery.我可以很容易地实现这一点bq load,但我想使用Dataflow作业来实现.所以我使用Dataflow的Pipeline和ParDo转换器(返回TableRow将它应用于BigQueryIO),我已经为转换创建了StringToRowConverter().这里实际问题开始了 - 我被迫为目标表指定模式,虽然我不想创建一个新表,如果它不存在 - 只是尝试加载数据.所以我不想手动设置TableRow的列名,因为我有大约600列.

public class StringToRowConverter extends DoFn<String, TableRow> {

private static Logger logger = LoggerFactory.getLogger(StringToRowConverter.class);

public void processElement(ProcessContext c) {
    TableRow row = new TableRow();          
    row.set("DO NOT KNOW THE COLUMN NAME", c.element());
    c.output(row);
}
}
Run Code Online (Sandbox Code Playgroud)

此外,假设该表已存在于BigQuery数据集中,我不需要创建它,并且CSV文件也包含正确顺序的列.

如果此方案没有解决方法,并且数据加载需要列名,那么我可以将其放在CSV文件的第一行中.

任何帮助将不胜感激.

java csv google-bigquery google-cloud-dataflow

13
推荐指数
1
解决办法
793
查看次数

如何删除gcloud Dataflow作业?

Dataflow作业在我的仪表板上杂乱无章,我想从我的项目中删除失败的作业.但是在仪表板中,我没有看到任何删除Dataflow作业的选项.我至少在寻找类似下面的东西,

$ gcloud beta dataflow jobs delete JOB_ID
Run Code Online (Sandbox Code Playgroud)

要删除所有工作,

$ gcloud beta dataflow jobs delete
Run Code Online (Sandbox Code Playgroud)

有人可以帮我这个吗?

google-cloud-dataflow

13
推荐指数
1
解决办法
5778
查看次数