由于两者都是流式框架,一次处理事件,这两种技术/流式框架之间的核心架构差异是什么?
还有哪些特定用例,哪一个比另一个更合适?
有人可以正确解释事件时间戳和水印。我从文档中理解它,但不是很清楚。现实生活中的例子或外行定义会有所帮助。另外,如果可以的话,请举一个例子(连同一些可以解释它的代码片段)。提前致谢
我正在设计一个系统,其中有一个主要的对象流,并且有多个工人从该对象产生一些结果.最后,有一些特殊/独特的工作者(根据图论)的某种"接收器",它获取所有结果,并将它们处理成写入某个DB的最终对象.
工人可能依赖于其他一些工人的结果(因此,等待他们的结果)
现在,我面临几个问题:
值得一提的是,它不是实时应用程序,而是离线处理系统(即您可以访问数据库并更改记录),但与此同时,它必须以"高速度"处理相对大量的对象".
关于技术,
我正在使用Java开发系统,但我并没有受到特定技术的限制.
如果你能帮助我完成系统的总体设计,我会很高兴的.
非常感谢!
java stream-processing bigdata system-design event-stream-processing
比方说,我们有一个TumblingEventTimeWindow与大小5分钟。我们有包含2 条基本信息的事件:
在这个例子中,我们在工作人员机器的挂钟时间下午 12:00启动我们的Flink拓扑(当然工作人员可能有不同步的时钟,但这超出了本问题的范围)。该拓扑包含一个处理运算符,其职责是汇总属于每个窗口的事件值和一个与此问题无关的 KAFKA Sink。
在这种情况下,几个事件到达Flink Operator,具有不同的事件时间戳12:01 - 12:09。此外,事件时间戳与我们的处理时间相对一致(如下面的 X 轴所示)。由于我们正在处理EVENT_TIME特性,因此应通过其事件时间戳来确定偶数是否属于特定事件。
在那个流程中,我假设两个翻滚窗口的边界是并且仅仅因为我们在12:00开始执行拓扑。如果这个假设是正确的(我希望不是),那么在回填情况下会发生什么,其中几个旧事件带有更旧的事件时间戳,并且我们在12:00再次启动了拓扑?(足够老,我们的迟到津贴不包括他们)。类似于以下内容:12:00 -- …
给定一个包含大量项目的结果流,我想存储它们并处理潜在的并发冲突:
public void onTriggerEvent(/* params */) {
Stream<Result> results = customThreadPool.submit(/*...complex parallel computation on multiple servers...*/).get();
List<Result> conflicts = store(results);
resolveConflictsInNewTransaction(conflicts);
}
Run Code Online (Sandbox Code Playgroud)
我被困在如何store(...)有效地实施。在Result由描述数据,需要在其各自的数据库表被更新这两件不更改和拆卸对象。
@Value
public static class Result {
A a; // describes update for row in table a
B b; // describes update for row in table b
}
Run Code Online (Sandbox Code Playgroud)
A并且B每个引用两个用户,其中(u1, u2)是各自数据库表上的一个键。
@Value
public static class A {
long u1;
long u2;
// ... computed data fields ...
} …Run Code Online (Sandbox Code Playgroud) 我正在设计一个原型实时监视器,用于处理相当大量(> 30G /天)的流数字数据.我想在Clojure中写这个,因为该语言似乎非常适合那种"观察者+状态机"系统,这可能最终会成为.
我找到的两个主要候选人是Lamina和Storm.还有Riemann和Pulse,但前者似乎更像是一个完整的解决方案,而不是一个框架,我宁愿不承诺最终的设计; Pulse的回购看起来有点不受维护?
我想知道的是; 这两个项目针对哪些数据和工作流程进行了优化?风暴似乎更成熟,但Lamina似乎更具有组合性和"Clojureic"(我的背景是Python,所以我倾向于高度评价).
我在网上看到的内容:
Storm似乎是以Big Data(流)为重点,核心是带有Clojure DSL的直接Java.它似乎具有针对许多现有数据源的pre = built处理程序.
Lamina更像是一个轻量级,可重用的组件,可以对Clojure进行编码以实现抽象,这意味着它可以作为其他事件系统的基础重用.数据源需要在代码中处理.
两者都有一组有用的聚合/分裂/计算库函数.Lamina的graphviz集成是一个不错的选择.
events clojure stream-processing lamina-clojure apache-storm
在大量使用 Haskell 和函数式语言之后,我开始学习 C++,我发现我一直在尝试解决同样的问题:
如果这是 Haskell,我可以简单地利用一切都是懒惰的事实,并在我想到的时候编写我的转换,然后它会在下游被消耗时应用。甚至有一些库可以执行这种精确的模式(导管和管道)。
假设我想获取序列1 2 3 4 5 6 ...和输出12 34 56 ...。我可以了解如何编写在流上运行并就地处理数据的临时代码。但我想知道是否存在一种抽象机制,允许我通过转换来自另一个流的数据(以任何可以想到的方式)来构建新的流。这种抽象应该允许我在处理数据时缓冲数据,而不仅仅是单个元素到新值的简单映射。
以下是限制:
如果你在想,这是作业吗?好吧,我收到了很多类作业,这些作业要求我处理数据流(这就是没有库和 C++03 限制的原因)。并不是我不知道如何使用while循环来做到这一点,而是我想知道 stl 中是否存在现有的流抽象,只是等待被发现和使用。
但如果唯一的方法是使用 C++11,那么我想知道。
我正在尝试使用*subFlows连接流.因此,我从广播的出口建立了一个来源.但它抛出了一个UnsupportedOperationException: cannot replace the shape of the EmptyModule.我试图谷歌这个例外,但我找不到类似的东西.
在这里我的代码
val aggFlow = Flow.fromGraph(GraphDSL.create() { implicit builder =>
val broadcast = builder.add(Broadcast[MonitoringMetricEvent](2))
val bc = builder.add(Broadcast[Long](1))
val zip = builder.add(ZipWith[StreamMeasurement, Long, (StreamMeasurement, Long)]((value, ewma) => (value, ewma)))
val merge = builder.add(Merge[Seq[StreamMeasurement]](1))
broadcast.out(1) ~> identityFlow ~> maxFlow ~> bc
val source = Source.fromGraph(GraphDSL.create() { implicit bl =>
SourceShape(bc.out(0))
})
broadcast.out(0) ~> identityFlow ~> topicFlow.groupBy(MAX_SUB_STREAMS, _._1)
.map(_._2)
.zip[Long](source)
.takeWhile(deciderFunction)
.map(_._1)
.fold[Seq[StreamMeasurement]](Seq.empty[StreamMeasurement])((seq, sm) => seq:+sm)
.mergeSubstreams ~> merge
FlowShape(broadcast.in, merge.out)
})
Run Code Online (Sandbox Code Playgroud)
在这里得到的例外情况: …
我想知道在 Flink 中是否可以在运营商之间共享状态。
例如,假设我在操作符上按键进行分区,并且我需要分A区内的一段分区状态C(出于任何原因)(图 1.a),或者我需要C下游操作符中的操作符状态F(图 1 .b)
我知道可以broadcast记录到所有分区。因此,如果您在记录中包含操作符的内部状态,则可以与下游操作符共享您的内部状态。
然而,这可能是一个昂贵的操作,而不是简单地让op1具体请求op2状态。
最近围绕可查询状态的发展是否朝着这个概念发展,或者它们只是为了让外部用户查询拓扑的内部状态?
预先感谢您的见解
在一个大型 json 文件中,我想从嵌套列表中删除一些元素,但保留文档的整体结构。
我的示例将其输入为(但真实的输入足够大以要求流式传输)。
{
"keep_untouched": {
"keep_this": [
"this",
"list"
]
},
"filter_this":
[
{"keep" : "true"},
{
"keep": "true",
"extra": "keeper"
} ,
{
"keep": "false",
"extra": "non-keeper"
}
]
}
Run Code Online (Sandbox Code Playgroud)
所需的输出仅删除了“filter_this”块的一个元素:
{
"keep_untouched": {
"keep_this": [
"this",
"list"
]
},
"filter_this":
[
{"keep" : "true"},
{
"keep": "true",
"extra": "keeper"
} ,
]
}
Run Code Online (Sandbox Code Playgroud)
处理此类情况的标准方法似乎是使用“truncate_stream”来重构流对象,然后再以通常的 jq 方式过滤这些对象。具体来说,命令:
jq -nc --stream 'fromstream(1|truncate_stream(inputs))'
Run Code Online (Sandbox Code Playgroud)
提供对对象流的访问:
{"keep_this":["this","list"]}
[{"keep":"true"},{"keep":"true","extra":"keeper"},
{"keep":"false","extra":"non-keeper"}]
Run Code Online (Sandbox Code Playgroud)
此时很容易过滤所需的对象。但是,这会从其父对象的上下文中剥离结果,这不是我想要的。
查看流式结构:
[["keep_untouched","keep_this",0],"this"]
[["keep_untouched","keep_this",1],"list"]
[["keep_untouched","keep_this",1]]
[["keep_untouched","keep_this"]]
[["filter_this",0,"keep"],"true"]
[["filter_this",0,"keep"]]
[["filter_this",1,"keep"],"true"]
[["filter_this",1,"extra"],"keeper"]
[["filter_this",1,"extra"]]
[["filter_this",2,"keep"],"false"] …Run Code Online (Sandbox Code Playgroud) apache-flink ×4
bigdata ×2
java ×2
akka ×1
akka-stream ×1
apache-apex ×1
apache-storm ×1
c++ ×1
c++03 ×1
c++98 ×1
clojure ×1
dataflow ×1
events ×1
hibernate ×1
jq ×1
json ×1
postgresql ×1
scala ×1
spring ×1
stream ×1
watermark ×1