标签: apache-kafka

尝试运行简单的 Kafka Stream 应用程序时收到异常

我一直在尝试使用 Kafka 运行一个简单的字数统计应用程序,但是每当我运行它时,我都会收到以下错误:

Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/kafka/common/utils/LogContext
    at org.apache.kafka.streams.KafkaStreams.<init>(KafkaStreams.java:630)
    at org.apache.kafka.streams.KafkaStreams.<init>(KafkaStreams.java:610)
    at org.apache.kafka.streams.KafkaStreams.<init>(KafkaStreams.java:557)
    at StreamsApp.main(StreamsApp.java:49)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:566)
    at com.intellij.rt.execution.application.AppMainV2.main(AppMainV2.java:131)
Caused by: java.lang.ClassNotFoundException: org.apache.kafka.common.utils.LogContext
    at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:583)
    at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178)
    at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:521)
Run Code Online (Sandbox Code Playgroud)

我不知道为什么我不断收到此错误...主要方法的代码列在下面。(第 49 行)KafkaStreams Streams = new KafkaStreams(topology, props);

public static void main(final String[] args) throws Exception {

    Properties props = new Properties();
    props.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application");
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
    props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    // props.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());

    StreamsBuilder builder = new StreamsBuilder();
    KStream<String, String> …
Run Code Online (Sandbox Code Playgroud)

java apache-kafka apache-kafka-streams

0
推荐指数
1
解决办法
7147
查看次数

Kafka在创建新主题时会产生大量额外数据

我有一个 3 节点 Zookeeper 集群版本 3.4.11 和 2 节点 Kafka 集群版本 0.11.3。我们编写了一个生产者,将消息发送到Kafka集群的特定主题和分区(我之前做过,并且对生产者进行了测试)。以下是经纪人配置:

broker.id=1
listeners=PLAINTEXT://node1:9092
num.partitions=24
delete.topic.enable=true
default.replication.factor=2
log.dirs=/data
zookeeper.connect=zoo1:2181,zoo2:2181,zoo3:2181
log.retention.hours=168
zookeeper.session.timeout.ms=40000
zookeeper.connection.timeout.ms=10000
offsets.topic.replication.factor=2
transaction.state.log.replication.factor=2
transaction.state.log.min.isr=2
Run Code Online (Sandbox Code Playgroud)

一开始,经纪人上没有主题,它们会自动创建。当我启动生产者时,Kafka 集群表现出奇怪的行为:

1-它创建了所有主题,但虽然生成数据的速率为每秒 10KB,但在不到一分钟的时间内,每个代理的日志目录从零数据变为 9.0 GB 数据!并且所有代理都关闭(因为缺乏日志目录容量)

2-就在开始生成数据时,我尝试使用控制台消费者使用数据,但它只是出错

WARN Error while fetching metadata with correlation id 2 : {Topic1=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
Run Code Online (Sandbox Code Playgroud)

3-以下是经纪商日志中反复出现的错误:

INFO Updated PartitionLeaderEpoch. New: {epoch:0, offset:0}, Current: {epoch:-1, offset-1} for Partition: Topic6-6. Cache now contains 0 entries. (kafka.server.epoch.LeaderEpochFileCache)
WARN Newly rolled segment file 00000000000000000000.log already exists; deleting it first (kafka.log.Log) …
Run Code Online (Sandbox Code Playgroud)

apache-kafka

0
推荐指数
1
解决办法
1921
查看次数

在 Apache Kafka 中记录主题数据时使用 log.retention.bytes 参数的混乱

“log.retention.bytes”是我们用来保留主题消息日志的参数,我给定的值为 1073741824。

我参考了 Kafka 文档,其中提到“log.retention.bytes”中给出的大小是每个分区,所以这意味着假设如果我为我正在使用的所有主题有 20 个分区,那么 Kafka 的总字节大小根据文档,retain 是 20*1073741824 。

但我需要的是清晰的是

Will Kafka retain 20*1073741824 bytes for all the topics?
                     (or)
Will Kafka retain 20*1073741824 bytes per topic?
Run Code Online (Sandbox Code Playgroud)

apache-kafka

0
推荐指数
1
解决办法
1866
查看次数

Cassandra 作为 kafka 中的数据源/生产者

如何将cassandra集成为apache Kafka中的数据生产者?

kafka config 有什么具体配置吗?

cassandra apache-kafka

0
推荐指数
1
解决办法
1304
查看次数

我们如何在kafka中快速写入单个消息(而不是批量)?

我是 Golang 和 Kafka 的新手,我正在使用 Segmentio kafka-go 使用 Golang 连接到 Kafka 服务器。截至目前,我想推送 Kafka 中用户的每个事件,所以我想推送单个消息(而不是批量),但由于该库提供的写入操作对于批量或单个消息需要相同的时间,因此需要很多时间。有没有什么方法可以快速编写单条消息,以便我可以在更短的时间内推送卡夫卡中的数百万个事件?

我已经对单条消息和批量消息进行了测试,它花费相同的时间(最短为 10 毫秒)。

go apache-kafka segment-io kafka-producer-api

0
推荐指数
1
解决办法
3129
查看次数

对墓碑值执行分组依据

对包含逻辑删除值的 ktable 执行 groupby 时会发生什么?似乎groupby没有被评估,但是tombstone会像filter方法一样被转发吗?

apache-kafka apache-kafka-streams ksqldb

0
推荐指数
1
解决办法
752
查看次数

如何理解kafka流聚合?

我是卡夫卡新手,正在学习它。我只是在为员工汇总数据,但遇到了问题。有人可以帮忙吗?

我有一个主题 timeoffs,其中包含 time_off_id 键和类型对象的值,其中还包含员工 ID。所以我想建立一个商店,其中员工 ID 应该是关键,值应该是该员工的休假列表。但我遵循以下方法,但遇到了问题。聚合数据时,提示方法引用中的返回类型错误:无法将 ArrayList 转换为 VR。你能帮助我吗。

代码:

KTable<String, TimeOff> timeoffs = builder.table(topic);
KGroupedTable<String, TimeOff> groupedTable = timeoffs.groupBy(
    (key, value) -> KeyValue.pair(value.getEmployeeId(), value)
);
groupedTable.aggregate(ArrayList<TimeOff>::new, (k, newValue, aggValue) -> {
  aggValue.add(newValue);
  return aggValue;
}, Materialized.as("NewStore"));
Run Code Online (Sandbox Code Playgroud)

我也尝试过这种方法,但这并没有解决问题。

TimeOffList 类:

package com.kafka.productiontest.models;

import java.util.ArrayList;

public class TimeOffList {
  ArrayList list = new ArrayList<TimeOff>();

  public TimeOffList add(Object s) {
    list.add(s);
    return this;
  }
}
Run Code Online (Sandbox Code Playgroud)

在流媒体类中:

groupedTable.aggregate(TimeOffList::new,
    (k, newValue, aggValue) -> (TimeOffList) aggValue.add(newValue));
Run Code Online (Sandbox Code Playgroud)

实施您的解决方案后,这个问题消失了,但现在面临 serde 的问题。我已经实现了 TimeOffListSerde。请检查下面的代码

KStream<String, TimeOff> source …
Run Code Online (Sandbox Code Playgroud)

apache-kafka apache-kafka-streams

0
推荐指数
1
解决办法
2357
查看次数

单个 Zookeeper 集合上的多个 kafka 集群

我目前有一个 3 节点 Kafka 集群,它连接到我的 Zookeeper 整体中的基本 chroot 路径。

zookeeper.connect=172.12.32.123:2181,172.11.43.211:2181,172.18.32.131:2181
Run Code Online (Sandbox Code Playgroud)

现在,我想添加一个新的 5 节点 Kafka 集群,它将连接到同一个 Zookeeper 集合中的其他一些 chroot 路径。

zookeeper.connect=172.12.32.123:2181,172.11.43.211:2181,172.18.32.131:2181/cluster/2
Run Code Online (Sandbox Code Playgroud)

这些配置是否会像两个 chroot 的相对路径一样工作?据我了解,原始 Kafka 集群应该连接到基本 chroot 路径以外的其他路径上,以实现更好的隔离。

另外,跨 Kafka 集群拥有相同的 Zookeeper 集合是否很好?该文档表示,通常最好为不同的集群提供隔离的 Zookeeper 集合。

apache-kafka apache-zookeeper

0
推荐指数
1
解决办法
3097
查看次数

Kafka是消息队列吗?Kafka可以用作数据库吗?

有些地方提到Kafka是发布订阅消息传递。其他消息来源提到 Kafka 是消息队列。请问它们之间有什么区别,Kakfa可以用作数据库吗?

apache-kafka

0
推荐指数
1
解决办法
1274
查看次数

使用Kafka Stream中的状态存储(RocksDB)将一条记录转换为多条记录

我想使用状态存储(RocksDB)将一条记录转换为多条记录。我知道有一种方法,例如stream.transform(final TransformerSupplier> TransformerSupplier,final String... stateStoreNames),但如何返回多个KeyValue对,以便我稍后可以使用分支发布到受尊重的主题?

有一种方法可以将数据转发到下游,但如何再次使用该数据?

卡夫卡版本 - 1.1.0

java apache-kafka apache-kafka-streams

0
推荐指数
1
解决办法
1692
查看次数