我想用KStream接口批量消息.
我有一个带有键/值的Stream我试图在翻滚窗口中收集它们,然后我想立即处理完整的窗口.
builder.stream(longSerde, updateEventSerde, CONSUME_TOPIC)
.aggregateByKey(
HashMap::new,
(aggKey, value, aggregate) -> {
aggregate.put(value.getUuid, value);
return aggregate;
},
TimeWindows.of("intentWindow", 100),
longSerde, mapSerde)
.foreach((wk, values) -> {
Run Code Online (Sandbox Code Playgroud)
事情是每次更新到KTable时调用foreach.一旦完成,我想处理整个窗口.如从100毫秒收集数据,然后立即处理.在每个.
16:** - windows from 2016-08-23T10:56:26 to 2016-08-23T10:56:27, key 2016-07-21T14:38:16.288, value count: 294
16:** - windows from 2016-08-23T10:56:26 to 2016-08-23T10:56:27, key 2016-07-21T14:38:16.288, value count: 295
16:** - windows from 2016-08-23T10:56:26 to 2016-08-23T10:56:27, key 2016-07-21T14:38:16.288, value count: 296
16:** - windows from 2016-08-23T10:56:26 to 2016-08-23T10:56:27, key 2016-07-21T14:38:16.288, value count: 297
16:** - windows from 2016-08-23T10:56:26 to …Run Code Online (Sandbox Code Playgroud) 在Kafka Stream WordCount示例中,它用于StateStore存储字数.如果同一个使用者组中有多个实例,那么该组StateStore是全局的,还是只是消费者实例的本地实例?
Thnaks
我正在寻找一种测试Kafka Streams应用程序的方法.这样我就可以定义输入事件,测试套件会显示输出.
没有真正的Kafka设置,这可能吗?
我有一个Kafka Streams应用程序消耗并生成具有3个代理的Kafka集群,复制因子为3.除了消费者偏移主题(50个分区)之外,所有其他主题每个只有一个分区.
当代理尝试首选副本时,Streams应用程序(运行在与代理完全不同的实例上)失败并显示错误:
Caused by: org.apache.kafka.streams.errors.StreamsException: task [0_0] exception caught when producing
at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.checkForException(RecordCollectorImpl.java:119)
...
at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:197)
Caused by: org.apache.kafka.common.errors.NotLeaderForPartitionException: This server is not the leader for that topic-partition.
Run Code Online (Sandbox Code Playgroud)
Streams应用程序尝试成为分区的领导者是正常的,因为它在不属于Kafka集群的服务器上运行?
我可以通过以下方式重现此行为:
bin/kafka-preferred-replica-election.sh --zookeeper localhost我的问题似乎与报告的失败类似,所以我想知道这是否是一个新的Kafka Streams错误.我的完整堆栈跟踪与报告的失败中链接的要点完全相同(此处).
另一个可能有趣的细节是,在领导者选举期间,我controller.log在经纪人中得到这些消息:
[2017-04-12 11:07:50,940] WARN [Controller-3-to-broker-3-send-thread], Controller 3's connection to broker BROKER-3-HOSTNAME:9092 (id: 3 rack: null) was unsuccessful (kafka.controller.RequestSendThread)
java.io.IOException: Connection to BROKER-3-HOSTNAME:9092 (id: 3 rack: null) failed
at kafka.utils.NetworkClientBlockingOps$.awaitReady$1(NetworkClientBlockingOps.scala:84)
at kafka.utils.NetworkClientBlockingOps$.blockingReady$extension(NetworkClientBlockingOps.scala:94)
at kafka.controller.RequestSendThread.brokerReady(ControllerChannelManager.scala:232)
at kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:185) …Run Code Online (Sandbox Code Playgroud) 我正在使用以下 log4j.properties
log4j.rootLogger=DEBUG, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target=System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n
Run Code Online (Sandbox Code Playgroud)
我只想禁用 kafka 的日志消息。其中显示我正在记录的日志消息。
有没有人设法使用 IntelliJ IDEA 调试用 Java 8 编写的 kafkastreams 代码?我正在运行一个简单的 linesplit.java 代码,它从一个主题中获取流并将其拆分并将其发送到另一个主题,但我不知道在哪里保存调试指针以在每条消息流经 linesplit.java 时对其进行调试。
分割线
public static void main(String[] args) throws Exception {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-linesplit");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
final StreamsBuilder builder = new StreamsBuilder();
// ------- use the code below for Java 8 and uncomment the above ---
builder.stream("streams-input")
.flatMapValues(value -> Arrays.asList(value.toString().split("\\W+")))
.to("streams-output");
// -----------------------------------------------------------------
final Topology topology = builder.build();
final KafkaStreams streams = new KafkaStreams(topology, props);
final CountDownLatch latch = new CountDownLatch(1); …Run Code Online (Sandbox Code Playgroud) 嘿伙计们,我想在我的春季启动项目中使用Kafka Streams实时处理.所以我需要Kafka Streams配置或者我想使用KStreams或KTable,但我在互联网上找不到例子.
我现在做生产者和消费者我想实时流式传输.
我们应该永远调用processorContext.commit()在Processor执行由自己呢?我的意思是commit在计划的Punctuator实现或内部process方法中调用方法。
我们应该在哪些用例中这样做,我们是否需要这样做?这个问题与 Kafka DSL withtransform()和 Processor API 有关。
似乎 Kafka Streams 自己处理它,调用 processorContext.commit()也不能保证它会立即完成。
我注意到,该aggregate()阶段似乎在序列化/反序列化每个元素,即使它定期发出结果。
streamBuilder
.stream(inputTopic, Consumed.`with`(keySerde, inputValueSerde))
.groupByKey(Serialized.`with`(keySerde, inputValueSerde))
.aggregate(
() => Snapshot.Initial(),
(_, event, prevSnap: Snapshot) => {
// ...
},
Materialized.as(stateStoreName).withValueSerde(snapshotSerde)
)
.toStream()
Run Code Online (Sandbox Code Playgroud)
我希望键值存储可以在内存中工作,直到提交提交为止。看起来不仅为每个更新进行写操作,而且还存在反序列化的读操作。有人可以解释一下这是如何工作的,如果我应该关注表现吗?
我正在查看文档,据我了解,通过启用以下功能,我们可以实现一次准确的交易 idempotence=true
幂等:幂等生产者针对单个主题为生产者启用一次。基本上,每条发送的邮件都有垃圾保证,在出现错误的情况下不会重复
那么,如果我们已经具有幂等性,那为什么我们要在Kafka Stream中一次只需要另一个属性呢?幂等与完全一次之间到底有什么不同
为什么普通Kafka Producer中不存在一次完全属性?