标签: dataflow

TPL 数据流:在保持秩序的同时设计并行性

我以前从未使用过 TPL,所以我想知道是否可以用它来完成:我的应用程序从很多帧创建了一个 gif 图像动画文件。我从一个表示 gif 文件帧的 Bitmap 列表开始,需要对每一帧执行以下操作:

  1. 在框架上绘制一些文本/位图
  2. 裁剪框架
  3. 调整框架大小
  4. 将图像减少到 256 色

显然,这个过程可以对列表中的所有帧并行完成,但对于每个帧,步骤的顺序需要相同。之后,我需要将所有帧写入 gif 文件。因此,所有帧都需要按照它们在原始列表中的相同顺序接收。最重要的是,这个过程可以在第一帧准备好时开始,不需要等到所有帧都处理完。

所以情况就是这样。TPL Dataflow 适合这个吗?如果是的话,谁能给我一个关于如何设计 tpl 块结构以反映上述过程的正确方向的提示?与我发现的一些样本相比,这对我来说似乎相当复杂。

c# asynchronous dataflow task-parallel-library tpl-dataflow

5
推荐指数
1
解决办法
1374
查看次数

TPL 数据流块永远不会在 PropagateCompletion 上完成

自从对我的传播完成管道进行了最后一次更改后,我的一个缓冲区块从未完成。让我总结一下什么是有效的,什么不再是:

以前工作:

A.LinkTo(B, PropagateCompletion);
B.LinkTo(C, PropagateCompletion);
C.LinkTo(D, PropagateCompletion);
D.Receive();

// everything completes
Run Code Online (Sandbox Code Playgroud)

不再工作:

A.LinkTo(B, PropagateCompletion);
C.LinkTo(D, PropagateCompletion);

await A.Completion;
someWriteOnceBlock.Post(B.Count);
// B.Complete(); commented on purpose
B.LinkTo(C, PropagateCompletion);

D.Receive();

// Only A reaches completion
// B remains in 'waiting for activation'
// C executes but obviously never completes since B doesn't either
Run Code Online (Sandbox Code Playgroud)

如果我取消注释注释行,一切正常,但显然该行不是必需的。

不知何故,我的 BufferBlock B 永远不会完成,即使链接它的块已完成并传播其完成,并且它链接的块接收所有缓冲项。

.net c# dataflow task-parallel-library tpl-dataflow

5
推荐指数
1
解决办法
795
查看次数

在 python Apache Beam 中打开一个 gzip 文件

目前是否可以使用 Apache Beam 从 python 中的 gzip 文件中读取?我的管道正在使用以下代码行从 gcs 中提取 gzip 文件:

beam.io.Read(beam.io.TextFileSource('gs://bucket/file.gz', compression_type='GZIP')) 
Run Code Online (Sandbox Code Playgroud)

但我收到此错误:

UnicodeDecodeError: 'utf8' codec can't decode byte 0x8b in position 1: invalid start byte
Run Code Online (Sandbox Code Playgroud)

我们在 python beam 源代码中注意到,在写入接收器时似乎处理了压缩文件。 https://github.com/apache/incubator-beam/blob/python-sdk/sdks/python/apache_beam/io/fileio.py#L445

更详细的追溯:

Traceback (most recent call last):
  File "beam-playground.py", line 11, in <module>
    p.run() 
  File "/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/apache_beam/pipeline.py", line 159, in run
    return self.runner.run(self)
  File "/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/apache_beam/runners/direct_runner.py", line 103, in run
    super(DirectPipelineRunner, self).run(pipeline)
  File "/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/apache_beam/runners/runner.py", line 98, in run
    pipeline.visit(RunVisitor(self))
  File "/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/apache_beam/pipeline.py", line 182, in visit
    self._root_transform().visit(visitor, self, visited)
  File "/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/apache_beam/pipeline.py", …
Run Code Online (Sandbox Code Playgroud)

python dataflow google-cloud-dataflow apache-beam

5
推荐指数
1
解决办法
3360
查看次数

在 Flink 中的操作员之间共享状态

我想知道在 Flink 中是否可以在运营商之间共享状态。

例如,假设我在操作符上按键进行分区,并且我需要分A区内的一段分区状态C(出于任何原因)(图 1.a),或者我需要C下游操作符中的操作符状态F(图 1 .b)

在此处输入图片说明

我知道可以broadcast记录到所有分区。因此,如果您在记录中包含操作符的内部状态,则可以与下游操作符共享您的内部状态。
然而,这可能是一个昂贵的操作,而不是简单地让op1具体请求op2状态。

最近围绕可查询状态的发展是否朝着这个概念发展,或者它们只是为了让外部用户查询拓扑的内部状态?

预先感谢您的见解

dataflow stream-processing apache-flink

5
推荐指数
1
解决办法
1498
查看次数

如何从源代码为任何应用程序创建数据流图 (DFG/SDFG)

我进行了大量研究,以弄清楚如何从应用程序的源代码为应用程序创建 DFG。对于某些应用程序,例如 MP3 解码器、JPEG 压缩和 H.263 解码器,可以在线使用 DFG。

我无法弄清楚如何从源代码中为 HEVC 等应用程序创建 DFG?是否有任何工具可以为此类复杂的应用程序立即生成数据流图,还是必须手动完成?

请就此事给我建议。

编辑:我将 Doxygen 用于 HEVC,我可以看到不同的功能如何相互交互。然而,每个函数都有许多入口和出口点,一段时间后 Doxygen 的输出变得太混乱而无法理解。

我还看了 StreamIt:http ://camlunity.ru/swap/Library/Conflux/Stream%20Programming/streamit-cc_stream_graph_programming_language.pdf

它看起来很方便,但它为更简单的应用程序(如 MP3 解码器)生成的图表太复杂了。为了生成连贯的 DFG,我是否必须重新编写整个源代码?

algorithm parallel-processing open-source dataflow data-structures

5
推荐指数
1
解决办法
2988
查看次数

BigQuery 无法插入作业。工作流失败

我需要通过 Dataflow 和 Beam 运行从 GCS 到 BigQuery 的批处理作业。我的所有文件都是具有相同架构的 avro。我创建了一个数据流 java 应用程序,它在较小的数据集(~1gb,大约 5 个文件)上成功。但是当我尝试在更大的数据集(>500gb,>1000 个文件)上运行它时,我收到一条错误消息

java.lang.RuntimeException: org.apache.beam.sdk.util.UserCodeException: java.lang.RuntimeException: Failed to create load job with id prefix 1b83679a4f5d48c5b45ff20b2b822728_6e48345728d4da6cb51353f0dc550c1b_00001_00000, reached max retries: 3, last failed load job: ...
Run Code Online (Sandbox Code Playgroud)

3 次重试后,它终止于:

Workflow failed. Causes: S57....... A work item was attempted 4 times without success....
Run Code Online (Sandbox Code Playgroud)

这一步是对 BigQuery 的加载。

堆栈驱动程序说处理卡在步骤....10m00s...和

Request failed with code 409, performed 0 retries due to IOExceptions, performed 0 retries due to unsuccessful status codes.....
Run Code Online (Sandbox Code Playgroud)

我查找了 409 错误代码,指出我可能有一个现有的作业、数据集或表。我已经删除了所有表并重新运行了应用程序,但它仍然显示相同的错误消息。

我目前仅限于 65 个工人,我让他们使用 …

dataflow google-cloud-platform stackdriver

5
推荐指数
1
解决办法
2830
查看次数

TPL Dataflow BufferBlock 线程安全吗?

我有一个相当简单的生产者-消费者模式,其中(简化)我有两个生产者,他们生产由一个消费者消费的输出。

为此,我使用 System.Threading.Tasks.Dataflow.BufferBlock<T>

一个BufferBlock对象被创建。一个Consumer是听这个BufferBlock,并处理任何接收到的输入。

send data to the同时有两个“生产者BufferBlock”

简化:

BufferBlock<int> bufferBlock = new BufferBlock<int>();

async Task Consume()
{
    while(await bufferBlock.OutputAvailable())
    {
         int dataToProcess = await outputAvailable.ReceiveAsync();
         Process(dataToProcess);
    }
}

async Task Produce1()
{
    IEnumerable<int> numbersToProcess = ...;
    foreach (int numberToProcess in numbersToProcess)
    {
         await bufferBlock.SendAsync(numberToProcess);
         // ignore result for this example
    }
}

async Task Produce2()
{
    IEnumerable<int> numbersToProcess = ...;
    foreach (int numberToProcess in numbersToProcess)
    {
         await bufferBlock.SendAsync(numberToProcess);
         // ignore result for …
Run Code Online (Sandbox Code Playgroud)

c# dataflow producer-consumer task-parallel-library tpl-dataflow

5
推荐指数
1
解决办法
2198
查看次数

在 Dataflow 上运行的 Apache Beam 管道无法从 KafkaIO 读取:SSL 握手失败

我正在构建一个 Apache Beam 管道,以从 Kafka 读取作为无界源。

我能够使用直接运行器在本地运行它。

但是,在云上使用 Google Cloud Dataflow 运行器运行时,管道会因附加的异常堆栈跟踪而失败。

似乎最终是Conscrypt Java 库抛出了javax.net.ssl.SSLException: Unable to parse TLS packet header. 我不确定如何解决这个问题。

java.io.IOException: Failed to start reading from source: org.apache.beam.sdk.io.kafka.KafkaUnboundedSource@33b5ff70
        com.google.cloud.dataflow.worker.WorkerCustomSources$UnboundedReaderIterator.start(WorkerCustomSources.java:783)
        com.google.cloud.dataflow.worker.util.common.worker.ReadOperation$SynchronizedReaderIterator.start(ReadOperation.java:360)
        com.google.cloud.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:193)
        com.google.cloud.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:158)
        com.google.cloud.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:75)
        com.google.cloud.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1227)
        com.google.cloud.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:135)
        com.google.cloud.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:966)
        java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.SslAuthenticationException: SSL handshake failed
        org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.start(KafkaUnboundedReader.java:126)
        com.google.cloud.dataflow.worker.WorkerCustomSources$UnboundedReaderIterator.start(WorkerCustomSources.java:778)
        com.google.cloud.dataflow.worker.util.common.worker.ReadOperation$SynchronizedReaderIterator.start(ReadOperation.java:360)
        com.google.cloud.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:193)
        com.google.cloud.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:158)
        com.google.cloud.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:75)
        com.google.cloud.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1227)
        com.google.cloud.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:135)
        com.google.cloud.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:966)
        java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        java.lang.Thread.run(Thread.java:745)
Caused by: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.SslAuthenticationException: SSL handshake failed
        java.util.concurrent.FutureTask.report(FutureTask.java:122)
        java.util.concurrent.FutureTask.get(FutureTask.java:206)
        org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.start(KafkaUnboundedReader.java:112)
        com.google.cloud.dataflow.worker.WorkerCustomSources$UnboundedReaderIterator.start(WorkerCustomSources.java:778)
        com.google.cloud.dataflow.worker.util.common.worker.ReadOperation$SynchronizedReaderIterator.start(ReadOperation.java:360)
        com.google.cloud.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:193)
        com.google.cloud.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:158)
        com.google.cloud.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:75) …
Run Code Online (Sandbox Code Playgroud)

ssl dataflow apache-kafka google-cloud-dataflow apache-beam

5
推荐指数
1
解决办法
1667
查看次数

How to solve Duplicate values exception when I create PCollectionView&lt;Map&lt;String,String&gt;&gt;

I'm setting up a slow-changing lookup Map in my Apache-Beam pipeline. It continuously updates the lookup map. For each key in lookup map, I retrieve the latest value in the global window with accumulating mode. But it always meets Exception :

org.apache.beam.sdk.Pipeline$PipelineExecutionException: java.lang.IllegalArgumentException: Duplicate values for mykey

Is anything wrong with this snippet code?

If I use .discardingFiredPanes() instead, I will lose information in the last emit.

pipeline
  .apply(GenerateSequence.from(0).withRate(1, Duration.standardMinutes(1L)))
  .apply(
      Window.<Long>into(new GlobalWindows())
         .triggering(Repeatedly.forever(
             AfterProcessingTime.pastFirstElementInPane()))
         .accumulatingFiredPanes())
  .apply(new ReadSlowChangingTable())
  .apply(Latest.perKey())
  .apply(View.asMap()); …
Run Code Online (Sandbox Code Playgroud)

dataflow google-cloud-dataflow apache-beam apache-beam-io

5
推荐指数
1
解决办法
1332
查看次数

使用 Python 在 Apache Beam 管道中进行异常处理

我正在使用 python 中的 Apache Beam(在 GCP Dataflow 上)做一个简单的管道,从 PubSub 读取并在 Big Query 上写入,但无法处理管道上的异常以创建替代流。

在一个简单的 WriteToBigQuery 示例中:

output = json_output | 'Write to BigQuery' >> beam.io.WriteToBigQuery('some-project:dataset.table_name')
Run Code Online (Sandbox Code Playgroud)

我试图把它放在一个try/except代码中,但它不起作用,因为当它失败时,异常似乎被抛出到我的 python 执行之外的 Java 层上:

INFO:root:2019-01-29T15:49:46.516Z: JOB_MESSAGE_ERROR: java.util.concurrent.ExecutionException: java.lang.RuntimeException: Error received from SDK harness for instruction -87: Traceback (most recent call last):
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/sdk_worker.py", line 135, in _execute
    response = task()
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/sdk_worker.py", line 170, in <lambda>
    self._execute(lambda: worker.do_instruction(work), work)
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/sdk_worker.py", line 221, in do_instruction
    request.instruction_id)
...
...
...
    self.signature.finish_bundle_method.method_value())
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/io/gcp/bigquery.py", …
Run Code Online (Sandbox Code Playgroud)

python dataflow google-cloud-dataflow apache-beam

5
推荐指数
1
解决办法
3777
查看次数