当前位置: 首页 > 知识库问答 >
问题:

如何在使用者线程面临异常时停止生产者线程

姬康平
2023-03-14

我在使用ArrayBlockingQueue时遇到了生产者和消费者的情况。如果使用者线程面临异常,如何停止生产者线程。我需要生产者停止等待队列是空的。我已经诱导了一个强制运行时异常。但是程序不会退出。生产者一直在等待,因为队列是空的。有人能帮忙吗

public class ServiceClass implements Runnable{

    private final static BlockingQueue<Integer> processQueue = new ArrayBlockingQueue<>(10);
    private static final int CONSUMER_COUNT = 1;
    private boolean isConsumerInterrupted = false;

    private boolean isConsumer = false;
    private static boolean producerIsDone = false;

    public ServiceClass(boolean consumer,boolean isConsumerInterrupted) {
        this.isConsumer = consumer;
        this.isConsumerInterrupted = isConsumerInterrupted;
    }

    public static void main(String[] args) {

        long startTime = System.nanoTime();

        ExecutorService producerPool = Executors.newFixedThreadPool(1);
        producerPool.submit(new ServiceClass(false,false)); // run method is
                                                         // called   
        // create a pool of consumer threads to parse the lines read
        ExecutorService consumerPool = Executors.newFixedThreadPool(CONSUMER_COUNT);
        for (int i = 0; i < CONSUMER_COUNT; i++) {
            consumerPool.submit(new ServiceClass(true,false)); // run method is
                                                            // called
        }
        producerPool.shutdown();
        consumerPool.shutdown();

        while (!producerPool.isTerminated() && !consumerPool.isTerminated()) {
        }

        long endTime = System.nanoTime();
        long elapsedTimeInMillis = TimeUnit.MILLISECONDS.convert((endTime - startTime), TimeUnit.NANOSECONDS);
        System.out.println("Total elapsed time: " + elapsedTimeInMillis + " ms");

    }



    @Override
    public void run() {
        if (isConsumer) {
            consume();
        } else {
            readFile(); //produce data by reading a file
        }
    }

    private void readFile() {
        //Path file = Paths.get("c:/temp/my-large-file.csv");
        try
        {

            for(int i =0;i<10000;i++) {
                if(isConsumerInterrupted) {
                    break;
                }
                processQueue.put(i);
                System.out.println("produced:" + i+"------"+processQueue.size());

            }
            //Java 8: Stream class


        } catch (Exception e){
            e.printStackTrace();
        }

        producerIsDone = true; // signal consumer
        System.out.println(Thread.currentThread().getName() + " producer is done");
    }

    private void consume() {
        try {
            while (!producerIsDone || (producerIsDone && !processQueue.isEmpty())) {

                System.out.println("consumed:" + processQueue.take()+"------"+processQueue.size());
                throw new RuntimeException();

                //System.out.println(Thread.currentThread().getName() + ":: consumer count:" + linesReadQueue.size());                
            }
            System.out.println(Thread.currentThread().getName() + " consumer is done");
        } catch (Exception e) {
            isConsumerInterrupted=true;
        }


    }




}

共有1个答案

束敏学
2023-03-14

您正在使用标志isconsumerinterrupted来终止生成器线程。这是不对的。消费者不使用队列中的元素,生产者一直生产,直到队列满了,然后开始阻塞,直到队列非满。然后,当使用者抛出RuntimeException时,它会设置标志,而生产者线程没有机会检查标志,因为没有一个使用者使用队列中的元素,这样生产者就可以从等待状态中出现。一种选择是使用将来,并在使用者抛出与设置标志相反的异常时取消它。由于processqueue.put响应中断,因此它将成功终止生成器线程。如果在等待期间中断,则抛出interruptedexception。看起来是这样的。

private static Future<?> producerFuture = null;
public static void main(String[] args) {

    // Remainder omitted.
    producerFuture = producerPool.submit(new ServiceClass(false, false)); 
    // ...
}

private void consume() {
    try {
        // ...
    } catch (Exception e) {
        producerFuture.cancel(true);
    }
}
 类似资料:
  • 下面是代码,我面临的问题是recordRead变量告诉线程应该从哪里开始读取记录的起点。但是我如何为每个线程设置不同的值?例如,对于thread1,它应该是0,recordsToRead应该是300,对于thread2,recordsToRead应该是300+300=600,对于最后一个线程,它应该是600以及更高的结束。pagesize=50pagesize、recordRead和recordT

  • 我有一个循环缓冲区(数组/先进先出),一个消费者和一个生产者。生产者将随机数放入数组中,消费者获取第一个数字并检查它是否是相对素数。 我的代码工作正常,我认为它工作正常,但我想改进它。我不太确定我的“空运行”方法。我应该在其他地方做异常处理吗?改变“无限循环”?不应更改方法签名(它们是预定义的)。 我会很高兴每一个改进代码的建议。(不在乎知名度(公开,...),还有静态的东西,我刚刚把它们放在一个

  • 问题内容: 我想创建某种线程应用程序。但是我不确定在两者之间实现队列的最佳方法是什么。 因此,我提出了两个想法(这两个想法可能都是完全错误的)。我想知道哪种更好,如果它们都烂了,那么实现队列的最佳方法是什么。我关心的主要是这些示例中队列的实现。我正在扩展一个内部类的Queue类,它是线程安全的。下面是两个示例,每个示例有4个类。 主班 消费阶层 生产者类别 队列类 要么 主班 消费阶层 生产者类别

  • 问题内容: 我想看看使用多线程生产者而不是单线程生产者会有多少时间差异。我在本地计算机上设置了ActiveMQ,编写了生产者类,该类将初始化并在其构造函数中启动JMS连接。我将消息限制设置为3M,将所有消息推送到ActiveMQ大约花费了50秒。我只发送了一个字符串“ hello world” 3M次。 然后,我使用了相同的生产者对象(一个连接但有多个会话),并使用线程大小为8的ExecutorS

  • 问题内容: 我有一个单一的线程生产者,它创建了一些任务对象,然后将它们添加到一个(大小固定的)对象中。 我还启动了一个多线程使用者。这是作为固定线程池()构建的。然后,我向该threadPool提交一些ConsumerWorker实例,每个ConsumerWorker都引用了上述ArrayBlockingQueue实例。 每个这样的Worker将在队列中执行并处理任务。 我的问题是,什么时候不再需