通过在Kafka 0.11中添加Headers到记录(ProducerRecord和ConsumerRecord),是否可以在使用Kafka Streams处理主题时获取这些标题?当调用类似mapon的方法时,KStream它提供了记录key和value记录的参数但我无法看到访问它的方法headers.如果我们能够map超越ConsumerRecords ,那就太好了.
恩.
KStreamBuilder kStreamBuilder = new KStreamBuilder();
KStream<String, String> stream = kStreamBuilder.stream("some-topic");
stream
.map((key, value) -> ... ) // can I get access to headers in methods like map, filter, aggregate, etc?
...
Run Code Online (Sandbox Code Playgroud)
像这样的东西会起作用:
KStreamBuilder kStreamBuilder = new KStreamBuilder();
KStream<String, String> stream = kStreamBuilder.stream("some-topic");
stream
.map((record) -> {
record.headers();
record.key();
record.value();
})
...
Run Code Online (Sandbox Code Playgroud) 我的应用程序有一些聚合/窗口操作,所以它有一些存储在其中的状态存储state.dir.AFAIK,它还将状态存储的更改日志写入代理,那么可以将Kafka Stream应用程序视为无状态POD吗?
我想用KStream接口批量消息.
我有一个带有键/值的Stream我试图在翻滚窗口中收集它们,然后我想立即处理完整的窗口.
builder.stream(longSerde, updateEventSerde, CONSUME_TOPIC)
.aggregateByKey(
HashMap::new,
(aggKey, value, aggregate) -> {
aggregate.put(value.getUuid, value);
return aggregate;
},
TimeWindows.of("intentWindow", 100),
longSerde, mapSerde)
.foreach((wk, values) -> {
Run Code Online (Sandbox Code Playgroud)
事情是每次更新到KTable时调用foreach.一旦完成,我想处理整个窗口.如从100毫秒收集数据,然后立即处理.在每个.
16:** - windows from 2016-08-23T10:56:26 to 2016-08-23T10:56:27, key 2016-07-21T14:38:16.288, value count: 294
16:** - windows from 2016-08-23T10:56:26 to 2016-08-23T10:56:27, key 2016-07-21T14:38:16.288, value count: 295
16:** - windows from 2016-08-23T10:56:26 to 2016-08-23T10:56:27, key 2016-07-21T14:38:16.288, value count: 296
16:** - windows from 2016-08-23T10:56:26 to 2016-08-23T10:56:27, key 2016-07-21T14:38:16.288, value count: 297
16:** - windows from 2016-08-23T10:56:26 to …Run Code Online (Sandbox Code Playgroud) 我试图<KStream>.process()用a TimeWindows.of("name", 30000)来批量处理一些KTable值并发送它们.似乎30秒超过了消费者超时间隔,之后Kafka认为该消费者已经解散并释放分区.
我已经尝试提高轮询频率和提交间隔以避免这种情况:
config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, "5000");
config.put(StreamsConfig.POLL_MS_CONFIG, "5000");
Run Code Online (Sandbox Code Playgroud)
不幸的是,这些错误仍在发生:
(很多这些)
ERROR o.a.k.s.p.internals.RecordCollector - Error sending record to topic kafka_test1-write_aggregate2-changelog
org.apache.kafka.common.errors.TimeoutException: Batch containing 1 record(s) expired due to timeout while requesting metadata from brokers for kafka_test1-write_aggregate2-changelog-0
Run Code Online (Sandbox Code Playgroud)
其次是:
INFO o.a.k.c.c.i.AbstractCoordinator - Marking the coordinator 12.34.56.7:9092 (id: 2147483547 rack: null) dead for group kafka_test1
WARN o.a.k.s.p.internals.StreamThread - Failed to commit StreamTask #0_0 in thread [StreamThread-1]:
org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has …Run Code Online (Sandbox Code Playgroud) 在Kafka Stream WordCount示例中,它用于StateStore存储字数.如果同一个使用者组中有多个实例,那么该组StateStore是全局的,还是只是消费者实例的本地实例?
Thnaks
我正在做一个POC,用于在数据流上运行机器学习算法.
我最初的想法是获取数据,使用
Spark Streaming - >来自多个表的聚合数据 - >在数据流上运行MLLib - >生成输出.
但是我穿过KStreams.现在我很困惑!
问题:
1.Spark Streaming和Kafka Streaming有什么区别?
2.我怎样才能结婚KStreams + Spark Streaming +机器学习?
我的想法是连续培训测试数据,而不是进行批量培训.
machine-learning apache-kafka apache-spark spark-streaming apache-kafka-streams
基于apache Kafka docs KStream-to-KStream Joins are always windowed joins,我的问题是如何控制窗口的大小?保持主题数据的大小是否相同?或者,例如,我们可以将数据保留1个月,但过去一周加入流?
是否有任何好的示例来显示窗口化的KStream-to-kStream窗口连接?
在我的情况,让我们说我有2 KStream,kstream1并且kstream2我希望能够加入十天kstream1至30天kstream2.
我正在尝试使用Kafka Streams和分布在两个实例中的状态存储.以下是商店和相关KTable的定义方式:
KTable<String, Double> userBalancesTable = kStreamBuilder.table(
"balances-table",
Consumed.with(String(), Double()),
Materialized.<String, Double, KeyValueStore<Bytes, byte[]>>as(BALANCES_STORE).withKeySerde(String()).withValueSerde(Double())
);
Run Code Online (Sandbox Code Playgroud)
接下来,我有一些流处理逻辑,它将一些数据聚合到这个balance-tableKTable:
transactionsStream
.leftJoin(...)
...
.aggregate(...)
.to("balances-table", Produced.with(String(), Double()));
Run Code Online (Sandbox Code Playgroud)
在某些时候,我是从REST处理程序查询状态存储.
ReadOnlyKeyValueStore<String, Double> balances = streams.store(BALANCES_STORE, QueryableStoreTypes.<String, Double>keyValueStore());
return Optional.ofNullable(balances.get(userId)).orElse(0.0);
Run Code Online (Sandbox Code Playgroud)
哪个工作完美 - 只要我有一个流处理实例.
现在,我正在添加第二个实例(注意:我的主题都有2个分区).如文档中所述,状态存储BALANCES_STORE基于每个记录的密钥在实例之间分配(在我的示例中,密钥是用户ID).因此,一个实例必须:
调用以KafkaStreams#metadataForKey发现哪个实例正在处理包含要检索的键的状态存储的一部分
对此实例进行RPC(例如REST)调用以检索它
我的问题是调用KafkaStreams#metadataForKey始终返回一个空元数据对象.但是,KafkaStreams#allMetadataForStore()返回包含两个实例的元数据对象.所以它的行为就像它不知道我正在查询的密钥,虽然在州商店的工作中查找它.
附加说明:我的application.server属性设置正确.
谢谢!
我有四个使用相同应用程序ID运行的kafkastream应用程序实例。所有输入主题都是单个分区,为了实现可伸缩性,我将其传递给具有多个分区的中间虚拟主题。我已将request.timeout.ms设置为4分钟。
kafka实例进入ERROR状态,没有引发任何异常。很难弄清楚到底是什么问题。有任何想法吗?
[INFO ] 2018-01-09 12:30:11.579 [app-new-03-cb952917-bd06-4932-8c7e-62986126a5b4-StreamThread-1] StreamThread:939 - stream-thread [app-new-03-cb952917-bd06-4932-8c7e-62986126a5b4-StreamThread-1] Shutting down
[INFO ] 2018-01-09 12:30:11.579 [app-new-03-cb952917-bd06-4932-8c7e-62986126a5b4-StreamThread-1] StreamThread:888 - stream-thread [app-new-03-cb952917-bd06-4932-8c7e-62986126a5b4-StreamThread-1] State transition from RUNNING to PENDING_SHUTDOWN.
[INFO ] 2018-01-09 12:30:11.595 [app-new-03-cb952917-bd06-4932-8c7e-62986126a5b4-StreamThread-1] KafkaProducer:972 - Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
[INFO ] 2018-01-09 12:30:11.605 [app-new-03-cb952917-bd06-4932-8c7e-62986126a5b4-StreamThread-1] StreamThread:972 - stream-thread [app-new-03-cb952917-bd06-4932-8c7e-62986126a5b4-StreamThread-1] Stream thread shutdown complete
[INFO ] 2018-01-09 12:30:11.605 [app-new-03-cb952917-bd06-4932-8c7e-62986126a5b4-StreamThread-1] StreamThread:888 - stream-thread [app-new-03-cb952917-bd06-4932-8c7e-62986126a5b4-StreamThread-1] State transition from PENDING_SHUTDOWN to DEAD.
[WARN ] 2018-01-09 12:30:11.605 [app-new-03-cb952917-bd06-4932-8c7e-62986126a5b4-StreamThread-1] KafkaStreams:343 - stream-client …Run Code Online (Sandbox Code Playgroud) 我正在查看文档,据我了解,通过启用以下功能,我们可以实现一次准确的交易 idempotence=true
幂等:幂等生产者针对单个主题为生产者启用一次。基本上,每条发送的邮件都有垃圾保证,在出现错误的情况下不会重复
那么,如果我们已经具有幂等性,那为什么我们要在Kafka Stream中一次只需要另一个属性呢?幂等与完全一次之间到底有什么不同
为什么普通Kafka Producer中不存在一次完全属性?