当前位置: 首页 > 知识库问答 >
问题:

如何在不关闭流的情况下从流量中收集

景鹏飞
2023-03-14

我的做法是创建一个反应endpoint,如下所示:

public Flux<ServerEvent> getEventFlux(Long forId){
    ServicePoller poller = new ServicePollerImpl();
    Map<String,Object> params =  new HashMap<>();
    params.put("id", forId);

    Flux<Long> interval = Flux.interval(Duration.ofMillis(pollDuration));
    Flux<ServerEvent> serverEventFlux =
            Flux.fromStream(
                    poller.getEventStream(url, params) //poll a given endpoint after a fixed duration.
            );
    Flux<ServerEvent> sourceFlux= Flux.zip(interval, serverEventFlux)
             .map(Tuple2::getT2); // Zip the two streams. 
/* Here I want to store data from sourceFlux into a collection whenever some data arrives without disturbing the downstream processing in Spring. So that I can access collection later on without polling again */

这会在数据可用时立即将其发回前端,然而,我的第二个用例是在数据到达时将其汇集到一个单独的集合中,这样,如果以后有类似的请求到达,我就可以从池中卸载整个数据,而不必再次访问服务。

在不关闭流量流的情况下,我有什么选择来访问流量并在它们到达时将值存储到集合中?

遇到异常:

java.lang.IllegalStateException:stream已经在java.util.stream.abstractPipeline.spliterator(abstractPipeline.java:343)~[NA:1.8.0_171]在java.util.stream.referencePipeline.iterator(referencePipeline.java:139)~[NA:1.8.0_171]在reactor.core.publisher.fluxstream.subscribe(jar:3.1.7.发布]在reactor.core.publisher.fluxzip$zipcoordator.subscribe(fluxzip.java:573)~[reactor-core-3.1.7.release.jar:3.1.7.发布]在reactor.core.publisher.fluxzip.handleboth(fluxzip.java:326)~[reactor-core-3.1.7.release.jar:3.1.7.发布]在reactor.core.publisher.fluxzip.jar:3.1.7.发布]

共有1个答案

包永新
2023-03-14

poller.getEventStream返回一个只能使用一次的Java8流。您可以先将流转换为集合,或者通过使用供应商推迟poller.getEventStream的执行:

Flux.fromStream(
  () -> poller.getEventStream(url, params)
);
 类似资料:
  • 假设我有一个未来对象的列表<代码>列表 但此方法必须等待所有期货返回,然后才能返回流 我想让它像管道一样工作,上层从流中提取数据并根据需要进行处理,无需等待所有的未来完成。

  • 我想关闭/终止/扔掉一个由一行代码创建的JPanel(因此它被完全终止,而不仅仅是隐藏并在后台作为

  • 如果我关闭一个扫描仪对象,并创建一个新的对象,并尝试读取更多的输入,我将得到异常。 我的代码工作正常,但是如果我不关闭扫描仪,它会给我一个警告。但是,如果我关闭它以摆脱警告,我也关闭...我如何避免这种情况? 另外,不关闭扫描仪是否有任何后果? 编辑:这是我的代码: 这是NameAddressExists()方法: 这是PanNumberExists()方法: 从以下main()方法调用这些方法:

  • 问题内容: 我想知道如果不手动关闭流,何时关闭。我的意思是,如果引用的范围不再存在,流将被关闭吗? 请考虑以下示例方案。 在这里,一旦完成流处理,我将退出,但是反过来将继续执行该程序的程序不会终止,而是继续进行其他操作。 我没有关闭溪流。一旦对A类的引用范围结束,它会自动关闭吗?(即何时结束)?GC会照顾吗?另外,我读到,一旦流程结束,流将关闭,并且系统释放为其他进程保留的所有资源。我们如何检查流

  • 我是函数式编程新手,我正在努力变得更好。 目前,我正在试验一些代码,采用以下基本形式: 首先,使用hashmap获取列表中每个数字的频率。下一步,我们总结出地图中作为键存在的键的数量。 我不喜欢的是,这两个流需要彼此分开存在,其中一个HashMap是由一个流生成的,只会被另一个流立即独占地使用。 有没有一种方法可以将其合并为一个流?我是这样想的: 但这里的问题是没有可参考的freq map,因为它

  • 问题内容: 这个问题是关于stdin,stdout和stderr及其处理的。 我们的项目中有一个课程是对的扩展。这里有一个安静的新方法来关闭适当的Process- Object的std流吗?还是不合适? 此类方法基本上用在finally块中。 我真正想知道的是此实施是否安全?考虑如下情况:流程对象在生命周期中是否总是返回相同的stdin,stdout和stderr流?或者可能我想念关闭之前由过程返