我以前从未使用过 TPL,所以我想知道是否可以用它来完成:我的应用程序从很多帧创建了一个 gif 图像动画文件。我从一个表示 gif 文件帧的 Bitmap 列表开始,需要对每一帧执行以下操作:
显然,这个过程可以对列表中的所有帧并行完成,但对于每个帧,步骤的顺序需要相同。之后,我需要将所有帧写入 gif 文件。因此,所有帧都需要按照它们在原始列表中的相同顺序接收。最重要的是,这个过程可以在第一帧准备好时开始,不需要等到所有帧都处理完。
所以情况就是这样。TPL Dataflow 适合这个吗?如果是的话,谁能给我一个关于如何设计 tpl 块结构以反映上述过程的正确方向的提示?与我发现的一些样本相比,这对我来说似乎相当复杂。
自从对我的传播完成管道进行了最后一次更改后,我的一个缓冲区块从未完成。让我总结一下什么是有效的,什么不再是:
以前工作:
A.LinkTo(B, PropagateCompletion);
B.LinkTo(C, PropagateCompletion);
C.LinkTo(D, PropagateCompletion);
D.Receive();
// everything completes
Run Code Online (Sandbox Code Playgroud)
不再工作:
A.LinkTo(B, PropagateCompletion);
C.LinkTo(D, PropagateCompletion);
await A.Completion;
someWriteOnceBlock.Post(B.Count);
// B.Complete(); commented on purpose
B.LinkTo(C, PropagateCompletion);
D.Receive();
// Only A reaches completion
// B remains in 'waiting for activation'
// C executes but obviously never completes since B doesn't either
Run Code Online (Sandbox Code Playgroud)
如果我取消注释注释行,一切正常,但显然该行不是必需的。
不知何故,我的 BufferBlock B 永远不会完成,即使链接到它的块已完成并传播其完成,并且从它链接的块接收所有缓冲项。
目前是否可以使用 Apache Beam 从 python 中的 gzip 文件中读取?我的管道正在使用以下代码行从 gcs 中提取 gzip 文件:
beam.io.Read(beam.io.TextFileSource('gs://bucket/file.gz', compression_type='GZIP'))
Run Code Online (Sandbox Code Playgroud)
但我收到此错误:
UnicodeDecodeError: 'utf8' codec can't decode byte 0x8b in position 1: invalid start byte
Run Code Online (Sandbox Code Playgroud)
我们在 python beam 源代码中注意到,在写入接收器时似乎处理了压缩文件。 https://github.com/apache/incubator-beam/blob/python-sdk/sdks/python/apache_beam/io/fileio.py#L445
更详细的追溯:
Traceback (most recent call last):
File "beam-playground.py", line 11, in <module>
p.run()
File "/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/apache_beam/pipeline.py", line 159, in run
return self.runner.run(self)
File "/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/apache_beam/runners/direct_runner.py", line 103, in run
super(DirectPipelineRunner, self).run(pipeline)
File "/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/apache_beam/runners/runner.py", line 98, in run
pipeline.visit(RunVisitor(self))
File "/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/apache_beam/pipeline.py", line 182, in visit
self._root_transform().visit(visitor, self, visited)
File "/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/apache_beam/pipeline.py", …Run Code Online (Sandbox Code Playgroud) 我想知道在 Flink 中是否可以在运营商之间共享状态。
例如,假设我在操作符上按键进行分区,并且我需要分A区内的一段分区状态C(出于任何原因)(图 1.a),或者我需要C下游操作符中的操作符状态F(图 1 .b)
我知道可以broadcast记录到所有分区。因此,如果您在记录中包含操作符的内部状态,则可以与下游操作符共享您的内部状态。
然而,这可能是一个昂贵的操作,而不是简单地让op1具体请求op2状态。
最近围绕可查询状态的发展是否朝着这个概念发展,或者它们只是为了让外部用户查询拓扑的内部状态?
预先感谢您的见解
我进行了大量研究,以弄清楚如何从应用程序的源代码为应用程序创建 DFG。对于某些应用程序,例如 MP3 解码器、JPEG 压缩和 H.263 解码器,可以在线使用 DFG。
我无法弄清楚如何从源代码中为 HEVC 等应用程序创建 DFG?是否有任何工具可以为此类复杂的应用程序立即生成数据流图,还是必须手动完成?
请就此事给我建议。
编辑:我将 Doxygen 用于 HEVC,我可以看到不同的功能如何相互交互。然而,每个函数都有许多入口和出口点,一段时间后 Doxygen 的输出变得太混乱而无法理解。
我还看了 StreamIt:http ://camlunity.ru/swap/Library/Conflux/Stream%20Programming/streamit-cc_stream_graph_programming_language.pdf
它看起来很方便,但它为更简单的应用程序(如 MP3 解码器)生成的图表太复杂了。为了生成连贯的 DFG,我是否必须重新编写整个源代码?
algorithm parallel-processing open-source dataflow data-structures
我需要通过 Dataflow 和 Beam 运行从 GCS 到 BigQuery 的批处理作业。我的所有文件都是具有相同架构的 avro。我创建了一个数据流 java 应用程序,它在较小的数据集(~1gb,大约 5 个文件)上成功。但是当我尝试在更大的数据集(>500gb,>1000 个文件)上运行它时,我收到一条错误消息
java.lang.RuntimeException: org.apache.beam.sdk.util.UserCodeException: java.lang.RuntimeException: Failed to create load job with id prefix 1b83679a4f5d48c5b45ff20b2b822728_6e48345728d4da6cb51353f0dc550c1b_00001_00000, reached max retries: 3, last failed load job: ...
Run Code Online (Sandbox Code Playgroud)
3 次重试后,它终止于:
Workflow failed. Causes: S57....... A work item was attempted 4 times without success....
Run Code Online (Sandbox Code Playgroud)
这一步是对 BigQuery 的加载。
堆栈驱动程序说处理卡在步骤....10m00s...和
Request failed with code 409, performed 0 retries due to IOExceptions, performed 0 retries due to unsuccessful status codes.....
Run Code Online (Sandbox Code Playgroud)
我查找了 409 错误代码,指出我可能有一个现有的作业、数据集或表。我已经删除了所有表并重新运行了应用程序,但它仍然显示相同的错误消息。
我目前仅限于 65 个工人,我让他们使用 …
我有一个相当简单的生产者-消费者模式,其中(简化)我有两个生产者,他们生产由一个消费者消费的输出。
为此,我使用 System.Threading.Tasks.Dataflow.BufferBlock<T>
一个BufferBlock对象被创建。一个Consumer是听这个BufferBlock,并处理任何接收到的输入。
send data to the同时有两个“生产者BufferBlock”
简化:
BufferBlock<int> bufferBlock = new BufferBlock<int>();
async Task Consume()
{
while(await bufferBlock.OutputAvailable())
{
int dataToProcess = await outputAvailable.ReceiveAsync();
Process(dataToProcess);
}
}
async Task Produce1()
{
IEnumerable<int> numbersToProcess = ...;
foreach (int numberToProcess in numbersToProcess)
{
await bufferBlock.SendAsync(numberToProcess);
// ignore result for this example
}
}
async Task Produce2()
{
IEnumerable<int> numbersToProcess = ...;
foreach (int numberToProcess in numbersToProcess)
{
await bufferBlock.SendAsync(numberToProcess);
// ignore result for …Run Code Online (Sandbox Code Playgroud) c# dataflow producer-consumer task-parallel-library tpl-dataflow
我正在构建一个 Apache Beam 管道,以从 Kafka 读取作为无界源。
我能够使用直接运行器在本地运行它。
但是,在云上使用 Google Cloud Dataflow 运行器运行时,管道会因附加的异常堆栈跟踪而失败。
似乎最终是Conscrypt Java 库抛出了javax.net.ssl.SSLException: Unable to parse TLS packet header. 我不确定如何解决这个问题。
java.io.IOException: Failed to start reading from source: org.apache.beam.sdk.io.kafka.KafkaUnboundedSource@33b5ff70
com.google.cloud.dataflow.worker.WorkerCustomSources$UnboundedReaderIterator.start(WorkerCustomSources.java:783)
com.google.cloud.dataflow.worker.util.common.worker.ReadOperation$SynchronizedReaderIterator.start(ReadOperation.java:360)
com.google.cloud.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:193)
com.google.cloud.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:158)
com.google.cloud.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:75)
com.google.cloud.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1227)
com.google.cloud.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:135)
com.google.cloud.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:966)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.SslAuthenticationException: SSL handshake failed
org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.start(KafkaUnboundedReader.java:126)
com.google.cloud.dataflow.worker.WorkerCustomSources$UnboundedReaderIterator.start(WorkerCustomSources.java:778)
com.google.cloud.dataflow.worker.util.common.worker.ReadOperation$SynchronizedReaderIterator.start(ReadOperation.java:360)
com.google.cloud.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:193)
com.google.cloud.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:158)
com.google.cloud.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:75)
com.google.cloud.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1227)
com.google.cloud.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:135)
com.google.cloud.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:966)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
java.lang.Thread.run(Thread.java:745)
Caused by: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.SslAuthenticationException: SSL handshake failed
java.util.concurrent.FutureTask.report(FutureTask.java:122)
java.util.concurrent.FutureTask.get(FutureTask.java:206)
org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.start(KafkaUnboundedReader.java:112)
com.google.cloud.dataflow.worker.WorkerCustomSources$UnboundedReaderIterator.start(WorkerCustomSources.java:778)
com.google.cloud.dataflow.worker.util.common.worker.ReadOperation$SynchronizedReaderIterator.start(ReadOperation.java:360)
com.google.cloud.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:193)
com.google.cloud.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:158)
com.google.cloud.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:75) …Run Code Online (Sandbox Code Playgroud) I'm setting up a slow-changing lookup Map in my Apache-Beam pipeline. It continuously updates the lookup map. For each key in lookup map, I retrieve the latest value in the global window with accumulating mode. But it always meets Exception :
org.apache.beam.sdk.Pipeline$PipelineExecutionException: java.lang.IllegalArgumentException: Duplicate values for mykey
Is anything wrong with this snippet code?
If I use .discardingFiredPanes() instead, I will lose information in the last emit.
pipeline
.apply(GenerateSequence.from(0).withRate(1, Duration.standardMinutes(1L)))
.apply(
Window.<Long>into(new GlobalWindows())
.triggering(Repeatedly.forever(
AfterProcessingTime.pastFirstElementInPane()))
.accumulatingFiredPanes())
.apply(new ReadSlowChangingTable())
.apply(Latest.perKey())
.apply(View.asMap()); …Run Code Online (Sandbox Code Playgroud) 我正在使用 python 中的 Apache Beam(在 GCP Dataflow 上)做一个简单的管道,从 PubSub 读取并在 Big Query 上写入,但无法处理管道上的异常以创建替代流。
在一个简单的 WriteToBigQuery 示例中:
output = json_output | 'Write to BigQuery' >> beam.io.WriteToBigQuery('some-project:dataset.table_name')
Run Code Online (Sandbox Code Playgroud)
我试图把它放在一个try/except代码中,但它不起作用,因为当它失败时,异常似乎被抛出到我的 python 执行之外的 Java 层上:
INFO:root:2019-01-29T15:49:46.516Z: JOB_MESSAGE_ERROR: java.util.concurrent.ExecutionException: java.lang.RuntimeException: Error received from SDK harness for instruction -87: Traceback (most recent call last):
File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/sdk_worker.py", line 135, in _execute
response = task()
File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/sdk_worker.py", line 170, in <lambda>
self._execute(lambda: worker.do_instruction(work), work)
File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/sdk_worker.py", line 221, in do_instruction
request.instruction_id)
...
...
...
self.signature.finish_bundle_method.method_value())
File "/usr/local/lib/python2.7/dist-packages/apache_beam/io/gcp/bigquery.py", …Run Code Online (Sandbox Code Playgroud) dataflow ×10
apache-beam ×4
c# ×3
tpl-dataflow ×3
python ×2
.net ×1
algorithm ×1
apache-flink ×1
apache-kafka ×1
asynchronous ×1
open-source ×1
ssl ×1
stackdriver ×1