当前位置: 首页 > 知识库问答 >
问题:

中断线程中的阻塞队列

欧阳洲
2023-03-14

我正在编写一个有3个线程的程序。一个读取一个文本文件并将单词输入到大小为2的ArraylistBlockingQueue中。下一个获取该列表并反转其中的每个其他单词。最后一个线程获取单词并将它们写入一个新的文本文件。

我所有的东西都在工作,除了我不知道如何中断和停止我的线程。程序写入文本文件,但从未结束。

主要方法

 JFileChooser chooser = new JFileChooser();
    File selectedFile = null;
    File outFile = new File("output.txt");

    int returnValue = chooser.showOpenDialog(null);
    if (returnValue == JFileChooser.APPROVE_OPTION) {
        selectedFile = chooser.getSelectedFile();

    }




    BlockingQueue text = new ArrayBlockingQueue(2);
    BlockingQueue out = new ArrayBlockingQueue(2);

    Thread input = new Thread(new inputClass(selectedFile, text));
    Thread reverse = new Thread(new reverseClass(text, out));
    Thread output = new Thread(new outputClass(out, outFile));


    chooser.setSelectedFile(outFile);
    if (chooser.showSaveDialog(chooser) == JFileChooser.APPROVE_OPTION) {

        input.start();
        reverse.start();
        output.start();


    }

输入

@Override
public void run() {

    try {
        Scanner s = new Scanner(f);

        while (isRunning) {

            try {
                if(s.hasNext()){
                text.put(s.next());
                } else {
               text.put(END);
            }
            } catch (InterruptedException ex) {

                Logger.getLogger(inputClass.class.getName()).log(Level.SEVERE, null, ex);
            } 

        }

    } catch (FileNotFoundException ex) {
        Logger.getLogger(inputClass.class.getName()).log(Level.SEVERE, null, ex);
    }

反向类@重写公共void run(){

    String original;
    String temp = "";
    String character = "";
    int count = 1; // keeps track of whether or not a word should be reversed

    while (isRunning) {

        try {

            original = text.take();
            if(original.equalsIgnoreCase(END)){
                Thread.currentThread().interrupt();
            }
            int length = original.length() - 1;
            //if count is even then the word should be reversed.
            if ((count % 2) == 0) {
                // reverses the original string if a ? or . appears
                if (original.contains("?") || original.contains(".")) {
                    character = original.charAt(length) + "";
                    for (int i = (length - 1); i >= 0; i--) {
                        temp = temp + original.charAt(i);

                    }
                    out.put(temp + character);
                    temp = "";
                    character = "";
                    // reverses the orgininal string if no ? or . appears
                } else {

                    for (int i = length; i >= 0; i--) {
                        temp = temp + original.charAt(i);

                    }
                    out.put(temp);
                    temp = "";
                }
                count++;
            } else {
                out.put(original);
                count++;
            }


        } catch (InterruptedException ex) {

            Logger.getLogger(reverseClass.class.getName()).log(Level.SEVERE, null, ex);

        }
    }
    try {
        out.put(END);
    } catch (InterruptedException ex) {
        Logger.getLogger(reverseClass.class.getName()).log(Level.SEVERE, null, ex);
    }

}

输出代码@覆盖公共无效run(){

    while (isRunning) {

        String s = null;
        try {
            s = out.take();
        } catch (InterruptedException ex) {
            Logger.getLogger(outputClass.class.getName()).log(Level.SEVERE, null, ex);
        }

        try {

            BufferedWriter writer = new BufferedWriter(new FileWriter(file, true));

            try {
                if (s.equalsIgnoreCase(END)) {
                    Thread.currentThread().interrupt();
                }
                writer.write(s + " ");


            }  finally {
                writer.flush();
            }

        } catch (IOException ex) {
            Logger.getLogger(outputClass.class.getName()).log(Level.SEVERE, null, ex);

        }
    }

共有3个答案

蒋奕
2023-03-14

你的代码已经完成一半了。。。

对于在take()方法中阻塞的线程,您需要:

  1. 将“isRunning”(正在运行)标志设置为“false”(错误)
  2. 调用线程。线程对象上的中断()

您可以通过在InterruptedException的处理程序中将其设置为false来避免暴露isRrun标志。

对于从Scanner读取时阻塞的线程,Thread.interrupt()应该会导致InterruptedIOException。如果没有,那么在Scanner正在使用的流上调用关闭()应该就足够了。它需要一点重组;即使用ReaderInputStream而不是File对象构建Scanner

戚育
2023-03-14

由于您一次读取一个单词,因此您可以通过阻塞队列发送一个标志。发送字符串文字“此队列已终止”就足够了。由于您一次只读取一个单词,因此无法在您的word反向程序中测试此字符串。

一旦收到此信息,您就可以中断字反转器。然后将相同的文字发送给作者。写入程序收到标志后,也可以关闭写入程序。

A、 K.索蒂里奥斯的“毒丸”。

反向器类示例:

/**
 * Called when being executed. Reverses a word by taking from intake and
 * places the reversed word into store
 */
@Override
public void run() {
    boolean isInterrupted = false;
    while (!isInterrupted) {
        try {
            StringBuilder str = new StringBuilder(intake.take());

            //Exit condition
            if (str.toString().equalsIgnoreCase(END_FLAG)) {
                Thread.currentThread().interrupt();
            }

            //If it is a word to be reversed, then reverse it
            if (oddWord % 2 == 1) {
                str = reverseWord(str);
            }

            //Put word in queue and increment counter
            store.put(str);
            ++oddWord;
        } catch (InterruptedException ex) {
            isInterrupted = true;
        }
    }

    //Puts pill into queue when main body is done
    try {
        store.put(END_FLAG);
    } catch (InterruptedException ex) {
        System.err.printf("Error setting flag in store.%nWordReverser%n%s%n", ex);
    }
}

Writer类中的Myrun()方法

    /**
     * Executes when being called in a thread
     */
    @Override
    public void run() {
        boolean isInterrupted = false;

        try (BufferedWriter writer = new BufferedWriter(new FileWriter(output))) {

            //Continue writing until the thread is interrupted
            while (!isInterrupted) {
                CharSequence word = in.take();

                if (word.toString().equalsIgnoreCase(END_FLAG)) {
                    isInterrupted = true;
                } else {
                    writer.write(word + " ");
                }
            }
        } catch (InterruptedException | IOException ex) {
            System.err.printf("Error writing to output file!%n%s%n", ex);
        }
    }
}
裘嘉木
2023-03-14

来自Javadoc:

BlockingQueue本质上不支持任何类型的“关闭”或“关闭”操作来指示不会添加更多项目。此类功能的需求和使用往往取决于实现。例如,一种常见的策略是生产者插入特殊的流结束或有毒对象,这些对象在被消费者获取时会相应地解释。

我建议将值null排队以指示流已结束;读者需要检查读取的值是否为null,如果是,则终止。

 类似资料:
  • 我编写了一个简单的类,我计划将其扩展为客户端套接字编程应用程序的一部分。类涉及一个BlockingQueue(我从这里复制了代码:相当于Java的BlockingQueue的C++)。当我创建了下面的包装类的一个实例后,我打算让它生成一个单独的线程,该线程只需执行BlockingQueue上阻塞的printer()函数,直到有一个或多个字符串可用,然后它只需将字符串打印到控制台窗口。在我的预期应用

  • 1. 什么是阻塞队列? 阻塞队列(BlockingQueue)是一个支持两个附加操作的队列。这两个附加的操作是:在队列为空时,获取元素的线程会等待队列变为非空。当队列满时,存储元素的线程会等待队列可用。阻塞队列常用于生产者和消费者的场景,生产者是往队列里添加元素的线程,消费者是从队列里拿元素的线程。阻塞队列就是生产者存放元素的容器,而消费者也只从容器里拿元素。 阻塞队列提供了四种处理方法 方法\处

  • 编辑: 主要问题:为什么只有一个线程抛出interruptedexception,而两个线程都阻塞在条件上。await 所以下面的代码只是我创建的一个示例。主要的问题是开发一个生产者-消费者实现,在这个实现中,我必须创建一个模拟类,它产生两种线程:客户线程和厨师线程,这两种线程是基于可重入锁进行同步的。在执行一些操作(客户添加订单,厨师执行服务这些订单)后,我调用客户线程上的join以确保所有订单

  • 问题内容: 我有一个经典的问题,线程将事件推送到第二个线程的传入队列。仅这次,我对性能非常感兴趣。我要实现的是: 我想要并发访问队列,生产者推送,接收者弹出。 当队列为空时,我希望消费者阻止队列,等待生产者。 我的第一个想法是使用,但是我很快意识到它不是并发的,并且会降低性能。另一方面,我现在使用,但仍要为每个出版物支付/ 的费用。由于使用者在找到空队列时不会阻塞,因此我必须进行同步并处于锁定状态

  • 如果队列已满,ArrayBlockingQueue将阻止生产者线程;如果队列为空,ArrayBlockingQueue将阻止消费者线程。 这种阻塞的概念是否与多线程的思想背道而驰?如果我有一个“主”线程,并假设我想将所有“日志记录”活动委托给另一个线程。因此,基本上在我的主线程内,我创建了一个Runnable来记录输出,并将其放在ArrayBlockingQueue上。这样做的全部目的是让“主”线

  • 假设,我有一个服务A,它有一个线程池执行器来调用服务B。我们可以用自己的值设置该池的核心池大小和队列。现在,服务B对请求的响应速度很慢,因为服务A线程池中的活动线程增加,导致阻塞队列大小增加。如何防止服务A的队列大小增加? 背景