当前位置: 首页 > 知识库问答 >
问题:

如何将非并行流与并行流互连(一个生产者多个消费者)

黄成荫
2023-03-14

我试图用Java8中的流创建一个生产者多个消费者模型。我正在从数据库资源中读取和处理数据,我想以流式方式处理它们(不能将整个资源读取到内存中)。

class SimpleIterator<Data> implements Iterator<Data>{

    private volatile Cursor cursor;

    public SimpleIterator(Cursor cursor){
        this.cursor = cursor;
    }

   @Override
    public boolean hasNext() {
        return cursor.hasNext();
    }    

    @Override
    public Data next() {
     return cursor.next();

    }
}
SimpleIterator<Data> iterator = new SimpleIterator<>(queryCursor);
Iterable<Data> iterable = () -> iterator;
Stream<Data> resultStream = StreamSupport.stream(iterable.spliterator(), false); // prallel set as false
resultStream.parallel().forEach(data->processData(data)); 
public processData(Data data){
//heavy operation
}

共有1个答案

裴曜灿
2023-03-14

我正在处理一个问题,我需要在两个流上做一个完整的外部联接。问题似乎相似。我所做的是插入两个阻塞队列来缓冲我的输入。我认为您可以使用一个阻塞队列来执行类似的操作,将单个流拆分为多个流,而无需并行源流。

我提出的解决方案可以在下面找到。我还没有测试连接两个流的解决方案,所以我不确定这是否有效。AbstractSpliterator类有一个TrySplit的实现;trySplit上的注释信息丰富。类的最后一个方法从拆分器实现中构造一个可并行的流。

import java.util.Spliterators;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.function.Consumer;
import java.util.stream.Stream;

public class StreamSplitter<T> extends Spliterators.AbstractSpliterator<T> {
    final T EOS = null; // Just a stub -- can't put a null in BlockingQueue

    private final BlockingQueue<T> queue;
    private final Thread thread;

    // An implementation of Runnable that fills a queue from a stream
    private class Filler implements Runnable {
        private final Stream<T> stream;
        private final BlockingQueue<T> queue;

        private Filler(Stream<T> stream, BlockingQueue<T> queue) {
            this.stream = stream;
            this.queue = queue;
        }

        @Override
        public void run() {
            stream.forEach(x -> {
                try {
                    // Blocks if the queue is full
                    queue.put(x);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
            // Stream is drained put end of stream marker.
            try {
                queue.put(EOS);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    private StreamSplitter(long estSize, int characteristics, Stream<T> srcStream) {
        super(estSize, characteristics);
        queue = new ArrayBlockingQueue<T>(1024);
        // Fill the queue from a separate thread (may want to externalize this).
        thread = new Thread(new Filler(srcStream, queue));
        thread.start();
    }

    @Override
    public boolean tryAdvance(Consumer<? super T> action) {
        try {
            T value = queue.take(); // waits (blocks) for entries in queue

            // If end of stream marker is found, return false signifying
            // that the stream is finished.
            if (value == EOS) {
                return false;
            }
            // Accept the next value.
            action.accept(value);
        } catch (InterruptedException e) {
            return false;
        }
        return true;
    }

    public static <T> Stream<T> splitStream(long estSize, int characteristics, Stream<T> srcStream) {
        Spliterator<T> spliterator = new StreamSplitter<T>(estSize, characteristics, srcStream);
        return StreamSupport.stream(spliterator, true);
    }
}
 类似资料:
  • 我有三根线。线程1(T1)是生成器,它生成数据。线程2和线程3(T2和T3)分别等待T1的数据在单独的循环中处理。我正在考虑在线程之间共享BlockingQueue,并通过调用“Take”让T2和T3等待。

  • 问题内容: 我对于如何使用特定的生产者-消费者模式感到困惑,在该模式中,生产者和消费者都可以同时并独立地进行操作。 首先,考虑以下示例,该示例紧随docs中的示例: 关于此脚本,有一个更详细的细节:通过常规的for循环将项目同步放入队列。 我的目标是创建一个使用(或)和的脚本。两者都应安排为同时运行。没有一个消费者协程明确地与生产者绑定或链接。 我如何修改上面的程序,以便生产者是可以与消费者/工人

  • 我正在尝试了解RxJava并发的一些细节,但我不确定我的想法是否正确。我对SubscribeOn/观察的工作原理有很好的了解,但我正在尝试确定池调度程序的一些细节。为此,我正在考虑尽可能简单地实现一个1-N生产者-消费者链,其中消费者的数量与CPU一样多。 根据文档,Schedulers.computation()由与内核一样多的线程池支持。但是,根据Reactive合约,运算符只能获得顺序调用。

  • 我有一个使用ActiveMQ的消息队列。web请求用persistency=true将消息放入队列。现在,我有两个消费者,它们都作为单独的会话连接到这个队列。使用者1总是确认消息,但使用者2从不这样做。 JMS队列实现负载平衡器语义。一条消息将被一个使用者接收。如果在发送消息时没有可用的使用者,它将被保留,直到有可以处理消息的使用者可用为止。如果使用者接收到一条消息,但在关闭之前没有确认它,那么该

  • 我有一个生产者/消费者场景,我不希望一个生产者交付产品,多个消费者消费这些产品。然而,常见的情况是,交付的产品只被一个消费者消费,而其他消费者从未看到过这个特定的产品。我不想实现的是,一个产品被每个消费者消费一次,而没有任何形式的阻碍。 我的第一个想法是使用多个BlockingQueue,每个消费者使用一个,并使生产者将每个产品按顺序放入所有可用的BlockingQueues中。但是,如果其中一个

  • 问题内容: 因此,我已经看到了许多在Go中实现一个消费者和许多生产者的方法-Go 并发中的经典fanIn函数。 我想要的是fanOut功能。它以一个通道作为参数,它从中读取一个值,并返回一个通道片,该通道将这个值的副本写入其中。 有没有正确/推荐的方法来实现这一目标? 问题答案: 您几乎描述了执行此操作的最佳方法,但这是执行此操作的一小段代码示例。 去游乐场:https : //play.gola