标签: dataflow

使用 C++11 异步功能的管道数据流

我正在尝试实现具有以下功能的多线程管道数据流框架:

  1. 流水线可以被描述为一个无环的有向图。每个节点执行一些处理并具有任意数量的任意类型的输入和一个任意类型的输出。

  2. 对于每个给定的输入数据实例,每个节点不应执行多次,之后应缓存结果。尽管此缓存不应该在内存中持续所需的时间更长,并且应该在任何其他节点不再需要时将其删除。

  3. 每个节点都应该支持惰性求值,即应该只在其他节点需要它的输出时才执行。

是否可以通过使用 C++11 多线程特性来实现这一点,尤其是std::future,std::promisestd::async? 任何人都可以提供线索吗?

c++ multithreading asynchronous pipeline dataflow

1
推荐指数
1
解决办法
1642
查看次数

从 Google Cloud BigQuery 读取数据

我是 Pipeline 世界和 Google API DataFlow 的新手。

我想使用 sqlQuery 从 BigQuery 读取数据。当我读取所有数据库时,它工作正常。

PipelineOptions options = PipelineOptionsFactory.create();
Pipeline p = Pipeline.create(options);
PCollection<TableRow> qData = p.apply(
     BigQueryIO.Read
         .named("Read")
         .from("test:DataSetTest.data"));
Run Code Online (Sandbox Code Playgroud)

但是当我使用 fromQuery 时出现错误。

PipelineOptions options = PipelineOptionsFactory.create();
Pipeline p = Pipeline.create(options);
PCollection<TableRow> qData = p.apply(
     BigQueryIO.Read
         .named("Read")
         .fromQuery("SELECT * FROM DataSetTest.data"));
Run Code Online (Sandbox Code Playgroud)

错误:

线程“main”中出现异常 java.lang.IllegalArgumentException:查询“SELECT * FROM DataSetTest.data”的验证失败。如果查询依赖于管道的早期阶段,则可以使用#withoutValidation 禁用此验证。

在 com.google.cloud.dataflow.sdk.io.BigQueryIO$Read$Bound.dryRunQuery(BigQueryIO.java:449)

在 com.google.cloud.dataflow.sdk.io.BigQueryIO$Read$Bound.validate(BigQueryIO.java:432)

在com.google.cloud.dataflow.sdk.Pipeline.applyInternal(Pipeline.java:357)

在com.google.cloud.dataflow.sdk.Pipeline.applyTransform(Pipeline.java:267)

在 com.google.cloud.dataflow.sdk.values.PBegin.apply(PBegin.java:47)

在com.google.cloud.dataflow.sdk.Pipeline.apply(Pipeline.java:151)

在 Test.java.packageid.StarterPipeline.main(StarterPipeline.java:72)

引起原因:java.lang.NullPointerException:必须指定必需参数projectId。

在 com.google.api.client.repackaged.com.google.common.base.Preconditions.checkNotNull(Preconditions.java:229)

在 com.google.api.client.util.Preconditions.checkNotNull(Preconditions.java:140)

在 com.google.api.services.bigquery.Bigquery$Jobs$Query。(Bigquery.java:1751)

在 com.google.api.services.bigquery.Bigquery$Jobs.query(Bigquery.java:1724)

在 com.google.cloud.dataflow.sdk.io.BigQueryIO$Read$Bound.dryRunQuery(BigQueryIO.java:445)

... 6 更多 …

java google-app-engine dataflow google-bigquery google-cloud-dataflow

1
推荐指数
1
解决办法
3842
查看次数

为什么在 Scio 中你更喜欢聚合而不是 groupByKey?

从:

https://github.com/spotify/scio/wiki/Scio-data-guideline

“比 groupByKey 更喜欢组合/聚合/减少转换。请记住,减少操作必须是关联的和可交换的。”

为什么特别喜欢聚合而不是 groupByKey?

scala dataflow apache-beam spotify-scio

1
推荐指数
1
解决办法
1261
查看次数

SideInput I/O 会降低性能

我正在使用 Python SDK 2.15.0 构建数据流管道。在此管道中,我需要在管道的多个阶段将附加数据加入到每个元素。

所有这些附加数据都是从 Google Cloud Storage 上的 avro 文件中读取的(Dataflow 和 GCS 存储桶使用的同一区域),使用 map 函数将其组织为键值元组,然后使用 pvalue.AsDict( 作为侧输入传递到 DoFn )。侧面输入数据在管道执行期间不会改变。

第一次连接(侧面输入大小 ~ 1 MB)进行得非常顺利。然而,第二次连接确实表现不佳。它的 sideinput 大小约为 50 MB。

数据流执行图清楚地显示了导致性能不佳的原因:我的 ParDo 步骤消耗的大约 90% 的时间都花在了读取侧面输入上。即使我只使用四个工作节点,从 sideinput 读取的数据量也超出了其实际大小几个数量级。

我能做些什么来防止这种情况发生吗?我是否需要以某种方式配置工作缓存大小?在我的 DoFn 的设置方法中准备附加数据而不是将其作为 sideinput 传递会更好吗?

以下是我准备侧面输入的方法:

sideinput_1 = pvalue.AsDict(p | "Read side input data 1" >> beam.io.ReadFromAvro("gs:/bucket/small_file.avro",0,False,True) \
                              | "Prepare sideinput 1" >> beam.Map(lambda x: (x["KEY"],x["VALUE"])))

# Preparing data for later join
sideinput_2 = pvalue.AsDict(p | "Read side input data 2" >> beam.io.ReadFromAvro("gs://bucket/bigger_file.avro",0,False,True) \ …
Run Code Online (Sandbox Code Playgroud)

python dataflow apache-beam

1
推荐指数
1
解决办法
657
查看次数

数据流 - 函数未被调用 - 错误 - 名称未定义

我正在 Google Dataflow 上使用 Apache Beam,并通过 lambda 函数调用函数情感,但收到错误消息:函数名称未定义。

output_tweets = (lines
                     | 'decode' >> beam.Map(lambda x: x.decode('utf-8'))
                     | 'assign window key' >> beam.WindowInto(window.FixedWindows(10))
                     | 'batch into n batches' >> BatchElements(min_batch_size=49, max_batch_size=50)
                     | 'sentiment analysis' >> beam.FlatMap(lambda x: sentiment(x))
                     )
Run Code Online (Sandbox Code Playgroud)

这是我的 Apache Beam 调用,在最后一行中提到了函数情绪,这给我带来了问题。

函数代码如下(我认为这不重要):

def sentiment(messages):
    if not isinstance(messages, list):
        messages = [messages]

    instances = list(map(lambda message: json.loads(message), messages))
    lservice = discovery.build('language', 'v1beta1', developerKey = APIKEY)
    for instance in instances['text']:
        response = lservice.documents().analyzeSentiment(
            body ={
                'document': {
                    'type': 'PLAIN_TEXT',
                    'content': …
Run Code Online (Sandbox Code Playgroud)

python dataflow google-cloud-dataflow apache-beam

1
推荐指数
1
解决办法
1554
查看次数

Apache NIFI 安装后登录问题

我是 Apache NIFI 新手。我已在本地 Windows 计算机上安装了 Apache NIFI。现在,它要求用户名和密码登录。您知道我在哪里可以找到或设置密码吗?我在安装过程中没有设置任何用户名和密码。我非常感谢您尽快回复。请看图片附件-> Nifi登录需要用户名和密码

dataflow apache-nifi

1
推荐指数
1
解决办法
1万
查看次数

SSIS需要带有2个列标题的平面文件输出相同

我正在尝试使用SSIS平面文件目标,但是无法解决使输出文件具有命名为同一事物的两列的问题。

我要求输出文件具有列标题:

first1, last1, email, shortname, email
Run Code Online (Sandbox Code Playgroud)

每当我尝试映射源数据时,都会收到错误消息,提示诸如“此列名称已存在”和“有多个数据源列,名称为”电子邮件”之类的信息。

最好的解决方法是什么?

谢谢

ssis etl dataflow flat-file

0
推荐指数
1
解决办法
2936
查看次数

BigQueryIO Read vs fromQuery

在Dataflow/Apache Beam程序中说,我正在尝试读取具有指数级增长数据的表.我想提高读取的性能.

BigQueryIO.Read.from("projectid:dataset.tablename")
Run Code Online (Sandbox Code Playgroud)

要么

BigQueryIO.Read.fromQuery("SELECT A, B FROM [projectid:dataset.tablename]")
Run Code Online (Sandbox Code Playgroud)

如果我只选择表中所需的列,而不是上面的整个表,那么我的读取性能会提高吗?

我知道选择几列会降低成本.但是想知道上面的读取性能.

dataflow google-bigquery google-cloud-dataflow

0
推荐指数
1
解决办法
326
查看次数

数据和流程图

我应该使用哪个图来描述这样的链:

Input data->preprocessing->preprocessed data->
algorithm 1->if a good result, next step, if not - do algorithm 1 again...
Run Code Online (Sandbox Code Playgroud)

architecture diagram uml modeling dataflow

0
推荐指数
1
解决办法
40
查看次数

通过 BufferBlock 的背压不起作用。(C# TPL 数据流)

典型情况:生产者快,消费者慢,需要让生产者慢下来
无法按我预期工作的示例代码(解释如下):

//  I assumed this block will behave like BlockingCollection, but it doesn't 
var bb = new BufferBlock<int>(new DataflowBlockOptions {
    BoundedCapacity = 3, // looks like this does nothing
});

// fast producer
int dataSource = -1;
var producer = Task.Run(() => {
    while (dataSource < 10) {
        var message = ++dataSource;
        bb.Post(message);
        Console.WriteLine($"Posted: {message}");
    }
    Console.WriteLine("Calling .Complete() on buffer block");
    bb.Complete();
});

// slow consumer
var ab = new ActionBlock<int>(i => {
    Thread.Sleep(500);
    Console.WriteLine($"Received: {i}");
}, …
Run Code Online (Sandbox Code Playgroud)

c# dataflow task-parallel-library tpl-dataflow

0
推荐指数
1
解决办法
593
查看次数

如何使用 Dataflow 将 Google Pub/Sub 中的数据批量处理到 Cloud Storage?

我正在构建一个从 MYSQL 数据库读取数据并在 BigQuery 中创建副本的变更数据捕获管道。我将在 Pub/Sub 中推送更改并使用 Dataflow 将它们传输到 Google Cloud Storage。我已经能够弄清楚如何流式传输更改,但是我需要对数据库中的几个表运行批处理。

在从 Pub/Sub 等无限源读取时,能否使用 Dataflow 运行批处理作业?我可以运行此批处理作业以将数据从 Pub/Sub 传输到 Cloud Storage,然后将此数据加载到 BigQuery 吗?我想要一个批处理作业,因为流作业成本更高。

dataflow batch-processing google-cloud-storage google-bigquery google-cloud-pubsub

0
推荐指数
1
解决办法
1754
查看次数

GCP 数据流的推送与拉取

我想知道应该在 GCP pubsub 中创建什么类型的订阅,以便处理来自 pubsub 主题的高频数据。我将以每秒 100 多条消息的速度在数据流中摄取数据。拉或推订阅真的很重要,以及它将如何影响速度等等。

dataflow google-cloud-platform google-cloud-pubsub apache-beam

-1
推荐指数
1
解决办法
1591
查看次数