我有n个从驱动流中检索记录的工作线程(这对这个问题并不重要),然后将它们推送到执行器服务,在那里处理记录并将其持久化到后端数据库。这个相同的执行器服务实例用于所有工作线程。
现在有一个场景,任何给定的worker循环都会停止处理记录和块,直到它提交的所有记录都得到完全处理。这本质上意味着,对于来自特定工作线程的记录,执行器服务中不应该有挂起/正在运行的线程。
实现的一个非常简单的示例如下:
>
工人阶级
public class Worker {
Worker(Listener listener){
this.listener = listener;
}
//called periodically to fetch records from a kinesis stream
public void processRecords(Record records) {
for (Record record : records) {
listener.handleRecord(record);
}
//if 15 minutes has elapsed, run below code. This is blocking.
listener.blockTillAllRecordsAreProcessed()
}
}
听者类
public class Listener {
ExecutorService es;
// same executor service is shared across all listeners.
Listener(ExecutorService es){
this.es = es;
}
public void handleRecord(Record record) {
//submit record to es and return
// non blocking
}
public boolean blockTillAllRecordsAreProcessed(){
// this should block until all records are processed
// no clue how to implement this with a common es
}
}
我能想到的唯一方法是为每个工作人员提供本地执行器服务,并为每个批次执行类似调用All
的操作,这将稍微改变实现但完成工作。但我觉得应该有更好的方法来解决这个问题。
您可以使用CountdownLatch类来阻止,如下所示:
public void processRecords(List<Record> records) {
CountDownLatch latch = new CountDownLatch(records.size());
for (Record record : records) {
listener.handleRecord(record, latch);
}
//if 15 minutes has elapsed, run below code. This is blocking.
listener.blockTillAllRecordsAreProcessed(latch)
}
public class Listener {
ExecutorService es;
...
public void handleRecord(Record record, CountDownLatch latch) {
//submit record to es and return
// non blocking
es.submit(()->{
someSyncTask(record);
latch.countDown();
})
}
public boolean blockTillAllRecordsAreProcessed(CountDownLatch latch){
System.out.println("waiting for processes to complete....");
try {
//current thread will get notified if all chidren's are done
// and thread will resume from wait() mode.
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
在这里阅读更多:https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/CountDownLatch.html
4.4.1.2 创建/使用公共服务 公共服务是应该由未指定的大量应用使用的服务。 有必要注意,它可能会收到恶意软件发送的信息(意图等)。 在使用公共服务的情况下,有必要注意,恶意软件可能会收到要发送的信息(意图等)。 下面展示了如何使用startService类型服务的示例代码。 要点(创建服务): 将导出属性显式设置为true。 小心并安全地处理接收到的意图。 返回结果时,请勿包含敏感信息。 A
我刚开始学习Java多线程。我从这个简单的代码开始,但是似乎从未实现。 我希望“任务执行”结果应该被打印出来,但我得到了一个空控制台。 下面是我的简单类:
我的Spring批处理作业每3分钟运行一次。 步骤应为 每个用户的记录应该并行执行。每个用户最多可以有150k条记录。 每个用户都可以有更新和删除记录。更新记录应在删除之前运行。 更新/删除集应该自己并行运行。但严格来说,所有更新都应该在删除之前完成。 有谁能提出在多个级别实现并行性的最佳方法,并遵循更新和删除级别的顺序吗。我正在研究Spring异步执行器服务、并行流和其他Spring库。Rx,仅
主要内容:Executor接口中的方法,实例接口是支持启动新任务的一个简单接口。 Executor接口中的方法 序号 方法 描述 1 在将来的某个时间执行给定的命令。 实例 以下程序显示了如何在基于线程的环境中接口的用法。 执行上面代码,得到如下结果 -
我在玩java多线程代码。我创建了一个具有固定线程池的executor服务。我正在提交两个任务顺序。我试图用线程使第一个任务变得很长。我在想这两个任务将并行运行。然而,当我运行程序时,程序会等待一段时间,然后输出一个B,这意味着编译器首先完成了第一个任务,然后才执行第二个任务。事实上,我在期待,因为第二个任务是一个短任务,它会在第一个任务之前完成。有什么解释吗?
我附上了一个应用程序的示例代码,它在我的Core i3 370M笔记本电脑上(Win 7 64bit,Java 1.8.0.4564bit)在大约20秒内重现了这个问题。这个应用程序读取识别文本蕴涵(RTE)语料库的XML文件,然后使用标准Java并发类同时解析所有句子。本地RTE XML文件的路径需要作为命令行参数给出。在我的测试中,我使用了以下公开的XML文件:http://www.nist.