我有2个Kafka主题从不同来源流式传输完全相同的内容,因此我可以获得高可用性,以防其中一个来源失败.我正在尝试使用Kafka Streams 0.10.1.0将2个主题合并为1个输出主题,这样我就不会错过任何有关失败的消息,并且在所有源都启动时没有重复.
使用leftJoinKStream 的方法时,其中一个主题可以没有问题(次要主题),但是当主要主题发生故障时,不会向输出主题发送任何内容.这似乎是因为,根据Kafka Streams开发人员指南,
KStream-KStream leftJoin始终由来自主流的记录驱动
因此,如果没有来自主流的记录,它将不会使用辅助流中的记录,即使它们存在.主流重新联机后,输出将恢复正常.
我也尝试使用outerJoin(添加重复记录),然后转换为KTable和groupByKey以消除重复,
KStream mergedStream = stream1.outerJoin(stream2,
(streamVal1, streamVal2) -> (streamVal1 == null) ? streamVal2 : streamVal1,
JoinWindows.of(2000L))
mergedStream.groupByKey()
.reduce((value1, value2) -> value1, TimeWindows.of(2000L), stateStore))
.toStream((key,value) -> value)
.to(outputStream)
Run Code Online (Sandbox Code Playgroud)
但我偶尔也会重复一遍.我也commit.interval.ms=200用来让KTable经常发送到输出流.
接近此合并以从多个相同的输入主题获得一次输出的最佳方法是什么?
我们正在将kafka代理设置与使用Spring云流kafka运行的kafka stream应用程序结合使用。尽管看起来运行良好,但在日志中确实出现了以下错误语句:
2019-02-21 22:37:20,253 INFO kafka-coordinator-heartbeat-thread | anomaly-timeline org.apache.kafka.clients.FetchSessionHandler [Consumer clientId=anomaly-timeline-56dc4481-3086-4359-a8e8-d2dae12272a2-StreamThread-1-consumer, groupId=anomaly-timeline] Node 2 was unable to process the fetch request with (sessionId=1290440723, epoch=2089): INVALID_FETCH_SESSION_EPOCH.
Run Code Online (Sandbox Code Playgroud)
我搜索了互联网,但是关于此错误的信息不多。我猜想这可能与经纪人和使用者之间的时间设置有所不同,但是两台机器的时间服务器设置相同。
知道如何解决吗?
我的流有一个名为'category'的列,我在另一个商店中为每个'category'提供了额外的静态元数据,每隔几天就会更新一次.这种查找的正确方法是什么?Kafka流有两种选择
在Kafka Streams之外加载静态数据,仅用于KStreams#map()添加元数据.这是可能的,因为Kafka Streams只是一个图书馆.
将元数据加载到Kafka主题,将其加载到a KTable和do KStreams#leftJoin(),这似乎更自然,并将分区等留给Kafka Streams.但是,这要求我们保持KTable加载所有值.请注意,我们必须加载整个查找数据,而不仅仅是加载更改.
上述哪一种是查找元数据的正确方法?
是否可以始终强制在重新启动时从头开始只读取一个流,这样就可以加载所有元数据KTable.
还有其他方式使用商店吗?
对于Kafka Streams,如果我们使用较低级别的处理器API,我们可以控制是否提交.因此,如果我们的代码中出现问题,并且我们不想提交此消息.在这种情况下,Kafka将多次重新发送此消息,直到问题得到解决.
但是如何控制在使用其更高级别的流DSL API时是否提交消息?
资源:
http://docs.confluent.io/2.1.0-alpha1/streams/developer-guide.html
通过在Kafka 0.11中添加Headers到记录(ProducerRecord和ConsumerRecord),是否可以在使用Kafka Streams处理主题时获取这些标题?当调用类似mapon的方法时,KStream它提供了记录key和value记录的参数但我无法看到访问它的方法headers.如果我们能够map超越ConsumerRecords ,那就太好了.
恩.
KStreamBuilder kStreamBuilder = new KStreamBuilder();
KStream<String, String> stream = kStreamBuilder.stream("some-topic");
stream
.map((key, value) -> ... ) // can I get access to headers in methods like map, filter, aggregate, etc?
...
Run Code Online (Sandbox Code Playgroud)
像这样的东西会起作用:
KStreamBuilder kStreamBuilder = new KStreamBuilder();
KStream<String, String> stream = kStreamBuilder.stream("some-topic");
stream
.map((record) -> {
record.headers();
record.key();
record.value();
})
...
Run Code Online (Sandbox Code Playgroud) 我有一个需要收听多个不同主题的应用程序; 每个主题都有关于如何处理消息的单独逻辑.我曾想过为每个KafkaStreams实例使用相同的kafka属性,但是我得到的错误如下所示.
错误
java.lang.IllegalArgumentException: Assigned partition my-topic-1 for non-subscribed topic regex pattern; subscription pattern is my-other-topic
Run Code Online (Sandbox Code Playgroud)
代码(kotlin)
class KafkaSetup() {
companion object {
private val LOG = LoggerFactory.getLogger(this::class.java)
}
fun getProperties(): Properties {
val properties = Properties()
properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-app")
return properties
}
private fun listenOnMyTopic() {
val kStreamBuilder = KStreamBuilder()
val kStream: KStream<String, String> = kStreamBuilder.stream("my-topic")
kStream.foreach { key, value -> LOG.info("do stuff") }
val kafkaStreams = KafkaStreams(kStreamBuilder, getProperties())
kafkaStreams.start()
}
private fun listenOnMyOtherTopic() {
val kStreamBuilder = KStreamBuilder() …Run Code Online (Sandbox Code Playgroud) 我的应用程序有一些聚合/窗口操作,所以它有一些存储在其中的状态存储state.dir.AFAIK,它还将状态存储的更改日志写入代理,那么可以将Kafka Stream应用程序视为无状态POD吗?
对于大数据中的许多情况,最好一次处理一小块记录缓冲区,而不是一次处理一条记录。
自然的例子是调用一些支持批处理以提高效率的外部 API。
我们如何在 Kafka Streams 中做到这一点?我在 API 中找不到任何看起来像我想要的东西。
到目前为止,我有:
builder.stream[String, String]("my-input-topic")
.mapValues(externalApiCall).to("my-output-topic")
Run Code Online (Sandbox Code Playgroud)
我想要的是:
builder.stream[String, String]("my-input-topic")
.batched(chunkSize = 2000).map(externalBatchedApiCall).to("my-output-topic")
Run Code Online (Sandbox Code Playgroud)
在 Scala 和 Akka Streams 中,该函数被称为groupedor batch。在 Spark Structured Streaming 中,我们可以做到mapPartitions.map(_.grouped(2000).map(externalBatchedApiCall))。
我试图了解 Connect 为您购买了什么而 Streams 没有。我们有应用程序的一部分,我们想在其中使用一个主题并写入 mariadb。
我可以用一个简单的处理器来完成这个。读取记录,存储在状态存储中,然后批量插入到 mariadb 中。
为什么这是一个坏主意?JDBC Sink Connector 给你带来了什么?