Kafka流处理器线程安全吗?

Ehu*_*Lev 3 java multithreading apache-kafka-streams

我知道这个问题之前在这里被问过:Kafka Streaming Concurrency?

但这对我来说很奇怪。根据文档(或者也许我遗漏了一些东西),每个分区都有一个任务,意味着不同的处理器实例,并且每个任务都由不同的线程执行。但是当我测试它时,我发现不同的线程可以获得不同的处理器实例。因此,如果您想在处理器中保留任何内存状态(老式方式),则必须锁定?

示例代码:

public class SomeProcessor extends AbstractProcessor<String, JsonObject> {

   private final String ID = UUID.randomUUID().toString();

   @Override
   public void process(String key, JsonObject value) {
     System.out.println("Thread id: " + Thread.currentThread().getId() +" ID: " + ID);
Run Code Online (Sandbox Code Playgroud)

输出:

线程 ID:88 ID:26b11094-a094-404b-b610-88b38cc9d1ef

线程 ID:88 ID:c667e669-9023-494b-9345-236777e9dfda

线程 ID:88 ID:c667e669-9023-494b-9345-236777e9dfda

线程 ID:90 ID:0a43ecb0-26f2-440d-88e2-87e0c9cc4927

线程 ID:90 ID:c667e669-9023-494b-9345-236777e9dfda

线程 ID:90 ID:c667e669-9023-494b-9345-236777e9dfda

有没有办法强制每个实例线程?

Mat*_*Sax 5

每个实例的线程数是一个配置参数(num.stream.threads默认值为1)。因此,如果您启动单个KafkaStreams实例,您就会获得num.stream.threads线程。

任务将工作拆分为并行单元(基于您的输入主题分区),并将分配给线程。因此,如果您有多个任务和一个线程,则所有任务都将分配给该线程。如果您有两个线程(所有实例的总和KafkaStreams),每个线程执行大约 50% 的任务。

KafkaStreams注意:由于 Kafka Streams 应用程序本质上是分布式的,因此无论您运行具有多个线程的单个实例,还是KafkaStreams运行每个线程一个线程的多个实例,都没有区别。任务将分布在应用程序的所有可用线程上。

如果您想在任务之间共享任何数据结构并且您有多个线程,则您有责任同步对此数据结构的访问。请注意,任务到线程的分配可能会在运行时发生变化,因此所有访问都必须同步。但是,不建议使用此模式,因为它限制了可扩展性。您应该设计没有共享数据结构的程序!主要原因是,您的程序通常分布在多台机器上,因此不同的KafkaStreams实例无论如何都无法访问共享的数据结构。共享数据结构只能在单个 JVM 中工作,但使用单个 JVM 会阻止应用程序的水平扩展。