标签: apache-kafka-streams

是否可以使用Kafka Streams访问邮件头?

通过在Kafka 0.11中添加Headers到记录(ProducerRecordConsumerRecord),是否可以在使用Kafka Streams处理主题时获取这些标题?当调用类似mapon的方法时,KStream它提供了记录keyvalue记录的参数但我无法看到访问它的方法headers.如果我们能够map超越ConsumerRecords ,那就太好了.

恩.

KStreamBuilder kStreamBuilder = new KStreamBuilder();
KStream<String, String> stream = kStreamBuilder.stream("some-topic");
stream
    .map((key, value) ->  ... ) // can I get access to headers in methods like map, filter, aggregate, etc?
    ... 
Run Code Online (Sandbox Code Playgroud)

像这样的东西会起作用:

KStreamBuilder kStreamBuilder = new KStreamBuilder();
KStream<String, String> stream = kStreamBuilder.stream("some-topic");
stream
    .map((record) -> {
        record.headers();
        record.key();
        record.value();
    })
    ...
Run Code Online (Sandbox Code Playgroud)

java apache-kafka apache-kafka-streams

9
推荐指数
1
解决办法
4984
查看次数

如何在Kubernetes上部署Kafka Stream应用程序?

我的应用程序有一些聚合/窗口操作,所以它有一些存储在其中的状态存储state.dir.AFAIK,它还将状态存储的更改日志写入代理,那么可以将Kafka Stream应用程序视为无状态POD吗?

apache-kafka kubernetes apache-kafka-streams

9
推荐指数
1
解决办法
2393
查看次数

KStream批处理窗口

我想用KStream接口批量消息.

我有一个带有键/值的Stream我试图在翻滚窗口中收集它们,然后我想立即处理完整的窗口.

builder.stream(longSerde, updateEventSerde, CONSUME_TOPIC)
                .aggregateByKey(
                        HashMap::new,
                        (aggKey, value, aggregate) -> {
                            aggregate.put(value.getUuid, value);
                            return aggregate;
                        },
                        TimeWindows.of("intentWindow", 100),
                        longSerde, mapSerde)
                .foreach((wk, values) -> {
Run Code Online (Sandbox Code Playgroud)

事情是每次更新到KTable时调用foreach.一旦完成,我想处理整个窗口.如从100毫秒收集数据,然后立即处理.在每个.

16:** - windows from 2016-08-23T10:56:26 to 2016-08-23T10:56:27, key 2016-07-21T14:38:16.288, value count: 294
16:** - windows from 2016-08-23T10:56:26 to 2016-08-23T10:56:27, key 2016-07-21T14:38:16.288, value count: 295
16:** - windows from 2016-08-23T10:56:26 to 2016-08-23T10:56:27, key 2016-07-21T14:38:16.288, value count: 296
16:** - windows from 2016-08-23T10:56:26 to 2016-08-23T10:56:27, key 2016-07-21T14:38:16.288, value count: 297
16:** - windows from 2016-08-23T10:56:26 to …
Run Code Online (Sandbox Code Playgroud)

apache-kafka-streams

8
推荐指数
2
解决办法
2759
查看次数

Kafka KStreams - 处理超时

我试图<KStream>.process()用a TimeWindows.of("name", 30000)来批量处理一些KTable值并发送它们.似乎30秒超过了消费者超时间隔,之后Kafka认为该消费者已经解散并释放分区.

我已经尝试提高轮询频率和提交间隔以避免这种情况:

config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, "5000");
config.put(StreamsConfig.POLL_MS_CONFIG, "5000");
Run Code Online (Sandbox Code Playgroud)

不幸的是,这些错误仍在发生:

(很多这些)

ERROR  o.a.k.s.p.internals.RecordCollector - Error sending record to topic kafka_test1-write_aggregate2-changelog 
org.apache.kafka.common.errors.TimeoutException: Batch containing 1 record(s) expired due to timeout while requesting metadata from brokers for kafka_test1-write_aggregate2-changelog-0
Run Code Online (Sandbox Code Playgroud)

其次是:

INFO   o.a.k.c.c.i.AbstractCoordinator - Marking the coordinator 12.34.56.7:9092 (id: 2147483547 rack: null) dead for group kafka_test1
WARN   o.a.k.s.p.internals.StreamThread - Failed to commit StreamTask #0_0 in thread [StreamThread-1]: 
  org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has …
Run Code Online (Sandbox Code Playgroud)

java apache-kafka apache-kafka-streams

8
推荐指数
1
解决办法
6490
查看次数

Kafka Stream StateStore是全球所有实例还是本地实体?

在Kafka Stream WordCount示例中,它用于StateStore存储字数.如果同一个使用者组中有多个实例,那么该组StateStore是全局的,还是只是消费者实例的本地实例?

Thnaks

apache-kafka apache-kafka-streams

8
推荐指数
1
解决办法
6621
查看次数

KStreams + Spark Streaming +机器学习

我正在做一个POC,用于在数据流上运行机器学习算法.
我最初的想法是获取数据,使用

Spark Streaming - >来自多个表的聚合数据 - >在数据流上运行MLLib - >生成输出.

但是我穿过KStreams.现在我很困惑!

问题:
1.Spark Streaming和Kafka Streaming有什么区别?
2.我怎样才能结婚KStreams + Spark Streaming +机器学习?
我的想法是连续培训测试数据,而不是进行批量培训.

machine-learning apache-kafka apache-spark spark-streaming apache-kafka-streams

8
推荐指数
2
解决办法
5734
查看次数

如何管理Kafka KStream到Kstream窗口加入?

基于apache Kafka docs KStream-to-KStream Joins are always windowed joins,我的问题是如何控制窗口的大小?保持主题数据的大小是否相同?或者,例如,我们可以将数据保留1个月,但过去一周加入流?

是否有任何好的示例来显示窗口化的KStream-to-kStream窗口连接?

在我的情况,让我们说我有2 KStream,kstream1并且kstream2我希望能够加入十天kstream1至30天kstream2.

apache-kafka apache-kafka-streams

8
推荐指数
2
解决办法
6024
查看次数

无法检索Kafka Streams中的状态存储键的元数据

我正在尝试使用Kafka Streams和分布在两个实例中的状态存储.以下是商店和相关KTable的定义方式:

KTable<String, Double> userBalancesTable = kStreamBuilder.table(
        "balances-table",
        Consumed.with(String(), Double()),
        Materialized.<String, Double, KeyValueStore<Bytes, byte[]>>as(BALANCES_STORE).withKeySerde(String()).withValueSerde(Double())
);
Run Code Online (Sandbox Code Playgroud)

接下来,我有一些流处理逻辑,它将一些数据聚合到这个balance-tableKTable:

transactionsStream
        .leftJoin(...)
        ...
        .aggregate(...)
        .to("balances-table", Produced.with(String(), Double()));
Run Code Online (Sandbox Code Playgroud)

在某些时候,我是从REST处理程序查询状态存储.

ReadOnlyKeyValueStore<String, Double> balances = streams.store(BALANCES_STORE, QueryableStoreTypes.<String, Double>keyValueStore());
return Optional.ofNullable(balances.get(userId)).orElse(0.0);
Run Code Online (Sandbox Code Playgroud)

哪个工作完美 - 只要我有一个流处理实例.

现在,我正在添加第二个实例(注意:我的主题都有2个分区).如文档所述,状态存储BALANCES_STORE基于每个记录的密钥在实例之间分配(在我的示例中,密钥是用户ID).因此,一个实例必须:

  1. 调用以KafkaStreams#metadataForKey发现哪个实例正在处理包含要检索的键的状态存储的一部分

  2. 对此实例进行RPC(例如REST)调用以检索它

我的问题是调用KafkaStreams#metadataForKey始终返回一个空元数据对象.但是,KafkaStreams#allMetadataForStore()返回包含两个实例的元数据对象.所以它的行为就像它不知道我正在查询的密钥,虽然在州商店的工作中查找它.

调试

附加说明:我的application.server属性设置正确.

谢谢!

apache-kafka apache-kafka-streams

8
推荐指数
0
解决办法
531
查看次数

KafkaStreams毫无例外地关闭

我有四个使用相同应用程序ID运行的kafkastream应用程序实例。所有输入主题都是单个分区,为了实现可伸缩性,我将其传递给具有多个分区的中间虚拟主题。我已将request.timeout.ms设置为4分钟。

kafka实例进入ERROR状态,没有引发任何异常。很难弄清楚到底是什么问题。有任何想法吗?

[INFO ] 2018-01-09 12:30:11.579 [app-new-03-cb952917-bd06-4932-8c7e-62986126a5b4-StreamThread-1] StreamThread:939 - stream-thread [app-new-03-cb952917-bd06-4932-8c7e-62986126a5b4-StreamThread-1] Shutting down
[INFO ] 2018-01-09 12:30:11.579 [app-new-03-cb952917-bd06-4932-8c7e-62986126a5b4-StreamThread-1] StreamThread:888 - stream-thread [app-new-03-cb952917-bd06-4932-8c7e-62986126a5b4-StreamThread-1] State transition from RUNNING to PENDING_SHUTDOWN.
[INFO ] 2018-01-09 12:30:11.595 [app-new-03-cb952917-bd06-4932-8c7e-62986126a5b4-StreamThread-1] KafkaProducer:972 - Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
[INFO ] 2018-01-09 12:30:11.605 [app-new-03-cb952917-bd06-4932-8c7e-62986126a5b4-StreamThread-1] StreamThread:972 - stream-thread [app-new-03-cb952917-bd06-4932-8c7e-62986126a5b4-StreamThread-1] Stream thread shutdown complete
[INFO ] 2018-01-09 12:30:11.605 [app-new-03-cb952917-bd06-4932-8c7e-62986126a5b4-StreamThread-1] StreamThread:888 - stream-thread [app-new-03-cb952917-bd06-4932-8c7e-62986126a5b4-StreamThread-1] State transition from PENDING_SHUTDOWN to DEAD.
[WARN ] 2018-01-09 12:30:11.605 [app-new-03-cb952917-bd06-4932-8c7e-62986126a5b4-StreamThread-1] KafkaStreams:343 - stream-client …
Run Code Online (Sandbox Code Playgroud)

apache-kafka-streams

8
推荐指数
1
解决办法
1682
查看次数

Kafka Stream中幂等与正好一次的区别

我正在查看文档,据我了解,通过启用以下功能,我们可以实现一次准确的交易 idempotence=true

幂等:幂等生产者针对单个主题为生产者启用一次。基本上,每条发送的邮件都有垃圾保证,在出现错误的情况下不会重复

那么,如果我们已经具有幂等性,那为什么我们要在Kafka Stream中一次只需要另一个属性呢?幂等与完全一次之间到底有什么不同

为什么普通Kafka Producer中不存在一次完全属性?

apache-kafka apache-kafka-streams

8
推荐指数
2
解决办法
193
查看次数