Kafka Streams内置了哪些功能,允许将单个输入流动态连接到多个输出流?KStream.branch允许基于真/假谓词的分支,但这不是我想要的.我希望每个传入日志确定它将在运行时{"date": "2017-01-01"}流式传输到的主题,例如,日志将流式传输到主题topic-2017-01-01,日志{"date": "2017-01-02"}将流式传输到主题topic-2017-01-02.
我可以调用forEach流,然后写一个Kafka制作人,但这似乎并不优雅.在Streams框架中有更好的方法吗?
有没有办法用Kafka Stream手动提交?
通常使用KafkaConsumer,我做类似下面的事情:
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records){
// process records
}
consumer.commitAsync();
}
Run Code Online (Sandbox Code Playgroud)
我手动调用提交的地方.我没有看到类似的API KStream.
Kafka Streams应用程序只能与此配置值指定的单个Kafka群集通信.未来版本的Kafka Streams将支持连接到不同的Kafka集群,以读取输入流和编写输出流.
这是否意味着我的整个应用程序只能连接到单个Kafka群集,或者每个KafkaStreams实例只能连接到一个群集?
我可以创建多个具有连接到不同群集的不同属性的KafkaStreams实例吗?
如何从单个主题创建多个流?当我做这样的事情时:
KStreamBuilder builder = new KStreamBuilder();
builder.stream(Serdes.String(), Serdes.String(), "master")
/* Filtering logic */
.to(Serdes.String(), Serdes.String(), "output1");
builder.stream(Serdes.String(), Serdes.String(), "master")
/* Filtering logic */
.to(Serdes.String(), Serdes.String(), "output2");
KafkaStreams streams = new KafkaStreams(builder, /* config */);
streams.start();
Run Code Online (Sandbox Code Playgroud)
我收到以下错误:
org.apache.kafka.streams.errors.TopologyBuilderException: Invalid topology building: Topic master has already been registered by another source.
at org.apache.kafka.streams.processor.TopologyBuilder.addSource(TopologyBuilder.java:347)
at org.apache.kafka.streams.kstream.KStreamBuilder.stream(KStreamBuilder.java:92)
Run Code Online (Sandbox Code Playgroud)
我是否需要为"master"中的每个流创建另一个KafkaStream实例?
我正在使用 kafka 版本 2.4.1(最近从 2.2.0 升级到 2.4.1)并注意到一个奇怪的问题。
即使应用程序(kafka 流)已关闭(没有正在运行的应用程序),但消费者组命令将状态返回为重新平衡。我们的应用程序作为 kubernetes pod 运行。
root@bastion-0:# ./kafka-consumer-groups --describe --group groupname --bootstrap-server kafka-0.local:9094
Warning: Consumer group 'groupname' is rebalancing.
Run Code Online (Sandbox Code Playgroud)
我现在已经等了一段时间(30 分钟),即使应用程序关闭,命令仍然报告“重新平衡”。
即使我尝试删除该组,它也会给出以下消息。
root@bastion-0:/app/kafka_2.12-2.4.1/bin# ./kafka-consumer-groups.sh --delete --group group1 --bootstrap-server kafka.local:9094
Error: Deletion of some consumer groups failed:
* Group 'group1' could not be deleted due to: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.GroupNotEmptyException: The group is not empty.
root@bastion-0:/app/kafka_2.12-2.4.1/bin# ./kafka-consumer-groups.sh --delete --group group2 --bootstrap-server kafka.local:9094
Error: Deletion of some consumer groups failed:
* Group 'group2' could not be deleted due …Run Code Online (Sandbox Code Playgroud) 我有2个Kafka主题从不同来源流式传输完全相同的内容,因此我可以获得高可用性,以防其中一个来源失败.我正在尝试使用Kafka Streams 0.10.1.0将2个主题合并为1个输出主题,这样我就不会错过任何有关失败的消息,并且在所有源都启动时没有重复.
使用leftJoinKStream 的方法时,其中一个主题可以没有问题(次要主题),但是当主要主题发生故障时,不会向输出主题发送任何内容.这似乎是因为,根据Kafka Streams开发人员指南,
KStream-KStream leftJoin始终由来自主流的记录驱动
因此,如果没有来自主流的记录,它将不会使用辅助流中的记录,即使它们存在.主流重新联机后,输出将恢复正常.
我也尝试使用outerJoin(添加重复记录),然后转换为KTable和groupByKey以消除重复,
KStream mergedStream = stream1.outerJoin(stream2,
(streamVal1, streamVal2) -> (streamVal1 == null) ? streamVal2 : streamVal1,
JoinWindows.of(2000L))
mergedStream.groupByKey()
.reduce((value1, value2) -> value1, TimeWindows.of(2000L), stateStore))
.toStream((key,value) -> value)
.to(outputStream)
Run Code Online (Sandbox Code Playgroud)
但我偶尔也会重复一遍.我也commit.interval.ms=200用来让KTable经常发送到输出流.
接近此合并以从多个相同的输入主题获得一次输出的最佳方法是什么?
在Kafka Stream库中,我想知道KTable和GlobalKTable之间的区别.
同样在KStream类中,有两种方法leftJoin()和outerJoin().这两种方法有什么区别呢?
我读过KStream.leftJoin,但没有找到确切的区别.
这些实体之间有什么区别?
我认为,KTable-具有compaction删除策略的简单kafka主题。另外,如果为KTable启用了日志记录,那么还将有changelog,然后删除策略为compaction,delete。
本地存储-基于RockDB的内存中键值缓存。但是本地商店也有一个变更日志。
在这两种情况下,我们都将在特定时间段(?)中获得密钥的最后一个值。本地存储用于聚合步骤,联接等。但是,紧随其后的还有创建具有压缩策略的新主题。
例如:
KStream<K, V> source = builder.stream(topic1);
KTable<K, V> table = builder.table(topic2); // what will happen here if i read data from topic with deletion policy delete and compaction? Will additional topic be created for store data or just a local store (cache) be used for it?
// or
KTable<K, V> table2 = builder.table(..., Materialized.as("key-value-store-name")) // what will happen here? As i think, i just specified a concrete name for local store and …Run Code Online (Sandbox Code Playgroud) 我们正在将kafka代理设置与使用Spring云流kafka运行的kafka stream应用程序结合使用。尽管看起来运行良好,但在日志中确实出现了以下错误语句:
2019-02-21 22:37:20,253 INFO kafka-coordinator-heartbeat-thread | anomaly-timeline org.apache.kafka.clients.FetchSessionHandler [Consumer clientId=anomaly-timeline-56dc4481-3086-4359-a8e8-d2dae12272a2-StreamThread-1-consumer, groupId=anomaly-timeline] Node 2 was unable to process the fetch request with (sessionId=1290440723, epoch=2089): INVALID_FETCH_SESSION_EPOCH.
Run Code Online (Sandbox Code Playgroud)
我搜索了互联网,但是关于此错误的信息不多。我猜想这可能与经纪人和使用者之间的时间设置有所不同,但是两台机器的时间服务器设置相同。
知道如何解决吗?
我试图找出我的kafka-streams应用程序内存不足的原因。我已经发现RocksDB消耗了大量本机内存,我尝试使用以下配置对其进行限制:
# put index and filter blocks in blockCache to avoid letting them grow unbounded (https://github.com/facebook/rocksdb/wiki/Block-Cache#caching-index-and-filter-blocks)
cache_index_and_filter_blocks = true;
# avoid evicting L0 cache of filter and index blocks to reduce performance impact of putting them in the blockCache (https://github.com/facebook/rocksdb/wiki/Block-Cache#caching-index-and-filter-blocks)
pinL0FilterAndIndexBlocksInCache=true
# blockCacheSize should be 1/3 of total memory available (https://github.com/facebook/rocksdb/wiki/Setup-Options-and-Basic-Tuning#block-cache-size)
blockCacheSize=1350 * 1024 * 1024
# use larger blockSize to reduce index block size (https://github.com/facebook/rocksdb/wiki/RocksDB-Tuning-Guide#difference-of-spinning-disk)
blockSize=256 * 1024
Run Code Online (Sandbox Code Playgroud)
但内存使用量似乎仍然无限增长,我的容器最终被 OOMKilled。
我使用 jemalloc 来分析内存使用情况(如此处所述),结果清楚地表明 …