标签: stream-processing

Kafka Streams 在处理时间窗口内排序

我想知道是否有任何方法可以使用 Kafka Streams DSL 或 Processor API 对窗口内的记录进行排序。

以以下情况为例(任意一种,但与我需要的类似):

  1. 有一些事件的 Kafka 主题,比如说用户点击。假设主题有 10 个分区。消息按键进行分区,但每个键都是唯一的,因此它是一种随机分区。每条记录都包含一个用户 ID,稍后会使用该用户 ID 对流进行重新分区。

  2. 我们消费流,并将每条消息发布到另一个主题,根据用户 ID 对记录进行分区(按用户 ID 重新分区原始流)。

  3. 然后我们消费这个重新分区的流,我们将消费的记录存储在窗口化 10 分钟的本地状态存储中。一个特定用户的所有点击总是在同一个分区,但顺序没有保证,因为原始主题有10个分区。

  4. 我了解Kafka Streams的窗口模型,当新记录进来时,时间会提前,但我需要这个窗口使用处理时间,而不是事件时间,然后当窗口过期时,我需要能够对缓冲进行排序事件,并按顺序将它们发送到另一个主题。

注意:

  1. 我们需要能够使用处理时间而不是事件时间来刷新/处理窗口内的记录。我们不能等待下一次点击来提前时间,因为它可能永远不会发生。

  2. 我们需要从商店中删除所有记录,尽快对窗口进行排序和刷新。

  3. 如果应用程序崩溃,我们需要恢复(在应用程序的同一个或另一个实例中)并处理所有未处理的窗口,而不需要等待特定用户的新记录。

我知道 Kafka Streams 1.0.0 允许在处理 API 中使用挂钟时间,但我不确定实现我需要的正确方法是什么(更重要的是考虑到上述恢复过程要求)。

stream-processing apache-kafka apache-kafka-streams

5
推荐指数
1
解决办法
3943
查看次数

流媒体:滚动窗口与微批处理

流处理中 5 秒的滚动窗口与微批处理时 5 秒的微批次有何不同?两者都有一个 5 秒的非重叠窗口,在此期间它们处理记录,然后继续前进。

据我所知,流处理中有一个时间概念:事件、摄取和处理时间。我们是否可以推断使用微批处理的流处理只不过是使用具有摄取时间或处理时间的滚动窗口的流处理?

stream-processing apache-spark spark-streaming apache-flink flink-streaming

5
推荐指数
1
解决办法
956
查看次数

无法连接到 id 为 1 的节点:[Worker]:错误:ConnectionError('没有连接到 id 为节点')

我正在尝试使用 robinhood / faust 但没有成功!

我已经创建了一个生产者,它成功地插入到我的 confluence-kafka localhost 实例中的原始主题中!

但 faust 无法连接到本地主机。

我的应用程序.py:

import faust
import base64
import random
from datetime import datetime


SOURCE_TOPIC="input_msgs"
TARGET_TOPIC="output_msgs"

app = faust.App("messages-stream", 
    broker="kafka://"+'localhost:9092',
    topic_partitions=1,
    store="memory://")

class OriginalMessage(faust.Record):
    msg: str


class TransformedMessage(faust.Record):
    msg_id: int
    msg_data: str
    msg_base64: str
    created_at: float 
    source_topic: str
    target_topic: str
    deleted: bool

topic = app.topic(SOURCE_TOPIC, value_type=OriginalMessage)
out_topic = app.topic(TARGET_TOPIC, partitions=1)

table = app.Table(
    "output_msgs",
    default=TransformedMessage,
    partitions=1,
    changelog_topic=out_topic,
)

print('Initializing Thread Processor...')


@app.agent(topic)
async def transformedmessage(messageevents):
    async for transfmessage in messageevents:
        try: …
Run Code Online (Sandbox Code Playgroud)

python stream-processing faust

5
推荐指数
1
解决办法
5388
查看次数


创建规则引擎的最佳设计模式

假设我必须设计一个规则引擎,根据静态配置规则,责任链在运行时发生变化。实现这个问题的最佳设计模式是什么?

例如,根据某些配置,流中的一组事件可以 (1) 过滤,(2) 划分为子集 (3) 修改

对于每个流,客户可以选择全部三个或三个中的一些或三个都不选择。

因此,我的组装管道应该在运行时根据每个流的配置进行配置。

哪个是实现这一目标的最佳设计模式?

java design-patterns rule-engine stream-processing

4
推荐指数
1
解决办法
3732
查看次数

Apache Flink 状态存储与 Kafka Streams

据我所知,Kafka Streams 会在内存、磁盘或 Kafka 主题中本地处理其状态,因为所有输入日期都来自分区,其中所有消息都由定义的值作为键控。大多数时候,计算可以在不知道其他处理器的状态的情况下完成。如果是这样,您就有另一个 Streams 实例来计算结果。就像这张照片一样:

在此输入图像描述

Flink 的状态到底存储在哪里?Flink 是否也可以在本地存储状态,或者总是将它们发布到所有实例(任务)?是否可以配置 Flink 以将状态存储在 Kafka Broker 中?

stream-processing apache-kafka apache-flink apache-kafka-streams

4
推荐指数
1
解决办法
1581
查看次数

kappa-architecture和lambda-architecture之间有什么区别

如果Kappa-Architecture直接对流进行分析而不是将数据分成两个流,那么数据存储在哪里,就像Kafka这样的消息系统?或者它可以在数据库中重新计算?

与使用流处理引擎重新计算以进行批量分析相比,单独的批处理层是否更快?

stream-processing bigdata batch-processing apache-kafka lambda-architecture

3
推荐指数
1
解决办法
1571
查看次数

Apache Flink:ProcessWindowFunction不适用

我想ProcessWindowFunction在我的Apache Flink项目中使用。但是使用过程函数时出现一些错误,请参见下面的代码片段

错误是:

在类型WindowedStream,元组,TimeWindow>的方法处理(ProcessWindowFunction,R,元组,TimeWindow>)是不适用的参数(JDBCExample.MyProcessWindows)

我的程序:

DataStream<Tuple2<String, JSONObject>> inputStream;

inputStream = env.addSource(new JsonArraySource());

inputStream.keyBy(0)
  .window(TumblingEventTimeWindows.of(Time.minutes(10)))
  .process(new MyProcessWindows());
Run Code Online (Sandbox Code Playgroud)

我的ProcessWindowFunction

private class MyProcessWindows 
  extends ProcessWindowFunction<Tuple2<String, JSONObject>, Tuple2<String, String>, String, Window>
{

  public void process(
      String key, 
      Context context, 
      Iterable<Tuple2<String, JSONObject>> input, 
      Collector<Tuple2<String, String>> out) throws Exception 
  {
    ...
  }

}
Run Code Online (Sandbox Code Playgroud)

stream-processing apache-flink flink-streaming

3
推荐指数
1
解决办法
478
查看次数

Flink窗口状态大小和状态管理

在阅读了flink的文档并四处搜寻之后,我无法完全理解flink的窗口中如何处理状态。可以说我有一个每小时运行的带有聚合函数的滚动窗口,该函数将msg累积到某些Java pojo或scala case类中。该窗口的大小将与在一小时内进入该窗口的事件的数量相关,或者仅将其与pojo / case类相关,因为会将事件累积到该对象中。(例如,如果将10000 msgs计数为整数,大小将接近10000 * msg大小还是int的大小?)此外,如果im使用pojos或case类,flink是否会为我处理状态(如果内存溢出到磁盘)在检查点用尽/保存状态等)还是我必须使用flink的状态对象?

谢谢你的帮助!

stream-processing apache-flink

3
推荐指数
1
解决办法
363
查看次数

Apache Flink 中的有状态函数

我研究了 Apache Flink 的新 Stateful Functions 2.0 API。我阅读了以下文档链接https://ci.apache.org/projects/flink/flink-statefun-docs-stable/。我还在 Git 存储库中运行了示例。( https://github.com/ververica/stateful-functions/tree/master/stateful-functions-examples )\n我对实现有几个问题。

\n\n

https://flink.apache.org/stateful-functions.html --> 页面末尾有一个示例,用于欺诈检测的交易评分。

\n\n

第一个问题是关于状态 TTL 的。我如何向 TTL 提供状态?示例表示:30 天后,\xe2\x80\x9cFraud Count\xe2\x80\x9d 函数将收到一条过期消息(来自其自身)并清除其状态。我应该阅读本手册还是还有其他功能?我该如何做这本手册?

\n\n

关于keyedstream的第二个问题。示例表示: \xe2\x80\x9cFraud Count\xe2\x80\x9d 的多个实例将存在 \xe2\x80\x94 例如,每个客户帐户一个。我应该给 赋值PersistedTable<K,V>吗?例如<customerid,count>。我可以清除特定键的状态吗?

\n\n

最后一个问题是关于窗口和水印的。如何将这些功能实现到 Stateful Functions 2.0?

\n

state stream-processing apache-flink flink-streaming flink-statefun

3
推荐指数
1
解决办法
1978
查看次数