我只是想知道是否有可能替换用Java的Executor服务编写的旧的多线程代码到Akka。我对此没有什么疑问。
Is akka actor runs in their own thread?
How Threads will be assigned for the Actors ?
What are the pros and cons of migration of it is possible?
目前我使用固定线程池多线程,并提交一个可调用的。
示例代码,
public class KafkaConsumerFactory {
private static Map<String,KafkaConsumer> registry = new HashMap<>();
private static ThreadLocal<KafkaConsumer> consumers = new ThreadLocal<KafkaConsumer>(){
@Override
protected KafkaConsumer initialValue() {
return new KafkaConsumer(createConsumerConfig());
}
};
static {
Runtime.getRuntime().addShutdownHook(new Thread(){
@Override
public void run() {
registry.forEach((tid,con) -> {
try{
con.close();
} finally {
System.out.println("Yes!! Consumer for " + tid + " is closed.");
}
});
}
});
}
private static Properties createConsumerConfig() {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "newcon-grp5");
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", KafkaKryoSerde.class.getName());
return props;
}
public static <K,V> KafkaConsumer<K,V> createConsumer(){
registry.put(Thread.currentThread().getName(),consumers.get());
return consumers.get();
}
}
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
public class KafkaNewConsumer {
public static int MAX_THREADS = 10;
private ExecutorService es = null;
private boolean stopRequest = false;
public static void main(String[] args){
KafkaNewConsumer knc = new KafkaNewConsumer();
Runtime.getRuntime().addShutdownHook(new Thread(){
@Override
public void run(){
knc.es.shutdown();
try {
knc.es.awaitTermination(500, TimeUnit.MILLISECONDS);
} catch (InterruptedException ignored) {
}finally {
System.out.println("Finished");
}
}
});
knc.consumeTopic("rtest3",knc::recordConsuemer);
}
public void recordConsuemer(ConsumerRecord<?,?> record){
String result = new StringJoiner(": ")
.add(Thread.currentThread().getName())
.add("ts").add(String.valueOf(record.timestamp()))
.add("offset").add(String.valueOf(record.offset()))
.add("data").add(String.valueOf(record.value()))
.add("value-len").add(String.valueOf(record.serializedValueSize()))
.toString();
System.out.println(result);
}
public void consumeTopic(String topicName, Consumer<ConsumerRecord<?,?>> fun){
KafkaConsumer con= KafkaConsumerFactory.createConsumer();
int paritions = con.partitionsFor(topicName).size();
int noOfThread = (MAX_THREADS < paritions) ? MAX_THREADS :paritions;
es = Executors.newFixedThreadPool(noOfThread);
con.close();
for(int i=0;i<noOfThread;i++){
es.submit(()->{
KafkaConsumer consumer = KafkaConsumerFactory.createConsumer();
try{
while (!stopRequest){
consumer.subscribe(Collections.singletonList(topicName));
ConsumerRecords<?,?> records = consumer.poll(5000);
records.forEach(fun);
consumer.commitSync();
}
}catch(Exception e){
e.printStackTrace();
} finally {
consumer.close();
}
});
}
}
}
但没有解释它如何变得比线程更快?
我尝试了一些示例Akka(activator中的Akka示例)代码,并在所有Actor中打印了Thread.currentThread.getName,发现创建了名为(helloakka-akka.actor.default-dispatcher-x)的不同调度程序线程。
但怎么做?谁在创建那些线程?它们的配置在哪里?线程和执行元之间的映射关系是什么?
如果我需要100个线程来并行执行some任务的部分,我是否需要创建100个参与者并向每个参与者发送1条消息?或者我需要创建一个actor并在它的队列中放入100个消息,它将分叉成100个线程。
真的很困惑
迁移到actor系统对于基于executor的系统来说不是一个小任务,但这是可以完成的。它要求您重新思考您设计系统的方式,并考虑参与者的影响。例如,在一个线程架构中,您为业务流程创建一些处理程序,将其放在一个runnable中,并让它在线程上执行操作。这对于一个行动者范式来说是完全不合适的。您必须重新构建系统以处理消息传递和使用消息调用任务。此外,您还必须改变对业务流程的思考方式,从命令式方法转变为基于消息的方法。请考虑购买产品的简单任务。我会假设你知道如何在一个执行者做这件事。在actor系统中,您可以执行以下操作:
(购买产品)->UserActor->(BillCredit Card)->CCProcessing Actor->(购买已批准和已记账的项目)->库存管理器->……依此类推
在每个阶段,括号中的是一个异步消息,发送给相关的执行元,该执行元执行业务逻辑,然后将消息转发给流程中的下一个执行元。
现在这只是创建基于actor的系统的一种方法,还有很多其他技术,但核心的基本原则是,您不能将其作为一组步骤来考虑,而应将其作为每个步骤独立运行的集合来考虑。然后,消息按常规顺序通过系统,但您无法确定顺序,甚至无法确定消息是否会到达那里,因此您必须在语义上进行设计来处理这一点。在上面的系统中,我可能会让另一个参与者每两分钟检查一次未提交给Billing的孤立订单。当然,这意味着我的消息需要是理想的,以确保如果我发送他们第二次是可以的,他们不会向用户支付两次账单。
我知道我没有处理您的具体示例,我只是想为您提供一些上下文,即actor不仅仅是创建执行器的另一种方式(好吧,我想您可以用这种方式滥用它们,但这是不可取的),而是一种完全不同的设计范式。这是一个非常值得学习的范例,如果你有了飞跃,你就再也不会想做执行者了。
我想做的是,创建一个executor服务(具有固定数量的线程)-->,然后将新线程的创建/新线程的任务执行移交给该executor。 由于我对Executor非常陌生,所以我想知道的是,我如何更改上面的代码,以便不是形成一个新的单独的线程,而是在线程池中创建一个新线程。我看不到任何创建线程的命令(在线程池中)-->将上面的任务移交给那个线程(而不是像上面那样移交给一个独立的线程)。 请告诉我如何解
我通过使用Executor服务创建3个线程(extends Runnable)并提交它们来执行主类中的三个任务。如下所示: 当线程中出现异常时,我捕获它并执行System.exit(-1)。但是,如果发生任何异常,我需要返回到主类,并在那里执行一些语句。这怎么做?我们能在没有FutureTask的情况下从这些线程中返回一些东西吗?
有没有办法使用ExecutorService暂停/恢复特定线程? 想象一下,我想停止id=0的线程(假设为每个线程分配了一个增量id,直到达到线程池的大小)。 过了一会儿,按下一个按钮,比如说,我想恢复那个特定的线程,并让所有其他线程保持其当前状态,可以暂停或恢复。 我在Java文档中发现了PausableThreadPoolExecutor的一个未完成版本。但它不适合我的需要,因为它会恢复池中的
我正在探索将java web应用程序移动到Azure应用程序服务的可能性。应用程序on prem在启动时读取属性文件。 是否有可能将属性文件传递或放置到应用服务?如果没有,建议将此类遗留应用程序移动到Azure应用服务?
当我的应用程序启动时,将创建一个executor服务(在java.util.concurrent中使用Executors.NewFixedThreadPool(maxThreadNum))对象。当请求到来时,executor服务将创建线程来处理它们。 当应用程序启动时,它将在executorService池中创建200个线程。 只是想知道当应用程序启动时,这是一种正确的创建线程的方法吗?还是有更好
有时我看到一些线程还没有完成他们的工作,服务杀死那个线程,我怎么能强迫服务等待,直到线程完成他们的工作? 这是我的代码: 我看到了一些例外。future.is完成())块。我怎么能确保每一个未来是当执行者服务关闭?