标签: apache-kafka-streams

TopologyTestDriver 在 KTable 聚合上发送错误消息

我有一个聚合在 KTable 上的拓扑。这是我创建的通用方法,用于根据我拥有的不同主题构建此拓扑。

public static <A, B, C> KTable<C, Set<B>> groupTable(KTable<A, B> table, Function<B, C> getKeyFunction,
        Serde<C> keySerde, Serde<B> valueSerde, Serde<Set<B>> aggregatedSerde) {
    return table
            .groupBy((key, value) -> KeyValue.pair(getKeyFunction.apply(value), value),
                    Serialized.with(keySerde, valueSerde))
            .aggregate(() -> new HashSet<>(), (key, newValue, agg) -> {
                agg.remove(newValue);
                agg.add(newValue);
                return agg;
            }, (key, oldValue, agg) -> {
                agg.remove(oldValue);
                return agg;
            }, Materialized.with(keySerde, aggregatedSerde));
}
Run Code Online (Sandbox Code Playgroud)

这在使用 Kafka 时效果很好,但在通过“TopologyTestDriver”进行测试时则不然。

在这两种情况下,当我获得更新时,subtractor首先调用 ,然后adder调用 。问题是,使用 时TopologyTestDriver,会发送两条消息以进行更新:一条在调用后subtractor,另一条在adder调用后。更不用说在之后subrtractor和之前发送的消息adder处于不正确的阶段。

其他人可以确认这是一个错误吗?我已经针对 …

apache-kafka apache-kafka-streams

2
推荐指数
1
解决办法
688
查看次数

KTable 应该发出的事件

我正在尝试测试一个拓扑,该拓扑作为最后一个节点,具有 KTable。我的测试使用成熟的 Kafka 集群(通过 confluence 的 Docker 镜像),所以我没有使用TopologyTestDriver.

我的拓扑具有键值类型的输入String -> CustomerString -> CustomerMapped. Serdes、模式以及与模式注册表的集成都按预期工作。

我正在使用 Scala、Kafka 2.2.0、Confluence Platform 5.2.1 和kafka-streams-scala. 我的拓扑尽可能简化,如下所示:

val otherBuilder = new StreamsBuilder()

otherBuilder
     .table[String,Customer](source)
     .mapValues(c => CustomerMapped(c.surname, c.age))
     .toStream.to(target)   
Run Code Online (Sandbox Code Playgroud)

(所有隐式 serdes、ProducedConsumed等都是默认的,并且可以正确找到)

我的测试包括同步且不间断地向主题发送一些记录 ( data) ,然后从主题读回,我将结果与:sourcetargetexpected

val data: Seq[(String, Customer)] = Vector(
   "key1" -> Customer(0, "Obsolete", "To be overridden", 0),
   "key1" -> Customer(0, "Obsolete2", "To be overridden2", 0),
   "key1" …
Run Code Online (Sandbox Code Playgroud)

scala apache-kafka apache-kafka-streams

2
推荐指数
1
解决办法
570
查看次数

Apache Kafka 状态存储

我正在学习Apache Kafka(作为消息传递系统),并在这个过程中了解了术语StateStore链接在这里

我还知道Apache kafka streams客户端 API。

适用StateStoreApache kafka消息传递系统的上下文或适用于Apache Kafka Streams.

是否Apache有自己的“自己”实现StateStore或使用第三方实现(例如,rockdsb.

谁能帮助我理解这一点。

apache-kafka rocksdb apache-kafka-streams

2
推荐指数
2
解决办法
7411
查看次数

为什么我会收到此编译错误:“无法找到 kstream.Consumed 的隐式值”以及如何修复它?

我们有这些依赖关系:

libraryDependencies += "org.apache.kafka"       %% "kafka-streams-scala"         % kafkaVersion
libraryDependencies += "io.confluent"           % "kafka-streams-avro-serde"     % confluentVersion
libraryDependencies += "io.confluent"           % "kafka-schema-registry-client" % confluentVersion
libraryDependencies += "ch.qos.logback"         % "logback-classic"              % "1.2.3"
libraryDependencies += "com.typesafe"           % "config"                       % "1.4.0"
libraryDependencies += "com.sksamuel.avro4s"    %% "avro4s-core"                 % "3.0.4"
Run Code Online (Sandbox Code Playgroud)

我们使用代码生成器从 AVRO 模式文件生成 Scala 案例类。一个这样生成的案例类具有 Either 值作为其字段之一。在 AVRO 模式中,这是用 type=[t1,t2] 表示的,因此生成看起来不错,这是一个总和类型:可以是类型 t1 或类型 t2。

问题变成从主题到案例类(二进制 -> Avro Map -> 案例类)的反序列化路径中缺少什么。

基本上我目前收到此错误:

could not find implicit value for parameter consumed: org.apache.kafka.streams.scala.kstream.Consumed[String, custom.UserEvent]
[error]       .stream[String, UserEvent]("schma.avsc")
Run Code Online (Sandbox Code Playgroud)

第一个想法是 kafka-streams-avro-serde,但可能这个库只确保 AVRO …

scala jsonschema avro apache-kafka apache-kafka-streams

2
推荐指数
1
解决办法
1712
查看次数

使用 KSTREAM 或 KSQL 将 JSON 数组转换为 JSON 对象

我有以下格式的数据进入 Kafka。

{"WHS":[{"Character Set":"UTF-8","action":"finished","Update-Date-Time":"2020-04-11 09:00:02:25","Number":0,"Abbr":"","Name":"","Name2":"","Country-Code":"","Addr-1":"","Addr-2":"","Addr-3":"","Addr-4":"","City":"","State":""}]}

Run Code Online (Sandbox Code Playgroud)

我希望它像这样转换。

{"Character Set":"UTF-8","action":"finished","Update-Date-Time":"2020-04-11 09:00:02:25","Number":0,"Abbr":"","Name":"","Name2":"","Country-Code":"","Addr-1":"","Addr-2":"","Addr-3":"","Addr-4":"","City":"","State":""}
Run Code Online (Sandbox Code Playgroud)

我尝试使用 ksql 进行扁平化,但 ksql 还不支持数组。

我尝试kstream使用以下代码进行扁平化。

builder.stream(inputTopic).flatMapValues(Object -> Arrays.asList()).to(outputTopic);
Run Code Online (Sandbox Code Playgroud)

但它没有产生任何输出。对此的任何帮助将不胜感激。

java apache-kafka apache-kafka-streams confluent-platform ksqldb

2
推荐指数
1
解决办法
1968
查看次数

Kafka重新分区(基于key的group by)

当我们基于某个键对流应用 group by 函数时,kafka 如何计算它,因为相同的键可能存在于不同的分区中?我看到 through() 函数基本上对数据进行了重新分区,但我不明白它是什么意思。它将具有相同密钥的所有消息移动到单个分区中吗?另外我们多久可以调用 through() 方法?如果有需求的话,我们可以在收到每条消息后调用它吗?请建议。谢谢

apache-kafka apache-kafka-streams ktable

2
推荐指数
1
解决办法
2670
查看次数

kafka 流状态存储 max.request.size 参数问题

我们在项目中使用Kafka流状态存储,并且我们想要存储超过1MB的数据,但是我们遇到了以下异常:

该消息序列化后为 1760923 字节,大于您使用 max.request.size 配置配置的最大请求大小。

然后我点击链接添加前缀到 StreamsConfig 以启用设置默认内部主题配置并添加以下配置:

topic.max.request.size=50000000
Run Code Online (Sandbox Code Playgroud)

然后应用程序工作正常,并且当状态存储内部主题已创建时它可以正常工作,但是当 Kafka 重新启动并且状态存储主题已丢失/删除时,Kafka 流处理器需要在启动时自动创建内部状态存储主题应用程序,此时,它抛出异常,其中显示:

"Aorg.apache.kafka.streams.errors.StreamsException: Could not create topic data-msg-seq-state-store-changelog. at org.apache.kafka.streams.processor.internals.InternalTopicManager.makeReady(InternalTopicManager.java:148)....
.....
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:805) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:774) Caused by: org.apache.kafka.common.errors.InvalidConfigurationException: Unknown topic config name: max.request.size".
Run Code Online (Sandbox Code Playgroud)

解决方案是我们可以手动创建内部主题,但这应该不是一个好的解决方案。

你能帮我解决这个问题吗?如果我错过了任何配置?

非常感谢。

2020年6月17日更新:仍然没有解决问题。任何人都可以帮忙吗?

apache-kafka apache-kafka-streams

2
推荐指数
1
解决办法
3806
查看次数

如何使用标点符号从状态存储中删除旧记录?(卡夫卡)

Ktable我使用 为主题创建了streamsBuilder.table("myTopic"),并将其具体化为状态存储,以便我可以使用交互式查询。

每小时,我都想从该状态存储(以及关联的变更日志主题)中删除其值在过去一小时内尚未更新的记录。

我相信使用标点符号可以实现这一点,但到目前为止我只使用过 DSL,因此不确定如何继续。如果有人能为我提供一个例子,我将非常感激。

谢谢,

杰克

java apache-kafka apache-kafka-streams ktable

2
推荐指数
1
解决办法
2236
查看次数

将数据添加到状态存储以进行状态处理和容错

我有一个执行一些状态处理的微服务。应用程序从输入主题构建 KStream,进行一些状态处理,然后将数据写入输出主题。

我将在同一组中运行 3 个该应用程序。当微服务发生故障时,我需要存储 3 个参数,接管的微服务可以查询共享状态存储并从崩溃的服务停止的地方继续。

我正在考虑将这 3 个参数推送到状态存储中,并在其他微服务接管时查询数据。从我的研究中,我看到了很多人们使用状态存储执行事件计数的例子,但这并不完全是我想要的,有谁知道一个例子或者解决这个问题的正确方法是什么?

stateful apache-kafka rocksdb apache-kafka-streams

2
推荐指数
1
解决办法
1626
查看次数

Kafka Streams:使用 Spring Cloud Stream 为每组主题定义多个 Kafka Streams

我正在尝试使用 Kafka Streams 做一个简单的 POC。但是,我在启动应用程序时遇到异常。我正在使用 Spring-Kafka、Kafka-Streams 2.5.1 和 Spring boot 2.3.5 Kafka 流配置

@Configuration
public class KafkaStreamsConfig {
    private static final Logger log = LoggerFactory.getLogger(KafkaStreamsConfig.class);

    @Bean
    public Function<KStream<String, String>, KStream<String, String>> processAAA() {
        return input -> input.peek((key, value) -> log
                .info("AAA Cloud Stream Kafka Stream processing : {}", input.toString().length()));
    }

    @Bean
    public Function<KStream<String, String>, KStream<String, String>> processBBB() {
        return input -> input.peek((key, value) -> log
                .info("BBB Cloud Stream Kafka Stream processing : {}", input.toString().length()));
    }

    @Bean
    public Function<KStream<String, String>, KStream<String, …
Run Code Online (Sandbox Code Playgroud)

apache-kafka spring-cloud-stream apache-kafka-streams spring-kafka spring-cloud-stream-binder-kafka

2
推荐指数
1
解决办法
4425
查看次数