我试图用Java8中的流创建一个生产者多个消费者模型。我正在从数据库资源中读取和处理数据,我想以流式方式处理它们(不能将整个资源读取到内存中)。
class SimpleIterator<Data> implements Iterator<Data>{
private volatile Cursor cursor;
public SimpleIterator(Cursor cursor){
this.cursor = cursor;
}
@Override
public boolean hasNext() {
return cursor.hasNext();
}
@Override
public Data next() {
return cursor.next();
}
}
SimpleIterator<Data> iterator = new SimpleIterator<>(queryCursor);
Iterable<Data> iterable = () -> iterator;
Stream<Data> resultStream = StreamSupport.stream(iterable.spliterator(), false); // prallel set as false
resultStream.parallel().forEach(data->processData(data));
public processData(Data data){
//heavy operation
}
我正在处理一个问题,我需要在两个流上做一个完整的外部联接。问题似乎相似。我所做的是插入两个阻塞队列来缓冲我的输入。我认为您可以使用一个阻塞队列来执行类似的操作,将单个流拆分为多个流,而无需并行源流。
我提出的解决方案可以在下面找到。我还没有测试连接两个流的解决方案,所以我不确定这是否有效。AbstractSpliterator类有一个TrySplit的实现;trySplit上的注释信息丰富。类的最后一个方法从拆分器实现中构造一个可并行的流。
import java.util.Spliterators;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.function.Consumer;
import java.util.stream.Stream;
public class StreamSplitter<T> extends Spliterators.AbstractSpliterator<T> {
final T EOS = null; // Just a stub -- can't put a null in BlockingQueue
private final BlockingQueue<T> queue;
private final Thread thread;
// An implementation of Runnable that fills a queue from a stream
private class Filler implements Runnable {
private final Stream<T> stream;
private final BlockingQueue<T> queue;
private Filler(Stream<T> stream, BlockingQueue<T> queue) {
this.stream = stream;
this.queue = queue;
}
@Override
public void run() {
stream.forEach(x -> {
try {
// Blocks if the queue is full
queue.put(x);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
// Stream is drained put end of stream marker.
try {
queue.put(EOS);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
private StreamSplitter(long estSize, int characteristics, Stream<T> srcStream) {
super(estSize, characteristics);
queue = new ArrayBlockingQueue<T>(1024);
// Fill the queue from a separate thread (may want to externalize this).
thread = new Thread(new Filler(srcStream, queue));
thread.start();
}
@Override
public boolean tryAdvance(Consumer<? super T> action) {
try {
T value = queue.take(); // waits (blocks) for entries in queue
// If end of stream marker is found, return false signifying
// that the stream is finished.
if (value == EOS) {
return false;
}
// Accept the next value.
action.accept(value);
} catch (InterruptedException e) {
return false;
}
return true;
}
public static <T> Stream<T> splitStream(long estSize, int characteristics, Stream<T> srcStream) {
Spliterator<T> spliterator = new StreamSplitter<T>(estSize, characteristics, srcStream);
return StreamSupport.stream(spliterator, true);
}
}
我有三根线。线程1(T1)是生成器,它生成数据。线程2和线程3(T2和T3)分别等待T1的数据在单独的循环中处理。我正在考虑在线程之间共享BlockingQueue,并通过调用“Take”让T2和T3等待。
问题内容: 我对于如何使用特定的生产者-消费者模式感到困惑,在该模式中,生产者和消费者都可以同时并独立地进行操作。 首先,考虑以下示例,该示例紧随docs中的示例: 关于此脚本,有一个更详细的细节:通过常规的for循环将项目同步放入队列。 我的目标是创建一个使用(或)和的脚本。两者都应安排为同时运行。没有一个消费者协程明确地与生产者绑定或链接。 我如何修改上面的程序,以便生产者是可以与消费者/工人
我正在尝试了解RxJava并发的一些细节,但我不确定我的想法是否正确。我对SubscribeOn/观察的工作原理有很好的了解,但我正在尝试确定池调度程序的一些细节。为此,我正在考虑尽可能简单地实现一个1-N生产者-消费者链,其中消费者的数量与CPU一样多。 根据文档,Schedulers.computation()由与内核一样多的线程池支持。但是,根据Reactive合约,运算符只能获得顺序调用。
我有一个使用ActiveMQ的消息队列。web请求用persistency=true将消息放入队列。现在,我有两个消费者,它们都作为单独的会话连接到这个队列。使用者1总是确认消息,但使用者2从不这样做。 JMS队列实现负载平衡器语义。一条消息将被一个使用者接收。如果在发送消息时没有可用的使用者,它将被保留,直到有可以处理消息的使用者可用为止。如果使用者接收到一条消息,但在关闭之前没有确认它,那么该
我有一个生产者/消费者场景,我不希望一个生产者交付产品,多个消费者消费这些产品。然而,常见的情况是,交付的产品只被一个消费者消费,而其他消费者从未看到过这个特定的产品。我不想实现的是,一个产品被每个消费者消费一次,而没有任何形式的阻碍。 我的第一个想法是使用多个BlockingQueue,每个消费者使用一个,并使生产者将每个产品按顺序放入所有可用的BlockingQueues中。但是,如果其中一个
问题内容: 因此,我已经看到了许多在Go中实现一个消费者和许多生产者的方法-Go 并发中的经典fanIn函数。 我想要的是fanOut功能。它以一个通道作为参数,它从中读取一个值,并返回一个通道片,该通道将这个值的副本写入其中。 有没有正确/推荐的方法来实现这一目标? 问题答案: 您几乎描述了执行此操作的最佳方法,但这是执行此操作的一小段代码示例。 去游乐场:https : //play.gola