我正在使用Google Data Flow来实现ETL数据仓库解决方案.
看看谷歌云产品,似乎DataProc也可以做同样的事情.
DataProc似乎比DataFlow便宜一点.
有没有人知道DataFlow上DataFlow的优缺点
为什么谷歌同时提供?
我想了解在哪种情况下我应该使用FlatMap或Map.我的文档似乎并不清楚.
我仍然不明白在哪种情况下我应该使用FlatMap或Map的转换.
有人能给我一个例子,这样我才能理解他们的不同之处吗?
我理解Spark中FlatMap与Map的区别,但不确定是否有相似之处?
我正在调查Google Dataflow和Apache Spark,以确定哪个更适合我们的bigdata分析业务需求.
我发现有Spark SQL和MLlib火花平台上做结构化数据查询和机器学习.
我想知道Google Dataflow平台中是否有相应的解决方案?
distributed-computing bigdata apache-spark google-cloud-dataflow google-cloud-ml
我已经为 100 GB 大小的 522 个 gzip 文件运行了以下代码,解压缩后,它将是大约 320 GB 数据和 protobuf 格式的数据,并将输出写入 GCS。我已经使用 n1 标准机器和区域进行输入,输出都得到了照顾,工作花费了我大约 17 美元,这是半小时的数据,所以我真的很需要在这里做一些成本优化。
我从下面的查询中得到的成本
SELECT l.value AS JobID, ROUND(SUM(cost),3) AS JobCost
FROM `PROJECT.gcp_billing_data.gcp_billing_export_v1_{}` bill,
UNNEST(bill.labels) l
WHERE service.description = 'Cloud Dataflow' and l.key = 'goog-dataflow-job-id' and
extract(date from _PARTITIONTIME) > "2020-12-31"
GROUP BY 1
Run Code Online (Sandbox Code Playgroud)
完整代码
import time
import sys
import argparse
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
import csv
import base64
from google.protobuf import timestamp_pb2
from google.protobuf.json_format import MessageToDict …Run Code Online (Sandbox Code Playgroud) python protocol-buffers avro google-cloud-dataflow apache-beam
我正在尝试使用Google Cloud Dataflow将Google PubSub消息写入Google云端存储.我知道TextIO/AvroIO不支持流媒体管道.但是,我在[1]中读到,可以ParDo/DoFn通过作者的评论在流式传输管道中写入GCS .我尽可能地按照他们的文章构建了一条管道.
我的目标是这种行为:
dataflow-requests/[isodate-time]/[paneIndex].我得到了不同的结果:
我如何解决这些问题并获得我期待的行为?
示例日志输出:
21:30:06.977 writing pane 0 to blob dataflow-requests/2016-04-08T20:59:59.999Z/0
21:30:06.977 writing pane 0 to blob dataflow-requests/2016-04-08T20:59:59.999Z/0
21:30:07.773 sucessfully write pane 0 to blob dataflow-requests/2016-04-08T20:59:59.999Z/0
21:30:07.846 sucessfully write pane 0 to blob dataflow-requests/2016-04-08T20:59:59.999Z/0
21:30:07.847 writing pane 0 to blob dataflow-requests/2016-04-08T20:59:59.999Z/0
Run Code Online (Sandbox Code Playgroud)
这是我的代码:
package com.example.dataflow;
import com.google.cloud.dataflow.sdk.Pipeline;
import com.google.cloud.dataflow.sdk.io.PubsubIO;
import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions;
import com.google.cloud.dataflow.sdk.options.PipelineOptions;
import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
import com.google.cloud.dataflow.sdk.transforms.DoFn;
import com.google.cloud.dataflow.sdk.transforms.ParDo;
import com.google.cloud.dataflow.sdk.transforms.windowing.*;
import com.google.cloud.dataflow.sdk.values.PCollection;
import com.google.gcloud.storage.BlobId;
import com.google.gcloud.storage.BlobInfo;
import …Run Code Online (Sandbox Code Playgroud) google-cloud-storage google-cloud-pubsub google-cloud-dataflow
我们在Google Kubernetes Engine上托管了NodeJS API,我们想开始将事件记录到BigQuery中.
我可以看到3种不同的方法:
对于这个特定的用例,我们不需要进行任何转换,只是将事件直接发送到正确的格式.但是我们稍后可能会有其他用例,我们需要将主数据存储区(MySQL)中的表同步到BQ进行分析,所以可能马上从Dataflow开始值得吗?
几个问题 :
我正在尝试在python中设置数据流流管道.我对批处理管道有很多经验.我们的基本架构如下所示:

第一步是进行一些基本处理,每条消息大约需要2秒才能进入窗口.我们使用3秒和3秒间隔的滑动窗口(可能会稍后更改,因此我们有重叠的窗口).作为最后一步,我们有SOG预测需要大约15秒的时间来处理,这显然是我们的瓶颈变换.
因此,我们似乎面临的问题是,在窗口化之前工作负载完全分布在我们的工作者上,但最重要的转换根本不是分布式的.所有的窗户一次一个地处理,看似1个工人,而我们有50个可用.
日志告诉我们,sog预测步骤每15秒输出一次,如果窗口将被更多的工作人员处理,则不应该是这种情况,因此这会在一段时间内产生巨大的延迟,这是我们不想要的.使用1分钟的消息,最后一个窗口的延迟为5分钟.当分配工作时,这应该只有大约15秒(SOG预测时间).所以在这一点上我们是无能为力的..
有没有人看到我们的代码是否有问题或如何防止/规避这个?似乎这是谷歌云数据流内部发生的事情.这是否也出现在Java流媒体管道中?
在批处理模式下,一切正常.在那里,人们可以尝试进行重新洗牌以确保不会发生融合等.但是在流媒体窗口化之后,这是不可能的.
args = parse_arguments(sys.argv if argv is None else argv)
pipeline_options = get_pipeline_options(project=args.project_id,
job_name='XX',
num_workers=args.workers,
max_num_workers=MAX_NUM_WORKERS,
disk_size_gb=DISK_SIZE_GB,
local=args.local,
streaming=args.streaming)
pipeline = beam.Pipeline(options=pipeline_options)
# Build pipeline
# pylint: disable=C0330
if args.streaming:
frames = (pipeline | 'ReadFromPubsub' >> beam.io.ReadFromPubSub(
subscription=SUBSCRIPTION_PATH,
with_attributes=True,
timestamp_attribute='timestamp'
))
frame_tpl = frames | 'CreateFrameTuples' >> beam.Map(
create_frame_tuples_fn)
crops = frame_tpl | 'MakeCrops' >> beam.Map(make_crops_fn, NR_CROPS)
bboxs = crops | 'bounding boxes tfserv' >> beam.Map(
pred_bbox_tfserv_fn, SERVER_URL)
sliding_windows = bboxs | 'Window' >> beam.WindowInto( …Run Code Online (Sandbox Code Playgroud) 我只需要每天运行一个数据流管道,但在我看来,建议像App Engine Cron Service这样需要构建整个Web应用程序的解决方案似乎有点太多了.我正在考虑从Compute Engine Linux VM中的cron作业运行管道,但这可能太简单了:).这样做有什么问题,为什么不是任何人(除了我,我猜)建议它?
我目前正在编写一个Java实用程序,用于将几个CSV文件从GCS导入BigQuery.我可以很容易地实现这一点bq load,但我想使用Dataflow作业来实现.所以我使用Dataflow的Pipeline和ParDo转换器(返回TableRow将它应用于BigQueryIO),我已经为转换创建了StringToRowConverter().这里实际问题开始了 - 我被迫为目标表指定模式,虽然我不想创建一个新表,如果它不存在 - 只是尝试加载数据.所以我不想手动设置TableRow的列名,因为我有大约600列.
public class StringToRowConverter extends DoFn<String, TableRow> {
private static Logger logger = LoggerFactory.getLogger(StringToRowConverter.class);
public void processElement(ProcessContext c) {
TableRow row = new TableRow();
row.set("DO NOT KNOW THE COLUMN NAME", c.element());
c.output(row);
}
}
Run Code Online (Sandbox Code Playgroud)
此外,假设该表已存在于BigQuery数据集中,我不需要创建它,并且CSV文件也包含正确顺序的列.
如果此方案没有解决方法,并且数据加载需要列名,那么我可以将其放在CSV文件的第一行中.
任何帮助将不胜感激.
Dataflow作业在我的仪表板上杂乱无章,我想从我的项目中删除失败的作业.但是在仪表板中,我没有看到任何删除Dataflow作业的选项.我至少在寻找类似下面的东西,
$ gcloud beta dataflow jobs delete JOB_ID
Run Code Online (Sandbox Code Playgroud)
要删除所有工作,
$ gcloud beta dataflow jobs delete
Run Code Online (Sandbox Code Playgroud)
有人可以帮我这个吗?