小编vic*_*tim的帖子

Kafka Streams如何使用包含不完整数据的分区?

Kafka Streams引擎将分区映射到一个工作者(即Java App),以便该分区中的所有消息都由该工作者处理.我有以下场景,并试图了解它是否仍然可行.

我有一个主题A(有3个分区).发送给它的消息由Kafka随机分区(即没有密钥).我发送给它的消息有一个如下的架构

{carModel: "Honda", color: "Red", timeStampEpoch: 14334343342}
Run Code Online (Sandbox Code Playgroud)

由于我有3个分区,并且消息在它们之间随机分区,因此可以将相同型号的汽车写入不同的分区.例如

P1
{carModel: "Honda", color: "Red", timeStampEpoch: 14334343342}
{carModel: "Honda", color: "Red", timeStampEpoch: 14334343342}
{carModel: "Toyota", color: "Blue", timeStampEpoch: 14334343342}

P2
{carModel: "Toyota", color: "Red", timeStampEpoch: 14334343342}
{carModel: "Honda", color: "Red", timeStampEpoch: 14334343342}
{carModel: "Nissan", color: "Blue", timeStampEpoch: 14334343342}

P3
{carModel: "Nissan", color: "Red", timeStampEpoch: 14334343342}
{carModel: "Honda", color: "Red", timeStampEpoch: 14334343342}
{carModel: "Nissan", color: "Blue", timeStampEpoch: 14334343342}
Run Code Online (Sandbox Code Playgroud)

现在让我们说我想计算carModel看到的汽车总数.我编写了一个Kafka Streams应用程序,它监听主题A,通过carModel映射消息,即

carStream.map((key, value) -> KeyValue.pair(value["carModel"], value))
Run Code Online (Sandbox Code Playgroud)

并将总数写入另一个主题B,即表单的消息 …

apache-kafka apache-kafka-streams

6
推荐指数
1
解决办法
1691
查看次数

Apache Flink 中是否有相当于 Kafka 的 KTable 的东西?

Apache Kafka 有一个 KTable 的概念,其中

其中每个数据记录代表一个更新

本质上,我可以使用一个 kafka 主题,并且只保留每个键的最新消息。

Apache Flink 中是否有类似的概念?我已经阅读了Flink 的 Table API,但似乎没有解决同样的问题。

一些帮助比较和对比这两个框架会很有帮助。我并不是在寻找哪个更好或更差。而是它们有何不同。正确的答案取决于我的要求。

apache-flink apache-kafka-streams

5
推荐指数
1
解决办法
1475
查看次数

使用RichAggregateFunction时出现Flink错误

我正在尝试在Flink中使用抽象RichAggregateFunction的实现。我希望它是“丰富的”,因为我需要将某些状态存储为聚合器的一部分,并且可以执行此操作,因为我可以访问运行时上下文。我的代码如下所示:

stream.keyBy(...)
   .window(GlobalWindows.create())
   .trigger(...)
   .aggregate(new MyRichAggregateFunction());
Run Code Online (Sandbox Code Playgroud)

但是,我得到一个UnsupportedOperationException说

此聚合函数不能是RichFunction。

我显然没有正确使用RichAggregateFunction。有任何如何正确使用它的示例吗?还是应该将ProcessFunction用于此类操作?

谢谢

java apache-flink flink-streaming

4
推荐指数
1
解决办法
469
查看次数

Flink错误-密钥组不在KeyGroupRange中

我正在使用RocksDB作为状态后端运行Flink图。对于图形中的一个联接运算符,我得到以下异常。(实际的组号当然因运行而异)。

java.lang.IllegalArgumentException:密钥组45不在KeyGroupRange {startKeyGroup = 0,endKeyGroup = 42}中。

我的接线员也不是如下

Source1 -----> Map1A ---> KeyBy--->\___ >
        \----> Map1B ---> KeyBy--->-----> Join1AB ---->
                                                \____>
Source2 ----->------------KeyBy---> -----------------> Join2,1AB ---->
Run Code Online (Sandbox Code Playgroud)

在Join2,1AB运算符中引发了错误,该运算符将(a)Join1AB的结果与(keyed)source2连接起来。

任何想法可能是什么原因造成的?我在下面有完整的stacktrace,并且我知道这仍然很模糊-但是任何朝着正确方向的指针都值得赞赏。

Caused by: java.lang.IllegalArgumentException: Key group 45 is not in KeyGroupRange{startKeyGroup=0, endKeyGroup=42}.
        at org.apache.flink.runtime.state.KeyGroupRangeOffsets.computeKeyGroupIndex(KeyGroupRangeOffsets.java:142)
        at org.apache.flink.runtime.state.KeyGroupRangeOffsets.setKeyGroupOffset(KeyGroupRangeOffsets.java:104)
        at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullSnapshotOperation.writeKVStateData(RocksDBKeyedStateBackend.java:664)
        at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullSnapshotOperation.writeDBSnapshot(RocksDBKeyedStateBackend.java:521)
        at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$3.performOperation(RocksDBKeyedStateBackend.java:417)
        at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$3.performOperation(RocksDBKeyedStateBackend.java:399)
        at org.apache.flink.runtime.io.async.AbstractAsyncIOCallable.call(AbstractAsyncIOCallable.java:72)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:40)
        at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:897)
        ... 5 more
    [CIRCULAR REFERENCE:java.lang.IllegalArgumentException: Key group 45 is not in KeyGroupRange{startKeyGroup=0, endKeyGroup=42}.]
Run Code Online (Sandbox Code Playgroud)

编辑:如果我将状态后端更改为文件系统(即FsStateBackend),则将得到以下堆栈跟踪。关键组索引问题。

java.lang.IllegalArgumentException: Key group index out of …
Run Code Online (Sandbox Code Playgroud)

apache-flink flink-streaming

4
推荐指数
1
解决办法
456
查看次数