我正在尝试使用处理器 api 创建 kafka 流,因为我有自定义流程。当我的处理器列出具有不同分区数量的多个主题时,我遇到了问题。我意识到我需要创建一个全球状态商店。我需要知道如何在监听多个主题的同时添加处理器并添加全局状态存储?
任何代码示例或链接都会有所帮助。谢谢你!!
我正在尝试根据 KafkaStreams 的惯例和合理性做出实际的设计决策。
假设我有两个不同的事件要放入KTables 中。我有一个生产者将这些消息发送给KStream正在收听该主题的生产者。
据我所知,我不能对使用 的消息使用条件转发KafkaStreams,因此如果流订阅了许多主题(例如,上述每个消息一个),我只能调用stream.to一个接收器主题 - 否则,我会做一些事情,比如foreach在流上调用并将带有 a 的消息发送KProducer到接收器主题。
以上建议使用单个流。我以为我可以在同一个应用程序中设置多个流,每个流监听一个主题,映射并转发到一个表接收器,但是每次我尝试创建 的两个实例时KafkaStreams,只有第一个初始化订阅它的主题 - 另一个得到一个来自客户端的警告,它的主题没有订阅。
我可以在同一个应用程序中设置多个流吗?如果有,有什么特殊要求吗?
class Stream(topic: String) {
val props: Option[Map[String, String]] = Some(TopicProps.get(topic))
val streamsBuilder = new StreamsBuilder
val topics = new util.ArrayList[String]
topics.add(props.get("topic"))
val stream: KStream[String, String] = configureStream(streamsBuilder, topics, props.get("sink"))
def configureStream(builder: StreamsBuilder, topics: java.util.List[String], sink: String): KStream[String, String] = {
builder.stream[String, String](
topics,
Consumed.`with`(String(), String())
)
}
def init(): KafkaStreams …Run Code Online (Sandbox Code Playgroud) 我们正在使用 Kafka,并希望使用交互式查询来访问我们状态存储中的数据。我们有一个现有的服务,它使用 Akka HTTP 来提供 REST API,我们希望将交互式查询集成到流程中。
似乎kafka-streams-query非常适合这个。但是,它通过公开route使用低级 API的属性集成到 Akka HTTP 中,该属性映射到Flow[HttpRequest, HttpResponse, Any]. 我们之前的所有代码都使用 Akka HTTP 的路由 DSL 连接代码。
我希望像下面这样的代码可以工作,但它没有:
implicit val system:ActorSystem = ActorSystem("web")
implicit val materializer:ActorMaterializer = ActorMaterializer()
implicit val ec = system.dispatcher
val firstRoutes:Route = ... //a route object populated
val lastRoutes:Route = ... //other route object populad
val iqServiceFlow:Flow[HttpRequest, HttpResponse, Any] = ...// code that returns the interactive query service
val firstFlow = Route.handlerFlow(firstRoutes)
val lastFlow = …Run Code Online (Sandbox Code Playgroud) 我一直在尝试使用 Kafka 运行一个简单的字数统计应用程序,但是每当我运行它时,我都会收到以下错误:
Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/kafka/common/utils/LogContext
at org.apache.kafka.streams.KafkaStreams.<init>(KafkaStreams.java:630)
at org.apache.kafka.streams.KafkaStreams.<init>(KafkaStreams.java:610)
at org.apache.kafka.streams.KafkaStreams.<init>(KafkaStreams.java:557)
at StreamsApp.main(StreamsApp.java:49)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at com.intellij.rt.execution.application.AppMainV2.main(AppMainV2.java:131)
Caused by: java.lang.ClassNotFoundException: org.apache.kafka.common.utils.LogContext
at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:583)
at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:521)
Run Code Online (Sandbox Code Playgroud)
我不知道为什么我不断收到此错误...主要方法的代码列在下面。(第 49 行)KafkaStreams Streams = new KafkaStreams(topology, props);
public static void main(final String[] args) throws Exception {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
// props.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> …Run Code Online (Sandbox Code Playgroud) 状态完整操作如何在具有多个实例的Kafka Stream应用程序中工作?让我们说我们有2个主题,每个A和B有2个分区.我们有一个流应用程序,它既消耗了两个主题,又有两个流之间的连接.
现在我们正在运行此流应用程序的2个实例.据我所知,每个实例将分配每个主题的2个分区之一.
现在,如果要连接的消息被应用程序的不同实例使用,联接将如何发生?我无法理解它.
虽然我测试了一个似乎工作正常的小流应用程序.我是否可以在不考虑流应用程序中定义的拓扑类型的情况下,始终增加任何类型应用程序的实例数量?
是否有任何文件可以让我了解其工作细节?
我正在为带有架构注册表的 java kafka-stream 寻找适当的教程/指南。我有谷歌,但找不到合适的教程。如果有人能帮我找到至少合适的教程,我真的很感激
我可以通过谷歌搜索找到 kafka 流教程。但我正在寻找带有 schemaregistry 的 kafka 流
java avro apache-kafka apache-kafka-streams confluent-schema-registry
对包含逻辑删除值的 ktable 执行 groupby 时会发生什么?似乎groupby没有被评估,但是tombstone会像filter方法一样被转发吗?
我是卡夫卡新手,正在学习它。我只是在为员工汇总数据,但遇到了问题。有人可以帮忙吗?
我有一个主题 timeoffs,其中包含 time_off_id 键和类型对象的值,其中还包含员工 ID。所以我想建立一个商店,其中员工 ID 应该是关键,值应该是该员工的休假列表。但我遵循以下方法,但遇到了问题。聚合数据时,提示方法引用中的返回类型错误:无法将 ArrayList 转换为 VR。你能帮助我吗。
代码:
KTable<String, TimeOff> timeoffs = builder.table(topic);
KGroupedTable<String, TimeOff> groupedTable = timeoffs.groupBy(
(key, value) -> KeyValue.pair(value.getEmployeeId(), value)
);
groupedTable.aggregate(ArrayList<TimeOff>::new, (k, newValue, aggValue) -> {
aggValue.add(newValue);
return aggValue;
}, Materialized.as("NewStore"));
Run Code Online (Sandbox Code Playgroud)
我也尝试过这种方法,但这并没有解决问题。
TimeOffList 类:
package com.kafka.productiontest.models;
import java.util.ArrayList;
public class TimeOffList {
ArrayList list = new ArrayList<TimeOff>();
public TimeOffList add(Object s) {
list.add(s);
return this;
}
}
Run Code Online (Sandbox Code Playgroud)
在流媒体类中:
groupedTable.aggregate(TimeOffList::new,
(k, newValue, aggValue) -> (TimeOffList) aggValue.add(newValue));
Run Code Online (Sandbox Code Playgroud)
实施您的解决方案后,这个问题消失了,但现在面临 serde 的问题。我已经实现了 TimeOffListSerde。请检查下面的代码
KStream<String, TimeOff> source …Run Code Online (Sandbox Code Playgroud) 我想使用状态存储(RocksDB)将一条记录转换为多条记录。我知道有一种方法,例如stream.transform(final TransformerSupplier> TransformerSupplier,final String... stateStoreNames),但如何返回多个KeyValue对,以便我稍后可以使用分支发布到受尊重的主题?
有一种方法可以将数据转发到下游,但如何再次使用该数据?
卡夫卡版本 - 1.1.0