当前位置: 首页 > 知识库问答 >
问题:

Java。带阻塞队列的消费者生产者。搜索工具

诸葛雅达
2023-03-14

我试图用阻塞队列实现一些消费者-生产者问题。为了达到某种目的,我决定编写文件搜索工具。

我认为搜索机制是递归工作的,每个新目录都将有新的线程池来提高搜索速度。

我的问题是,我不知道如何实现最终停止打印线程(消费者)的机制——当搜索线程完成工作时。

我试图用一些想法来做到这一点,比如毒丸,但它效果不佳(线程在打印任何结果之前停止)。任何想法我该怎么做?

下面是一些代码:

搜索机制:

public class SearchingAlgorithm implements Runnable {

private final File file;
private BlockingQueue<File> queue;
private ExecutorService executor;

public SearchingAlgorithm(File fileName, BlockingQueue<File> queue) {
    this.file = fileName;
    this.queue = queue;
    this.executor = Executors.newWorkStealingPool();
}

@Override
public void run() {
    try {
        searchDeep();
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}

private void searchDeep() throws InterruptedException {
    File[] files = file.listFiles();
    if (files != null) {
        for (File fil : files) {
            if (fil.isDirectory()) {
                executor.submit(new SearchingAlgorithm(fil, this.queue));
            } else {
                this.queue.add(fil);
            }
        }
    }
}

}

打印机:

public class ContainingCheckAlgorithm implements Runnable {

private BlockingQueue<File> queue;
// private ExecutorService executor;
private String keyWord;

public ContainingCheckAlgorithm(BlockingQueue<File> queue, String keyWord) {
    this.queue = queue;
    this.keyWord = keyWord;
    // executor = Executors.newFixedThreadPool(2);
}

@Override
public void run() {
    try {
        printFile();
    } catch (InterruptedException e) {
        e.printStackTrace();
    }

}

private void printFile() throws InterruptedException {
    while (true) {
        File takeFile = queue.take();
        String fileName = takeFile.getAbsolutePath()
                .toLowerCase();
        boolean isContainingKeyWord = fileName.contains(keyWord.toLowerCase());

        if (isContainingKeyWord) {
            System.out.println(takeFile.getAbsolutePath());
        }
    }
}

}

主要测试类:

public class MainClass {

public static void main(String[] args) throws InterruptedException {
    ExecutorService executor = Executors.newFixedThreadPool(2);
    BlockingQueue<File> queue = new LinkedBlockingQueue<>();

    File fileName = new File("C:/");

    SearchingAlgorithm sa = new SearchingAlgorithm(fileName, queue);
    executor.submit(sa);

    ContainingCheckAlgorithm ca = new ContainingCheckAlgorithm(queue, "Slipknot");
    executor.submit(ca);

    executor.shutdown();
}

}

共有2个答案

冉子石
2023-03-14

如你所说,我试着这样做:

搜索算法与其他搜索算法共享线程池。

搜索:

public class SearchingAlgorithm implements Runnable {

private final File file;
private BlockingQueue<File> queue;
private ExecutorService executor;

public SearchingAlgorithm(File fileName, BlockingQueue<File> queue, ExecutorService executor) {
    this.file = fileName;
    this.queue = queue;
    this.executor = executor;
}

@Override
public void run() {
    try {
        searchDeep();
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}

private void searchDeep() throws InterruptedException {
    File[] files = file.listFiles();
    if (files != null) {
        for (File fil : files) {
            if (fil.isDirectory()) {
                executor.submit(new SearchingAlgorithm(fil, this.queue, executor));
            } else {
                this.queue.add(fil);
            }
        }
    }
}

现在 ContainingCheckAlgorith 需要与主类共享 CountDownLatch,因为我需要一些机制来关闭主类中的线程池。正如你所说,它也使用pool(超时),我的线程终于完成了他们的工作。

检查中

public class ContainingCheckAlgorithm implements Runnable {

private BlockingQueue<File> queue;
private String keyWord;
private CountDownLatch latch;

public ContainingCheckAlgorithm(BlockingQueue<File> queue, String keyWord, CountDownLatch latch) {
    this.queue = queue;
    this.keyWord = keyWord;
    this.latch = latch;
}

@Override
public void run() {
    try {
        printFile();
        latch.countDown();
    } catch (InterruptedException e) {
        e.printStackTrace();
    }

}

private void printFile() throws InterruptedException {
    File takeFile;
    while ((takeFile = queue.poll(1, TimeUnit.SECONDS)) != null) {
        String fileName = takeFile.getName()
                .toLowerCase();
        boolean isContainingKeyWord = fileName.contains(keyWord.toLowerCase());

        if (isContainingKeyWord) {
            System.out.println(takeFile.getAbsolutePath());
        }
    }
}

主要:

public class MainClass {

public static void main(String[] args) throws InterruptedException {
    ExecutorService executor = Executors.newCachedThreadPool();
    BlockingQueue<File> queue = new LinkedBlockingQueue<>();
    CountDownLatch latch = new CountDownLatch(1);

    File fileName = new File("C:/");

    SearchingAlgorithm sa = new SearchingAlgorithm(fileName, queue, executor);
    executor.submit(sa);

    ContainingCheckAlgorithm ca = new ContainingCheckAlgorithm(queue, "Slipknot", latch);
    executor.submit(ca);

    latch.await();
    executor.shutdown();
}

这看起来很奇怪,但我想知道如果:

>

  • 超过1个线程将作为ContainingCheck算法运行?

    搜索算法将搜索文件超过 1 秒,并且包含检查算法完成工作?显然,我可以将超时更改为 2 秒甚至更多,但我们总是试图优化我们的程序。

  • 万高轩
    2023-03-14

    将整个工作分为两个阶段。在第一阶段,如果队列为空,SearchingAlgorithm的工作和ContainingCheckAlgorithm等待新的作业。在第二阶段,所有 SearchingAlgorithm 实例都已完成,如果发现队列为空,则 ContainingCheckAlgorithm 退出。为了发现队列是否为空,ContainingCheckAlgorithm使用queue.poll(timeout)而不是queue.take()。

    而且您不需要为每个搜索算法创建新的线程池。

     类似资料:
    • 问题内容: 我正在研究有关Java中线程的生产者和消费者设计模式,最近我在Java 5中进行了探索,引入Java 5中引入了BlockingQueue数据结构。现在,它变得更加简单,因为BlockingQueue通过引入阻塞方法隐式地提供了此控件。 put()和take()。现在,您无需使用等待和通知即可在生产者和消费者之间进行通信。如果有界队列,则如果Queue已满,BlockingQueue

    • 问题内容: 我想创建某种线程应用程序。但是我不确定在两者之间实现队列的最佳方法是什么。 因此,我提出了两个想法(这两个想法可能都是完全错误的)。我想知道哪种更好,如果它们都烂了,那么实现队列的最佳方法是什么。我关心的主要是这些示例中队列的实现。我正在扩展一个内部类的Queue类,它是线程安全的。下面是两个示例,每个示例有4个类。 主班 消费阶层 生产者类别 队列类 要么 主班 消费阶层 生产者类别

    • 问题内容: 我正在为标准Java系统工作,对生产者来说,这有严格的时序要求(1/100秒的毫秒)。 我有一个生产者将内容放置在阻塞队列中,然后一个消费者使用了该内容并将其转储到文件中。当数据不可用时,使用者将阻塞。 显然,阻塞队列是合适的接口,但是如果我想 最小化生产者的成本, 我应该选择哪种实际实现?当我将内容放入队列时,我希望尽可能少地进行诸如锁定和分配之类的事情,而且我不介意消费者是否需要等

    • 我们使用activemq作为Java独立应用程序的消息队列。我的问题是,基于activemq web控制台,队列有一定数量的消息排队和出列。但是,根据我在代码中添加的sysout语句,应用程序消耗的消息数似乎少于activemq web控制台上显示的消息数。例如,在activemq控制台上,没有。排队和出列的消息约为1800条。但是,在控制台上显示的出列消息数(我每接收一条消息就增加一个计数器)只

    • 我正在使用(java)LinkedBlockingQueue创建资源池,其中 资源元素是等价的,属于一个池,它们的排序是无关紧要的。 消费者是竞争线程,一次抓取一个资源,使用“拉”操作,处理资源,然后通过“添加”操作将其返回给池。 当特定资源被消费者线程使用时,它不得对其他消费者线程可用。 问题是:LinkedBlockingQueue不会对等待的消费者进行FIFO,而且服务水平也不统一。 关于这

    • 本教程演示了如何发送和接收来自Spring Kafka的消息。 首先创建一个能够发送消息给Kafka主题的Spring Kafka Producer。 接下来,我们创建一个Spring Kafka Consumer,它可以收听发送给Kafka主题的消息。使用适当的键/值序列化器和解串器来配置它们。 最后用一个简单的Spring Boot应用程序演示应用程序。 下载并安装Apache Kafka 要