当前位置: 首页 > 知识库问答 >
问题:

如何使用返回单声道的generate wrapping调用创建流量

盛跃
2023-03-14

我有一个例子,我想使用通量。生成,因为我不想进行昂贵的阻塞呼叫,除非/直到用户提出要求。具体来说,我多次调用Elasticsearch(有效地进行分页),直到没有更多点击。我在迭代器中使用标准的阻塞调用实现了这一点

下面是前面使用阻塞的代码:


  public Iterator<SearchResponse> createDeepQueryIterator(@NonNull PITSearchInput input){
    return new PointInTimeIterator(elasticClient, input);
  }

  public Flux<SearchResponse> createDeepQueryFlux(@NonNull PITSearchInput input){
    return Flux.<SearchResponse, PointInTimeIterator>generate(
            () -> new PointInTimeIterator(elasticClient, input),
            (deepQueryIterator, sink) -> {
              if (deepQueryIterator.hasNext()) {
                sink.next(deepQueryIterator.next());
              }else{
                sink.complete();
              }
              return deepQueryIterator;
            },
            (deepQueryIterator) -> deepQueryIterator.shutdown())
        .subscribeOn(Schedulers.boundedElastic());
  }

上面的工作原理很好,因为它等待对ES进行下一次调用,直到(该)订阅者准备好接收下一个数据块。

在下面,我尝试使用Spring的ReactiveElasticsearch chClient,但问题是在订阅者处理第一个之前对ES进行了多次调用。


  public Flux<SearchResponse> createDeepQuery(PointInTimeIteratorFactory.PITSearchInput input) {
    log.info("Creating flux");

    AtomicReference<PitId> pitId = new AtomicReference<>();
    AtomicInteger count = new AtomicInteger();

    Mono<PitId> pitIdMono =
        Mono.fromCallable(
            () -> {
              pitId.set(createPIT(input));
              return pitId.get();
            })
        .subscribeOn(Schedulers.boundedElastic());
    Mono<SearchResponse> searchResponseMono =
        pitIdMono.flatMap(
            p -> {
              log.info("Calling search");
              return reactiveElasticsearchClient.searchForResponse(createSearchRequestFrom(p, input));
            });
    Flux<SearchResponse> expand =
        searchResponseMono
            .expand(
                (searchResponse -> {
                  int hitCount = searchResponse.getHits().getHits().length;
                  count.addAndGet(hitCount);
                  log.info("Previous returned {} hits totaling {}", hitCount, count.get());

                  if (count.get() > input.getMaxTotalSize()
                  || hitCount < input.getMaxSizePerQuery()){
                    log.info("Returning empty");
                    return Mono.empty();
                  }

                  log.info("Calling search");
                  pitId.set(new PitId(searchResponse.pointInTimeId()));
                  return reactiveElasticsearchClient.searchForResponse(
                      createSearchRequestFrom(searchResponse, input));
                }))
            .doFinally(
                p -> {
                  deletePIT(pitId.get());
                });
    return expand;
  }

因此,问题是不要使用反应式客户端返回单声道的能力

下面是通量的记录-

2021-12-02 13:13:37.300  INFO 13704 --- [           main] a.a.t.ReactivePointInTimeIteratorFactory : Creating flux
2021-12-02 13:13:37.346  INFO 13704 --- [oundedElastic-1] a.a.t.ReactivePointInTimeIteratorFactory : Creating PIT
2021-12-02 13:13:37.407  INFO 13704 --- [oundedElastic-1] a.a.t.ReactivePointInTimeIteratorFactory : Calling search
2021-12-02 13:13:38.176  INFO 13704 --- [or-http-epoll-2] a.a.t.ReactivePointInTimeIteratorFactory : Previous returned 50 hits totaling 50
2021-12-02 13:13:38.177  INFO 13704 --- [or-http-epoll-2] a.a.t.ReactivePointInTimeIteratorFactory : Calling search
2021-12-02 13:13:38.177  INFO 13704 --- [or-http-epoll-2] a.a.t.ReactivePointInTimeIteratorFactory : Setting searchAfter to 1634877306267
2021-12-02 13:13:38.228  INFO 13704 --- [or-http-epoll-2] a.a.t.ReactivePointInTimeIteratorFactory : Previous returned 50 hits totaling 100
2021-12-02 13:13:38.228  INFO 13704 --- [or-http-epoll-2] a.a.t.ReactivePointInTimeIteratorFactory : Calling search
2021-12-02 13:13:38.228  INFO 13704 --- [or-http-epoll-2] a.a.t.ReactivePointInTimeIteratorFactory : Setting searchAfter to 1634877606162
2021-12-02 13:13:38.271  INFO 13704 --- [or-http-epoll-2] a.a.t.ReactivePointInTimeIteratorFactory : Previous returned 50 hits totaling 150
2021-12-02 13:13:38.271  INFO 13704 --- [or-http-epoll-2] a.a.t.ReactivePointInTimeIteratorFactory : Calling search
2021-12-02 13:13:38.272  INFO 13704 --- [or-http-epoll-2] a.a.t.ReactivePointInTimeIteratorFactory : Setting searchAfter to 1634877606362
2021-12-02 13:13:38.311  INFO 13704 --- [or-http-epoll-2] a.a.t.ReactivePointInTimeIteratorFactory : Previous returned 50 hits totaling 200
2021-12-02 13:13:38.312  INFO 13704 --- [or-http-epoll-2] a.a.t.ReactivePointInTimeIteratorFactory : Calling search
2021-12-02 13:13:38.312  INFO 13704 --- [or-http-epoll-2] a.a.t.ReactivePointInTimeIteratorFactory : Setting searchAfter to 1634877906244
2021-12-02 13:13:38.344  INFO 13704 --- [or-http-epoll-2] a.a.t.ReactivePointInTimeIteratorFactory : Previous returned 50 hits totaling 250
2021-12-02 13:13:38.345  INFO 13704 --- [or-http-epoll-2] a.a.t.ReactivePointInTimeIteratorFactory : Returning empty
2021-12-02 13:13:38.345  INFO 13704 --- [or-http-epoll-2] a.a.t.ReactivePointInTimeIteratorFactory : Closing PIT ReactivePointInTimeIteratorFactory.PitId(id=m_2xAwENYWN0aXZpdHlzdG9yZRZQQkRGWldmclI2cWZITEpoWDI1cGlRABZCZU8xbm55ZlFabXREYmNEdThESG1RAAAAAAAAWQcTFm5BcXdPU2xTUWE2bEU4dkVPVkpkWFEBFlBCREZaV2ZyUjZxZkhMSmhYMjVwaVEAAA==)
2021-12-02 13:13:40.171  INFO 13704 --- [     parallel-1] p.actss.activity.store.PitTest           : [1634877306066]
2021-12-02 13:13:42.172  INFO 13704 --- [     parallel-2] p.actss.activity.store.PitTest           : [1634877306272]
2021-12-02 13:13:44.172  INFO 13704 --- [     parallel-3] p.actss.activity.store.PitTest           : [1634877606166]
2021-12-02 13:13:46.173  INFO 13704 --- [     parallel-4] p.actss.activity.store.PitTest           : [1634877906057]
2021-12-02 13:13:48.174  INFO 13704 --- [     parallel-1] p.actss.activity.store.PitTest           : [1634877906248]
2021-12-02 13:13:48.174  INFO 13704 --- [     parallel-1] p.actss.activity.store.PitTest           : Complete
2021-12-02 13:13:48.174  INFO 13704 --- [           main] p.actss.activity.store.PitTest           : blah
2021-12-02 13:13:48.175  INFO 13704 --- [     parallel-1] p.actss.activity.store.PitTest           : onComplete

更新:添加PitTest代码以确保完整性:


  @Test
  void testReactoiveFluxIt() throws InterruptedException {
    Flux<SearchResponse> deepQuery = reactivePointInTimeIteratorFactory.createDeepQuery(...);

    deepQuery
        .delayElements(Duration.ofMillis(2000))
        .doOnNext(p -> log.info(Arrays.toString(p.getHits().getHits()[0].getSortValues()))) //
        .doOnComplete(() -> log.info("Complete")) //
        .doFinally(p -> log.info(p.toString()))
        .blockLast();
    log.info("blah");
    Thread.sleep(5000);
  }

共有1个答案

林夕
2023-03-14

delayElements切换到并行调度程序并将每个发出的元素延迟2秒。这就是为什么排序值会在之后打印的原因。

 类似资料:
  • 想象一下,我有一个CourseID对象列表(CourseID,Name)。让我们把这个列表称为'CourseNameList'。 当某人将请求发送到“ 然而,在发送结果之前,我还需要追加每个课程的价格。价格将从另一个微服务中检索,并返回Mono对象。 因此,用户将看到带有(ID、名称、价格)的课程列表。价格来源于其他服务。 控制器方法可能如下所示 我尝试了多种方法来返回Flux。但是,我不知道怎么

  • 嗨我刚开始学习反应式编程 我这里有这段代码,我的过程应该是我将调用TokenRepository来获取令牌,然后使用token.getAccessToken()作为cardRepository.findAllCards()上的参数 想知道这是否可行吗?

  • 我的代码如下。我需要从mongo db获得每次旅行的车费,然后将每次旅行的所有车费相加,得到总车费。我被一种我不知道如何阅读的单声道音乐所困扰。我试着把它转换成通量,但我得到了通量 "'

  • 我的REST控制器方法应该返回Mono,它必须由两个并行请求构建到另一个web服务,并处理它们的响应,其中一个请求返回Mono,另一个请求返回Flux 如何将单声道的响应与熔剂的响应结合起来进行处理? 显然,控制器是错误的,因为: 1)接受2个或更多的Mono,其中我有Mono和Flux-如何将它们组合起来? 2)也不确定: 是否正确? 有什么建议吗?

  • 我如何传递我的异常,以便使用者将看到我在单声道中传递的原始异常?希望我的问题很清楚,提前谢谢

  • 我们有以下架构 SQS(源)->SQS轮询器->我们的业务逻辑->Sink,它从SQS中删除消息。 这是一个akka流(我们的业务逻辑有多个阶段)。 现在我们希望通过添加HTTP服务器(而不是Akka HTTP)来扩展这个体系结构。 现在我们的服务也有了路径 我认为https://doc.akka.io/docs/akka/2.5/stream/operators/source/queue.htm