我对 kafka 文档中关于这个主题的措辞有点困惑,所以我想在这里问我是否正确地解释了这些内容?
因此,如果我正确理解这种扩展 Kafka Stream 应用程序的唯一方法是启动应用程序的新实例(或增加 application 中的流线程数量),这将确保 ConsumerGroup('application. id'),这样我就可以将流应用程序扩展到主题的分区数量(如果我的流拓扑连接到多个主题,实际上会发生什么,假设 TopicA 有 5 个分区,topicB 有 3 个分区,我加入了 TopicA 和 TopicB 的流,我猜在这种情况下我可以扩展到 3 个实例/线程)。
现在假设我有一个包含 5 个分区的 topicA,并且启动了应用程序的 3 个实例,如果我在拓扑中配置了 KTable,则每个 KTable 将包含来自特定分区的信息,并且我必须找出我的哪个实例(分区)上的元数据关键是,那么当我启动第四个实例时会发生什么,假设实例3上的KTable的键/值现在可以转到实例4上的KTable,不是吗?一方面问题是这样的重新平衡需要多长时间(我认为这取决于主题大小,所以假设需要 1 分钟,我正在查询 KTable 的应用程序在此操作期间会没有响应吗?)
附带问题是,此机制对于“streamBuilder.table(..)”和“streambuilder.groupByKey(..).reduce(..)”的工作原理是否完全相同?
最后一个问题,同样是一个具有 5 个分区的主题,但我没有启动 3 个应用程序实例,而是启动了一个具有 3 个流线程的实例 (num.stream.threads = 3),我会再次拥有 3 个 KTable 代表 5 个分区吗?如果我将线程大小从 3 更改为 4,其行为与增加实例数完全相同。
感谢您的回答..
我无法在此处以及 Spring 网站和博客上找到 Spring Cloud Stream 是否能够提供 Kafka Stream API 提供的“Exactly Once”语义。也许没有单个配置/注释,并且在线程“是否可以使用 Spring Cloud Stream 进行一次处理? ”我可以找到一些有用的东西,但专家的答案是非常高的水平。感谢帮助
我有使用 Kafka Streams 处理的记录(使用处理器 API)。假设该记录有city_id和一些其他字段。
在 Kafka Streams 应用程序中,我想将目标城市的当前温度添加到记录中。
Temperature<->City对存储在例如中。Postgres。
在 Java 应用程序中,我可以使用 JDBC 连接到 Postgres 并进行构建,new HashMap<CityId, Temperature>这样我就可以根据city_id. 就像是tempHM.get(record.city_id)。
有几个问题如何最好地处理它:
最初,我一直在内部执行此操作,AbstractProcessor::init()但这似乎是错误的,因为它是为每个线程初始化的,并且还在重新平衡时重新初始化。
因此,我在使用它构建流拓扑构建器和处理器之前移动了它。在所有处理器实例上仅独立提取一次数据。
这是正确有效的方法吗?它有效,但是...
HashMap<CityId, Temperature> tempHM = new HashMap<CityId, Temperature>;
// Connect to DB and initialize tempHM here
Topology topology = new Topology();
topology
.addSource(SOURCE, stringDerializer, protoDeserializer, "topic-in")
.addProcessor(TemperatureAppender.NAME, () -> new TemperatureAppender(tempHm), SOURCE)
.addSink(SINK, "topic-out", stringSerializer, protoSerializer, TemperatureAppender.NAME)
;
Run Code Online (Sandbox Code Playgroud)
例如,我想每 15 分钟刷新一次温度数据。我正在考虑使用 Hashmap 容器而不是 Hashmap,这样可以处理它: …
我试图更好地了解如何设置集群来运行 Kafka-Stream 应用程序。我试图更好地了解所涉及的数据量。
在这方面,虽然我可以很快看到 KTable 需要状态存储,但我想知道从主题创建 Kstream 是否立即意味着将该主题的所有日志复制到状态存储中,显然是以我认为的仅附加方式。也就是说,特别是如果我们想公开查询流?
当数据是 Kstream 时,当数据在源主题中移动时,Kafka 是否会自动复制状态存储中的数据?如上所述,由于更新,这对于 Ktable 来说听起来很明显,但对于 Kstream 我只想确认会发生什么。
我无法弄清楚如何测试使用 Avro 作为消息格式和(融合)架构注册表的 Spring Cloud Stream Kafka Streams 应用程序。
配置可能是这样的:
spring:
application:
name: shipping-service
cloud:
stream:
schema-registry-client:
endpoint: http://localhost:8081
kafka:
streams:
binder:
configuration:
application:
id: shipping-service
default:
key:
serde: org.apache.kafka.common.serialization.Serdes$IntegerSerde
schema:
registry:
url: ${spring.cloud.stream.schema-registry-client.endpoint}
value:
subject:
name:
strategy: io.confluent.kafka.serializers.subject.RecordNameStrategy
bindings:
input:
consumer:
valueSerde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
order:
consumer:
valueSerde: io.confluent.kafka.streams.serdes.avro.GenericAvroSerde
output:
producer:
valueSerde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
bindings:
input:
destination: customer
order:
destination: order
output:
destination: order
server:
port: 8086
logging:
level:
org.springframework.kafka.config: debug
Run Code Online (Sandbox Code Playgroud)
笔记:
我想关于 Kafka Broker,我应该使用 EmbeddedKafkaBroker bean,但如您所见,它还依赖于应该以某种方式模拟的模式注册表。如何?
apache-kafka spring-cloud spring-cloud-stream apache-kafka-streams spring-kafka
我将结合使用 StateRestoreListener 和 Spring Cloud Kafka Streams 绑定器。我需要监视应用程序的容错状态存储的恢复进度。汇合https://docs.confluence.io/current/streams/monitoring.html#streams-monitoring-runtime-status中有示例。
为了观察所有状态存储的恢复,您需要为应用程序提供 org.apache.kafka.streams.processor.StateRestoreListener 接口的实例。您可以通过调用 KafkaStreams#setGlobalStateRestoreListener 方法来设置 org.apache.kafka.streams.processor.StateRestoreListener。
第一个问题是从应用程序获取 Kafka Streams。我用以下方法解决了这个问题
StreamsBuilderFactoryBean streamsBuilderFactoryBean = context.getBean("&stream-builder-process", StreamsBuilderFactoryBean.class);
KafkaStreams kafkaStreams = streamsBuilderFactoryBean.getKafkaStreams();
Run Code Online (Sandbox Code Playgroud)
第二个问题是将 StateRestoreListener 设置为 KafkaStreams,因为我收到错误
java.lang.IllegalStateException:只能在 CREATED 状态下设置 GlobalStateRestoreListener。当前状态是:正在运行
是否可以在 Spring Cloud Kafka Streams 绑定器中使用 StateRestoreListener?谢谢
java spring apache-kafka spring-cloud-stream apache-kafka-streams
您能否建议一下,当控件到达 catch 块时,如何停止发送到我的第三个 kafka 主题,当前消息被发送到错误主题以及正常处理时应发送到的主题。一段代码如下:
@Component
public class Abc {
private final StreamBridge streamBridge;
public Abc (StreamBridge streamBridge)
this.streamBridge = streamBridge;
@Bean
public Function<KStream<String, KafkaClass>, KStream<String,KafkaClass>> hiProcess() {
return input -> input.map((key,value) -> {
try{
KafkaClass stream = processFunction();
}
catch(Exception e) {
Message<KakfaClass> mess = MessageBuilder.withPayload(value).build();
streamBridge.send("errProcess-out-0". mess);
}
return new KeyValue<>(key, stream);
})
}
}
Run Code Online (Sandbox Code Playgroud) apache-kafka spring-cloud spring-cloud-stream apache-kafka-streams spring-kafka
我想问一下,如果我不使用有状态流,我的 KafkaStreamsConfiguration 中是否需要复制因子。我不使用这个RockDB。据我所知,复制因子设置适用于更改日志和重新分区主题。我理解这个变更日志主题,但是这个重新分区主题让我有点困惑...有人可以用非常基本的语言向我解释这个重新分区主题是什么,以及如果我不在流应用程序中使用状态,我是否应该关心这个复制因子?
问候
我创建了一个生产MSG到一个主题A的生产者,我需要的是我想在那个MSG中进行更改并希望将其发送到另一个主题B,我正在尝试通过Kafka流做到这一点,但不确定是否是正确与否.如果它需要Kafka流,那么请分享应该写的代码?
我根据本教程编写了一个简单的kafka-stream程序:http:
//kafka.apache.org/10/documentation/streams/tutorial
Pipe.java:
package eric.kafka.stream;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.KStream;
/**
* kafka-stream - pipe,
*/
public class Pipe {
// topic names,
public static final String TOPIC_INPUT = "streams-plaintext-input";
public static final String TOPIC_OUTPUT = "streams-pipe-output";
public static void pipe() {
// set up properties,
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-pipe"); // app id,
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); // kafka server,
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); // serialization / …Run Code Online (Sandbox Code Playgroud)