标签: dataflow

java.lang.ClassCastException:com.google.gson.internal.LinkedTreeMap 无法转换为 java.util.LinkedHashMap

我很抱歉提出关于这个一般问题的另一个问题,但我在 SO 上发现的问题似乎都与我的问题密切相关。

我有一个现有的、工作的数据流管道,它接受对象KV<Long, Iterable<TableRow>>并输出TableRow对象。这段代码在我们的生产环境中,运行没有问题。然而,我现在正在尝试使用直接运行器实现单元测试来测试此管道,但是单元测试在到达线路时失败

LinkedHashMap<String, Object> evt = (LinkedHashMap<String, Object>) row.get(Schema.EVT);
Run Code Online (Sandbox Code Playgroud)

在管道中,抛出错误消息:

java.lang.ClassCastException:com.google.gson.internal.LinkedTreeMap 无法转换为 java.util.LinkedHashMap

现有数据流代码的简化版本如下所示:

public static class Process extends DoFn<KV<Long, Iterable<TableRow>>, TableRow> {

    /* private variables */

    /* constructor */

    /* private functions */

    @ProcessElement
    public void processElement(ProcessContext c) throws InterruptedException, ParseException {       

      EventProcessor eventProc = new EventProcessor();
      Processor.WorkItem workItem = new Processor.WorkItem();
      Iterator<TableRow> it = c.element().getValue().iterator();

      // process all TableRows having the same id
      while (it.hasNext()) {
        TableRow item = it.next();

        if …
Run Code Online (Sandbox Code Playgroud)

java dataflow google-cloud-dataflow apache-beam

5
推荐指数
1
解决办法
8075
查看次数

无法获取应用程序默认凭据。在本地运行

我正在尝试使用示例从 DataFlow 的 GCP Pub/Sub 检索数据。

import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;

import avro.shaded.com.google.common.collect.Lists;
import com.google.auth.oauth2.GoogleCredentials;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.joda.time.Duration;

import com.google.api.services.bigquery.model.TableFieldSchema;
import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;

public class StreamDemoConsumer {

    public static interface MyOptions extends DataflowPipelineOptions {
        @Description("Output BigQuery table <project_id>:<dataset_id>.<table_id>")
        @Default.String("coexon-seoul-dev:ledger_data_set.ledger_data2")
        String getOutput();

        void setOutput(String s);

        @Description("Input topic")
        @Default.String("projects/coexon-seoul-dev/topics/trading") …
Run Code Online (Sandbox Code Playgroud)

java dataflow google-cloud-platform google-cloud-pubsub

5
推荐指数
2
解决办法
6510
查看次数

防止 Apache Beam / Dataflow 流 (python) 管道中的融合以消除管道瓶颈

我们目前正在使用 DataflowRunner 在 Apache Beam 上开发流式管道。我们正在从 Pub/Sub 读取消息并对它们进行一些处理,然后我们在滑动窗口中将它们窗口化(当前窗口大小为 3 秒,间隔也是 3 秒)。一旦窗口被触发,我们对窗口内的元素进行一些后处理。这个后处理步骤明显大于窗口大小,大约需要 15 秒。

管道的apache梁代码:

input = ( pipeline | beam.io.ReadFromPubSub(subscription=<subscription_path>)
                   | beam.Map(process_fn))
windows = input | beam.WindowInto(beam.window.SlidingWindows(3, 3),
                                  trigger=AfterCount(30), 
                                  accumulation_mode = AccumulationModel.DISCARDING)
group = windows | beam.GroupByKey()
group | beam.Map(post_processing_fn)
Run Code Online (Sandbox Code Playgroud)

如您所知,Dataflow 会尝试对您的管道步骤执行一些优化。在我们的例子中,它从窗口开始(集群操作:1/处理 2/窗口+后处理)将所有内容融合在一起,这导致只有 1 个工作人员对所有窗口进行缓慢的顺序后处理。我们每 15 秒看到一次管道正在处理下一个窗口的日志。但是,我们希望让多个工作人员选择单独的窗口,而不是将工作量交给单个工作人员。

因此,我们正在寻找防止这种融合发生的方法,因此 Dataflow 将窗口与窗口的后处理分开。通过这种方式,我们希望 Dataflow 能够再次将多个工作人员分配给触发窗口的后处理。

到目前为止我们尝试过的:

最后两个动作确实创建了第三个集群操作(1/ processing 2/ windowing 3/ post-processing ),但我们注意到在窗口化之后仍然是同一个worker在执行所有操作。

是否有任何解决方案可以解决此问题陈述?

我们现在正在考虑的当前解决方法是构建另一个接收窗口的流管道,以便这些工作人员可以并行处理窗口,但它很麻烦。

dataflow google-cloud-dataflow apache-beam

5
推荐指数
1
解决办法
1166
查看次数

慢慢改变 BigQuery 的查找缓存 - Dataflow Python Streaming SDK

我正在尝试遵循缓慢变化的查找缓存的设计模式(https://cloud.google.com/blog/products/gcp/guide-to-common-cloud-dataflow-use-case-patterns-part-1)用于在 DataFlow 上使用适用于 Apache Beam 的 Python SDK 的流式传输管道。

\n\n

我们的查找缓存参考表位于 BigQuery 中,我们能够读取它并将其作为 ParDo 操作的侧输入传递,但无论我们如何设置触发器/窗口,它都不会刷新。

\n\n
class FilterAlertDoFn(beam.DoFn):\n  def process(self, element, alertlist):\n\n    print len(alertlist)\n    print alertlist\n\n    \xe2\x80\xa6  # function logic\n
Run Code Online (Sandbox Code Playgroud)\n\n
\n\n
alert_input = (p | beam.io.Read(beam.io.BigQuerySource(query=ALERT_QUERY))\n                        | \xe2\x80\x98alert_side_input\xe2\x80\x99 >> beam.WindowInto(\n                            beam.window.GlobalWindows(),\n                            trigger=trigger.RepeatedlyTrigger(trigger.AfterWatermark(\n                                late=trigger.AfterCount(1)\n                            )),\n                            accumulation_mode=trigger.AccumulationMode.ACCUMULATING\n                          )\n                       | beam.Map(lambda elem: elem[\xe2\x80\x98SOMEKEY\xe2\x80\x99])\n)\n\n...\n\n\nmain_input | \xe2\x80\x98alerts\xe2\x80\x99 >> beam.ParDo(FilterAlertDoFn(), beam.pvalue.AsList(alert_input))\n
Run Code Online (Sandbox Code Playgroud)\n\n
\n\n

基于此处的 I/O 页面 ( https://beam.apache.org/documentation/io/built-in/ ),它表示 Python SDK 仅支持 BigQuery Sink 的流式传输,这是否意味着 BQ 读取是有界源那么在这个方法中能不能刷新\xe2\x80\x99呢?

\n\n

尝试在源上设置非全局窗口会导致侧输入中出现空 PCollection。

\n\n
\n\n

更新 …

python streaming dataflow apache-beam

5
推荐指数
1
解决办法
1222
查看次数

使用 Azure 数据工厂 (ADF) 数据流 (DF) 从/向 Azure Data Lake Store gen1 发送和接收数据

我有一个 Azure Data Lake Store gen1 (ADLS-1) 和一个带有数据流 (DF) 的 Azure 数据工厂 (ADF) (V2)。当我在 ADF 中创建新的 DF 并在源和/或接收器节点中选择 ADLS-1 中的数据集时,我收到以下验证错误(在 DF 中):

source1 AzureDataLakeStore does not support MSI authentication in Data Flow.

这是否意味着我无法将 DF 与 ADLS-1 一起使用,或者这是某种身份验证问题?

我尝试过的事情列表:

  • 我已在 ADLS-1 的访问控制 (IAM) 中为 ADF 资源授予所有者角色
  • 我已向 ADF 资源授予数据集 ADLS-1 文件夹中的所有(读、写等)权限
  • 我可以在 ADF 管道中将数据从 ADLS-1 复制到 ADLS-1(因此在 DF 之外)
  • 我可以在 DF 的源节点和接收器节点中为来自 ADLS-2 (gen 2) 的数据集选择数据集(所以这里我没有收到错误)
  • 我可以创建一个管道,首先将数据集从 ADLS-1 复制到 ADLS-2,然后使用 DF 对其进行处理(然后将其复制回来)。这个解决方法非常乏味,而且我(目前)还没有生产 ADLS-2。
  • 这里说ADLS-1 支持的功能包括映射数据流 (DF)。

如果有人知道将 DF 与 ADLS-1 …

dataflow azure-data-factory

5
推荐指数
1
解决办法
1432
查看次数

Apache Beam 中的侧面输入与普通构造函数参数

我有一个关于 的背景下的侧面输入和广播的一般性问题Apache Beam。期间计算所需的任何其他变量、列表、映射是否processElement需要作为辅助输入传递?如果它们作为普通构造函数参数传递可以吗DoFn?例如,如果我有一些固定(未计算)值变量(常量,如开始日期、结束日期),我想在processElement. PCollectionView现在,我可以分别从每个变量中创建单例,并将它们DoFn作为侧面输入传递给构造函数。但是,我可以不这样做,而只是将每个常量作为普通构造函数参数传递给 吗DoFn?我在这里错过了一些微妙的东西吗?

就代码而言,我什么时候应该这样做:

public static class MyFilter extends DoFn<KV<String, Iterable<MyData>> {
  // these are singleton views
  private final PCollectionView<LocalDateTime> dateStartView;
  private final PCollectionView<LocalDateTime> dateEndView;

  public MyFilter(PCollectionView<LocalDateTime> dateStartView,
                       PCollectionView<LocalDateTime> dateEndView){

      this.dateStartView = dateStartView;
      this.dateEndView = dateEndView;
  }

  @ProcessElement
  public void processElement(ProcessContext c) throws Exception{
  // extract date values from the singleton views here and use them
Run Code Online (Sandbox Code Playgroud)

与以下相反:

public static class MyFilter extends DoFn<KV<String, Iterable<MyData>> { …
Run Code Online (Sandbox Code Playgroud)

dataflow broadcast data-processing google-cloud-dataflow apache-beam

5
推荐指数
1
解决办法
2538
查看次数

Apache Beam 中的架构是什么?

我正在阅读有关 Apache BEAM 中的SCHEMAS 的文档,但我无法理解它的目的是什么、如何、为什么或在哪些情况下我应该使用它们。使用模式或使用扩展 Serialized 接口的类有什么区别?

该文档有一个示例:

@DefaultSchema(JavaFieldSchema.class)
public class TransactionPojo {
  public String bank;
  public double purchaseAmount;
}
Run Code Online (Sandbox Code Playgroud)
PCollection<TransactionPojos> transactionPojos = readTransactionsAsPojo();
Run Code Online (Sandbox Code Playgroud)

但它没有解释readTransactionsAsPojo函数是如何构建的。我认为对此有很多遗漏的解释。

dataflow apache-beam

5
推荐指数
1
解决办法
1589
查看次数

TPL数据流与异步操作一起使用

我正在通过移植一些旧的套接字代码以使用TPL数据流和新的异步功能来尝试TPL数据流。尽管API感觉很坚如磐石,但我的代码仍然最终变得混乱。我想知道我是否在这里错过了什么。

我的要求如下:一个套接字类公开:Open,Close,Send和Receive方法。全部都返回一个Task,因此是异步的。打开和关闭是原子的。尽管发送和接收一次只能处理1条命令,但它们可以彼此相邻工作。

从逻辑上讲,这使我进入下一个内部控制代码:

// exposing an exclusive scheduler for connectivity related tasks and a parallel scheduler where send and receive can work with
private readonly ConcurrentExclusiveSchedulerPair exclusiveConnectionSchedulerPair;
private readonly ActionBlock<Action> connectionBlock;
private readonly ActionBlock<Action> sendBlock;
private readonly ActionBlock<Action> receiveBlock;

// within the constructor:
this.exclusiveConnectionSchedulerPair = new ConcurrentExclusiveSchedulerPair();
this.connectionBlock = new ActionBlock<Action>(action => action(), new ExecutionDataflowBlockOptions()  { TaskScheduler = exclusiveConnectionSchedulerPair.ExclusiveScheduler });
this.sendBlock = new ActionBlock<Action>(action => action(), new ExecutionDataflowBlockOptions()    { TaskScheduler = exclusiveConnectionSchedulerPair.ConcurrentScheduler });
this.receiveBlock = new ActionBlock<Action>(action => action(), new …
Run Code Online (Sandbox Code Playgroud)

.net c# dataflow task-parallel-library tpl-dataflow

4
推荐指数
1
解决办法
2190
查看次数

基于Java的简单工作流管理器/数据工作流,具有启动ext的能力.应用程序,调用Web服务等

首先,如果在stackoverflow上已经存在类似我的问题,抱歉,但我还没有找到它.实际上我不知道我可以使用什么标签来搜索我需要的解决方案.

基本上,我需要一个工具/软件,可以在整个过程中使用多个工具/操作来管理数据(对象)流.当然,现有的BPM /工作流平台工具之一可能会这样做,但它们似乎对我的要求来说过于复杂.

我有一个用JPA/Hibernate构建的"静态"数据模型.然后我需要更改该静态模型,以便在其上使用不同的处理函数.该函数可以是一些java类,Web服务或外部应用程序(支持批处理模式).之后,我需要捕获这些函数的输出并进行一些可视化,绘制一些图表等.我可以假设所有这些处理函数都可以访问静态模型,并且可以将其更改为特定的模型,因此不需要将输入传递给他们.另一方面,它们的输出应该由主"工作流程管理器"捕获.

还有一件事,整个过程应该在没有任何用户交互的情况下自动运行(可能将来会改变,但现在看起来和现在).在流程开始之前,管理员应该定义使用哪个"处理功能",就是这样.另一件事......最好的情况是,如果整个过程在数据库状态发生变化时被触发,但这并不重要,我可以通过调用Web服务来启动它.

问题是:我应该使用现有的BPM /工作流工具之一,如jBPM或Activiti,自己编写一个简单的"工作流管理器",或使用比jBPM/Activiti(有没有?)简单得多的现有工具. .当然我更喜欢最简单的方法......

非常感谢任何反馈.

java workflow dataflow data-processing

4
推荐指数
1
解决办法
1万
查看次数

为什么块按此顺序运行?

这是一个简短的代码示例,可以快速向您介绍我的问题:

using System;
using System.Linq;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;

namespace DataflowTest
{
    class Program
    {
        static void Main(string[] args)
        {
            var firstBlock = new TransformBlock<int, int>(x => x, new ExecutionDataflowBlockOptions
            {
                MaxDegreeOfParallelism = 4
            });
            var secondBlock = new TransformBlock<int,string>(async x =>
            {
                if (x == 12)
                {
                    await Task.Delay(5000);
                    return $"{DateTime.Now}: Message is {x} (This is delayed message!) ";
                }

                return $"{DateTime.Now}: Message is {x}";
            }, new ExecutionDataflowBlockOptions
            {
                MaxDegreeOfParallelism = 4
            });
            var thirdBlock = new ActionBlock<string>(s => …
Run Code Online (Sandbox Code Playgroud)

.net c# dataflow task-parallel-library tpl-dataflow

4
推荐指数
1
解决办法
782
查看次数