我在使用ArrayBlockingQueue时遇到了生产者和消费者的情况。如果使用者线程面临异常,如何停止生产者线程。我需要生产者停止等待队列是空的。我已经诱导了一个强制运行时异常。但是程序不会退出。生产者一直在等待,因为队列是空的。有人能帮忙吗
public class ServiceClass implements Runnable{
private final static BlockingQueue<Integer> processQueue = new ArrayBlockingQueue<>(10);
private static final int CONSUMER_COUNT = 1;
private boolean isConsumerInterrupted = false;
private boolean isConsumer = false;
private static boolean producerIsDone = false;
public ServiceClass(boolean consumer,boolean isConsumerInterrupted) {
this.isConsumer = consumer;
this.isConsumerInterrupted = isConsumerInterrupted;
}
public static void main(String[] args) {
long startTime = System.nanoTime();
ExecutorService producerPool = Executors.newFixedThreadPool(1);
producerPool.submit(new ServiceClass(false,false)); // run method is
// called
// create a pool of consumer threads to parse the lines read
ExecutorService consumerPool = Executors.newFixedThreadPool(CONSUMER_COUNT);
for (int i = 0; i < CONSUMER_COUNT; i++) {
consumerPool.submit(new ServiceClass(true,false)); // run method is
// called
}
producerPool.shutdown();
consumerPool.shutdown();
while (!producerPool.isTerminated() && !consumerPool.isTerminated()) {
}
long endTime = System.nanoTime();
long elapsedTimeInMillis = TimeUnit.MILLISECONDS.convert((endTime - startTime), TimeUnit.NANOSECONDS);
System.out.println("Total elapsed time: " + elapsedTimeInMillis + " ms");
}
@Override
public void run() {
if (isConsumer) {
consume();
} else {
readFile(); //produce data by reading a file
}
}
private void readFile() {
//Path file = Paths.get("c:/temp/my-large-file.csv");
try
{
for(int i =0;i<10000;i++) {
if(isConsumerInterrupted) {
break;
}
processQueue.put(i);
System.out.println("produced:" + i+"------"+processQueue.size());
}
//Java 8: Stream class
} catch (Exception e){
e.printStackTrace();
}
producerIsDone = true; // signal consumer
System.out.println(Thread.currentThread().getName() + " producer is done");
}
private void consume() {
try {
while (!producerIsDone || (producerIsDone && !processQueue.isEmpty())) {
System.out.println("consumed:" + processQueue.take()+"------"+processQueue.size());
throw new RuntimeException();
//System.out.println(Thread.currentThread().getName() + ":: consumer count:" + linesReadQueue.size());
}
System.out.println(Thread.currentThread().getName() + " consumer is done");
} catch (Exception e) {
isConsumerInterrupted=true;
}
}
}
您正在使用标志isconsumerinterrupted
来终止生成器线程。这是不对的。消费者不使用队列中的元素,生产者一直生产,直到队列满了,然后开始阻塞,直到队列非满。然后,当使用者抛出RuntimeException时,它会设置标志,而生产者线程没有机会检查标志,因为没有一个使用者使用队列中的元素,这样生产者就可以从等待状态中出现。一种选择是使用将来,并在使用者抛出与设置标志相反的异常时取消它。由于processqueue.put
响应中断,因此它将成功终止生成器线程。如果在等待期间中断,则抛出interruptedexception
。看起来是这样的。
private static Future<?> producerFuture = null;
public static void main(String[] args) {
// Remainder omitted.
producerFuture = producerPool.submit(new ServiceClass(false, false));
// ...
}
private void consume() {
try {
// ...
} catch (Exception e) {
producerFuture.cancel(true);
}
}
下面是代码,我面临的问题是recordRead变量告诉线程应该从哪里开始读取记录的起点。但是我如何为每个线程设置不同的值?例如,对于thread1,它应该是0,recordsToRead应该是300,对于thread2,recordsToRead应该是300+300=600,对于最后一个线程,它应该是600以及更高的结束。pagesize=50pagesize、recordRead和recordT
我有一个循环缓冲区(数组/先进先出),一个消费者和一个生产者。生产者将随机数放入数组中,消费者获取第一个数字并检查它是否是相对素数。 我的代码工作正常,我认为它工作正常,但我想改进它。我不太确定我的“空运行”方法。我应该在其他地方做异常处理吗?改变“无限循环”?不应更改方法签名(它们是预定义的)。 我会很高兴每一个改进代码的建议。(不在乎知名度(公开,...),还有静态的东西,我刚刚把它们放在一个
问题内容: 我想创建某种线程应用程序。但是我不确定在两者之间实现队列的最佳方法是什么。 因此,我提出了两个想法(这两个想法可能都是完全错误的)。我想知道哪种更好,如果它们都烂了,那么实现队列的最佳方法是什么。我关心的主要是这些示例中队列的实现。我正在扩展一个内部类的Queue类,它是线程安全的。下面是两个示例,每个示例有4个类。 主班 消费阶层 生产者类别 队列类 要么 主班 消费阶层 生产者类别
问题内容: 我想看看使用多线程生产者而不是单线程生产者会有多少时间差异。我在本地计算机上设置了ActiveMQ,编写了生产者类,该类将初始化并在其构造函数中启动JMS连接。我将消息限制设置为3M,将所有消息推送到ActiveMQ大约花费了50秒。我只发送了一个字符串“ hello world” 3M次。 然后,我使用了相同的生产者对象(一个连接但有多个会话),并使用线程大小为8的ExecutorS
问题内容: 我有一个单一的线程生产者,它创建了一些任务对象,然后将它们添加到一个(大小固定的)对象中。 我还启动了一个多线程使用者。这是作为固定线程池()构建的。然后,我向该threadPool提交一些ConsumerWorker实例,每个ConsumerWorker都引用了上述ArrayBlockingQueue实例。 每个这样的Worker将在队列中执行并处理任务。 我的问题是,什么时候不再需