我有一个通用的Streams API问题,我想“高效地”解决。假设我有一个(可能非常大,可能无限)流。我想以某种方式对其进行预处理,例如,过滤掉一些项目,并对一些项目进行变异。让我们假设这个预处理是复杂的,时间和计算密集型的,所以我不想做两次。
接下来,我想对项序列执行两组不同的操作,并使用不同的流类型构造处理每个不同序列的远端。对于无限流,这将是一个forEach,对于有限流,它可能是一个收集器或其他什么。
显然,我可能会将中间结果收集到一个列表中,然后从该列表中拖出两个单独的流,单独处理每个流。这将适用于有限的流,尽管a)它看起来“丑陋”,b)对于非常大的流来说可能不切实际,对于无限的流来说平坦不起作用。
我想我可以用peek作为一种“T恤”。然后,我可以对peek下游的结果执行一个处理链,并以某种方式强制peek中的消费者执行“其他”工作,但现在第二条路径不再是流。
我发现我可以创建一个BlockingQueue,使用peek将内容推送到该队列中,然后从队列中获取流。这似乎是一个很好的想法,实际上效果很好,尽管我不理解流是如何关闭的(它确实关闭了,但我看不出是如何关闭的)。下面的示例代码说明了这一点:
List<Student> ls = Arrays.asList(
new Student("Fred", 2.3F)
// more students (and Student definition) elided ...
);
BlockingQueue<Student> pipe = new LinkedBlockingQueue<>();
ls.stream()
.peek(s -> {
try {
pipe.put(s);
} catch (InterruptedException ioe) {
ioe.printStackTrace();
}
})
.forEach(System.out::println);
new Thread(
new Runnable() {
public void run() {
Map<String, Double> map =
pipe.stream()
.collect(Collectors.groupingBy(s->s.getName(),
Collectors.averagingDouble(s->s.getGpa())));
map.forEach(
(k,v)->
System.out.println(
"Students called " + k
+ " average " + v));
}
}).start();
那么,第一个问题是:有没有“更好”的方法来做到这一点?
第二个问题,阻塞队列上的流是如何关闭的?
干杯,托比
有趣的问题。我先回答第二个问题,因为这是一个更简单的问题。
第二个问题,阻塞队列上的流是如何关闭的?
我认为“closed”的意思是,流有一定数量的元素,然后就结束了,而不考虑将来可能添加到队列中的任何元素。原因是队列上的流仅表示创建流时队列的当前内容。它不表示任何未来的元素,也就是说,其他线程将来可能添加的元素。
如果您想要一个表示队列当前和未来内容的流,那么您可以使用另一个答案中描述的技术。基本上用Stream.generate()
调用queue.take()
。不过,我不认为这是你想做的,所以我不会在这里进一步讨论。
现在谈谈你的大问题。
您有一个对象源,需要对其进行一些处理,包括过滤。然后,您需要获取结果并通过两个不同的下游处理步骤发送它们。基本上你有一个生产者和两个消费者。
您必须处理的基本问题之一是如何处理不同处理步骤以不同速率发生的情况。假设我们已经解决了如何从队列中获取流而不使流过早终止的问题。如果生产者产生元素的速度比消费者处理这个队列中的元素的速度快,队列将积累元素,直到填满所有可用的内存。
您还必须决定如何以不同的速率处理不同的消费者处理元素。如果一个耗电元件明显比另一个耗电元件慢,则可能需要缓冲任意数量的元件(这可能会填满内存),或者必须将较快的耗电元件的速度减慢,以匹配较慢耗电元件的平均速度。
让我给你画一张草图,说明你将如何进行。不过,我不知道你们的实际要求,所以我不知道这是否会令人满意。需要注意的一点是,在这种应用程序中使用并行流可能会有问题,因为并行流不能很好地处理阻塞和负载平衡。
首先,我将从一个流开始处理来自生产者的元素,并将它们累积到一个ArrayBlockingQueue
:
BlockingQueue<T> queue = new ArrayBlockingQueue<>(capacity);
producer.map(...)
.filter(...)
.forEach(queue::put);
(请注意,put
抛出InterruptedException
,因此您不能将queue::put
放在这里。您必须在这里放置一个try-catch块,或者编写一个helper方法。但是,如果捕获到InterruptedException
怎么办并不明显。)
如果队列已满,这将阻塞管道。可以在自己的线程中顺序运行,或者在专用线程池中并行运行,以避免阻塞公共池。
其次,消费者:
while (true) {
// wait until the queue is full, or a timeout has expired,
// depending upon how frequently you want to continue
// processing elements emitted by the producer
List<T> list = new ArrayList<>();
queue.drainTo(list);
downstream1 = list.stream().filter(...).map(...).collect(...);
downstream2 = list.stream().filter(...).map(...).collect(...);
// deal with results downstream1 and downstream2
}
这里的关键是使用drainTo
方法批量完成从生产者到消费者的切换,该方法将队列的元素添加到目的地,并自动清空队列。这样,消费者就不必等待生产商完成其处理(如果是无限的,就不会发生这种情况)。此外,消费者在已知数量的数据上操作,不会在处理过程中阻塞。因此,如果有帮助的话,每个消费流都可以并行运行。
在这里,我让消费者步调一致。如果希望使用者以不同的速率运行,则必须构造额外的队列(或其他东西)来独立缓冲其工作负载。
如果消费者总体上比生产者慢,队列最终会被填满并被阻塞,使生产者慢到消费者可以接受的速度。如果消费者平均比生产者快,那么也许你不需要担心消费者的相对处理率。你可以让它们循环并拾取生产者设法放入队列的任何东西,甚至让它们阻塞,直到有东西可用。
我应该说,我所概述的是一种非常简单的多阶段流水线方法。如果您的应用程序是性能关键型的,您可能会发现自己在调整内存消耗、负载平衡、提高吞吐量和减少延迟方面做了大量工作。还有其他一些框架可能更适合您的应用程序。例如,你可以看看LMAX干扰器,尽管我自己对它没有任何经验。
我有一个由Java8流表示的数据集: 谢谢你的洞察力。
我想知道我是否能做这样的事情。假设我有一个数字流1-20。我想利用一个特性,比如drop 3(我想用Java术语来说是限制还是跳过?)并产生一个流,即数字流: 1-20、4-20、7-20等 然后可能平坦地将这些全部映射到一条溪流中。我尝试了使用Stream.iterate的各种组合,主要是从流生成流,但我一直收到一个IllegalStateExcema,说流已经操作或关闭。 例如,人们可能期望这
我写了一个kafka流代码,使用kafka 2.4 kafka客户端版本和kafka 2.2服务器版本。我的主题有50个分区 我的Kafka流代码有选择键()DSL操作,我有200万条记录使用相同的KEY。在流配置中,我已经完成了 因此,我能够使用完全相同的键使用不同的分区。如果我没有按预期使用轮循机制,我的所有消息都会转到同一分区。 直到现在一切都很好,但我意识到;当我使用RoundRobin分
我对流媒体有一个普遍的问题,但对于问题的范围,让我们限制自己使用Kafka Streams。让我们进一步缩小范围,将我们的问题局限于单词计数,或者可能是一般的计数。假设我有一个由某个键和一个值组成的流,键可以是一个字符串(假设我们可以有很多字符串,除了空字符串,由世界上的任何字符组成),值是一个整数,现在我们正在构建一个单词计数应用程序,如果词汇表中的单词总数是一万亿,我们不能将它们存储在本地缓存
在Spring Boot应用程序中,我试图配置Kafka流。用简单的Kafka主题,一切都很好,但我无法得到工作SpringKafka流。 这是我的配置: 我想创建一个基于主题的流。应用一个简单的转换并将此流中的消息发送到test主题。 我向发送以下消息,其中是我自己的复杂类型,但是我现在不知道如何将它转换为中的,以便能够在中使用它。 请建议如何使其工作。
我有一张班友名单 我想分组名单的foo根据他们的位置和总和,我如何实现它与Java流? 我所取得的成就: 我的代码: