我很抱歉提出关于这个一般问题的另一个问题,但我在 SO 上发现的问题似乎都与我的问题密切相关。
我有一个现有的、工作的数据流管道,它接受对象KV<Long, Iterable<TableRow>>并输出TableRow对象。这段代码在我们的生产环境中,运行没有问题。然而,我现在正在尝试使用直接运行器实现单元测试来测试此管道,但是单元测试在到达线路时失败
LinkedHashMap<String, Object> evt = (LinkedHashMap<String, Object>) row.get(Schema.EVT);
Run Code Online (Sandbox Code Playgroud)
在管道中,抛出错误消息:
java.lang.ClassCastException:com.google.gson.internal.LinkedTreeMap 无法转换为 java.util.LinkedHashMap
现有数据流代码的简化版本如下所示:
public static class Process extends DoFn<KV<Long, Iterable<TableRow>>, TableRow> {
/* private variables */
/* constructor */
/* private functions */
@ProcessElement
public void processElement(ProcessContext c) throws InterruptedException, ParseException {
EventProcessor eventProc = new EventProcessor();
Processor.WorkItem workItem = new Processor.WorkItem();
Iterator<TableRow> it = c.element().getValue().iterator();
// process all TableRows having the same id
while (it.hasNext()) {
TableRow item = it.next();
if …Run Code Online (Sandbox Code Playgroud) 我正在尝试使用此示例从 DataFlow 的 GCP Pub/Sub 检索数据。
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import avro.shaded.com.google.common.collect.Lists;
import com.google.auth.oauth2.GoogleCredentials;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.joda.time.Duration;
import com.google.api.services.bigquery.model.TableFieldSchema;
import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;
public class StreamDemoConsumer {
public static interface MyOptions extends DataflowPipelineOptions {
@Description("Output BigQuery table <project_id>:<dataset_id>.<table_id>")
@Default.String("coexon-seoul-dev:ledger_data_set.ledger_data2")
String getOutput();
void setOutput(String s);
@Description("Input topic")
@Default.String("projects/coexon-seoul-dev/topics/trading") …Run Code Online (Sandbox Code Playgroud) 我们目前正在使用 DataflowRunner 在 Apache Beam 上开发流式管道。我们正在从 Pub/Sub 读取消息并对它们进行一些处理,然后我们在滑动窗口中将它们窗口化(当前窗口大小为 3 秒,间隔也是 3 秒)。一旦窗口被触发,我们对窗口内的元素进行一些后处理。这个后处理步骤明显大于窗口大小,大约需要 15 秒。
管道的apache梁代码:
input = ( pipeline | beam.io.ReadFromPubSub(subscription=<subscription_path>)
| beam.Map(process_fn))
windows = input | beam.WindowInto(beam.window.SlidingWindows(3, 3),
trigger=AfterCount(30),
accumulation_mode = AccumulationModel.DISCARDING)
group = windows | beam.GroupByKey()
group | beam.Map(post_processing_fn)
Run Code Online (Sandbox Code Playgroud)
如您所知,Dataflow 会尝试对您的管道步骤执行一些优化。在我们的例子中,它从窗口开始(集群操作:1/处理 2/窗口+后处理)将所有内容融合在一起,这导致只有 1 个工作人员对所有窗口进行缓慢的顺序后处理。我们每 15 秒看到一次管道正在处理下一个窗口的日志。但是,我们希望让多个工作人员选择单独的窗口,而不是将工作量交给单个工作人员。
因此,我们正在寻找防止这种融合发生的方法,因此 Dataflow 将窗口与窗口的后处理分开。通过这种方式,我们希望 Dataflow 能够再次将多个工作人员分配给触发窗口的后处理。
到目前为止我们尝试过的:
最后两个动作确实创建了第三个集群操作(1/ processing 2/ windowing 3/ post-processing ),但我们注意到在窗口化之后仍然是同一个worker在执行所有操作。
是否有任何解决方案可以解决此问题陈述?
我们现在正在考虑的当前解决方法是构建另一个接收窗口的流管道,以便这些工作人员可以并行处理窗口,但它很麻烦。
我正在尝试遵循缓慢变化的查找缓存的设计模式(https://cloud.google.com/blog/products/gcp/guide-to-common-cloud-dataflow-use-case-patterns-part-1)用于在 DataFlow 上使用适用于 Apache Beam 的 Python SDK 的流式传输管道。
\n\n我们的查找缓存参考表位于 BigQuery 中,我们能够读取它并将其作为 ParDo 操作的侧输入传递,但无论我们如何设置触发器/窗口,它都不会刷新。
\n\nclass FilterAlertDoFn(beam.DoFn):\n def process(self, element, alertlist):\n\n print len(alertlist)\n print alertlist\n\n \xe2\x80\xa6 # function logic\nRun Code Online (Sandbox Code Playgroud)\n\nalert_input = (p | beam.io.Read(beam.io.BigQuerySource(query=ALERT_QUERY))\n | \xe2\x80\x98alert_side_input\xe2\x80\x99 >> beam.WindowInto(\n beam.window.GlobalWindows(),\n trigger=trigger.RepeatedlyTrigger(trigger.AfterWatermark(\n late=trigger.AfterCount(1)\n )),\n accumulation_mode=trigger.AccumulationMode.ACCUMULATING\n )\n | beam.Map(lambda elem: elem[\xe2\x80\x98SOMEKEY\xe2\x80\x99])\n)\n\n...\n\n\nmain_input | \xe2\x80\x98alerts\xe2\x80\x99 >> beam.ParDo(FilterAlertDoFn(), beam.pvalue.AsList(alert_input))\nRun Code Online (Sandbox Code Playgroud)\n\n基于此处的 I/O 页面 ( https://beam.apache.org/documentation/io/built-in/ ),它表示 Python SDK 仅支持 BigQuery Sink 的流式传输,这是否意味着 BQ 读取是有界源那么在这个方法中能不能刷新\xe2\x80\x99呢?
\n\n尝试在源上设置非全局窗口会导致侧输入中出现空 PCollection。
\n\n更新 …
我有一个 Azure Data Lake Store gen1 (ADLS-1) 和一个带有数据流 (DF) 的 Azure 数据工厂 (ADF) (V2)。当我在 ADF 中创建新的 DF 并在源和/或接收器节点中选择 ADLS-1 中的数据集时,我收到以下验证错误(在 DF 中):
source1
AzureDataLakeStore does not support MSI authentication in Data Flow.
这是否意味着我无法将 DF 与 ADLS-1 一起使用,或者这是某种身份验证问题?
我尝试过的事情列表:
如果有人知道将 DF 与 ADLS-1 …
我有一个关于 的背景下的侧面输入和广播的一般性问题Apache Beam。期间计算所需的任何其他变量、列表、映射是否processElement需要作为辅助输入传递?如果它们作为普通构造函数参数传递可以吗DoFn?例如,如果我有一些固定(未计算)值变量(常量,如开始日期、结束日期),我想在processElement. PCollectionView现在,我可以分别从每个变量中创建单例,并将它们DoFn作为侧面输入传递给构造函数。但是,我可以不这样做,而只是将每个常量作为普通构造函数参数传递给 吗DoFn?我在这里错过了一些微妙的东西吗?
就代码而言,我什么时候应该这样做:
public static class MyFilter extends DoFn<KV<String, Iterable<MyData>> {
// these are singleton views
private final PCollectionView<LocalDateTime> dateStartView;
private final PCollectionView<LocalDateTime> dateEndView;
public MyFilter(PCollectionView<LocalDateTime> dateStartView,
PCollectionView<LocalDateTime> dateEndView){
this.dateStartView = dateStartView;
this.dateEndView = dateEndView;
}
@ProcessElement
public void processElement(ProcessContext c) throws Exception{
// extract date values from the singleton views here and use them
Run Code Online (Sandbox Code Playgroud)
与以下相反:
public static class MyFilter extends DoFn<KV<String, Iterable<MyData>> { …Run Code Online (Sandbox Code Playgroud) dataflow broadcast data-processing google-cloud-dataflow apache-beam
我正在阅读有关 Apache BEAM 中的SCHEMAS 的文档,但我无法理解它的目的是什么、如何、为什么或在哪些情况下我应该使用它们。使用模式或使用扩展 Serialized 接口的类有什么区别?
该文档有一个示例:
@DefaultSchema(JavaFieldSchema.class)
public class TransactionPojo {
public String bank;
public double purchaseAmount;
}
Run Code Online (Sandbox Code Playgroud)
PCollection<TransactionPojos> transactionPojos = readTransactionsAsPojo();
Run Code Online (Sandbox Code Playgroud)
但它没有解释readTransactionsAsPojo函数是如何构建的。我认为对此有很多遗漏的解释。
我正在通过移植一些旧的套接字代码以使用TPL数据流和新的异步功能来尝试TPL数据流。尽管API感觉很坚如磐石,但我的代码仍然最终变得混乱。我想知道我是否在这里错过了什么。
我的要求如下:一个套接字类公开:Open,Close,Send和Receive方法。全部都返回一个Task,因此是异步的。打开和关闭是原子的。尽管发送和接收一次只能处理1条命令,但它们可以彼此相邻工作。
从逻辑上讲,这使我进入下一个内部控制代码:
// exposing an exclusive scheduler for connectivity related tasks and a parallel scheduler where send and receive can work with
private readonly ConcurrentExclusiveSchedulerPair exclusiveConnectionSchedulerPair;
private readonly ActionBlock<Action> connectionBlock;
private readonly ActionBlock<Action> sendBlock;
private readonly ActionBlock<Action> receiveBlock;
// within the constructor:
this.exclusiveConnectionSchedulerPair = new ConcurrentExclusiveSchedulerPair();
this.connectionBlock = new ActionBlock<Action>(action => action(), new ExecutionDataflowBlockOptions() { TaskScheduler = exclusiveConnectionSchedulerPair.ExclusiveScheduler });
this.sendBlock = new ActionBlock<Action>(action => action(), new ExecutionDataflowBlockOptions() { TaskScheduler = exclusiveConnectionSchedulerPair.ConcurrentScheduler });
this.receiveBlock = new ActionBlock<Action>(action => action(), new …Run Code Online (Sandbox Code Playgroud) 首先,如果在stackoverflow上已经存在类似我的问题,抱歉,但我还没有找到它.实际上我不知道我可以使用什么标签来搜索我需要的解决方案.
基本上,我需要一个工具/软件,可以在整个过程中使用多个工具/操作来管理数据(对象)流.当然,现有的BPM /工作流平台工具之一可能会这样做,但它们似乎对我的要求来说过于复杂.
我有一个用JPA/Hibernate构建的"静态"数据模型.然后我需要更改该静态模型,以便在其上使用不同的处理函数.该函数可以是一些java类,Web服务或外部应用程序(支持批处理模式).之后,我需要捕获这些函数的输出并进行一些可视化,绘制一些图表等.我可以假设所有这些处理函数都可以访问静态模型,并且可以将其更改为特定的模型,因此不需要将输入传递给他们.另一方面,它们的输出应该由主"工作流程管理器"捕获.
还有一件事,整个过程应该在没有任何用户交互的情况下自动运行(可能将来会改变,但现在看起来和现在).在流程开始之前,管理员应该定义使用哪个"处理功能",就是这样.另一件事......最好的情况是,如果整个过程在数据库状态发生变化时被触发,但这并不重要,我可以通过调用Web服务来启动它.
问题是:我应该使用现有的BPM /工作流工具之一,如jBPM或Activiti,自己编写一个简单的"工作流管理器",或使用比jBPM/Activiti(有没有?)简单得多的现有工具. .当然我更喜欢最简单的方法......
非常感谢任何反馈.
这是一个简短的代码示例,可以快速向您介绍我的问题:
using System;
using System.Linq;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
namespace DataflowTest
{
class Program
{
static void Main(string[] args)
{
var firstBlock = new TransformBlock<int, int>(x => x, new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 4
});
var secondBlock = new TransformBlock<int,string>(async x =>
{
if (x == 12)
{
await Task.Delay(5000);
return $"{DateTime.Now}: Message is {x} (This is delayed message!) ";
}
return $"{DateTime.Now}: Message is {x}";
}, new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 4
});
var thirdBlock = new ActionBlock<string>(s => …Run Code Online (Sandbox Code Playgroud) dataflow ×10
apache-beam ×5
java ×3
.net ×2
c# ×2
tpl-dataflow ×2
broadcast ×1
python ×1
streaming ×1
workflow ×1