eth*_*nny 1 java apache-kafka apache-kafka-streams
我希望将来自KStream的窗口批输出组合在一起并将它们写入辅助存储.
我期待.punctuate()大约每30秒看一次.我得到的是保存在这里.
(原始文件长达数千行)
总结 - .punctuate()看似随机然后反复调用.它似乎不符合通过ProcessorContext.schedule()设置的值.
另一个相同代码的运行.punctuate()大约每四分钟产生一次调用.这次我没有看到疯狂的重复值.来源没有变化 - 只是结果不同.
使用以下代码:
StreamsConfig streamsConfig = new StreamsConfig(config);
KStreamBuilder kStreamBuilder = new KStreamBuilder();
KStream<String, String> lines = kStreamBuilder.stream(TOPIC);
lines.process(new BPS2());
KafkaStreams kafkaStreams = new KafkaStreams(kStreamBuilder, streamsConfig);
kafkaStreams.start();
Run Code Online (Sandbox Code Playgroud)
public class BP2 extends AbstractProcessor<String, String> {
private static final Logger LOGGER = LoggerFactory.getLogger(BP2.class);
private ProcessorContext context;
private final long delay;
private final ArrayList<String> values;
public BP2(long delay) {
LOGGER.debug("BatchProcessor() constructor");
this.delay = delay;
values = new ArrayList<>();
}
@Override
public void process(String s, String s2) {
LOGGER.debug("batched processor s:{} s2:{}", s, s2);
values.add(s2);
}
@Override
public void init(ProcessorContext context) {
LOGGER.info("init");
super.init(context);
values.clear();
this.context = context;
context.schedule(delay);
}
@Override
public void punctuate(long timestamp) {
super.punctuate(timestamp);
LOGGER.info("punctuate ts: {} count: {}", timestamp, values.size());
context().commit();
}
}
Run Code Online (Sandbox Code Playgroud)
public class BPS2 implements ProcessorSupplier<String, String> {
private static final Logger LOGGER = LoggerFactory.getLogger(BPS2.class);
@Override
public Processor<String, String> get() {
try {
return new BP2(30000);
} catch(Exception exception) {
LOGGER.error("Unable to instantiate BatchProcessor()", exception);
throw new RuntimeException();
}
}
}
Run Code Online (Sandbox Code Playgroud)
为了确保我的调试器没有减慢速度,我构建它并在与我的kafka进程相同的盒子上运行它.这次它甚至没有试图延迟4分钟或更长时间 - 在几秒钟内就输出了虚假的电话.punctuate().其中许多(大多数)没有干预.process().
更新:这部分答案是针对Kafka版本0.11或更早版本(对于Kafka 1.0及更高版本,见下文)
在卡夫卡流,标点符号是基于数据流的时间和不 系统时间(又名处理时间).
每个默认流时间是事件时间,即嵌入在Kafka记录中的时间戳.当你不设置非默认TimestampExtractor(参见timestamp.extractor在http://docs.confluent.io/current/streams/developer-guide.html#optional-configuration-parameters),来电来punctuate只取决于事件的过程关于您处理的记录的时间.因此,如果您需要多个分钟来处理记录的"30秒"(事件时间),punctuate则会比30秒(挂钟时间)更频繁地调用...
这也可以解释您的不规则呼叫模式(即突发和长延迟).如果您的数据事件时间"跳转",并且您的主题中已经完全可以处理您要处理的数据,那么Kafka Streams也会"跳转"内部维护的流时间.
我认为,您可以通过使用来解决您的问题WallclockTimestampExtractor(请参阅http://docs.confluent.io/current/streams/developer-guide.html#timestamp-extractor)
还有一点需要提及:流时间只有在处理数据时才会提前 - 如果您的应用程序到达输入主题的末尾并等待数据,punctuate则不会被调用.即使您使用,这也适用WallclockTimestampExtractor.
顺便说一句:目前有关于Streams标点符号行为的讨论:https://github.com/apache/kafka/pull/1689
回答Kafka 1.0及更高版本
从Kafka 1.0开始,可以根据挂钟时间或事件时间来注册标点符号:https://kafka.apache.org/10/documentation/streams/developer-guide/processor-api.html#id2
| 归档时间: |
|
| 查看次数: |
1449 次 |
| 最近记录: |