当前位置: 首页 > 知识库问答 >
问题:

在通量中,如果内部通量最终为空,如何完成外部通量

况庆
2023-03-14

在这些条件下,请考虑以下代码:

生成通量

getOneResponePage(int)最终将返回一个空的通量

package ch.cimnine.test;

import org.junit.Test;
import reactor.core.publisher.Flux;

public class PaginationTest {
    @Test
    public void main() {
        final Flux<Integer> finalFlux = getAllResponses();

        finalFlux.subscribe(resultItem -> {
            try {
                Thread.sleep(200); // Simulate heavy processing
            } catch (InterruptedException ignore) {
            }

            System.out.println(resultItem);
        });
    }

    private Flux<Integer> getAllResponses() {
        Flux<Flux<Integer>> myFlux = Flux.generate(
            () -> 0, // inital page
            (page, sink) -> {
                var innerFlux = getOneResponePage(page); // eventually returns a Flux.empty()

                // my way to check whether the `innerFlux` is now empty
                innerFlux.hasElements().subscribe(
                    hasElements -> {
                        if (hasElements) {
                            System.out.println("hasElements=true");
                            sink.next(innerFlux);
                            return;
                        }

                        System.out.println("hasElements=false");
                        sink.complete();
                    }
                );

                return page + 1;
            }
        );

        return Flux.concat(myFlux);
    }

    private Flux<Integer> getOneResponePage(int page) {
        System.out.println("Request for page " + page);
        
        // there's only content on the first 3 pages
        if (page < 3) {
            return Flux
                .just(1, 2, 3, 5, 7, 11, 13, 17, 23, 27, 31)
                .map(i -> (1000 * page) + i);
        }

        return Flux.empty();
    }
}

目标是有一个名为getAllResponses()的方法,该方法返回连续的流量

  1. 由于我是反应式编程的新手,我的想法对吗
  2. IntelliJ警告我,不建议在非阻塞上下文中调用“subscribe”。如何正确操作

在我的实际代码中,getOne响应页面(int)使用org.springframework.web.reactive.function.client.WebClient发送请求。它连接到返回结果的服务。该服务每次调用最多只返回1000个结果。必须发送偏移参数才能获得更多结果。

API有点奇怪,因为确定您是否拥有所有结果的唯一方法是使用不断增加的偏移量重复查询它,直到您得到一个空结果集。它会很乐意为仍在增加的偏移量返回更多空结果集(...直到达到偏移量的内部最大值并返回400错误请求。)

getOneResponePage(int)的实际实现几乎与此相同:

private Flux<ResponseItem> getOneResponePage(int page) {
    return webClientInstance
        .get()
        .uri(uriBuilder -> {
            uriBuilder.queryParam("offset", page * LIMIT);
            uriBuilder.queryParam("limit", LIMIT);
            // …
        })
        .retrieve()
        .bodyToFlux(ResponseItem.class);
}

共有1个答案

卢志强
2023-03-14

没有从内部流停止外部流的直接方法。最接近的方法是在内部序列中使用SwitchIf空Flux.error(NoSuchElementException),然后在外部序列中使用onErrorResumeNext并返回空Flux,如果它找到NoSuchElementException

Flux.just(listOf(1, 2, 3), listOf(), listOf(4, 5, 6))
.flatMap(list ->
     Flux.fromIterable(list)
     .switchIfEmpty(Flux.error(new NoSuchElementException()))
)
.onErrorResumeNext(e -> 
      e instanceof NoSuchElementException ?
      Flux.empty() : Flux.error(e)
);
 类似资料:
  • 我有这个界面: 我想测试它的实现,但我得到: 在测试中,我使用: 我错过了什么

  • 问题内容: 我正在使用函数,这样我的程序就不会一团糟,但我不知道如何将局部变量变成全局变量。 问题答案: 这是两种实现相同目的的方法: 使用参数并返回(推荐) 运行时,将获得以下输出 使用全局变量(永远不要这样做) 现在您将获得:

  • 问题内容: 如果省略,则会看到错误“ 无法在用其他方法定义的内部类中引用非最终变量jtfContent ”。 为什么匿名内部类必须要求外部类实例变量为final才能访问它? 问题答案: 首先,让我们放松一下,请放下那把枪。 好。现在,语言坚持的原因是它作弊是为了让你的内部类函数可以访问他们渴望的局部变量。运行时复制本地执行上下文(以及其他适当的内容),因此它坚持要求你进行所有操作,final以使事

  • 我现在正在使用Flux。我想创建一个通量 描述问题:我有一个工作案例,我有一个房屋清单和我所在的清单。我需要返回所有房屋的结果,我只会更改结果对象上的真/假标志。当然,第二个列表的元素可能更少。 有人能提出这样或其他的建议吗? }

  • 我正在进行spring webflux文件上传。我想从控制器上传文件到amazon S3 bucket上。在控制器中,我收到了以下物体 从文件部分。content()我可以 我的问题是如何转换这个通量