标签: apache-kafka-streams

Kafka 流状态目录 io 错误

流运行一定时间后给出以下错误?我找不到谁负责创建 .sst 文件?

环境:

卡夫卡版本 0.10.0-cp1

Scala 2.11.8

    org.apache.kafka.streams.errors.ProcessorStateException: Error while executing flush from store agg
        at org.apache.kafka.streams.state.internals.RocksDBStore.flushInternal(RocksDBStore.java:424)
        at org.apache.kafka.streams.state.internals.RocksDBStore.flush(RocksDBStore.java:414)
        at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.flush(MeteredKeyValueStore.java:165)
        at org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:330)
        at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:247)
        at org.apache.kafka.streams.processor.internals.StreamThread.commitOne(StreamThread.java:446)
        at org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:434)
        at org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:422)
        at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:340)
        at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:218)
Caused by: org.rocksdb.RocksDBException: IO error: /tmp/kafka-streams/pos/0_0/rocksdb/agg/000008.sst: No such file or directory
        at org.rocksdb.RocksDB.flush(Native Method)
        at org.rocksdb.RocksDB.flush(RocksDB.java:1329)
        at org.apache.kafka.streams.state.internals.RocksDBStore.flushInternal(RocksDBStore.java:422)
        ... 9 more
[2016-06-24 11:13:54,910] ERROR Failed to commit StreamTask #0_0 in thread [StreamThread-1]:  (org.apache.kafka.streams.processor.internals.StreamThread:452)
org.apache.kafka.streams.errors.ProcessorStateException: Error while batch writing to store agg
        at org.apache.kafka.streams.state.internals.RocksDBStore.putAllInternal(RocksDBStore.java:324)
        at …
Run Code Online (Sandbox Code Playgroud)

apache-kafka rocksdb apache-kafka-streams

1
推荐指数
1
解决办法
2764
查看次数

Kafka Streams:一条记录​​到多条记录

鉴于:我在Kafka中有两个主题让我们说主题A和主题B.Kafka Stream从主题A读取记录,处理它并产生与消费记录对应的多个记录(比如说记录A和记录B).现在,问题是如何使用Kafka Streams实现这一目标.

KStream<String, List<Message>> producerStreams[] = recordStream.mapValues(new ValueMapper<Message, List<Message>>() {
        @Override
        public List<Message> apply(final Message message) {
          return consumerRecordHandler.process(message);
        }
    }).*someFunction*()
Run Code Online (Sandbox Code Playgroud)

这里,读取的记录是Message; 处理完毕后,返回Message列表.如何将此列表分成两个生产者流?任何帮助将不胜感激.

apache-kafka kafka-consumer-api kafka-producer-api apache-kafka-streams

1
推荐指数
1
解决办法
4488
查看次数

KafkaStreams如何确定自举时GlobalKTable是否已完全填充?

我用来创建GlobalKTable的主题非常活跃。在KStream-GlobalKTable连接的文档中,我阅读了

GlobalKTable是在完全自举(重)启动的KafkaStreams情况下,这意味着表是用下面的话题,可在启动时中的所有数据完全填充。仅在引导完成后才开始实际的数据处理。

KafkaStreams如何确定是否读取了所有数据?它是否读取所有时间戳低于KafkaStreams实例引导时间的消息?还是使用某种超时?

无论哪种方式,我都认为我们最好正确保留基础主题的保留日志压缩,否则重新启动可能需要一段时间。

apache-kafka-streams

1
推荐指数
1
解决办法
741
查看次数

带有替代方法的重载方法值表

我有以下编译器抱怨的代码:

val state: KTable[String, String]  = builder
  .table("BARY-PATH", Materialized.as("PATH-STORE"))
Run Code Online (Sandbox Code Playgroud)

错误信息:

[error] /home/developer/Desktop/microservices/paths-stream/src/main/scala/io/khinkali/PathsTopology.scala:23:8: overloaded method value table with alternatives:
[error]   [K, V](x$1: String, x$2: org.apache.kafka.streams.kstream.Materialized[K,V,org.apache.kafka.streams.state.KeyValueStore[org.apache.kafka.common.utils.Bytes,Array[Byte]]])org.apache.kafka.streams.kstream.KTable[K,V] <and>
[error]   [K, V](x$1: String, x$2: org.apache.kafka.streams.Consumed[K,V])org.apache.kafka.streams.kstream.KTable[K,V]
[error]  cannot be applied to (String, org.apache.kafka.streams.kstream.Materialized[Nothing,Nothing,Nothing])
[error]       .table("BARY-PATH", Materialized.as("PATH-STORE"))
[error]        ^
Run Code Online (Sandbox Code Playgroud)

然后我尝试:

val state: KTable[String, String]  = builder
  .table[String, String]("BARY-PATH", Materialized[String, String,KeyValueStore[org.apache.kafka.common.utils.Bytes, Array[Byte]]].as("PATH-STORE"))
Run Code Online (Sandbox Code Playgroud)

编译器仍然抱怨:

[error] /home/developer/Desktop/microservices/paths-stream/src/main/scala/io/khinkali/PathsTopology.scala:24:43: object org.apache.kafka.streams.kstream.Materialized is not a value
[error]       .table[String, String]("BARY-PATH", Materialized[String, String,KeyValueStore[org.apache.kafka.common.utils.Bytes, Array[Byte]]].as("PATH-STORE"))
Run Code Online (Sandbox Code Playgroud)

我阅读了API 文档,但无法弄清楚,我做错了什么?

方法实现:

 * Materialize a {@link StateStore} …
Run Code Online (Sandbox Code Playgroud)

java scala apache-kafka apache-kafka-streams

1
推荐指数
1
解决办法
340
查看次数

如何定义 StreamsBuilderFactoryBean 的两个实例

我是 Spring Kafka 的新手。出于某种原因,我想创建两个 StreamsBuilderFactoryBean,如您所见,我定义了两个 StreamsBuilderFactoryBean,一个名为“ commonDSLBuilder”,另一个名为“ ”,propertyDSLBuilder带有props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 4). 所以“ commonDSLBuilde”只创建一个消费者,而“ propertyDSLBuilder”创建四个消费者。

 @Configuration
@EnableKafka
public class KafkaStreamsConfig {
    private static final Logger log = LoggerFactory.getLogger(KafkaStreamsConfig.class);

    @Value("${spring.kafka.stream.application-id}")
    private String applicationId;

    @Bean(name = "commonDSLBuilder")
    public StreamsBuilderFactoryBean commonDSLBuilder() {
        Map<String, Object> props = new HashMap<>();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        StreamsConfig streamsConfig = new StreamsConfig(props);
        StreamsBuilderFactoryBean streamsBuilder = new StreamsBuilderFactoryBean(streamsConfig);
        streamsBuilder.setSingleton(Boolean.FALSE);
        return streamsBuilder;
    }

    @Bean(name = "propertyDSLBuilder")
    public StreamsBuilderFactoryBean propertyDSLBuilder() {
        Map<String, Object> …
Run Code Online (Sandbox Code Playgroud)

spring apache-kafka apache-kafka-streams spring-kafka

1
推荐指数
1
解决办法
1848
查看次数

Kafka Streams - 使用容错定义自定义关系/非键值状态存储

我正在尝试使用 kafka 实现事件溯源。

我对流处理器应用程序的设想是一个典型的 3 层 Spring 应用程序,其中:

  • “表示”层被(由?)Kafka 流 API 取代。
  • 业务逻辑层由拓扑中的处理器 API 使用。
  • 此外,DB 是一个关系 H2 内存数据库,可通过 Spring Data JPA 存储库访问。存储库还实现了必要的接口,以便将它们注册为 Kafka 状态存储以使用好处(恢复和容错)

但我想知道我应该如何实现自定义状态存储部分?

我一直在寻找和:

  • 有一些接口,例如StateStore& StoreBuilderStoreBuilderwithLoggingEnabled()方法;但是如果我启用它,实际的更新和更改日志记录何时发生?通常示例都是键值存储,即使是自定义的。如果我不想要键值怎么办?kafka 文档中交互式查询部分的示例并没有削减它。

  • 我知道交互式查询。但它们似乎适合查询而不是更新;顾名思义。

在键值存储中,发送到更改日志的记录很简单。但是如果我不使用键值;我何时以及如何通知 kafka 我的状态已更改?

apache-kafka spring-cloud-stream apache-kafka-streams

1
推荐指数
1
解决办法
571
查看次数

Spring Cloud Stream Kafka - 找不到 Serde 类:org.apache.kafka.common.serialization.Serde$StringSerde

我正在尝试使用 Spring Cloud Stream 框架构建一个简单的 Kafka Streams 应用程序。我可以连接到流来推送原始数据进行处理。但是当我尝试按键处理事件计数的流Serde class not found: org.apache.kafka.common.serialization.Serde$StringSerde时,我在运行应用程序时遇到异常。我检查了我的项目包含的库,我可以找到这个Serde类,它没有丢失。我不知道为什么在运行时它没有被加载!

下面是我的源文件。

com.pgp.learn.kafka.analytics.AnalyticsApplication

package com.pgp.learn.kafka.analytics;

import com.pgp.learn.kafka.analytics.model.PageViewEvent;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.*;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;

import java.util.Arrays;
import java.util.List;
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

@SpringBootApplication
@EnableBinding(AnalyticsBinding.class)
public class  AnalyticsApplication {

    public static void main(String[] …
Run Code Online (Sandbox Code Playgroud)

java apache-kafka spring-cloud-stream apache-kafka-streams spring-kafka

1
推荐指数
1
解决办法
1019
查看次数

与Kafka Spring Cloud Stream多个@EnableBinding

我试图设置一个监听Kafka的Spring Boot应用程序。

我正在使用Kafka Streams Binder。

一个简单的 @EnableBinding

@EnableBinding(StreamExample.StreamProcessor.class)
public class StreamExample {

    @StreamListener(StreamProcessor.INPUT)
    @SendTo(StreamProcessor.OUTPUT)
    public KStream<String, String> process(KStream<String, String> input) {

        logger.info("Stream listening");

        return input
                .peek(((key, value) -> logger.info("key = {} value = {}", key, value)));
    }

    interface StreamProcessor {

        String INPUT = "input_1";
        String OUTPUT = "output_1";

        @Input(INPUT)
        KStream<String, String> input();

        @Output(OUTPUT)
        KStream<String, String> outputProcessed();
    }
}
Run Code Online (Sandbox Code Playgroud)

和在 application.yml

spring:
  cloud:
    stream:
      kafka:
        streams:
          binder:
            brokers: localhost:29092
      bindings:
         input_1:
           destination: mytopic1
           group: readgroup
         output_1:
           destination: mytopic2
         input_2:
           destination: mytopic3 …
Run Code Online (Sandbox Code Playgroud)

apache-kafka spring-boot spring-cloud-stream apache-kafka-streams

1
推荐指数
1
解决办法
1281
查看次数

使用kafka流获取时间窗口中给定键的最后一个事件

我开始使用 KStream 来使用来自现有主题的数据。

我只对在 10 秒窗口内获取给定 ID 的最后一个事件感兴趣。我尝试使用以下代码:

StreamsBuilder builder = new StreamsBuilder();
KStream<String, MySale> stream = builder.stream(INPUT_TOPIC, Consumed.with(Serdes.String(), specificAvroSerde));

stream.selectKey((key, value) -> value.getID())
    .groupByKey()
    .windowedBy(TimeWindows.of(Duration.ofSeconds(10)))
    .reduce((value1, value2) -> value2)
    .toStream()
    .peek((key, value) -> log.info("key={}, value={}", key, value.toString()))
    .to(OUTPUT_TOPIC, Produced.with(Serdes.String(), specificAvroSerde));
Run Code Online (Sandbox Code Playgroud)

但我最终得到了所有事件,而不仅仅是最后一个。使用 KStream 可以做我想做的事吗?

java dsl stream apache-kafka apache-kafka-streams

1
推荐指数
1
解决办法
1283
查看次数

KSQL 是在后台发出远程请求,还是 Table 实际上是一个全局 KTable?

我有一个包含客户记录的 Kafka 主题,称为“客户创建”。每个客户都是主题中的一个新记录。有4个分区。

我有两个基于 docker image 的 ksql-server 实例正在运行confluentinc/cp-ksql-server:5.3.0。两者都使用相同的KSQL 服务 ID

我创建了一个表:

CREATE TABLE t_customer (id VARCHAR, 
                         firstname VARCHAR, 
                         lastname VARCHAR)
WITH (KAFKA_TOPIC = 'customer-created', 
      VALUE_FORMAT='JSON', 
      KEY = 'id');
Run Code Online (Sandbox Code Playgroud)

我是 KSQL 的新手,但我的理解是 KSQL 构建在 Kafka Streams 之上,并且每个 ksql-server 实例大致相当于一个 Kafka 流应用程序实例。我注意到的第一件事是,一旦我启动 ksql-server 的新实例,它就已经知道在第一个实例上创建的表/流,即使它是开发人员模式下的交互式实例。其次,我可以根据两个实例的 ID 选择同一个客户,但我希望只能从其中一个实例中选择相同的客户,因为我假设 KSQL 表等同于 KTable,即它应该只包含本地数据,即来自 ksql-server 实例正在处理的分区。

SET 'auto.offset.reset'='earliest';
select * from t_customer where id = '7e1a141b-b8a6-4f4a-b368-45da2a9e92a1';
Run Code Online (Sandbox Code Playgroud)

无论我将 ksql-cli 附加到哪个 ksql-server 实例,我都会得到结果。在使用普通的 Kafka Streams 时,我可以让它工作的唯一方法是使用全局 KTable。我从两个实例中得到结果的事实让我有点惊讶,因为根据文档,“只有 …

apache-kafka-streams ksqldb

1
推荐指数
1
解决办法
318
查看次数