我在使用 DataflowRunner 创建 Map PCollectionView 时遇到问题。
下面的管道将 unbouded 计数输入与来自侧输入的值(包含 10 个生成的值)聚合在一起。在 gcp 上运行管道时,它会卡在 View.asMap() 转换中。更具体地说, ParDo(StreamingPCollectionViewWriter) 没有任何输出。
我用 dataflow 2.0.0-beta3 和 beam-0.7.0-SNAPSHOT 尝试了这个,但没有任何结果。请注意,使用本地 DirectRunner 时,我的管道运行没有任何问题。
难道我做错了什么?感谢所有帮助,在此先感谢您帮助我!
public class SimpleSideInputPipeline {
private static final Logger LOG = LoggerFactory.getLogger(SimpleSideInputPipeline.class);
public interface Options extends DataflowPipelineOptions {}
public static void main(String[] args) throws IOException {
Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
Pipeline pipeline = Pipeline.create(options);
final PCollectionView<Map<Integer, String>> sideInput = pipeline
.apply(CountingInput.forSubrange(0L, 10L))
.apply("Create KV<Integer, String>",ParDo.of(new DoFn<Long, KV<Integer, String>>() {
@ProcessElement
public void processElement(ProcessContext c) { …Run Code Online (Sandbox Code Playgroud) I'm setting up a slow-changing lookup Map in my Apache-Beam pipeline. It continuously updates the lookup map. For each key in lookup map, I retrieve the latest value in the global window with accumulating mode. But it always meets Exception :
org.apache.beam.sdk.Pipeline$PipelineExecutionException: java.lang.IllegalArgumentException: Duplicate values for mykey
Is anything wrong with this snippet code?
If I use .discardingFiredPanes() instead, I will lose information in the last emit.
pipeline
.apply(GenerateSequence.from(0).withRate(1, Duration.standardMinutes(1L)))
.apply(
Window.<Long>into(new GlobalWindows())
.triggering(Repeatedly.forever(
AfterProcessingTime.pastFirstElementInPane()))
.accumulatingFiredPanes())
.apply(new ReadSlowChangingTable())
.apply(Latest.perKey())
.apply(View.asMap()); …Run Code Online (Sandbox Code Playgroud) 我们从Google Dataflow 1.9迁移到Apache Beam 0.6.应用globalwindow后,我们注意到行为更改为时间戳.在Google Dataflow 1.9中,我们会在窗口化/合并功能后在DoFn中获得正确的时间戳.现在我们为时间戳获得了一些巨大的价值,例如9223371950454775,全局窗口的默认行为是否在Apache Beam版本中发生了变化?
input.apply(name(id, "Assign To Shard"), ParDo.of(new AssignToTest()))
.apply(name(id, "Window"), Window
.<KV<Long, ObjectNode >>into(new GlobalWindows())
.triggering(Repeatedly.forever(
AfterProcessingTime
.pastFirstElementInPane()
.plusDelayOf(Duration.standardMinutes(1))))
.discardingFiredPanes())
.apply(name(id, "Group By Shard"), GroupByKey.create())
.appy(.....) }
Run Code Online (Sandbox Code Playgroud) 我有一个可以在本地执行而没有任何错误的管道。我曾经在本地运行的管道中遇到此错误
'Clients have non-trivial state that is local and unpickleable.'
PicklingError: Pickling client objects is explicitly not supported.
Run Code Online (Sandbox Code Playgroud)
我相信我通过降级到 apache-beam=2.3.0 解决了这个问题,然后在本地它会完美运行。
现在我正在使用DataflowRunner 并且在 requirements.txt 文件中我有以下依赖项
apache-beam==2.3.0
google-cloud-bigquery==1.1.0
google-cloud-core==0.28.1
google-cloud-datastore==1.6.0
google-cloud-storage==1.10.0
protobuf==3.5.2.post1
pytz==2013.7
Run Code Online (Sandbox Code Playgroud)
但我又犯了这个可怕的错误
'Clients have non-trivial state that is local and unpickleable.'
PicklingError: Pickling client objects is explicitly not supported.
Run Code Online (Sandbox Code Playgroud)
为什么它给我 DataflowRunner 的错误而不是 DirectRunner?他们不应该使用相同的依赖项/环境吗?任何帮助,将不胜感激。
我读过这是解决它的方法,但是当我尝试它时,我仍然遇到相同的错误
class MyDoFn(beam.DoFn):
def start_bundle(self, process_context):
self._dsclient = datastore.Client()
def process(self, context, *args, **kwargs):
# do stuff with self._dsclient
Run Code Online (Sandbox Code Playgroud)
来自https://github.com/GoogleCloudPlatform/google-cloud-python/issues/3191
我之前在本地修复此问题的参考帖子:
我们正在尝试使用 Apache Beam(使用 Go SDK)和 Dataflow 来并行化一项耗时的任务。对于更多上下文,我们有缓存作业,它接受一些查询,跨数据库运行它并缓存它们。每个数据库查询可能需要几秒钟到几分钟的时间,我们希望并行运行这些查询以更快地完成任务。
创建了一个简单的管道,如下所示:
// Create initial PCollection.
startLoad := beam.Create(s, "InitialLoadToStartPipeline")
// Emits a unit of work along with query and date range.
cachePayloads := beam.ParDo(s, &getCachePayloadsFn{Config: config}, startLoad)
// Emits a cache response which includes errCode, errMsg, time etc.
cacheResponses := beam.ParDo(s, &cacheQueryDoFn{Config: config}, cachePayloads)
...
Run Code Online (Sandbox Code Playgroud)
排放的数量getCachePayloadsFn并不多,生产时大多为数百,最多可达数千。
现在的问题是cacheQueryDoFn不是并行执行,查询是逐个顺序执行的。我们通过在缓存函数中放入日志StartBundle并ProcessElement记录 goroutine id、进程 id、开始和结束时间等来确认这一点,以确认执行中没有重叠。
即使只有 10 个查询,我们也希望始终并行运行查询。根据我们的理解和文档,它根据整体输入创建捆绑包,这些捆绑包并行运行,并且在捆绑包内按顺序运行。有没有办法控制负载中的包数量或增加并行性?
我们尝试过的事情:
num_workers=2和autoscaling_algorithm=None. 它启动两个虚拟机,但Setup仅在一个虚拟机上运行初始化 DoFn 的方法,并将其用于整个负载。我每 6 分钟安排一个谷歌云数据流作业(使用 apache beam python sdk),它在内部从 Big Query Table 读取,进行一些转换并写入另一个 Big Query 表。此作业已开始间歇性失败(大约 10 次中的 4 次)并显示以下错误跟踪。
2021-02-17 14:51:18.146 ISTError message from worker: Traceback (most recent call last):
File "/usr/local/lib/python3.8/site-packages/dataflow_worker/batchworker.py", line 649, in do_work
work_executor.execute()
File "/usr/local/lib/python3.8/site-packages/dataflow_worker/executor.py", line 225, in execute
self.response = self._perform_source_split_considering_api_limits(
File "/usr/local/lib/python3.8/site-packages/dataflow_worker/executor.py", line 233, in _perform_source_split_considering_api_limits
split_response = self._perform_source_split(source_operation_split_task,
File "/usr/local/lib/python3.8/site-packages/dataflow_worker/executor.py", line 271, in _perform_source_split
for split in source.split(desired_bundle_size):
File "/usr/local/lib/python3.8/site-packages/apache_beam/io/gcp/bigquery.py", line 807, in split
self.table_reference = self._execute_query(bq)
File "/usr/local/lib/python3.8/site-packages/apache_beam/options/value_provider.py", line 135, in _f
return …Run Code Online (Sandbox Code Playgroud) python-3.x google-bigquery google-cloud-dataflow apache-beam
根据Apache Beam 2.0.0 SDK 文档 GroupIntoBatches仅适用于KV集合。
我的数据集只包含值,不需要引入键。然而,为了使用GroupIntoBatches我必须用一个空字符串作为键来实现“假”键:
static class FakeKVFn extends DoFn<String, KV<String, String>> {
@ProcessElement
public void processElement(ProcessContext c) {
c.output(KV.of("", c.element()));
}
}
Run Code Online (Sandbox Code Playgroud)
所以整体管道如下所示:
public static void main(String[] args) {
PipelineOptions options = PipelineOptionsFactory.create();
Pipeline p = Pipeline.create(options);
long batchSize = 100L;
p.apply("ReadLines", TextIO.read().from("./input.txt"))
.apply("FakeKV", ParDo.of(new FakeKVFn()))
.apply(GroupIntoBatches.<String, String>ofSize(batchSize))
.setCoder(KvCoder.of(StringUtf8Coder.of(), IterableCoder.of(StringUtf8Coder.of())))
.apply(ParDo.of(new DoFn<KV<String, Iterable<String>>, String>() {
@ProcessElement
public void processElement(ProcessContext c) {
c.output(callWebService(c.element().getValue()));
}
}))
.apply("WriteResults", TextIO.write().to("./output/"));
p.run().waitUntilFinish();
}
Run Code Online (Sandbox Code Playgroud)
有没有办法在不引入“假”密钥的情况下分组?
需要一些帮助。我有一些从 Pub/Sub 读取并写入 GCS 中的批处理文件的琐碎任务,但是在使用 fileio.WriteToFiles 时遇到了一些困难
with beam.Pipeline(options=pipeline_options) as p:
input = (p | 'ReadData' >> beam.io.ReadFromPubSub(topic=known_args.input_topic).with_output_types(bytes)
| "Decode" >> beam.Map(lambda x: x.decode('utf-8'))
| 'Parse' >> beam.Map(parse_json)
| ' data w' >> beam.WindowInto(
FixedWindows(60),
accumulation_mode=AccumulationMode.DISCARDING
))
event_data = (input
| 'filter events' >> beam.Filter(lambda x: x['t'] == 'event')
| 'encode et' >> beam.Map(lambda x: json.dumps(x))
| 'write events to file' >> fileio.WriteToFiles(
path='gs://extention/ga_analytics/events/', shards=0))
Run Code Online (Sandbox Code Playgroud)
窗口触发后我需要一个文件,但文件数等于来自 Pubsub 的消息数,有人可以帮助我吗? 当前输出文件, 但我只需要一个文件。