使用Executor Service将Java多线程迁移到Akka

RBa*_*jee 3 java multithreading akka

我只是想知道是否有可能替换用Java的Executor服务编写的Akka旧的多线程代码。我对此毫不怀疑。

Is akka actor runs in their own thread? 

How Threads will be assigned for the Actors ?

What are the pros and cons of migration of it is possible?
Run Code Online (Sandbox Code Playgroud)

目前,我将固定线程池用于多线程,并提交可调用对象。

示例代码

public class KafkaConsumerFactory {

    private static Map<String,KafkaConsumer> registry = new HashMap<>();

    private static ThreadLocal<KafkaConsumer> consumers = new ThreadLocal<KafkaConsumer>(){
        @Override
        protected KafkaConsumer initialValue() {
            return new KafkaConsumer(createConsumerConfig());
        }
    };

    static {
        Runtime.getRuntime().addShutdownHook(new Thread(){
            @Override
            public void run() {
                registry.forEach((tid,con) -> {
                    try{
                        con.close();
                    } finally {
                        System.out.println("Yes!! Consumer for " + tid + " is closed.");
                    }
                });
            }
        });
    }

    private static Properties createConsumerConfig() {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("group.id", "newcon-grp5");
        props.put("key.deserializer", StringDeserializer.class.getName());
        props.put("value.deserializer", KafkaKryoSerde.class.getName());
        return props;
    }


    public static <K,V> KafkaConsumer<K,V> createConsumer(){
        registry.put(Thread.currentThread().getName(),consumers.get());
        return consumers.get();
    }
}
Run Code Online (Sandbox Code Playgroud)

///////////////////////////////////////////////////// ///////

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;

public class KafkaNewConsumer {
    public static int MAX_THREADS = 10;
    private ExecutorService es = null;
    private boolean stopRequest = false;




    public static void main(String[] args){
        KafkaNewConsumer knc = new KafkaNewConsumer();
        Runtime.getRuntime().addShutdownHook(new Thread(){
            @Override
            public void run(){
                knc.es.shutdown();
                try {
                    knc.es.awaitTermination(500, TimeUnit.MILLISECONDS);
                } catch (InterruptedException ignored) {

                }finally {
                    System.out.println("Finished");
                }
            }
        });

        knc.consumeTopic("rtest3",knc::recordConsuemer);

    }

    public void recordConsuemer(ConsumerRecord<?,?> record){
        String result = new StringJoiner(": ")
                .add(Thread.currentThread().getName())
                .add("ts").add(String.valueOf(record.timestamp()))
                .add("offset").add(String.valueOf(record.offset()))
                .add("data").add(String.valueOf(record.value()))
                .add("value-len").add(String.valueOf(record.serializedValueSize()))
                .toString();
        System.out.println(result);
    }
    public void  consumeTopic(String topicName, Consumer<ConsumerRecord<?,?>> fun){
        KafkaConsumer con= KafkaConsumerFactory.createConsumer();
        int paritions = con.partitionsFor(topicName).size();
        int noOfThread = (MAX_THREADS < paritions) ? MAX_THREADS :paritions;
         es = Executors.newFixedThreadPool(noOfThread);
        con.close();
        for(int i=0;i<noOfThread;i++){
            es.submit(()->{
                KafkaConsumer consumer = KafkaConsumerFactory.createConsumer();
                try{
                    while (!stopRequest){
                        consumer.subscribe(Collections.singletonList(topicName));
                        ConsumerRecords<?,?> records = consumer.poll(5000);

                        records.forEach(fun);
                        consumer.commitSync();
                    }
                }catch(Exception e){
                    e.printStackTrace();
                } finally {
                    consumer.close();
                }
            });
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

我浏览了一些互联网教程,其中一些直接总结了

演员非常好,比传统线程快。

但是没有任何解释说明它如何变得比线程更快?

我尝试了一些示例Akka(来自激活程序的Akka示例)代码,并在所有actor中打印了Thread.currentThread.getName,发现创建了名为(helloakka-akka.actor.default-dispatcher-X)的不同调度程序线程。

但是如何?谁在创建那些线程?它们的配置在哪里?线程和Actor之间的映射关系是什么?

每次发送消息时,Akka都会创建新线程吗?还是在内部使用线程池?

如果我需要100个线程来并行执行某些任务的各个部分,是否需要创建100个Actor并向其中的每个发送1条消息?或者我需要创建1个actor并在队列中放入100条消息,它将分叉到100个线程中。

真的很困惑

Rob*_*Jr. 5

对于基于执行者的系统来说,迁移到参与者系统并不是一件容易的事,但是可以做到。它要求您重新考虑设计系统的方式并考虑参与者的影响。例如,在线程体系结构中,您为业务流程创建了一些处理程序,将其扔到可运行的环境中,然后让它在线程上运行。这对于演员范式是完全不合适的。您必须重新设计系统,以处理消息传递并使用消息传递来调用任务。此外,您还必须将对业务流程的思考方式从命令式方法更改为基于消息的方法。作为示例,请考虑购买产品的简单任务。我假设您知道如何在执行程序中执行此操作。在actor系统中,您可以执行以下操作:

(购买产品)-> UserActor->(BillCredit卡)-> CCProcessing Actor->(购买批准和计费项目)->库存管理器-> ...等

在每个阶段,括号中的内容是发送给相关参与者的异步消息,该参与者执行业务逻辑,然后将消息转发给流程中的下一个参与者。

现在,这只是创建基于参与者的系统的一种方法,还有许多其他技术,但是核心基础是您不能命令性地思考,而是作为每个独立运行的步骤的集合。然后,消息按常规顺序在系统中爆炸,但是您无法确定顺序,甚至无法确定消息是否到达目的地,因此您必须设计语义来处理该问题。在上面的系统中,我可能让另一个参与者每两分钟检查一次尚未提交给开票的孤立订单。当然,这意味着我的消息需要确定性,以确保是否第二次向我发送消息时,它们不会向用户收费两次。

我知道我没有处理您的特定示例,我只是想为您提供一些背景信息,参与者不仅是创建执行者的另一种方式(我想您可以以这种方式滥用执行者,但不建议这样做),而是完全不同的设计范式。这是一个非常值得学习的范例,可以肯定地学习,如果您进行了跨越,您将再也不想做执行者了。