流运行一定时间后给出以下错误?我找不到谁负责创建 .sst 文件?
环境:
卡夫卡版本 0.10.0-cp1
Scala 2.11.8
org.apache.kafka.streams.errors.ProcessorStateException: Error while executing flush from store agg
at org.apache.kafka.streams.state.internals.RocksDBStore.flushInternal(RocksDBStore.java:424)
at org.apache.kafka.streams.state.internals.RocksDBStore.flush(RocksDBStore.java:414)
at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.flush(MeteredKeyValueStore.java:165)
at org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:330)
at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:247)
at org.apache.kafka.streams.processor.internals.StreamThread.commitOne(StreamThread.java:446)
at org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:434)
at org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:422)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:340)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:218)
Caused by: org.rocksdb.RocksDBException: IO error: /tmp/kafka-streams/pos/0_0/rocksdb/agg/000008.sst: No such file or directory
at org.rocksdb.RocksDB.flush(Native Method)
at org.rocksdb.RocksDB.flush(RocksDB.java:1329)
at org.apache.kafka.streams.state.internals.RocksDBStore.flushInternal(RocksDBStore.java:422)
... 9 more
[2016-06-24 11:13:54,910] ERROR Failed to commit StreamTask #0_0 in thread [StreamThread-1]: (org.apache.kafka.streams.processor.internals.StreamThread:452)
org.apache.kafka.streams.errors.ProcessorStateException: Error while batch writing to store agg
at org.apache.kafka.streams.state.internals.RocksDBStore.putAllInternal(RocksDBStore.java:324)
at …Run Code Online (Sandbox Code Playgroud) 鉴于:我在Kafka中有两个主题让我们说主题A和主题B.Kafka Stream从主题A读取记录,处理它并产生与消费记录对应的多个记录(比如说记录A和记录B).现在,问题是如何使用Kafka Streams实现这一目标.
KStream<String, List<Message>> producerStreams[] = recordStream.mapValues(new ValueMapper<Message, List<Message>>() {
@Override
public List<Message> apply(final Message message) {
return consumerRecordHandler.process(message);
}
}).*someFunction*()
Run Code Online (Sandbox Code Playgroud)
这里,读取的记录是Message; 处理完毕后,返回Message列表.如何将此列表分成两个生产者流?任何帮助将不胜感激.
apache-kafka kafka-consumer-api kafka-producer-api apache-kafka-streams
我用来创建GlobalKTable的主题非常活跃。在KStream-GlobalKTable连接的文档中,我阅读了
该
GlobalKTable是在完全自举(重)启动的KafkaStreams情况下,这意味着表是用下面的话题,可在启动时中的所有数据完全填充。仅在引导完成后才开始实际的数据处理。
KafkaStreams如何确定是否读取了所有数据?它是否读取所有时间戳低于KafkaStreams实例引导时间的消息?还是使用某种超时?
无论哪种方式,我都认为我们最好正确保留基础主题的保留和日志压缩,否则重新启动可能需要一段时间。
我有以下编译器抱怨的代码:
val state: KTable[String, String] = builder
.table("BARY-PATH", Materialized.as("PATH-STORE"))
Run Code Online (Sandbox Code Playgroud)
错误信息:
[error] /home/developer/Desktop/microservices/paths-stream/src/main/scala/io/khinkali/PathsTopology.scala:23:8: overloaded method value table with alternatives:
[error] [K, V](x$1: String, x$2: org.apache.kafka.streams.kstream.Materialized[K,V,org.apache.kafka.streams.state.KeyValueStore[org.apache.kafka.common.utils.Bytes,Array[Byte]]])org.apache.kafka.streams.kstream.KTable[K,V] <and>
[error] [K, V](x$1: String, x$2: org.apache.kafka.streams.Consumed[K,V])org.apache.kafka.streams.kstream.KTable[K,V]
[error] cannot be applied to (String, org.apache.kafka.streams.kstream.Materialized[Nothing,Nothing,Nothing])
[error] .table("BARY-PATH", Materialized.as("PATH-STORE"))
[error] ^
Run Code Online (Sandbox Code Playgroud)
然后我尝试:
val state: KTable[String, String] = builder
.table[String, String]("BARY-PATH", Materialized[String, String,KeyValueStore[org.apache.kafka.common.utils.Bytes, Array[Byte]]].as("PATH-STORE"))
Run Code Online (Sandbox Code Playgroud)
编译器仍然抱怨:
[error] /home/developer/Desktop/microservices/paths-stream/src/main/scala/io/khinkali/PathsTopology.scala:24:43: object org.apache.kafka.streams.kstream.Materialized is not a value
[error] .table[String, String]("BARY-PATH", Materialized[String, String,KeyValueStore[org.apache.kafka.common.utils.Bytes, Array[Byte]]].as("PATH-STORE"))
Run Code Online (Sandbox Code Playgroud)
我阅读了API 文档,但无法弄清楚,我做错了什么?
方法实现:
* Materialize a {@link StateStore} …Run Code Online (Sandbox Code Playgroud) 我是 Spring Kafka 的新手。出于某种原因,我想创建两个 StreamsBuilderFactoryBean,如您所见,我定义了两个 StreamsBuilderFactoryBean,一个名为“ commonDSLBuilder”,另一个名为“ ”,propertyDSLBuilder带有props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 4). 所以“ commonDSLBuilde”只创建一个消费者,而“ propertyDSLBuilder”创建四个消费者。
@Configuration
@EnableKafka
public class KafkaStreamsConfig {
private static final Logger log = LoggerFactory.getLogger(KafkaStreamsConfig.class);
@Value("${spring.kafka.stream.application-id}")
private String applicationId;
@Bean(name = "commonDSLBuilder")
public StreamsBuilderFactoryBean commonDSLBuilder() {
Map<String, Object> props = new HashMap<>();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
StreamsConfig streamsConfig = new StreamsConfig(props);
StreamsBuilderFactoryBean streamsBuilder = new StreamsBuilderFactoryBean(streamsConfig);
streamsBuilder.setSingleton(Boolean.FALSE);
return streamsBuilder;
}
@Bean(name = "propertyDSLBuilder")
public StreamsBuilderFactoryBean propertyDSLBuilder() {
Map<String, Object> …Run Code Online (Sandbox Code Playgroud) 我正在尝试使用 kafka 实现事件溯源。
我对流处理器应用程序的设想是一个典型的 3 层 Spring 应用程序,其中:
但我想知道我应该如何实现自定义状态存储部分?
我一直在寻找和:
有一些接口,例如StateStore& StoreBuilder。StoreBuilder有withLoggingEnabled()方法;但是如果我启用它,实际的更新和更改日志记录何时发生?通常示例都是键值存储,即使是自定义的。如果我不想要键值怎么办?kafka 文档中交互式查询部分的示例并没有削减它。
我知道交互式查询。但它们似乎适合查询而不是更新;顾名思义。
在键值存储中,发送到更改日志的记录很简单。但是如果我不使用键值;我何时以及如何通知 kafka 我的状态已更改?
我正在尝试使用 Spring Cloud Stream 框架构建一个简单的 Kafka Streams 应用程序。我可以连接到流来推送原始数据进行处理。但是当我尝试按键处理事件计数的流Serde class not found: org.apache.kafka.common.serialization.Serde$StringSerde时,我在运行应用程序时遇到异常。我检查了我的项目包含的库,我可以找到这个Serde类,它没有丢失。我不知道为什么在运行时它没有被加载!
下面是我的源文件。
com.pgp.learn.kafka.analytics.AnalyticsApplication
package com.pgp.learn.kafka.analytics;
import com.pgp.learn.kafka.analytics.model.PageViewEvent;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.*;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
import java.util.Arrays;
import java.util.List;
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
@SpringBootApplication
@EnableBinding(AnalyticsBinding.class)
public class AnalyticsApplication {
public static void main(String[] …Run Code Online (Sandbox Code Playgroud) java apache-kafka spring-cloud-stream apache-kafka-streams spring-kafka
我试图设置一个监听Kafka的Spring Boot应用程序。
我正在使用Kafka Streams Binder。
一个简单的 @EnableBinding
@EnableBinding(StreamExample.StreamProcessor.class)
public class StreamExample {
@StreamListener(StreamProcessor.INPUT)
@SendTo(StreamProcessor.OUTPUT)
public KStream<String, String> process(KStream<String, String> input) {
logger.info("Stream listening");
return input
.peek(((key, value) -> logger.info("key = {} value = {}", key, value)));
}
interface StreamProcessor {
String INPUT = "input_1";
String OUTPUT = "output_1";
@Input(INPUT)
KStream<String, String> input();
@Output(OUTPUT)
KStream<String, String> outputProcessed();
}
}
Run Code Online (Sandbox Code Playgroud)
和在 application.yml
spring:
cloud:
stream:
kafka:
streams:
binder:
brokers: localhost:29092
bindings:
input_1:
destination: mytopic1
group: readgroup
output_1:
destination: mytopic2
input_2:
destination: mytopic3 …Run Code Online (Sandbox Code Playgroud) apache-kafka spring-boot spring-cloud-stream apache-kafka-streams
我开始使用 KStream 来使用来自现有主题的数据。
我只对在 10 秒窗口内获取给定 ID 的最后一个事件感兴趣。我尝试使用以下代码:
StreamsBuilder builder = new StreamsBuilder();
KStream<String, MySale> stream = builder.stream(INPUT_TOPIC, Consumed.with(Serdes.String(), specificAvroSerde));
stream.selectKey((key, value) -> value.getID())
.groupByKey()
.windowedBy(TimeWindows.of(Duration.ofSeconds(10)))
.reduce((value1, value2) -> value2)
.toStream()
.peek((key, value) -> log.info("key={}, value={}", key, value.toString()))
.to(OUTPUT_TOPIC, Produced.with(Serdes.String(), specificAvroSerde));
Run Code Online (Sandbox Code Playgroud)
但我最终得到了所有事件,而不仅仅是最后一个。使用 KStream 可以做我想做的事吗?
我有一个包含客户记录的 Kafka 主题,称为“客户创建”。每个客户都是主题中的一个新记录。有4个分区。
我有两个基于 docker image 的 ksql-server 实例正在运行confluentinc/cp-ksql-server:5.3.0。两者都使用相同的KSQL 服务 ID。
我创建了一个表:
CREATE TABLE t_customer (id VARCHAR,
firstname VARCHAR,
lastname VARCHAR)
WITH (KAFKA_TOPIC = 'customer-created',
VALUE_FORMAT='JSON',
KEY = 'id');
Run Code Online (Sandbox Code Playgroud)
我是 KSQL 的新手,但我的理解是 KSQL 构建在 Kafka Streams 之上,并且每个 ksql-server 实例大致相当于一个 Kafka 流应用程序实例。我注意到的第一件事是,一旦我启动 ksql-server 的新实例,它就已经知道在第一个实例上创建的表/流,即使它是开发人员模式下的交互式实例。其次,我可以根据两个实例的 ID 选择同一个客户,但我希望只能从其中一个实例中选择相同的客户,因为我假设 KSQL 表等同于 KTable,即它应该只包含本地数据,即来自 ksql-server 实例正在处理的分区。
SET 'auto.offset.reset'='earliest';
select * from t_customer where id = '7e1a141b-b8a6-4f4a-b368-45da2a9e92a1';
Run Code Online (Sandbox Code Playgroud)
无论我将 ksql-cli 附加到哪个 ksql-server 实例,我都会得到结果。在使用普通的 Kafka Streams 时,我可以让它工作的唯一方法是使用全局 KTable。我从两个实例中得到结果的事实让我有点惊讶,因为根据文档,“只有 …
apache-kafka ×8
java ×3
spring-kafka ×2
dsl ×1
ksqldb ×1
rocksdb ×1
scala ×1
spring ×1
spring-boot ×1
stream ×1