当前位置: 首页 > 知识库问答 >
问题:

将带有Executor服务的Java多线程迁移到Akka

松正阳
2023-03-14

我只是想知道是否有可能替换用Java的Executor服务编写的旧的多线程代码到Akka。我对此没有什么疑问。

Is akka actor runs in their own thread? 

How Threads will be assigned for the Actors ?

What are the pros and cons of migration of it is possible?

目前我使用固定线程池多线程,并提交一个可调用的。

示例代码,

public class KafkaConsumerFactory {

    private static Map<String,KafkaConsumer> registry = new HashMap<>();

    private static ThreadLocal<KafkaConsumer> consumers = new ThreadLocal<KafkaConsumer>(){
        @Override
        protected KafkaConsumer initialValue() {
            return new KafkaConsumer(createConsumerConfig());
        }
    };

    static {
        Runtime.getRuntime().addShutdownHook(new Thread(){
            @Override
            public void run() {
                registry.forEach((tid,con) -> {
                    try{
                        con.close();
                    } finally {
                        System.out.println("Yes!! Consumer for " + tid + " is closed.");
                    }
                });
            }
        });
    }

    private static Properties createConsumerConfig() {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("group.id", "newcon-grp5");
        props.put("key.deserializer", StringDeserializer.class.getName());
        props.put("value.deserializer", KafkaKryoSerde.class.getName());
        return props;
    }


    public static <K,V> KafkaConsumer<K,V> createConsumer(){
        registry.put(Thread.currentThread().getName(),consumers.get());
        return consumers.get();
    }
}
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;

public class KafkaNewConsumer {
    public static int MAX_THREADS = 10;
    private ExecutorService es = null;
    private boolean stopRequest = false;




    public static void main(String[] args){
        KafkaNewConsumer knc = new KafkaNewConsumer();
        Runtime.getRuntime().addShutdownHook(new Thread(){
            @Override
            public void run(){
                knc.es.shutdown();
                try {
                    knc.es.awaitTermination(500, TimeUnit.MILLISECONDS);
                } catch (InterruptedException ignored) {

                }finally {
                    System.out.println("Finished");
                }
            }
        });

        knc.consumeTopic("rtest3",knc::recordConsuemer);

    }

    public void recordConsuemer(ConsumerRecord<?,?> record){
        String result = new StringJoiner(": ")
                .add(Thread.currentThread().getName())
                .add("ts").add(String.valueOf(record.timestamp()))
                .add("offset").add(String.valueOf(record.offset()))
                .add("data").add(String.valueOf(record.value()))
                .add("value-len").add(String.valueOf(record.serializedValueSize()))
                .toString();
        System.out.println(result);
    }
    public void  consumeTopic(String topicName, Consumer<ConsumerRecord<?,?>> fun){
        KafkaConsumer con= KafkaConsumerFactory.createConsumer();
        int paritions = con.partitionsFor(topicName).size();
        int noOfThread = (MAX_THREADS < paritions) ? MAX_THREADS :paritions;
         es = Executors.newFixedThreadPool(noOfThread);
        con.close();
        for(int i=0;i<noOfThread;i++){
            es.submit(()->{
                KafkaConsumer consumer = KafkaConsumerFactory.createConsumer();
                try{
                    while (!stopRequest){
                        consumer.subscribe(Collections.singletonList(topicName));
                        ConsumerRecords<?,?> records = consumer.poll(5000);

                        records.forEach(fun);
                        consumer.commitSync();
                    }
                }catch(Exception e){
                    e.printStackTrace();
                } finally {
                    consumer.close();
                }
            });
        }
    }
}

但没有解释它如何变得比线程更快?

我尝试了一些示例Akka(activator中的Akka示例)代码,并在所有Actor中打印了Thread.currentThread.getName,发现创建了名为(helloakka-akka.actor.default-dispatcher-x)的不同调度程序线程。

但怎么做?谁在创建那些线程?它们的配置在哪里?线程和执行元之间的映射关系是什么?

如果我需要100个线程来并行执行some任务的部分,我是否需要创建100个参与者并向每个参与者发送1条消息?或者我需要创建一个actor并在它的队列中放入100个消息,它将分叉成100个线程。

真的很困惑

共有1个答案

夏谦
2023-03-14

迁移到actor系统对于基于executor的系统来说不是一个小任务,但这是可以完成的。它要求您重新思考您设计系统的方式,并考虑参与者的影响。例如,在一个线程架构中,您为业务流程创建一些处理程序,将其放在一个runnable中,并让它在线程上执行操作。这对于一个行动者范式来说是完全不合适的。您必须重新构建系统以处理消息传递和使用消息调用任务。此外,您还必须改变对业务流程的思考方式,从命令式方法转变为基于消息的方法。请考虑购买产品的简单任务。我会假设你知道如何在一个执行者做这件事。在actor系统中,您可以执行以下操作:

(购买产品)->UserActor->(BillCredit Card)->CCProcessing Actor->(购买已批准和已记账的项目)->库存管理器->……依此类推

在每个阶段,括号中的是一个异步消息,发送给相关的执行元,该执行元执行业务逻辑,然后将消息转发给流程中的下一个执行元。

现在这只是创建基于actor的系统的一种方法,还有很多其他技术,但核心的基本原则是,您不能将其作为一组步骤来考虑,而应将其作为每个步骤独立运行的集合来考虑。然后,消息按常规顺序通过系统,但您无法确定顺序,甚至无法确定消息是否会到达那里,因此您必须在语义上进行设计来处理这一点。在上面的系统中,我可能会让另一个参与者每两分钟检查一次未提交给Billing的孤立订单。当然,这意味着我的消息需要是理想的,以确保如果我发送他们第二次是可以的,他们不会向用户支付两次账单。

我知道我没有处理您的具体示例,我只是想为您提供一些上下文,即actor不仅仅是创建执行器的另一种方式(好吧,我想您可以用这种方式滥用它们,但这是不可取的),而是一种完全不同的设计范式。这是一个非常值得学习的范例,如果你有了飞跃,你就再也不会想做执行者了。

 类似资料:
  • 我想做的是,创建一个executor服务(具有固定数量的线程)-->,然后将新线程的创建/新线程的任务执行移交给该executor。 由于我对Executor非常陌生,所以我想知道的是,我如何更改上面的代码,以便不是形成一个新的单独的线程,而是在线程池中创建一个新线程。我看不到任何创建线程的命令(在线程池中)-->将上面的任务移交给那个线程(而不是像上面那样移交给一个独立的线程)。 请告诉我如何解

  • 我通过使用Executor服务创建3个线程(extends Runnable)并提交它们来执行主类中的三个任务。如下所示: 当线程中出现异常时,我捕获它并执行System.exit(-1)。但是,如果发生任何异常,我需要返回到主类,并在那里执行一些语句。这怎么做?我们能在没有FutureTask的情况下从这些线程中返回一些东西吗?

  • 有没有办法使用ExecutorService暂停/恢复特定线程? 想象一下,我想停止id=0的线程(假设为每个线程分配了一个增量id,直到达到线程池的大小)。 过了一会儿,按下一个按钮,比如说,我想恢复那个特定的线程,并让所有其他线程保持其当前状态,可以暂停或恢复。 我在Java文档中发现了PausableThreadPoolExecutor的一个未完成版本。但它不适合我的需要,因为它会恢复池中的

  • 我正在探索将java web应用程序移动到Azure应用程序服务的可能性。应用程序on prem在启动时读取属性文件。 是否有可能将属性文件传递或放置到应用服务?如果没有,建议将此类遗留应用程序移动到Azure应用服务?

  • 当我的应用程序启动时,将创建一个executor服务(在java.util.concurrent中使用Executors.NewFixedThreadPool(maxThreadNum))对象。当请求到来时,executor服务将创建线程来处理它们。 当应用程序启动时,它将在executorService池中创建200个线程。 只是想知道当应用程序启动时,这是一种正确的创建线程的方法吗?还是有更好

  • 有时我看到一些线程还没有完成他们的工作,服务杀死那个线程,我怎么能强迫服务等待,直到线程完成他们的工作? 这是我的代码: 我看到了一些例外。future.is完成())块。我怎么能确保每一个未来是当执行者服务关闭?