当前位置: 首页 > 知识库问答 >
问题:

主线程使用者和其他线程生产者

公孙国兴
2023-03-14

下面是代码,我面临的问题是recordRead变量告诉线程应该从哪里开始读取记录的起点。但是我如何为每个线程设置不同的值?例如,对于thread1,它应该是0,recordsToRead应该是300,对于thread2,recordsToRead应该是300+300=600,对于最后一个线程,它应该是600以及更高的结束。pagesize=50pagesize、recordRead和recordToRead都是属于主类和主线程的变量。

    ExecutorService service = Executors.newFixedThreadPool(nThreads);
    while(nThreads > 0) {
        nThreads--;
        service.execute(new Runnable() {

            @Override
            public void run() {
                // TODO Auto-generated method stub

                do {
                    int respCode = 0;
                    int RecordsToRead = div;
                    JSONObject jsObj = new JSONObject();
                    jsObj.put("pagesize", pageSize);
                    jsObj.put("start", recordsRead);
                    jsObj.put("searchinternalid", searchInternalId);

                    try {
                        boolean status = req.invoke(jsObj); 
                        respCode = req.getResponseCode();

                    } catch (Exception e) {         
                        req.reset();
                        e.printStackTrace();
                        return true;
                    }
                    JSONObject jsResp = req.getResponseJson();
                    //here jsResp will be added to ArrayBlockingQueue.

                    req.reset();
                }while(!isError && !isMaxLimit && recordsRead < RecordsToRead);

            }

        });
    }

在这个循环之后将是主线程读取队列的代码。如何为所有线程设置recordsRead和recordToread。

以及如何使主线程等待直到至少有一个线程在队列中插入一个对象。

共有1个答案

陈康胜
2023-03-14

我在你的定义中看到了两个问题。第一个问题是执行并行块计算,第二个问题是创建一个连续的流水线。让我们从第一个问题开始。为了实现具有预定义大小的并行计算,fmpv的最佳选择是使用fork-join框架。不仅是因为性能(工作窃取确实有效),也是因为代码更简单。但是,由于您被限制为3个线程对我来说,它似乎也有效地直接使用线程。简单地说,你想要的东西我可以通过这种方式来实现:

    final int chunkSize = 300;
    //you can also use total amount of job
    //int totalWork = 1000 and chunk size equals totalWork/threadsNumber
    final int threadsNumber = 3;

    Thread[] threads = new Thread[threadsNumber];

    for (int ii = 0; ii < threadsNumber; ii++) {
        final int i = ii;

        threads[ii] = new Thread(() -> {
           //count your variable according the volume
            // for example you can do so
            int chunkStart = i * chunkSize; 
            int chunkEnd = chunkStart + chunkSize;
            for(int j = chunkStart; j < chunkEnd; j++) {
              //object creation with necessary proprs
              //offer to queue here
            }
        });

        threads[ii].start();
    }

    //your code here
    //take here

    for (int ii = 0; ii < threadsNumber; ii++) {
        try {
         //this part is only as example
         //you do not need it                
         //here if you want you can also w8 for completion of all threads
            threads[ii].join();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

关于消费的第二个问题。对于这个puprose,您可以使用ConcurrentLinkedBlockingQueue(http://www.jgroups.org/javadoc/org/jgroups/util/ConcurrentLinkedBlockingQueue.html)。在生产者线程中报价,主要采用取用方法

但老实说,我还是不明白你问题的原因。您想创建连续的管道还是它只是一次性的计算?

 类似资料:
  • 问题内容: 我想看看使用多线程生产者而不是单线程生产者会有多少时间差异。我在本地计算机上设置了ActiveMQ,编写了生产者类,该类将初始化并在其构造函数中启动JMS连接。我将消息限制设置为3M,将所有消息推送到ActiveMQ大约花费了50秒。我只发送了一个字符串“ hello world” 3M次。 然后,我使用了相同的生产者对象(一个连接但有多个会话),并使用线程大小为8的ExecutorS

  • 来自文档:http://docs.python.org/2/library/thread 让我们在这里只讨论非守护进程线程。因为第一个引号没有特别提到非守护进程线程,所以我假设,如果主线程退出,即使是非守护进程线程也应该被杀死。然而,第二句引文却表明了另一种情况。事实上,当主线程退出时,非守护进程线程确实不会被杀死。那么,这里的第一个引用有什么意义呢?

  • 我在使用ArrayBlockingQueue时遇到了生产者和消费者的情况。如果使用者线程面临异常,如何停止生产者线程。我需要生产者停止等待队列是空的。我已经诱导了一个强制运行时异常。但是程序不会退出。生产者一直在等待,因为队列是空的。有人能帮忙吗

  • 注意:我工作了很多时间并研究了google和stackoverflow,但我找不到答案。 我用线。sleep(),它冻结了所有其他JDialog、JFrame和线程。 我的示例代码: 在这种情况下,JDialog无法正确显示: inccorect出现jdialog 但它必须符合这一点: true出现jdialog 我怎样才能解决这个问题。我想让主线程等待另一个线程。有人可以纠正我的示例代码,或者在

  • 问题内容: 我想创建某种线程应用程序。但是我不确定在两者之间实现队列的最佳方法是什么。 因此,我提出了两个想法(这两个想法可能都是完全错误的)。我想知道哪种更好,如果它们都烂了,那么实现队列的最佳方法是什么。我关心的主要是这些示例中队列的实现。我正在扩展一个内部类的Queue类,它是线程安全的。下面是两个示例,每个示例有4个类。 主班 消费阶层 生产者类别 队列类 要么 主班 消费阶层 生产者类别