小编Ken*_*les的帖子

数据流映射侧输入问题

我在使用 DataflowRunner 创建 Map PCollectionView 时遇到问题。

下面的管道将 unbouded 计数输入与来自侧输入的值(包含 10 个生成的值)聚合在一起。在 gcp 上运行管道时,它会卡在 View.asMap() 转换中。更具体地说, ParDo(StreamingPCollectionViewWriter) 没有任何输出。

我用 dataflow 2.0.0-beta3 和 beam-0.7.0-SNAPSHOT 尝试了这个,但没有任何结果。请注意,使用本地 DirectRunner 时,我的管道运行没有任何问题。

难道我做错了什么?感谢所有帮助,在此先感谢您帮助我!

public class SimpleSideInputPipeline {

    private static final Logger LOG = LoggerFactory.getLogger(SimpleSideInputPipeline.class);

    public interface Options extends DataflowPipelineOptions {}

    public static void main(String[] args) throws IOException {
        Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
        Pipeline pipeline = Pipeline.create(options);

        final PCollectionView<Map<Integer, String>> sideInput = pipeline
                .apply(CountingInput.forSubrange(0L, 10L))
                .apply("Create KV<Integer, String>",ParDo.of(new DoFn<Long, KV<Integer, String>>() {
                    @ProcessElement
                    public void processElement(ProcessContext c) { …
Run Code Online (Sandbox Code Playgroud)

google-cloud-dataflow apache-beam

5
推荐指数
1
解决办法
1641
查看次数

How to solve Duplicate values exception when I create PCollectionView&lt;Map&lt;String,String&gt;&gt;

I'm setting up a slow-changing lookup Map in my Apache-Beam pipeline. It continuously updates the lookup map. For each key in lookup map, I retrieve the latest value in the global window with accumulating mode. But it always meets Exception :

org.apache.beam.sdk.Pipeline$PipelineExecutionException: java.lang.IllegalArgumentException: Duplicate values for mykey

Is anything wrong with this snippet code?

If I use .discardingFiredPanes() instead, I will lose information in the last emit.

pipeline
  .apply(GenerateSequence.from(0).withRate(1, Duration.standardMinutes(1L)))
  .apply(
      Window.<Long>into(new GlobalWindows())
         .triggering(Repeatedly.forever(
             AfterProcessingTime.pastFirstElementInPane()))
         .accumulatingFiredPanes())
  .apply(new ReadSlowChangingTable())
  .apply(Latest.perKey())
  .apply(View.asMap()); …
Run Code Online (Sandbox Code Playgroud)

dataflow google-cloud-dataflow apache-beam apache-beam-io

5
推荐指数
1
解决办法
1332
查看次数

Apache Beam:为什么全局窗口9223371950454775中的聚合值的时间戳?

我们从Google Dataflow 1.9迁移到Apache Beam 0.6.应用globalwindow后,我们注意到行为更改为时间戳.在Google Dataflow 1.9中,我们会在窗口化/合并功能后在DoFn中获得正确的时间戳.现在我们为时间戳获得了一些巨大的价值,例如9223371950454775,全局窗口的默认行为是否在Apache Beam版本中发生了变化?

input.apply(name(id, "Assign To Shard"), ParDo.of(new AssignToTest()))
      .apply(name(id, "Window"), Window
          .<KV<Long, ObjectNode >>into(new GlobalWindows())
          .triggering(Repeatedly.forever(
              AfterProcessingTime
                  .pastFirstElementInPane()
                  .plusDelayOf(Duration.standardMinutes(1))))
          .discardingFiredPanes())
      .apply(name(id, "Group By Shard"), GroupByKey.create())
      .appy(.....) }
Run Code Online (Sandbox Code Playgroud)

google-cloud-dataflow apache-beam

4
推荐指数
1
解决办法
798
查看次数

数据流错误:“客户端具有非平凡的本地状态和不可选择的状态”

我有一个可以在本地执行而没有任何错误的管道。我曾经在本地运行的管道中遇到此错误

    'Clients have non-trivial state that is local and unpickleable.'
     PicklingError: Pickling client objects is explicitly not supported.
Run Code Online (Sandbox Code Playgroud)

我相信我通过降级到 apache-beam=2.3.0 解决了这个问题,然后在本地它会完美运行。

现在我正在使用DataflowRunner 并且在 requirements.txt 文件中我有以下依赖项

    apache-beam==2.3.0
    google-cloud-bigquery==1.1.0
    google-cloud-core==0.28.1
    google-cloud-datastore==1.6.0
    google-cloud-storage==1.10.0
    protobuf==3.5.2.post1
    pytz==2013.7
Run Code Online (Sandbox Code Playgroud)

但我又犯了这个可怕的错误

    'Clients have non-trivial state that is local and unpickleable.'
     PicklingError: Pickling client objects is explicitly not supported.
Run Code Online (Sandbox Code Playgroud)

为什么它给我 DataflowRunner 的错误而不是 DirectRunner?他们不应该使用相同的依赖项/环境吗?任何帮助,将不胜感激。

我读过这是解决它的方法,但是当我尝试它时,我仍然遇到相同的错误

    class MyDoFn(beam.DoFn):

        def start_bundle(self, process_context):
            self._dsclient = datastore.Client()

        def process(self, context, *args, **kwargs):
        # do stuff with self._dsclient
Run Code Online (Sandbox Code Playgroud)

来自https://github.com/GoogleCloudPlatform/google-cloud-python/issues/3191

我之前在本地修复此问题的参考帖子:

在 apache-beam 作业中使用 …

python pickle google-cloud-dataflow apache-beam

3
推荐指数
1
解决办法
3326
查看次数

控制 Apache Beam Dataflow 管道中的并行性

我们正在尝试使用 Apache Beam(使用 Go SDK)和 Dataflow 来并行化一项耗时的任务。对于更多上下文,我们有缓存作业,它接受一些查询,跨数据库运行它并缓存它们。每个数据库查询可能需要几秒钟到几分钟的时间,我们希望并行运行这些查询以更快地完成任务。

创建了一个简单的管道,如下所示:

    // Create initial PCollection.
    startLoad := beam.Create(s, "InitialLoadToStartPipeline")

    // Emits a unit of work along with query and date range.
    cachePayloads := beam.ParDo(s, &getCachePayloadsFn{Config: config}, startLoad)

    // Emits a cache response which includes errCode, errMsg, time etc.
    cacheResponses := beam.ParDo(s, &cacheQueryDoFn{Config: config}, cachePayloads)

    ...
Run Code Online (Sandbox Code Playgroud)

排放的数量getCachePayloadsFn并不多,生产时大多为数百,最多可达数千。

现在的问题是cacheQueryDoFn不是并行执行,查询是逐个顺序执行的。我们通过在缓存函数中放入日志StartBundleProcessElement记录 goroutine id、进程 id、开始和结束时间等来确认这一点,以确认执行中没有重叠。

即使只有 10 个查询,我们也希望始终并行运行查询。根据我们的理解和文档,它根据整体输入创建捆绑包,这些捆绑包并行运行,并且在捆绑包内按顺序运行。有没有办法控制负载中的包数量或增加并行性?

我们尝试过的事情:

  1. 保持num_workers=2autoscaling_algorithm=None. 它启动两个虚拟机,但Setup仅在一个虚拟机上运行初始化 DoFn 的方法,并将其用于整个负载。
  2. 在这里 …

go google-cloud-dataflow apache-beam

3
推荐指数
1
解决办法
5072
查看次数

启动 Bigquery 作业的数据流作业间歇性失败并出现错误“错误”:[ {“消息”:“已经存在:作业

我每 6 分钟安排一个谷歌云数据流作业(使用 apache beam python sdk),它在内部从 Big Query Table 读取,进行一些转换并写入另一个 Big Query 表。此作业已开始间歇性失败(大约 10 次中的 4 次)并显示以下错误跟踪。

2021-02-17 14:51:18.146 ISTError message from worker: Traceback (most recent call last):
  File "/usr/local/lib/python3.8/site-packages/dataflow_worker/batchworker.py", line 649, in do_work
    work_executor.execute()
  File "/usr/local/lib/python3.8/site-packages/dataflow_worker/executor.py", line 225, in execute
    self.response = self._perform_source_split_considering_api_limits(
  File "/usr/local/lib/python3.8/site-packages/dataflow_worker/executor.py", line 233, in _perform_source_split_considering_api_limits
    split_response = self._perform_source_split(source_operation_split_task,
  File "/usr/local/lib/python3.8/site-packages/dataflow_worker/executor.py", line 271, in _perform_source_split
    for split in source.split(desired_bundle_size):
  File "/usr/local/lib/python3.8/site-packages/apache_beam/io/gcp/bigquery.py", line 807, in split
    self.table_reference = self._execute_query(bq)
  File "/usr/local/lib/python3.8/site-packages/apache_beam/options/value_provider.py", line 135, in _f
    return …
Run Code Online (Sandbox Code Playgroud)

python-3.x google-bigquery google-cloud-dataflow apache-beam

3
推荐指数
1
解决办法
139
查看次数

非 KV 元素的 GroupIntoBatches

根据Apache Beam 2.0.0 SDK 文档 GroupIntoBatches仅适用于KV集合。

我的数据集只包含值,不需要引入键。然而,为了使用GroupIntoBatches我必须用一个空字符串作为键来实现“假”键:

static class FakeKVFn extends DoFn<String, KV<String, String>> {
  @ProcessElement
  public void processElement(ProcessContext c) {
    c.output(KV.of("", c.element()));
  }
}
Run Code Online (Sandbox Code Playgroud)

所以整体管道如下所示:

public static void main(String[] args) {
  PipelineOptions options = PipelineOptionsFactory.create();
  Pipeline p = Pipeline.create(options);

  long batchSize = 100L;

  p.apply("ReadLines", TextIO.read().from("./input.txt"))
      .apply("FakeKV", ParDo.of(new FakeKVFn()))
      .apply(GroupIntoBatches.<String, String>ofSize(batchSize))
      .setCoder(KvCoder.of(StringUtf8Coder.of(), IterableCoder.of(StringUtf8Coder.of())))
      .apply(ParDo.of(new DoFn<KV<String, Iterable<String>>, String>() {
        @ProcessElement
        public void processElement(ProcessContext c) {
          c.output(callWebService(c.element().getValue()));
        }
      }))
      .apply("WriteResults", TextIO.write().to("./output/"));

  p.run().waitUntilFinish();
}
Run Code Online (Sandbox Code Playgroud)

有没有办法在不引入“假”密钥的情况下分组?

apache-beam

1
推荐指数
1
解决办法
2314
查看次数

使用 Apache Beam Python `WriteToFiles` 转换每个窗口只写一个文件

需要一些帮助。我有一些从 Pub/Sub 读取并写入 GCS 中的批处理文件的琐碎任务,但是在使用 fileio.WriteToFiles 时遇到了一些困难

with beam.Pipeline(options=pipeline_options) as p:
  input = (p | 'ReadData' >> beam.io.ReadFromPubSub(topic=known_args.input_topic).with_output_types(bytes)
             | "Decode" >> beam.Map(lambda x: x.decode('utf-8'))
             | 'Parse' >> beam.Map(parse_json)
             | ' data w' >> beam.WindowInto(
                 FixedWindows(60),
                 accumulation_mode=AccumulationMode.DISCARDING
             ))

  event_data = (input
             | 'filter events' >> beam.Filter(lambda x: x['t'] == 'event')
             | 'encode et' >> beam.Map(lambda x: json.dumps(x))
             | 'write events to file' >> fileio.WriteToFiles(
                    path='gs://extention/ga_analytics/events/', shards=0))
Run Code Online (Sandbox Code Playgroud)

窗口触发后我需要一个文件,但文件数等于来自 Pubsub 的消息数,有人可以帮助我吗? 当前输出文件, 但我只需要一个文件。

python google-cloud-dataflow apache-beam

1
推荐指数
1
解决办法
613
查看次数