标签: apache-kafka-streams

如何扩展 Kafka Stream 应用程序

我对 kafka 文档中关于这个主题的措辞有点困惑,所以我想在这里问我是否正确地解释了这些内容?

因此,如果我正确理解这种扩展 Kafka Stream 应用程序的唯一方法是启动应用程序的新实例(或增加 application 中的流线程数量),这将确保 ConsumerGroup('application. id'),这样我就可以将流应用程序扩展到主题的分区数量(如果我的流拓扑连接到多个主题,实际上会发生什么,假设 TopicA 有 5 个分区,topicB 有 3 个分区,我加入了 TopicA 和 TopicB 的流,我猜在这种情况下我可以扩展到 3 个实例/线程)。

现在假设我有一个包含 5 个分区的 topicA,并且启动了应用程序的 3 个实例,如果我在拓扑中配置了 KTable,则每个 KTable 将包含来自特定分区的信息,并且我必须找出我的哪个实例(分区)上的元数据关键是,那么当我启动第四个实例时会发生什么,假设实例3上的KTable的键/值现在可以转到实例4上的KTable,不是吗?一方面问题是这样的重新平衡需要多长时间(我认为这取决于主题大小,所以假设需要 1 分钟,我正在查询 KTable 的应用程序在此操作期间会没有响应吗?)

附带问题是,此机制对于“streamBuilder.table(..)”和“streambuilder.groupByKey(..).reduce(..)”的工作原理是否完全相同?

最后一个问题,同样是一个具有 5 个分区的主题,但我没有启动 3 个应用程序实例,而是启动了一个具有 3 个流线程的实例 (num.stream.threads = 3),我会再次拥有 3 个 KTable 代表 5 个分区吗?如果我将线程大小从 3 更改为 4,其行为与增加实例数完全相同。

感谢您的回答..

apache-kafka apache-kafka-streams

1
推荐指数
1
解决办法
2359
查看次数

Spring Cloud Stream 与 Kafka Stream 的 Exactly Once 功能对比

我无法在此处以及 Spring 网站和博客上找到 Spring Cloud Stream 是否能够提供 Kafka Stream API 提供的“Exactly Once”语义。也许没有单个配置/注释,并且在线程“是否可以使用 Spring Cloud Stream 进行一次处理? ”我可以找到一些有用的东西,但专家的答案是非常高的水平。感谢帮助

apache-kafka spring-cloud-stream apache-kafka-streams

1
推荐指数
1
解决办法
2646
查看次数

如何将外部源的上下文添加到 Kafka Streams 中的记录的正确方法

我有使用 Kafka Streams 处理的记录(使用处理器 API)。假设该记录有city_id和一些其他字段。

在 Kafka Streams 应用程序中,我想将目标城市的当前温度添加到记录中。
Temperature<->City对存储在例如中。Postgres。

在 Java 应用程序中,我可以使用 JDBC 连接到 Postgres 并进行构建,new HashMap<CityId, Temperature>这样我就可以根据city_id. 就像是tempHM.get(record.city_id)

有几个问题如何最好地处理它:

在哪里启动上下文数据?

最初,我一直在内部执行此操作,AbstractProcessor::init()但这似乎是错误的,因为它是为每个线程初始化的,并且还在重新平衡时重新初始化。

因此,我在使用它构建流拓扑构建器和处理器之前移动了它。在所有处理器实例上仅独立提取一次数据。

这是正确有效的方法吗?它有效,但是...

HashMap<CityId, Temperature> tempHM = new HashMap<CityId, Temperature>;

// Connect to DB and initialize tempHM here

Topology topology = new Topology();

topology
    .addSource(SOURCE, stringDerializer, protoDeserializer, "topic-in")

    .addProcessor(TemperatureAppender.NAME, () -> new TemperatureAppender(tempHm), SOURCE)

    .addSink(SINK, "topic-out", stringSerializer, protoSerializer, TemperatureAppender.NAME)
;
Run Code Online (Sandbox Code Playgroud)

如何刷新上下文数据?

例如,我想每 15 分钟刷新一次温度数据。我正在考虑使用 Hashmap 容器而不是 Hashmap,这样可以处理它: …

java apache-kafka apache-kafka-streams

1
推荐指数
1
解决办法
658
查看次数

Kstream 的成本与 KTable 相对于状态存储的成本

我试图更好地了解如何设置集群来运行 Kafka-Stream 应用程序。我试图更好地了解所涉及的数据量。

在这方面,虽然我可以很快看到 KTable 需要状态存储,但我想知道从主题创建 Kstream 是否立即意味着将该主题的所有日志复制到状态存储中,显然是以我认为的仅附加方式。也就是说,特别是如果我们想公开查询流?

当数据是 Kstream 时,当数据在源主题中移动时,Kafka 是否会自动复制状态存储中的数据?如上所述,由于更新,这对于 Ktable 来说听起来很明显,但对于 Kstream 我只想确认会发生什么。

apache-kafka apache-kafka-streams ksqldb

1
推荐指数
1
解决办法
943
查看次数

如何测试使用 Avro 和 Confluence Schema Registry 的 Spring Cloud Stream Kafka Streams 应用程序?

我无法弄清楚如何测试使用 Avro 作为消息格式和(融合)架构注册表的 Spring Cloud Stream Kafka Streams 应用程序。

配置可能是这样的:

spring:
  application:
    name: shipping-service
  cloud:
    stream:
      schema-registry-client:
        endpoint: http://localhost:8081
      kafka:
        streams:
          binder:
            configuration:
              application:
                id: shipping-service
              default:
                key:
                  serde: org.apache.kafka.common.serialization.Serdes$IntegerSerde
              schema:
                registry:
                  url: ${spring.cloud.stream.schema-registry-client.endpoint}
              value:
                subject:
                  name:
                    strategy: io.confluent.kafka.serializers.subject.RecordNameStrategy
          bindings:
            input:
              consumer:
                valueSerde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
            order:
              consumer:
                valueSerde: io.confluent.kafka.streams.serdes.avro.GenericAvroSerde
            output:
              producer:
                valueSerde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
      bindings:
        input:
          destination: customer
        order:
          destination: order
        output:
          destination: order

server:
  port: 8086

logging:
  level:
    org.springframework.kafka.config: debug
Run Code Online (Sandbox Code Playgroud)

笔记:

  • 它使用本机序列化/反序列化。
  • 测试框架:Junit 5

我想关于 Kafka Broker,我应该使用 EmbeddedKafkaBroker bean,但如您所见,它还依赖于应该以某种方式模拟的模式注册表。如何?

apache-kafka spring-cloud spring-cloud-stream apache-kafka-streams spring-kafka

1
推荐指数
1
解决办法
2098
查看次数

将 StateRestoreListener 与 Spring Cloud Kafka Streams 绑定器结合使用

我将结合使用 StateRestoreListener 和 Spring Cloud Kafka Streams 绑定器。我需要监视应用程序的容错状态存储的恢复进度。汇合https://docs.confluence.io/current/streams/monitoring.html#streams-monitoring-runtime-status中有示例。

为了观察所有状态存储的恢复,您需要为应用程序提供 org.apache.kafka.streams.processor.StateRestoreListener 接口的实例。您可以通过调用 KafkaStreams#setGlobalStateRestoreListener 方法来设置 org.apache.kafka.streams.processor.StateRestoreListener。

第一个问题是从应用程序获取 Kafka Streams。我用以下方法解决了这个问题

StreamsBuilderFactoryBean streamsBuilderFactoryBean = context.getBean("&stream-builder-process", StreamsBuilderFactoryBean.class);
KafkaStreams kafkaStreams = streamsBuilderFactoryBean.getKafkaStreams();
Run Code Online (Sandbox Code Playgroud)

第二个问题是将 StateRestoreListener 设置为 KafkaStreams,因为我收到错误

java.lang.IllegalStateException:只能在 CREATED 状态下设置 GlobalStateRestoreListener。当前状态是:正在运行

是否可以在 Spring Cloud Kafka Streams 绑定器中使用 StateRestoreListener?谢谢

java spring apache-kafka spring-cloud-stream apache-kafka-streams

1
推荐指数
1
解决办法
1338
查看次数

当控制转到catch块时如何停止发送到kafka主题功能性kafka spring

您能否建议一下,当控件到达 catch 块时,如何停止发送到我的第三个 kafka 主题,当前消息被发送到错误主题以及正常处理时应发送到的主题。一段代码如下:

@Component
public class Abc {
private final StreamBridge streamBridge;
public Abc (StreamBridge streamBridge)
this.streamBridge = streamBridge;
@Bean
public Function<KStream<String, KafkaClass>, KStream<String,KafkaClass>> hiProcess() {
return input -> input.map((key,value) -> {
try{
KafkaClass stream = processFunction();
}
catch(Exception e) {
Message<KakfaClass> mess = MessageBuilder.withPayload(value).build();
streamBridge.send("errProcess-out-0". mess);
}
return new KeyValue<>(key, stream);
})
}
}

    
Run Code Online (Sandbox Code Playgroud)

apache-kafka spring-cloud spring-cloud-stream apache-kafka-streams spring-kafka

1
推荐指数
1
解决办法
1926
查看次数

变更日志主题和重新分区主题 kafka 流

我想问一下,如果我不使用有状态流,我的 KafkaStreamsConfiguration 中是否需要复制因子。我不使用这个RockDB。据我所知,复制因子设置适用于更改日志和重新分区主题。我理解这个变更日志主题,但是这个重新分区主题让我有点困惑...有人可以用非常基本的语言向我解释这个重新分区主题是什么,以及如果我不在流应用程序中使用状态,我是否应该关心这个复制因子?

问候

apache-kafka apache-kafka-streams spring-kafka

1
推荐指数
1
解决办法
2149
查看次数

如何修改一个kafka主题的消息并使用java发送到另一个kafka主题?

我创建了一个生产MSG到一个主题A的生产者,我需要的是我想在那个MSG中进行更改并希望将其发送到另一个主题B,我正在尝试通过Kafka流做到这一点,但不确定是否是正确与否.如果它需要Kafka流,那么请分享应该写的代码?

java apache-kafka apache-kafka-streams

0
推荐指数
1
解决办法
2763
查看次数

kafka-stream:获取CorruptRecordException

我根据本教程编写了一个简单的kafka-stream程序:http:
//kafka.apache.org/10/documentation/streams/tutorial


程序

Pipe.java:

package eric.kafka.stream;

import java.util.Properties;
import java.util.concurrent.CountDownLatch;

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.KStream;

/**
 * kafka-stream - pipe,
 */
public class Pipe {
    // topic names,
    public static final String TOPIC_INPUT = "streams-plaintext-input";
    public static final String TOPIC_OUTPUT = "streams-pipe-output";

    public static void pipe() {
        // set up properties,
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-pipe"); // app id,
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); // kafka server,
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); // serialization / …
Run Code Online (Sandbox Code Playgroud)

java apache-kafka apache-kafka-streams

0
推荐指数
1
解决办法
2509
查看次数