当前位置: 首页 > 知识库问答 >
问题:

Java—使用公共executor服务实例进行并发处理

东深
2023-03-14

我有n个从驱动流中检索记录的工作线程(这对这个问题并不重要),然后将它们推送到执行器服务,在那里处理记录并将其持久化到后端数据库。这个相同的执行器服务实例用于所有工作线程。

现在有一个场景,任何给定的worker循环都会停止处理记录和块,直到它提交的所有记录都得到完全处理。这本质上意味着,对于来自特定工作线程的记录,执行器服务中不应该有挂起/正在运行的线程。

实现的一个非常简单的示例如下:

>

  • 工人阶级

    public class Worker {
    
    Worker(Listener listener){
        this.listener = listener;
    }
    
    //called periodically to fetch records from a kinesis stream
    public void processRecords(Record records) {
    
        for (Record record : records) {
            listener.handleRecord(record);
        }
    
        //if 15 minutes has elapsed, run below code. This is blocking.
        listener.blockTillAllRecordsAreProcessed()
    }
    

    }

    听者类

    public class Listener {
    
        ExecutorService es;
    
        // same executor service is shared across all listeners.
        Listener(ExecutorService es){
            this.es = es;
        }
    
        public void handleRecord(Record record) {
            //submit record to es and return
            // non blocking
        }
    
        public boolean blockTillAllRecordsAreProcessed(){
            // this should block until all records are processed
            // no clue how to implement this with a common es
        }
    
    }
    

    我能想到的唯一方法是为每个工作人员提供本地执行器服务,并为每个批次执行类似调用All的操作,这将稍微改变实现但完成工作。但我觉得应该有更好的方法来解决这个问题。

  • 共有1个答案

    幸弘光
    2023-03-14

    您可以使用CountdownLatch类来阻止,如下所示:

    public void processRecords(List<Record> records) {
     CountDownLatch latch = new CountDownLatch(records.size());
     for (Record record : records) {
         listener.handleRecord(record, latch);
     }
    
     //if 15 minutes has elapsed, run below code. This is blocking.
     listener.blockTillAllRecordsAreProcessed(latch)
     } 
    
    public class Listener {
     ExecutorService es;
     ...
     public void handleRecord(Record record, CountDownLatch latch) {
         //submit record to es and return
         // non blocking
         es.submit(()->{
            someSyncTask(record);
            latch.countDown();
            
        })
     }
    
     public boolean blockTillAllRecordsAreProcessed(CountDownLatch latch){
         System.out.println("waiting for processes to complete....");
         try {
              //current thread will get notified if all chidren's are done 
              // and thread will resume from wait() mode.
              latch.await();
              } catch (InterruptedException e) {
                   e.printStackTrace();
              }
           }
    
       }
    
    

    在这里阅读更多:https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/CountDownLatch.html

     类似资料:
    • 4.4.1.2 创建/使用公共服务 公共服务是应该由未指定的大量应用使用的服务。 有必要注意,它可能会收到恶意软件发送的信息(意图等)。 在使用公共服务的情况下,有必要注意,恶意软件可能会收到要发送的信息(意图等)。 下面展示了如何使用startService类型服务的示例代码。 要点(创建服务): 将导出属性显式设置为true。 小心并安全地处理接收到的意图。 返回结果时,请勿包含敏感信息。 A

    • 我刚开始学习Java多线程。我从这个简单的代码开始,但是似乎从未实现。 我希望“任务执行”结果应该被打印出来,但我得到了一个空控制台。 下面是我的简单类:

    • 我的Spring批处理作业每3分钟运行一次。 步骤应为 每个用户的记录应该并行执行。每个用户最多可以有150k条记录。 每个用户都可以有更新和删除记录。更新记录应在删除之前运行。 更新/删除集应该自己并行运行。但严格来说,所有更新都应该在删除之前完成。 有谁能提出在多个级别实现并行性的最佳方法,并遵循更新和删除级别的顺序吗。我正在研究Spring异步执行器服务、并行流和其他Spring库。Rx,仅

    • 主要内容:Executor接口中的方法,实例接口是支持启动新任务的一个简单接口。 Executor接口中的方法 序号 方法 描述 1 在将来的某个时间执行给定的命令。 实例 以下程序显示了如何在基于线程的环境中接口的用法。 执行上面代码,得到如下结果 -

    • 我在玩java多线程代码。我创建了一个具有固定线程池的executor服务。我正在提交两个任务顺序。我试图用线程使第一个任务变得很长。我在想这两个任务将并行运行。然而,当我运行程序时,程序会等待一段时间,然后输出一个B,这意味着编译器首先完成了第一个任务,然后才执行第二个任务。事实上,我在期待,因为第二个任务是一个短任务,它会在第一个任务之前完成。有什么解释吗?

    • 我附上了一个应用程序的示例代码,它在我的Core i3 370M笔记本电脑上(Win 7 64bit,Java 1.8.0.4564bit)在大约20秒内重现了这个问题。这个应用程序读取识别文本蕴涵(RTE)语料库的XML文件,然后使用标准Java并发类同时解析所有句子。本地RTE XML文件的路径需要作为命令行参数给出。在我的测试中,我使用了以下公开的XML文件:http://www.nist.