标签: stream-processing

Apache Apex vs Apache Flink

由于两者都是流式框架,一次处理事件,这两种技术/流式框架之间的核心架构差异是什么?

还有哪些特定用例,哪一个比另一个更合适?

stream-processing apache-flink apache-apex

6
推荐指数
1
解决办法
2085
查看次数

为什么 Apache Flink 需要 Watermarks 来进行事件时间处理?

有人可以正确解释事件时间戳和水印。我从文档中理解它,但不是很清楚。现实生活中的例子或外行定义会有所帮助。另外,如果可以的话,请举一个例子(连同一些可以解释它的代码片段)。提前致谢

stream-processing apache-flink

6
推荐指数
1
解决办法
831
查看次数

流处理架构

我正在设计一个系统,其中有一个主要的对象流,并且有多个工人从该对象产生一些结果.最后,有一些特殊/独特的工作者(根据图论)的某种"接收器",它获取所有结果,并将它们处理成写入某个DB的最终对象.

工人可能依赖于其他一些工人的结果(因此,等待他们的结果)

现在,我面临几个问题:

  1. 可能是一名工人比另一名工人慢得多.你怎么处理那件事呢?添加更慢类型的更多工作者(=缩放)?(也许是动态的)
  2. 假设W_B依赖于W_A.如果W_B由于某种原因而关闭,则流程将停止,系统将停止工作.所以我想让系统以某种方式绕过这个工人.
  3. 此外,最终工作人员如何决定何时对结果进行操作?假设它有A和B的结果,但缺少C的结果.可能是C下降或者此刻它只是非常慢.它怎么能做出决定?

值得一提的是,它不是实时应用程序,而是离线处理系统(即您可以访问数据库并更改记录),但与此同时,它必须以"高速度"处理相对大量的对象".

关于技术,
我正在使用Java开发系统,但我并没有受到特定技术的限制.

如果你能帮助我完成系统的总体设计,我会很高兴的.

非常感谢!

java stream-processing bigdata system-design event-stream-processing

6
推荐指数
1
解决办法
122
查看次数

Flink Windows 边界、水印、事件时间戳和处理时间

问题定义和建立概念

比方说,我们有一个TumblingEventTimeWindow大小5分钟。我们有包含2 条基本信息的事件:

  • 数字
  • 事件时间戳

在这个例子中,我们在工作人员机器的挂钟时间下午 12:00启动我们的Flink拓扑(当然工作人员可能有不同步的时钟,但这超出了本问题的范围)。该拓扑包含一个处理运算符,其职责是汇总属于每个窗口的事件值和一个与此问题无关的 KAFKA Sink。

  • 这个窗口有一个BoundedOutOfOrdernessTimestampExtractor,允许延迟一分钟
  • 水印:据我所知,Flink 和 Spark Structured Stream 中的水印定义为(max-event-timestamp-seen-so-far - allowed-lateness)。任何事件时间戳小于或等于此水印的事件都将在结果计算中被丢弃和忽略。

第 1 部分(确定窗口的边界)

快乐(实时)路径

在这种情况下,几个事件到达Flink Operator,具有不同的事件时间戳12:01 - 12:09。此外,事件时间戳与我们的处理时间相对一致(如下面的 X 轴所示)。由于我们正在处理EVENT_TIME特性,因此应通过其事件时间戳来确定偶数是否属于特定事件。

在此处输入图片说明

旧数据涌入

在那个流程中,我假设两个翻滚窗口边界是并且仅仅因为我们在12:00开始执行拓扑。如果这个假设是正确的(我希望不是),那么在回填情况下会发生什么,其中几个事件带有更旧的事件时间戳,并且我们在12:00再次启动了拓扑?(足够老,我们的迟到津贴不包括他们)。类似于以下内容:12:00 -- …

watermark stream-processing apache-flink flink-streaming

6
推荐指数
1
解决办法
1060
查看次数

有效地将结果流存储在多个表中,每个项目都带有乐观锁定

给定一个包含大量项目的结果流,我想存储它们并处理潜在的并发冲突:

public void onTriggerEvent(/* params */) {
  Stream<Result> results = customThreadPool.submit(/*...complex parallel computation on multiple servers...*/).get();
  List<Result> conflicts = store(results);
  resolveConflictsInNewTransaction(conflicts);
}
Run Code Online (Sandbox Code Playgroud)

被困在如何store(...)有效地实施。在Result由描述数据,需要在其各自的数据库表被更新这两件不更改和拆卸对象。

@Value
public static class Result {
    A a; // describes update for row in table a
    B b; // describes update for row in table b
}
Run Code Online (Sandbox Code Playgroud)

A并且B每个引用两个用户,其中(u1, u2)是各自数据库表上的一个键。

@Value
public static class A {
    long u1;
    long u2;
   // ... computed data fields ...
} …
Run Code Online (Sandbox Code Playgroud)

java postgresql spring hibernate stream-processing

6
推荐指数
1
解决办法
173
查看次数

拉米娜与风暴

我正在设计一个原型实时监视器,用于处理相当大量(> 30G /天)的流数字数据.我想在Clojure中写这个,因为该语言似乎非常适合那种"观察者+状态机"系统,这可能最终会成为.

我找到的两个主要候选人是Lamina和Storm.还有Riemann和Pulse,但前者似乎更像是一个完整的解决方案,而不是一个框架,我宁愿不承诺最终的设计; Pulse的回购看起来有点不受维护?

我想知道的是; 这两个项目针对哪些数据和工作流程进行了优化?风暴似乎更成熟,但Lamina似乎更具有组合性和"Clojureic"(我的背景是Python,所以我倾向于高度评价).

我在网上看到的内容:

  • Storm似乎是以Big Data(流)为重点,核心是带有Clojure DSL的直接Java.它似乎具有针对许多现有数据源的pre = built处理程序.

  • Lamina更像是一个轻量级,可重用的组件,可以对Clojure进行编码以实现抽象,这意味着它可以作为其他事件系统的基础重用.数据源需要在代码中处理.

  • 两者都有一组有用的聚合/分裂/计算库函数.Lamina的graphviz集成是一个不错的选择.

events clojure stream-processing lamina-clojure apache-storm

5
推荐指数
2
解决办法
785
查看次数

如何用 C++ 编写自定义流转换?

在大量使用 Haskell 和函数式语言之后,我开始学习 C++,我发现我一直在尝试解决同样的问题:

  • 从输入流中读取一些数据
  • 根据特定算法对它们进行标记
  • 处理令牌

如果这是 Haskell,我可以简单地利用一切都是懒惰的事实,并在我想到的时候编写我的转换,然后它会在下游被消耗时应用。甚至有一些库可以执行这种精确的模式(导管管道)。

假设我想获取序列1 2 3 4 5 6 ...和输出12 34 56 ...。我可以了解如何编写在流上运行并就地处理数据的临时代码。但我想知道是否存在一种抽象机制,允许我通过转换来自另一个流的数据(以任何可以想到的方式)来构建新的流。这种抽象应该允许我在处理数据时缓冲数据,而不仅仅是单个元素到新值的简单映射。

以下是限制:

  • 除了 stdlib 之外,我无法使用任何其他库。
  • 它必须适用于 C++03(意味着没有 C++11 功能。)

如果你在想,这是作业吗?好吧,我收到了很多类作业,这些作业要求我处理数据流(这就是没有库和 C++03 限制的原因)。并不是我不知道如何使用while循环来做到这一点,而是我想知道 stl 中是否存在现有的流抽象,只是等待被发现和使用。

但如果唯一的方法是使用 C++11,那么我想知道。

c++ stream stream-processing c++03 c++98

5
推荐指数
1
解决办法
1107
查看次数

通过从Graph创建Source,Akka-stream UnsupportedOperationException

我正在尝试使用*subFlows连接流.因此,我从广播的出口建立了一个来源.但它抛出了一个UnsupportedOperationException: cannot replace the shape of the EmptyModule.我试图谷歌这个例外,但我找不到类似的东西.

在这里我的代码

val aggFlow = Flow.fromGraph(GraphDSL.create() { implicit builder =>
    val broadcast = builder.add(Broadcast[MonitoringMetricEvent](2))
    val bc = builder.add(Broadcast[Long](1))

    val zip = builder.add(ZipWith[StreamMeasurement, Long, (StreamMeasurement, Long)]((value, ewma) => (value, ewma)))
    val merge = builder.add(Merge[Seq[StreamMeasurement]](1))


    broadcast.out(1) ~> identityFlow ~> maxFlow ~> bc

    val source = Source.fromGraph(GraphDSL.create() { implicit bl =>
      SourceShape(bc.out(0))
    })

    broadcast.out(0) ~> identityFlow ~> topicFlow.groupBy(MAX_SUB_STREAMS, _._1)
        .map(_._2)
        .zip[Long](source)
        .takeWhile(deciderFunction)
        .map(_._1)
        .fold[Seq[StreamMeasurement]](Seq.empty[StreamMeasurement])((seq, sm) => seq:+sm)
      .mergeSubstreams ~>  merge



    FlowShape(broadcast.in, merge.out)
  })
Run Code Online (Sandbox Code Playgroud)

在这里得到的例外情况: …

scala stream-processing akka akka-stream

5
推荐指数
0
解决办法
182
查看次数

在 Flink 中的操作员之间共享状态

我想知道在 Flink 中是否可以在运营商之间共享状态。

例如,假设我在操作符上按键进行分区,并且我需要分A区内的一段分区状态C(出于任何原因)(图 1.a),或者我需要C下游操作符中的操作符状态F(图 1 .b)

在此处输入图片说明

我知道可以broadcast记录到所有分区。因此,如果您在记录中包含操作符的内部状态,则可以与下游操作符共享您的内部状态。
然而,这可能是一个昂贵的操作,而不是简单地让op1具体请求op2状态。

最近围绕可查询状态的发展是否朝着这个概念发展,或者它们只是为了让外部用户查询拓扑的内部状态?

预先感谢您的见解

dataflow stream-processing apache-flink

5
推荐指数
1
解决办法
1498
查看次数

jq Streaming - 过滤嵌套列表并保留全局结构

在一个大型 json 文件中,我想从嵌套列表中删除一些元素,但保留文档的整体结构。

我的示例将其输入为(但真实的输入足够大以要求流式传输)。

{
  "keep_untouched": {
    "keep_this": [
      "this",
      "list"
    ]
  },
  "filter_this":
  [
    {"keep" : "true"},
    {
      "keep": "true",
      "extra": "keeper"
    } ,
    {
      "keep": "false",
      "extra": "non-keeper"
    }
  ]
}
Run Code Online (Sandbox Code Playgroud)

所需的输出仅删除了“filter_this”块的一个元素:

{
  "keep_untouched": {
    "keep_this": [
      "this",
      "list"
    ]
  },
  "filter_this":
  [
    {"keep" : "true"},
    {
      "keep": "true",
      "extra": "keeper"
    } ,
  ]
}
Run Code Online (Sandbox Code Playgroud)

处理此类情况的标准方法似乎是使用“truncate_stream”来重构流对象,然后再以通常的 jq 方式过滤这些对象。具体来说,命令:

jq -nc --stream 'fromstream(1|truncate_stream(inputs))' 
Run Code Online (Sandbox Code Playgroud)

提供对对象流的访问:

{"keep_this":["this","list"]}
[{"keep":"true"},{"keep":"true","extra":"keeper"}, 
 {"keep":"false","extra":"non-keeper"}]
Run Code Online (Sandbox Code Playgroud)

此时很容易过滤所需的对象。但是,这会从其父对象的上下文中剥离结果,这不是我想要的。

查看流式结构:

[["keep_untouched","keep_this",0],"this"]
[["keep_untouched","keep_this",1],"list"]
[["keep_untouched","keep_this",1]]
[["keep_untouched","keep_this"]]
[["filter_this",0,"keep"],"true"]
[["filter_this",0,"keep"]]
[["filter_this",1,"keep"],"true"]
[["filter_this",1,"extra"],"keeper"]
[["filter_this",1,"extra"]]
[["filter_this",2,"keep"],"false"] …
Run Code Online (Sandbox Code Playgroud)

json stream-processing bigdata jq

5
推荐指数
1
解决办法
1794
查看次数