标签: dataflow

有没有人在主流语言的真实项目中使用数据流编程?

我正在考虑在clojure程序中使用一些Dataflow编程技术,但是我很难从使用Java,C#或其他在现实世界中使用这些技术的主流语言的项目中找到大量信息.如果有人有任何关于此事可以分享的经验,我将不胜感激.

dataflow clojure

6
推荐指数
1
解决办法
1886
查看次数

如何使用clang或其他工具生成数据流图?

使用clang和graphviz,我可以生成一些C/C++代码的调用图,如本答案中所述.

现在我需要一个在非常大的代码库(大部分是C)上计算的数据流图,这个代码库是一个cmake用作构建工具的软件.

所以我的问题是,鉴于数据结构的名称,我如何使用/实现这种结构来检索函数和文件的名称?

Libtool对于clang项目中的一些数据流挖掘算法有一些稀疏的引用(甚至不确定它是否稳定或正在开发中),但我发现clang它本身没有scan-build.

我如何能够生成这条信息?我真的需要这个,给定一个名称我想要检索代码中使用的位置,几乎我所评论的所有静态分析工具都集中在函数和方法上,我需要检查数据结构的用法clang.

编辑:

我也在考虑使用doxygen作为文档,所以如果doxygen的xml输出对某些工具有用,我可以使用它.

c dataflow

6
推荐指数
1
解决办法
1966
查看次数

TPL Dataflow块消耗所有可用内存

我有TransformManyBlock以下设计:

  • 输入:文件的路径
  • 输出:IEnumerable文件的内容,一次一行

我在一个巨大的文件(61GB)上运行这个块,这个文件太大而无法放入RAM中.为了避免无限制的内存增长,我BoundedCapacity为这个块和所有下游块设置了一个非常低的值(例如1).尽管如此,该块显然会贪婪地迭代IEnumerable,它消耗了计算机上的所有可用内存,使每个进程停止运行.在我杀死进程之前,块的OutputCount继续无限制地上升.

我该怎么做才能防止块IEnumerable以这种方式消耗?

编辑:这是一个示例程序,说明了问题:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;

class Program
{
    static IEnumerable<string> GetSequence(char c)
    {
        for (var i = 0; i < 1024 * 1024; ++i)
            yield return new string(c, 1024 * 1024);
    }

    static void Main(string[] args)
    {
        var options = new ExecutionDataflowBlockOptions() { BoundedCapacity = 1 };
        var firstBlock = new TransformManyBlock<char, string>(c => GetSequence(c), options);
        var secondBlock = new ActionBlock<string>(str …
Run Code Online (Sandbox Code Playgroud)

.net c# dataflow task-parallel-library tpl-dataflow

6
推荐指数
1
解决办法
1951
查看次数

IO读写操作的TPL Dataflow实现中的内存问题

我试图使用文件IO操作实现读写操作,并将这些操作封装在一起TransformBlock,以使这些操作线程安全,而不是使用锁定机制.

但问题是,当我尝试并行写入5个文件时,有一个异常的内存,并且在使用此实现时它阻止了UI线程.该实现在Windows Phone项目中完成.请说明这个实现有什么问题.

文件IO操作

public static readonly IsolatedStorageFile _isolatedStore = IsolatedStorageFile.GetUserStoreForApplication();
public static readonly FileIO _file = new FileIO();
public static readonly ConcurrentExclusiveSchedulerPair taskSchedulerPair = new ConcurrentExclusiveSchedulerPair();
public static readonly ExecutionDataflowBlockOptions exclusiveExecutionDataFlow 
    = new ExecutionDataflowBlockOptions
{
    TaskScheduler = taskSchedulerPair.ExclusiveScheduler,
    BoundedCapacity = 1
};

public static readonly ExecutionDataflowBlockOptions concurrentExecutionDataFlow 
    = new ExecutionDataflowBlockOptions
{
    TaskScheduler = taskSchedulerPair.ConcurrentScheduler,
    BoundedCapacity = 1
};

public static async Task<T> LoadAsync<T>(string fileName)
{
    T result = default(T);

    var transBlock = new TransformBlock<string, T>
       (async fName …
Run Code Online (Sandbox Code Playgroud)

c# file-io multithreading dataflow tpl-dataflow

6
推荐指数
1
解决办法
499
查看次数

谷歌数据流管道中的数据存储输入是否可以一次处理一批N个条目?

我正在尝试执行数据流管道作业,该作业将从数据存储区一次N个条目执行一个函数.在我的情况下,此函数将一批100个条目作为有效负载发送到某些REST服务.这意味着我想要查看来自一个数据存储区实体的所有条目,并一次100个批处理条目发送到某些外部REST服务.

我目前的解决方案

  1. 从数据存储读取输入
  2. 创建与管道选项中指定的工作者一样多的键(1 worker = 1键).
  3. 按键分组,以便我们将迭代器作为输出(步骤4中的迭代器输入)
  4. 以编程方式批处理临时列表中的用户,并将它们作为批处理发送到REST端点.

上面描述的伪代码场景(忽略细节):

final int BATCH_SIZE = 100;

// 1. Read input from datastore
pipeline.apply(DatastoreIO.readFrom(datasetId, query))

    // 2. create keys to be used in group by so we get iterator in next task
    .apply(ParDo.of(new DoFn<DatastoreV1.Entity, KV<String, EntryPOJO>>() {
        @Override
        public void processElement(ProcessContext c) throws Exception {
            String key = generateKey(c);
            EntryPOJO entry = processEntity(c);
            c.output(KV.of(key, entry));
        }
    }))

    // 3. Group by key
    .apply(GroupByKey.create())

    // …
Run Code Online (Sandbox Code Playgroud)

dataflow google-cloud-datastore gcloud google-cloud-dataflow

6
推荐指数
1
解决办法
1092
查看次数

当消费者不堪重负时,如何让快速生产者暂停?

我在我的应用程序中使用TPL Dataflow实现了生产者/消费者模式.我有大数据流网格,其中有大约40个块.网格中有两个主要功能部分:生产者部分和消费者部分.生产者应该继续为消费者提供大量工作,而消费者有时会缓慢地处理传入的工作.当消费者忙于一些指定数量的工作项时,我想暂停生产者.否则,该应用程序会占用大量内存/ CPU,并且行为不可持续.

我制作了演示应用程序以演示此问题:

啮合

using System;
using System.Linq;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;

namespace DataflowTest
{
    class Program
    {
        static void Main(string[] args)
        {
            var options = new ExecutionDataflowBlockOptions
            {
                MaxDegreeOfParallelism = 4,
                EnsureOrdered = false
            };

            var boundedOptions = new ExecutionDataflowBlockOptions
            {
                MaxDegreeOfParallelism = 4,
                EnsureOrdered = false,
                BoundedCapacity = 5
            };

            var bufferBlock = new BufferBlock<int>(boundedOptions);
            var producerBlock = new TransformBlock<int, int>(x => x + 1, options);
            var broadcastBlock = new BroadcastBlock<int>(x => x, options);

            var consumerBlock = new …
Run Code Online (Sandbox Code Playgroud)

.net c# dataflow task-parallel-library tpl-dataflow

6
推荐指数
1
解决办法
806
查看次数

GCP数据流:从发布/订阅IO流式传输的系统延迟

我们使用“系统延迟”来检查我们的数据流作业的运行状况。例如,如果看到系统延迟增加,我们将尝试查看如何降低该指标。关于该指标几乎没有疑问。

  • 1)系统滞后到底意味着什么?

数据项等待处理的最长时间

以上是我们点击信息图标后在GCP控制台中看到的内容。在这种情况下,数据项是什么意思?流处理具有“窗口化”,事件时间与处理时间,水印等概念。什么时候考虑将某个项目等待处理?例如,仅仅是消息何时到达而不论其状态如何?

  • 2)此指标的最佳阈值是多少?

我们试图将这一指标保持在尽可能低的水平,但是对于将其保持在最低水平我们没有任何建议。例如,我们是否有一些建议,例如将系统延迟保持在20s到30s之间是最佳的。

  • 3)系统滞后如何影响汇

系统延迟如何影响事件本身的延迟?

streaming dataflow google-cloud-platform google-cloud-dataflow

6
推荐指数
1
解决办法
1743
查看次数

如何跳过SSIS数据流中的最后一行

我在我的数据流中使用FlatFile Source Manager- > Script COmponent as Trans- > OLEDB destination.

Source从平面文件中读取所有行,我想跳过更新数据库的最后一行(Trailer record).

由于它包含NULL值,因此数据库会引发错误.

请帮我解决这个问题.

问候,VHK

sql-server ssis etl dataflow ssis-2012

6
推荐指数
2
解决办法
5286
查看次数

数据流设置控制器服务帐户

我尝试为Dataflow设置控制器服务帐户。在我的数据流选项中,我有:

options.setGcpCredential(GoogleCredentials.fromStream(new FileInputStream("key.json")).createScoped(someArrays)); 
options.setServiceAccount("xxx@yyy.iam.gserviceaccount.com");
Run Code Online (Sandbox Code Playgroud)

但我得到:

WARNING: Request failed with code 403, performed 0 retries due to IOExceptions, performed 0 retries due to unsuccessful status codes, HTTP framework says request can be retried, (caller responsible for retrying): https://dataflow.googleapis.com/v1b3/projects/MYPROJECT/locations/MYLOCATION/jobs
Exception in thread "main" java.lang.RuntimeException: Failed to create a workflow job: (CODE): Current user cannot act as service account "xxx@yyy.iam.gserviceaccount.com. Causes: (CODE): Current user cannot act as service account "xxx@yyy.iam.gserviceaccount.com.
    at org.apache.beam.runners.dataflow.DataflowRunner.run(DataflowRunner.java:791)
    at org.apache.beam.runners.dataflow.DataflowRunner.run(DataflowRunner.java:173)
    at org.apache.beam.sdk.Pipeline.run(Pipeline.java:311)
    at org.apache.beam.sdk.Pipeline.run(Pipeline.java:297)
Run Code Online (Sandbox Code Playgroud)

...

Caused by: com.google.api.client.googleapis.json.GoogleJsonResponseException: 403 …
Run Code Online (Sandbox Code Playgroud)

dataflow google-cloud-platform google-cloud-dataflow google-cloud-iam

6
推荐指数
2
解决办法
1525
查看次数

Apache Beam 中的架构是什么?

我正在阅读有关 Apache BEAM 中的SCHEMAS 的文档,但我无法理解它的目的是什么、如何、为什么或在哪些情况下我应该使用它们。使用模式或使用扩展 Serialized 接口的类有什么区别?

该文档有一个示例:

@DefaultSchema(JavaFieldSchema.class)
public class TransactionPojo {
  public String bank;
  public double purchaseAmount;
}
Run Code Online (Sandbox Code Playgroud)
PCollection<TransactionPojos> transactionPojos = readTransactionsAsPojo();
Run Code Online (Sandbox Code Playgroud)

但它没有解释readTransactionsAsPojo函数是如何构建的。我认为对此有很多遗漏的解释。

dataflow apache-beam

5
推荐指数
1
解决办法
1589
查看次数