标签: apache-kafka-streams

合并多个相同的Kafka Streams主题

我有2个Kafka主题从不同来源流式传输完全相同的内容,因此我可以获得高可用性,以防其中一个来源失败.我正在尝试使用Kafka Streams 0.10.1.0将2个主题合并为1个输出主题,这样我就不会错过任何有关失败的消息,并且在所有源都启动时没有重复.

使用leftJoinKStream 的方法时,其中一个主题可以没有问题(次要主题),但是当主要主题发生故障时,不会向输出主题发送任何内容.这似乎是因为,根据Kafka Streams开发人员指南,

KStream-KStream leftJoin始终由来自主流的记录驱动

因此,如果没有来自主流的记录,它将不会使用辅助流中的记录,即使它们存在.主流重新联机后,输出将恢复正常.

我也尝试使用outerJoin(添加重复记录),然后转换为KTable和groupByKey以消除重复,

KStream mergedStream = stream1.outerJoin(stream2, 
    (streamVal1, streamVal2) -> (streamVal1 == null) ? streamVal2 : streamVal1,
    JoinWindows.of(2000L))

mergedStream.groupByKey()
            .reduce((value1, value2) -> value1, TimeWindows.of(2000L), stateStore))
            .toStream((key,value) -> value)
            .to(outputStream)
Run Code Online (Sandbox Code Playgroud)

但我偶尔也会重复一遍.我也commit.interval.ms=200用来让KTable经常发送到输出流.

接近此合并以从多个相同的输入主题获得一次输出的最佳方法是什么?

java high-availability apache-kafka-streams

10
推荐指数
1
解决办法
4419
查看次数

卡夫卡INVALID_FETCH_SESSION_EPOCH

我们正在将kafka代理设置与使用Spring云流kafka运行的kafka stream应用程序结合使用。尽管看起来运行良好,但在日志中确实出现了以下错误语句:

2019-02-21 22:37:20,253 INFO kafka-coordinator-heartbeat-thread | anomaly-timeline org.apache.kafka.clients.FetchSessionHandler [Consumer clientId=anomaly-timeline-56dc4481-3086-4359-a8e8-d2dae12272a2-StreamThread-1-consumer, groupId=anomaly-timeline] Node 2 was unable to process the fetch request with (sessionId=1290440723, epoch=2089): INVALID_FETCH_SESSION_EPOCH. 
Run Code Online (Sandbox Code Playgroud)

我搜索了互联网,但是关于此错误的信息不多。我猜想这可能与经纪人和使用者之间的时间设置有所不同,但是两台机器的时间服务器设置相同。

知道如何解决吗?

apache-kafka spring-cloud-stream apache-kafka-streams

10
推荐指数
2
解决办法
1万
查看次数

使用查找数据丰富KStream的理想方式

我的流有一个名为'category'的列,我在另一个商店中为每个'category'提供了额外的静态元数据,每隔几天就会更新一次.这种查找的正确方法是什么?Kafka流有两种选择

  1. 在Kafka Streams之外加载静态数据,仅用于KStreams#map()添加元数据.这是可能的,因为Kafka Streams只是一个图书馆.

  2. 将元数据加载到Kafka主题,将其加载到a KTable和do KStreams#leftJoin(),这似乎更自然,并将分区等留给Kafka Streams.但是,这要求我们保持KTable加载所有值.请注意,我们必须加载整个查找数据,而不仅仅是加载更改.

    • 例如,最初说只有一个类别'c1'.Kafka流应用程序优雅地停止,然后重新启动.重新启动后,添加了一个新类别"c2".我的假设是,table = KStreamBuilder().table('metadataTopic')只有值'c2',因为这是应用程序第二次启动以来唯一发生的变化.我希望它有'c1'和'c2'.
    • 如果它也有"c1",那么数据是否会从KTable中删除(可能是通过设置发送key = null消息?)?

上述哪一种是查找元数据的正确方法?

是否可以始终强制在重新启动时从头开始只读取一个流,这样就可以加载所有元数据KTable.

还有其他方式使用商店吗?

apache-kafka-streams

9
推荐指数
2
解决办法
3091
查看次数

使用Kafka Streams DSL时如何处理错误并且不提交

对于Kafka Streams,如果我们使用较低级别的处理器API,我们可以控制是否提交.因此,如果我们的代码中出现问题,并且我们不想提交此消息.在这种情况下,Kafka将多次重新发送此消息,直到问题得到解决.

但是如何控制在使用其更高级别的流DSL API时是否提交消息?

资源:

http://docs.confluent.io/2.1.0-alpha1/streams/developer-guide.html

apache-kafka kafka-consumer-api apache-kafka-streams

9
推荐指数
1
解决办法
3225
查看次数

是否可以使用Kafka Streams访问邮件头?

通过在Kafka 0.11中添加Headers到记录(ProducerRecordConsumerRecord),是否可以在使用Kafka Streams处理主题时获取这些标题?当调用类似mapon的方法时,KStream它提供了记录keyvalue记录的参数但我无法看到访问它的方法headers.如果我们能够map超越ConsumerRecords ,那就太好了.

恩.

KStreamBuilder kStreamBuilder = new KStreamBuilder();
KStream<String, String> stream = kStreamBuilder.stream("some-topic");
stream
    .map((key, value) ->  ... ) // can I get access to headers in methods like map, filter, aggregate, etc?
    ... 
Run Code Online (Sandbox Code Playgroud)

像这样的东西会起作用:

KStreamBuilder kStreamBuilder = new KStreamBuilder();
KStream<String, String> stream = kStreamBuilder.stream("some-topic");
stream
    .map((record) -> {
        record.headers();
        record.key();
        record.value();
    })
    ...
Run Code Online (Sandbox Code Playgroud)

java apache-kafka apache-kafka-streams

9
推荐指数
1
解决办法
4984
查看次数

Kafka Streams:使用相同的`application.id`来使用多个主题

我有一个需要收听多个不同主题的应用程序; 每个主题都有关于如何处理消息的单独逻辑.我曾想过为每个KafkaStreams实例使用相同的kafka属性,但是我得到的错误如下所示.

错误

java.lang.IllegalArgumentException: Assigned partition my-topic-1 for non-subscribed topic regex pattern; subscription pattern is my-other-topic
Run Code Online (Sandbox Code Playgroud)

代码(kotlin)

class KafkaSetup() {
    companion object {
        private val LOG = LoggerFactory.getLogger(this::class.java)
    }

    fun getProperties(): Properties {
        val properties = Properties()
        properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-app")
        return properties
    }

    private fun listenOnMyTopic() {
        val kStreamBuilder = KStreamBuilder()
        val kStream: KStream<String, String> = kStreamBuilder.stream("my-topic")

        kStream.foreach { key, value -> LOG.info("do stuff") }

        val kafkaStreams = KafkaStreams(kStreamBuilder, getProperties())
        kafkaStreams.start()
    }

    private fun listenOnMyOtherTopic() {
        val kStreamBuilder = KStreamBuilder() …
Run Code Online (Sandbox Code Playgroud)

apache-kafka apache-kafka-streams

9
推荐指数
1
解决办法
7180
查看次数

如何在Kubernetes上部署Kafka Stream应用程序?

我的应用程序有一些聚合/窗口操作,所以它有一些存储在其中的状态存储state.dir.AFAIK,它还将状态存储的更改日志写入代理,那么可以将Kafka Stream应用程序视为无状态POD吗?

apache-kafka kubernetes apache-kafka-streams

9
推荐指数
1
解决办法
2393
查看次数

如何使用 kafka 流以块/批次的形式处理数据?

对于大数据中的许多情况,最好一次处理一小块记录缓冲区,而不是一次处理一条记录。

自然的例子是调用一些支持批处理以提高效率的外部 API。

我们如何在 Kafka Streams 中做到这一点?我在 API 中找不到任何看起来像我想要的东西。

到目前为止,我有:

builder.stream[String, String]("my-input-topic")
.mapValues(externalApiCall).to("my-output-topic")
Run Code Online (Sandbox Code Playgroud)

我想要的是:

builder.stream[String, String]("my-input-topic")
.batched(chunkSize = 2000).map(externalBatchedApiCall).to("my-output-topic")
Run Code Online (Sandbox Code Playgroud)

在 Scala 和 Akka Streams 中,该函数被称为groupedor batch。在 Spark Structured Streaming 中,我们可以做到mapPartitions.map(_.grouped(2000).map(externalBatchedApiCall))

java scala apache-kafka apache-spark apache-kafka-streams

9
推荐指数
1
解决办法
7921
查看次数

Kafka Streams 处理器 - 状态存储和输入主题分区

我想完全理解 kafka-streams 处理器必须遵守的关于处理器输入及其状态分区的规则。具体我想了解:

  1. 是否可能以及使用与输入主题的键不同的状态存储键的潜在后果是什么
  2. 状态存储键是否在分区之间共享,即如果我在处理器处理属于两个不同分区的记录时尝试访问处理器中的相同键,我是否会获得相同的值

我一直在对此进行一些研究,我发现的答案似乎不是很清楚,有时还相互矛盾:例如,这个似乎表明商店是完全独立的,您可以使用任何键,而这个则说您永远不应该使用具有与输入主题中的键不同的键的商店。

感谢您的澄清。

apache-kafka apache-kafka-streams

9
推荐指数
1
解决办法
1211
查看次数

Kafka Connect vs Streams for sinks

我试图了解 Connect 为您购买了什么而 Streams 没有。我们有应用程序的一部分,我们想在其中使用一个主题并写入 mariadb。

我可以用一个简单的处理器来完成这个。读取记录,存储在状态存储中,然后批量插入到 mariadb 中。

为什么这是一个坏主意?JDBC Sink Connector 给你带来了什么?

apache-kafka apache-kafka-streams apache-kafka-connect

9
推荐指数
1
解决办法
3695
查看次数