我一直在尝试使用 Kafka 运行一个简单的字数统计应用程序,但是每当我运行它时,我都会收到以下错误:
Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/kafka/common/utils/LogContext
at org.apache.kafka.streams.KafkaStreams.<init>(KafkaStreams.java:630)
at org.apache.kafka.streams.KafkaStreams.<init>(KafkaStreams.java:610)
at org.apache.kafka.streams.KafkaStreams.<init>(KafkaStreams.java:557)
at StreamsApp.main(StreamsApp.java:49)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at com.intellij.rt.execution.application.AppMainV2.main(AppMainV2.java:131)
Caused by: java.lang.ClassNotFoundException: org.apache.kafka.common.utils.LogContext
at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:583)
at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:521)
Run Code Online (Sandbox Code Playgroud)
我不知道为什么我不断收到此错误...主要方法的代码列在下面。(第 49 行)KafkaStreams Streams = new KafkaStreams(topology, props);
public static void main(final String[] args) throws Exception {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
// props.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> …Run Code Online (Sandbox Code Playgroud) 我有一个 3 节点 Zookeeper 集群版本 3.4.11 和 2 节点 Kafka 集群版本 0.11.3。我们编写了一个生产者,将消息发送到Kafka集群的特定主题和分区(我之前做过,并且对生产者进行了测试)。以下是经纪人配置:
broker.id=1
listeners=PLAINTEXT://node1:9092
num.partitions=24
delete.topic.enable=true
default.replication.factor=2
log.dirs=/data
zookeeper.connect=zoo1:2181,zoo2:2181,zoo3:2181
log.retention.hours=168
zookeeper.session.timeout.ms=40000
zookeeper.connection.timeout.ms=10000
offsets.topic.replication.factor=2
transaction.state.log.replication.factor=2
transaction.state.log.min.isr=2
Run Code Online (Sandbox Code Playgroud)
一开始,经纪人上没有主题,它们会自动创建。当我启动生产者时,Kafka 集群表现出奇怪的行为:
1-它创建了所有主题,但虽然生成数据的速率为每秒 10KB,但在不到一分钟的时间内,每个代理的日志目录从零数据变为 9.0 GB 数据!并且所有代理都关闭(因为缺乏日志目录容量)
2-就在开始生成数据时,我尝试使用控制台消费者使用数据,但它只是出错
WARN Error while fetching metadata with correlation id 2 : {Topic1=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
Run Code Online (Sandbox Code Playgroud)
3-以下是经纪商日志中反复出现的错误:
INFO Updated PartitionLeaderEpoch. New: {epoch:0, offset:0}, Current: {epoch:-1, offset-1} for Partition: Topic6-6. Cache now contains 0 entries. (kafka.server.epoch.LeaderEpochFileCache)
WARN Newly rolled segment file 00000000000000000000.log already exists; deleting it first (kafka.log.Log) …Run Code Online (Sandbox Code Playgroud) “log.retention.bytes”是我们用来保留主题消息日志的参数,我给定的值为 1073741824。
我参考了 Kafka 文档,其中提到“log.retention.bytes”中给出的大小是每个分区,所以这意味着假设如果我为我正在使用的所有主题有 20 个分区,那么 Kafka 的总字节大小根据文档,retain 是 20*1073741824 。
但我需要的是清晰的是
Will Kafka retain 20*1073741824 bytes for all the topics?
(or)
Will Kafka retain 20*1073741824 bytes per topic?
Run Code Online (Sandbox Code Playgroud) 如何将cassandra集成为apache Kafka中的数据生产者?
kafka config 有什么具体配置吗?
我是 Golang 和 Kafka 的新手,我正在使用 Segmentio kafka-go 使用 Golang 连接到 Kafka 服务器。截至目前,我想推送 Kafka 中用户的每个事件,所以我想推送单个消息(而不是批量),但由于该库提供的写入操作对于批量或单个消息需要相同的时间,因此需要很多时间。有没有什么方法可以快速编写单条消息,以便我可以在更短的时间内推送卡夫卡中的数百万个事件?
我已经对单条消息和批量消息进行了测试,它花费相同的时间(最短为 10 毫秒)。
对包含逻辑删除值的 ktable 执行 groupby 时会发生什么?似乎groupby没有被评估,但是tombstone会像filter方法一样被转发吗?
我是卡夫卡新手,正在学习它。我只是在为员工汇总数据,但遇到了问题。有人可以帮忙吗?
我有一个主题 timeoffs,其中包含 time_off_id 键和类型对象的值,其中还包含员工 ID。所以我想建立一个商店,其中员工 ID 应该是关键,值应该是该员工的休假列表。但我遵循以下方法,但遇到了问题。聚合数据时,提示方法引用中的返回类型错误:无法将 ArrayList 转换为 VR。你能帮助我吗。
代码:
KTable<String, TimeOff> timeoffs = builder.table(topic);
KGroupedTable<String, TimeOff> groupedTable = timeoffs.groupBy(
(key, value) -> KeyValue.pair(value.getEmployeeId(), value)
);
groupedTable.aggregate(ArrayList<TimeOff>::new, (k, newValue, aggValue) -> {
aggValue.add(newValue);
return aggValue;
}, Materialized.as("NewStore"));
Run Code Online (Sandbox Code Playgroud)
我也尝试过这种方法,但这并没有解决问题。
TimeOffList 类:
package com.kafka.productiontest.models;
import java.util.ArrayList;
public class TimeOffList {
ArrayList list = new ArrayList<TimeOff>();
public TimeOffList add(Object s) {
list.add(s);
return this;
}
}
Run Code Online (Sandbox Code Playgroud)
在流媒体类中:
groupedTable.aggregate(TimeOffList::new,
(k, newValue, aggValue) -> (TimeOffList) aggValue.add(newValue));
Run Code Online (Sandbox Code Playgroud)
实施您的解决方案后,这个问题消失了,但现在面临 serde 的问题。我已经实现了 TimeOffListSerde。请检查下面的代码
KStream<String, TimeOff> source …Run Code Online (Sandbox Code Playgroud) 我目前有一个 3 节点 Kafka 集群,它连接到我的 Zookeeper 整体中的基本 chroot 路径。
zookeeper.connect=172.12.32.123:2181,172.11.43.211:2181,172.18.32.131:2181
Run Code Online (Sandbox Code Playgroud)
现在,我想添加一个新的 5 节点 Kafka 集群,它将连接到同一个 Zookeeper 集合中的其他一些 chroot 路径。
zookeeper.connect=172.12.32.123:2181,172.11.43.211:2181,172.18.32.131:2181/cluster/2
Run Code Online (Sandbox Code Playgroud)
这些配置是否会像两个 chroot 的相对路径一样工作?据我了解,原始 Kafka 集群应该连接到基本 chroot 路径以外的其他路径上,以实现更好的隔离。
另外,跨 Kafka 集群拥有相同的 Zookeeper 集合是否很好?该文档表示,通常最好为不同的集群提供隔离的 Zookeeper 集合。
有些地方提到Kafka是发布订阅消息传递。其他消息来源提到 Kafka 是消息队列。请问它们之间有什么区别,Kakfa可以用作数据库吗?
我想使用状态存储(RocksDB)将一条记录转换为多条记录。我知道有一种方法,例如stream.transform(final TransformerSupplier> TransformerSupplier,final String... stateStoreNames),但如何返回多个KeyValue对,以便我稍后可以使用分支发布到受尊重的主题?
有一种方法可以将数据转发到下游,但如何再次使用该数据?
卡夫卡版本 - 1.1.0