标签: apache-kafka-streams

KStream批处理窗口

我想用KStream接口批量消息.

我有一个带有键/值的Stream我试图在翻滚窗口中收集它们,然后我想立即处理完整的窗口.

builder.stream(longSerde, updateEventSerde, CONSUME_TOPIC)
                .aggregateByKey(
                        HashMap::new,
                        (aggKey, value, aggregate) -> {
                            aggregate.put(value.getUuid, value);
                            return aggregate;
                        },
                        TimeWindows.of("intentWindow", 100),
                        longSerde, mapSerde)
                .foreach((wk, values) -> {
Run Code Online (Sandbox Code Playgroud)

事情是每次更新到KTable时调用foreach.一旦完成,我想处理整个窗口.如从100毫秒收集数据,然后立即处理.在每个.

16:** - windows from 2016-08-23T10:56:26 to 2016-08-23T10:56:27, key 2016-07-21T14:38:16.288, value count: 294
16:** - windows from 2016-08-23T10:56:26 to 2016-08-23T10:56:27, key 2016-07-21T14:38:16.288, value count: 295
16:** - windows from 2016-08-23T10:56:26 to 2016-08-23T10:56:27, key 2016-07-21T14:38:16.288, value count: 296
16:** - windows from 2016-08-23T10:56:26 to 2016-08-23T10:56:27, key 2016-07-21T14:38:16.288, value count: 297
16:** - windows from 2016-08-23T10:56:26 to …
Run Code Online (Sandbox Code Playgroud)

apache-kafka-streams

8
推荐指数
2
解决办法
2759
查看次数

Kafka Stream StateStore是全球所有实例还是本地实体?

在Kafka Stream WordCount示例中,它用于StateStore存储字数.如果同一个使用者组中有多个实例,那么该组StateStore是全局的,还是只是消费者实例的本地实例?

Thnaks

apache-kafka apache-kafka-streams

8
推荐指数
1
解决办法
6621
查看次数

测试Kafka Streams拓扑

我正在寻找一种测试Kafka Streams应用程序的方法.这样我就可以定义输入事件,测试套件会显示输出.

没有真正的Kafka设置,这可能吗?

testing apache-kafka apache-kafka-streams

8
推荐指数
1
解决办法
8026
查看次数

卡夫卡领导人选举导致Kafka Streams崩溃

我有一个Kafka Streams应用程序消耗并生成具有3个代理的Kafka集群,复制因子为3.除了消费者偏移主题(50个分区)之外,所有其他主题每个只有一个分区.

当代理尝试首选副本时,Streams应用程序(运行在与代理完全不同的实例上)失败并显示错误:

Caused by: org.apache.kafka.streams.errors.StreamsException: task [0_0] exception caught when producing
    at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.checkForException(RecordCollectorImpl.java:119)
    ...
    at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:197)
Caused by: org.apache.kafka.common.errors.NotLeaderForPartitionException: This server is not the leader for that topic-partition.
Run Code Online (Sandbox Code Playgroud)

Streams应用程序尝试成为分区的领导者是正常的,因为它在不属于Kafka集群的服务器上运行?

我可以通过以下方式重现此行为:

  1. 杀死其中一个经纪人(其他2个接管者作为所有分区的领导者,按照预期将杀死的经纪人作为他们的领导者)
  2. 让遇难的经纪人重新振作起来
  3. 触发首选副本领袖选举 bin/kafka-preferred-replica-election.sh --zookeeper localhost

我的问题似乎与报告的失败类似,所以我想知道这是否是一个新的Kafka Streams错误.我的完整堆栈跟踪与报告的失败中链接的要点完全相同(此处).

另一个可能有趣的细节是,在领导者选举期间,我controller.log在经纪人中得到这些消息:

[2017-04-12 11:07:50,940] WARN [Controller-3-to-broker-3-send-thread], Controller 3's connection to broker BROKER-3-HOSTNAME:9092 (id: 3 rack: null) was unsuccessful (kafka.controller.RequestSendThread)
java.io.IOException: Connection to BROKER-3-HOSTNAME:9092 (id: 3 rack: null) failed
    at kafka.utils.NetworkClientBlockingOps$.awaitReady$1(NetworkClientBlockingOps.scala:84)
    at kafka.utils.NetworkClientBlockingOps$.blockingReady$extension(NetworkClientBlockingOps.scala:94)
    at kafka.controller.RequestSendThread.brokerReady(ControllerChannelManager.scala:232)
    at kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:185) …
Run Code Online (Sandbox Code Playgroud)

java apache-kafka apache-kafka-streams

8
推荐指数
1
解决办法
2022
查看次数

我们可以只为 kafka 禁用 log4j 日志吗

我正在使用以下 log4j.properties

log4j.rootLogger=DEBUG, stdout


log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target=System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n
Run Code Online (Sandbox Code Playgroud)

我只想禁用 kafka 的日志消息。其中显示我正在记录的日志消息。

apache-kafka apache-kafka-streams

8
推荐指数
2
解决办法
7934
查看次数

如何调试kafkastreams代码?

有没有人设法使用 IntelliJ IDEA 调试用 Java 8 编写的 kafkastreams 代码?我正在运行一个简单的 linesplit.java 代码,它从一个主题中获取流并将其拆分并将其发送到另一个主题,但我不知道在哪里保存调试指针以在每条消息流经 linesplit.java 时对其进行调试。

分割线

public static void main(String[] args) throws Exception {
    Properties props = new Properties();
    props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-linesplit");
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
    props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

    final StreamsBuilder builder = new StreamsBuilder();



    // ------- use the code below for Java 8 and uncomment the above ---

    builder.stream("streams-input")
           .flatMapValues(value -> Arrays.asList(value.toString().split("\\W+")))
           .to("streams-output");

     //  -----------------------------------------------------------------

    final Topology topology = builder.build();
    final KafkaStreams streams = new KafkaStreams(topology, props);
    final CountDownLatch latch = new CountDownLatch(1); …
Run Code Online (Sandbox Code Playgroud)

java apache-kafka apache-kafka-streams

8
推荐指数
1
解决办法
2761
查看次数

Kafka Streams with Spring Boot

嘿伙计们,我想在我的春季启动项目中使用Kafka Streams实时处理.所以我需要Kafka Streams配置或者我想使用KStreams或KTable,但我在互联网上找不到例子.

我现在做生产者和消费者我想实时流式传输.

apache-kafka spring-boot apache-kafka-streams spring-kafka

8
推荐指数
1
解决办法
1万
查看次数

Kafka Streams - 处理器上下文提交

我们应该永远调用processorContext.commit()Processor执行由自己呢?我的意思是commit在计划的Punctuator实现或内部process方法中调用方法。

我们应该在哪些用例中这样做,我们是否需要这样做?这个问题与 Kafka DSL withtransform()和 Processor API 有关。

似乎 Kafka Streams 自己处理它,调用 processorContext.commit()也不能保证它会立即完成。

apache-kafka apache-kafka-streams

8
推荐指数
1
解决办法
2317
查看次数

Kafka Streams聚合阶段是否对每个元素进行序列化和反序列化?

我注意到,该aggregate()阶段似乎在序列化/反序列化每个元素,即使它定期发出结果。

  streamBuilder
      .stream(inputTopic, Consumed.`with`(keySerde, inputValueSerde))
      .groupByKey(Serialized.`with`(keySerde, inputValueSerde))
      .aggregate(
        () => Snapshot.Initial(),
        (_, event, prevSnap: Snapshot) => {
          // ...
        },
        Materialized.as(stateStoreName).withValueSerde(snapshotSerde)
      )
      .toStream()
Run Code Online (Sandbox Code Playgroud)

我希望键值存储可以在内存中工作,直到提交提交为止。看起来不仅为每个更新进行写操作,而且还存在反序列化的读操作。有人可以解释一下这是如何工作的,如果我应该关注表现吗?

apache-kafka apache-kafka-streams

8
推荐指数
1
解决办法
41
查看次数

Kafka Stream中幂等与正好一次的区别

我正在查看文档,据我了解,通过启用以下功能,我们可以实现一次准确的交易 idempotence=true

幂等:幂等生产者针对单个主题为生产者启用一次。基本上,每条发送的邮件都有垃圾保证,在出现错误的情况下不会重复

那么,如果我们已经具有幂等性,那为什么我们要在Kafka Stream中一次只需要另一个属性呢?幂等与完全一次之间到底有什么不同

为什么普通Kafka Producer中不存在一次完全属性?

apache-kafka apache-kafka-streams

8
推荐指数
2
解决办法
193
查看次数