当前位置: 首页 > 知识库问答 >
问题:

使用者/生产者异常处理

甘骞尧
2023-03-14

我有一个循环缓冲区(数组/先进先出),一个消费者和一个生产者。生产者将随机数放入数组中,消费者获取第一个数字并检查它是否是相对素数。

我的代码工作正常,我认为它工作正常,但我想改进它。我不太确定我的“空运行”方法。我应该在其他地方做异常处理吗?改变“无限循环”?不应更改方法签名(它们是预定义的)。

我会很高兴每一个改进代码的建议。(不在乎知名度(公开,...),还有静态的东西,我刚刚把它们放在一个文件里。

import java.math.BigInteger;
import java.util.Random;

public class ConsProd {

    static class CircularBuffer {

        private BigInteger[] buffer;
        //"pointers"
        private int read;
        private int write;

        public CircularBuffer(int size) {
            this.buffer = new BigInteger[size];
            this.read = 0;
            this.write = 0;
        }

        public boolean isFull() {
            for(int i = 0; i < buffer.length; i++) {
                if(buffer[i] == null)
                    return false;
            }
            return true;
        }

        public boolean isEmpty() {
            for(int i = 0; i < buffer.length; i++) {
                if(buffer[i] != null)
                    return false;
            }
            return true;    
        }

        public synchronized void put(BigInteger element) throws InterruptedException {
            while(isFull()){
                wait();
            }
            buffer[write] = element;
            write = (write+1)%buffer.length;
            notifyAll();
        }

        public synchronized BigInteger take() throws InterruptedException {
            while(isEmpty()){
                wait();
            }
            BigInteger temp = buffer[read];
            buffer[read] = null;
            read = (read+1)%buffer.length;
            notifyAll();
            return temp;
        }   
    }

    static class Consumer implements Runnable {

        private int id;
        private CircularBuffer buffer;

        public Consumer(int id, CircularBuffer b) {
            this.id = id;
            this.buffer = b;
        }

        private void consume(BigInteger e) {
            synchronized(e){
                System.out.println("consumer " + id + " retrieved: " + e);
                if (e.isProbablePrime(100)) {
                    System.out.println("     -----> probably prime!");
                }
            }
        }

        @Override
        public void run() {
            try { // TODO is this the right place to handle the exception? 
                while (!Thread.currentThread().isInterrupted()) {
                    BigInteger e = buffer.take();
                    consume(e);
                }
            } catch (InterruptedException e) { }
        }

    }

    static class Producer implements Runnable {

        private int id;
        private CircularBuffer buffer;

        public Producer(int id, CircularBuffer b) {
            this.id = id;
            this.buffer = b;
        }

        protected BigInteger produce() {
            BigInteger x = new BigInteger(10, new Random());
            System.out.println("producer " + id + " produced:  " + x.toString());
            return x;
        }

        @Override
        public void run() {

            try { // TODO is this the right place to handle the exception? 
                while (!Thread.currentThread().isInterrupted()) {
                    BigInteger e = produce();
                    buffer.put(e);
                }
            } catch (InterruptedException e) { }
        }
    }


    public static void main(String[] args) {

        CircularBuffer cbuf = new CircularBuffer(4);

        Thread t1 = new Thread(new Consumer(1, cbuf));
        Thread t2 = new Thread(new Consumer(2, cbuf));
        Thread t3 = new Thread(new Consumer(3, cbuf));

        Thread t4 = new Thread(new Producer(1, cbuf));
        Thread t5 = new Thread(new Producer(2, cbuf));

        t1.start();
        t2.start();
        t3.start();

        t4.start();
        t5.start();
    }
}

共有1个答案

孙斌
2023-03-14

这是旧的,但我想我应该插话。将来,这个问题似乎对codereview.stackexchange.com.更好

我会很高兴每个改进代码的建议。

我的第一反应是你应该使用ExecutorService,而不是消费者线程和循环。制作人可以保持你写的一样,尽管我怀疑它是多线程的,会对你有所帮助。你的 e.isProbablePrime() 方法比新的 BigInteger(10, new Random()); 慢很多个数量级。

由于生产者和消费者之间的速度差异,您必须使用有限的BlockingQueue创建ExecutorService。如下所示:

threadPool = new ThreadPoolExecutor(2, 2, 0L, TimeUnit.MILLISECONDS,
                       new LinkedBlockingQueue<Runnable>(100));

然后,当你的生产者创建 BigInteger时,它会做一个:

threadPool.put(new Consumer(randomBigInteger());

然后,您的消费者将完全没有循环。这将处理线程、共享缓冲区和所有同步代码。你不需要管理它。您需要自己处理恢复任何结果,尽管您也可以提交<code>Callable

如果你不能使用ExecutorService的东西,那么你的代码看起来相当不错,尽管我会使用LinkedBlockingQueue而不是CircularBuffer

 类似资料:
  • 我使用带有幂等生产者配置的spring kafka: 这是我的配置道具: 我的Kafka制作人抛出OutOfOrderSequence异常: 2019-03-06 21:25:47发送者[ERROR][生产者clientId=生产者-1]代理返回org.apache.kafka.common.errors.OutOfOrderSequence异常:代理在偏移-1处收到主题分区主题-1的乱序序列号。

  • 问题内容: 这是处理生成器中引发的异常的后续操作,并讨论了一个更一般的问题。 我有一个功能,可以读取不同格式的数据。所有格式都是面向行或记录的,每种格式都有一个专用的解析功能,可以作为生成器来实现。因此,主读取函数获得一个输入和一个生成器,该生成器从输入中读取其各自的格式并将记录传递回主函数: 哪里是这样的: 我面临的问题是,尽管可能引发异常(例如,从流中读取时),但它不知道如何处理它。负责处理异

  • 我有一个消费者作为生产者消费者模式的一部分: 简化: 如果我移除 通过将线程设置为睡眠,CPU使用率攀升到极高的水平(13%),而不是0%。 此外,如果我实例化该类的多个实例,则每个实例的CPU使用率都会以13%的增量攀升。 大约每分钟(可能每30秒)都会向BlockingCollection添加一个新的LogItem,并将适用的消息写入文件。 有没有可能线程以某种方式阻止了其他线程的运行,而系统

  • 我有一个生产者/消费者模式,如下所示 固定数量的生成器线程,每个线程写入它们自己的BlockingQueue,通过执行器调用 单个使用者线程,读取生产者线程 每个生产者都在运行一个数据库查询,并将结果写入其队列。消费者轮询所有生产者队列。目前,如果出现数据库错误,生产者线程就会死掉,然后消费者就会永远停留在产品队列中等待更多的结果。 我应该如何构造它来正确处理catch错误?

  • 我们的生产环境中出现了随机的和: 我们偶尔会在我的生产者日志中得到这个异常: 主题:XXXXXX:5608 ms的过期记录自批量创建加上逗留时间以来已经过去。 此类错误消息中的毫秒数不断变化。有时是5秒,有时是13秒! 我们很少能得到: 集群由3个经纪人和3个动物园管理员组成。生产者服务器和Kafka集群在同一个网络中。 我在打同步电话。有一个web服务,多个用户请求调用它来发送数据。Kafka

  • 我尝试使用 kafka 实现一个简单的生产者消费者示例,并使用以下属性实现了: 然而,当我在另一个项目(数据可视化软件的插件)中尝试完全相同的配置时,我得到了以下错误: 在我说它工作的第一个版本中,我使用了“mvn clean compile assembly:single”,但是在第二个版本中,我为整个项目创建了一个jar文件。因为可视化软件需要一个jar文件来安装插件。因为每件事都是一样的(至