我试图用阻塞队列实现一些消费者-生产者问题。为了达到某种目的,我决定编写文件搜索工具。
我认为搜索机制是递归工作的,每个新目录都将有新的线程池来提高搜索速度。
我的问题是,我不知道如何实现最终停止打印线程(消费者)的机制——当搜索线程完成工作时。
我试图用一些想法来做到这一点,比如毒丸,但它效果不佳(线程在打印任何结果之前停止)。任何想法我该怎么做?
下面是一些代码:
搜索机制:
public class SearchingAlgorithm implements Runnable {
private final File file;
private BlockingQueue<File> queue;
private ExecutorService executor;
public SearchingAlgorithm(File fileName, BlockingQueue<File> queue) {
this.file = fileName;
this.queue = queue;
this.executor = Executors.newWorkStealingPool();
}
@Override
public void run() {
try {
searchDeep();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
private void searchDeep() throws InterruptedException {
File[] files = file.listFiles();
if (files != null) {
for (File fil : files) {
if (fil.isDirectory()) {
executor.submit(new SearchingAlgorithm(fil, this.queue));
} else {
this.queue.add(fil);
}
}
}
}
}
打印机:
public class ContainingCheckAlgorithm implements Runnable {
private BlockingQueue<File> queue;
// private ExecutorService executor;
private String keyWord;
public ContainingCheckAlgorithm(BlockingQueue<File> queue, String keyWord) {
this.queue = queue;
this.keyWord = keyWord;
// executor = Executors.newFixedThreadPool(2);
}
@Override
public void run() {
try {
printFile();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
private void printFile() throws InterruptedException {
while (true) {
File takeFile = queue.take();
String fileName = takeFile.getAbsolutePath()
.toLowerCase();
boolean isContainingKeyWord = fileName.contains(keyWord.toLowerCase());
if (isContainingKeyWord) {
System.out.println(takeFile.getAbsolutePath());
}
}
}
}
主要测试类:
public class MainClass {
public static void main(String[] args) throws InterruptedException {
ExecutorService executor = Executors.newFixedThreadPool(2);
BlockingQueue<File> queue = new LinkedBlockingQueue<>();
File fileName = new File("C:/");
SearchingAlgorithm sa = new SearchingAlgorithm(fileName, queue);
executor.submit(sa);
ContainingCheckAlgorithm ca = new ContainingCheckAlgorithm(queue, "Slipknot");
executor.submit(ca);
executor.shutdown();
}
}
如你所说,我试着这样做:
搜索算法与其他搜索算法共享线程池。
搜索:
public class SearchingAlgorithm implements Runnable {
private final File file;
private BlockingQueue<File> queue;
private ExecutorService executor;
public SearchingAlgorithm(File fileName, BlockingQueue<File> queue, ExecutorService executor) {
this.file = fileName;
this.queue = queue;
this.executor = executor;
}
@Override
public void run() {
try {
searchDeep();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
private void searchDeep() throws InterruptedException {
File[] files = file.listFiles();
if (files != null) {
for (File fil : files) {
if (fil.isDirectory()) {
executor.submit(new SearchingAlgorithm(fil, this.queue, executor));
} else {
this.queue.add(fil);
}
}
}
}
现在 ContainingCheckAlgorith 需要与主类共享 CountDownLatch,因为我需要一些机制来关闭主类中的线程池。正如你所说,它也使用pool(超时),我的线程终于完成了他们的工作。
检查中
public class ContainingCheckAlgorithm implements Runnable {
private BlockingQueue<File> queue;
private String keyWord;
private CountDownLatch latch;
public ContainingCheckAlgorithm(BlockingQueue<File> queue, String keyWord, CountDownLatch latch) {
this.queue = queue;
this.keyWord = keyWord;
this.latch = latch;
}
@Override
public void run() {
try {
printFile();
latch.countDown();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
private void printFile() throws InterruptedException {
File takeFile;
while ((takeFile = queue.poll(1, TimeUnit.SECONDS)) != null) {
String fileName = takeFile.getName()
.toLowerCase();
boolean isContainingKeyWord = fileName.contains(keyWord.toLowerCase());
if (isContainingKeyWord) {
System.out.println(takeFile.getAbsolutePath());
}
}
}
主要:
public class MainClass {
public static void main(String[] args) throws InterruptedException {
ExecutorService executor = Executors.newCachedThreadPool();
BlockingQueue<File> queue = new LinkedBlockingQueue<>();
CountDownLatch latch = new CountDownLatch(1);
File fileName = new File("C:/");
SearchingAlgorithm sa = new SearchingAlgorithm(fileName, queue, executor);
executor.submit(sa);
ContainingCheckAlgorithm ca = new ContainingCheckAlgorithm(queue, "Slipknot", latch);
executor.submit(ca);
latch.await();
executor.shutdown();
}
这看起来很奇怪,但我想知道如果:
>
超过1个线程将作为ContainingCheck算法运行?
搜索算法将搜索文件超过 1 秒,并且包含检查算法完成工作?显然,我可以将超时更改为 2 秒甚至更多,但我们总是试图优化我们的程序。
将整个工作分为两个阶段。在第一阶段,如果队列为空,SearchingAlgorithm的工作和ContainingCheckAlgorithm等待新的作业。在第二阶段,所有 SearchingAlgorithm 实例都已完成,如果发现队列为空,则 ContainingCheckAlgorithm 退出。为了发现队列是否为空,ContainingCheckAlgorithm使用queue.poll(timeout)而不是queue.take()。
而且您不需要为每个搜索算法创建新的线程池。
问题内容: 我正在研究有关Java中线程的生产者和消费者设计模式,最近我在Java 5中进行了探索,引入Java 5中引入了BlockingQueue数据结构。现在,它变得更加简单,因为BlockingQueue通过引入阻塞方法隐式地提供了此控件。 put()和take()。现在,您无需使用等待和通知即可在生产者和消费者之间进行通信。如果有界队列,则如果Queue已满,BlockingQueue
问题内容: 我想创建某种线程应用程序。但是我不确定在两者之间实现队列的最佳方法是什么。 因此,我提出了两个想法(这两个想法可能都是完全错误的)。我想知道哪种更好,如果它们都烂了,那么实现队列的最佳方法是什么。我关心的主要是这些示例中队列的实现。我正在扩展一个内部类的Queue类,它是线程安全的。下面是两个示例,每个示例有4个类。 主班 消费阶层 生产者类别 队列类 要么 主班 消费阶层 生产者类别
问题内容: 我正在为标准Java系统工作,对生产者来说,这有严格的时序要求(1/100秒的毫秒)。 我有一个生产者将内容放置在阻塞队列中,然后一个消费者使用了该内容并将其转储到文件中。当数据不可用时,使用者将阻塞。 显然,阻塞队列是合适的接口,但是如果我想 最小化生产者的成本, 我应该选择哪种实际实现?当我将内容放入队列时,我希望尽可能少地进行诸如锁定和分配之类的事情,而且我不介意消费者是否需要等
我们使用activemq作为Java独立应用程序的消息队列。我的问题是,基于activemq web控制台,队列有一定数量的消息排队和出列。但是,根据我在代码中添加的sysout语句,应用程序消耗的消息数似乎少于activemq web控制台上显示的消息数。例如,在activemq控制台上,没有。排队和出列的消息约为1800条。但是,在控制台上显示的出列消息数(我每接收一条消息就增加一个计数器)只
我正在使用(java)LinkedBlockingQueue创建资源池,其中 资源元素是等价的,属于一个池,它们的排序是无关紧要的。 消费者是竞争线程,一次抓取一个资源,使用“拉”操作,处理资源,然后通过“添加”操作将其返回给池。 当特定资源被消费者线程使用时,它不得对其他消费者线程可用。 问题是:LinkedBlockingQueue不会对等待的消费者进行FIFO,而且服务水平也不统一。 关于这
本教程演示了如何发送和接收来自Spring Kafka的消息。 首先创建一个能够发送消息给Kafka主题的Spring Kafka Producer。 接下来,我们创建一个Spring Kafka Consumer,它可以收听发送给Kafka主题的消息。使用适当的键/值序列化器和解串器来配置它们。 最后用一个简单的Spring Boot应用程序演示应用程序。 下载并安装Apache Kafka 要