当前位置: 首页 > 知识库问答 >
问题:

Spring反应器在不突变的情况下对输入进行配料

司空奕
2023-03-14

我正在尝试批量记录不断发出从一个流源(Kafka)和调用我的服务在一批100。

作为输入,我得到的是一个单一的记录。我正在尝试什么是最好的方式来实现它的反应方式使用Spring反应堆,而不需要有一个突变和锁定在管道外。

 Mono.just(input)
                .subscribe(i -> {
                     batches.add(input);
                     if(batches.size() >= 100) {
                         // Invoke another reactive pipeline.
                         // Clear the batch (requires locking in order to be thread safe).
                     }
                });

什么是最好的方法,以实现分批在一个流式源使用反应器。

共有1个答案

曹经业
2023-03-14

使用flux.bufferflux.bufferTimeout可以将固定数量的元素收集到列表

StepVerifier.create(
      Flux.range(0, 1000)
          .buffer(100)
   )
   .expectNextCount(10)
   .expectComplete()
   .verify()

在情况下,当输入是一个单个值时,假设like调用带有参数的方法:

public void invokeMe(String element);

您可以采用unicastprocessor技术,将所有数据传送到该处理器,这样它就可以处理批处理

class Batcher {

   final UnicastProcessor processor = UnicastProcessor.create();

   public void invokeMe(String element) {
       processor.sink().next(element);
       // or Mono.just(element).subscribe(processor);
   }


   public Flux<List<String>> listen() {
       return processor.bufferTimeout(100, Duration.ofSeconds(5));
   }
}

Batcher batcher = new Batcher();

StepVerifier.create(
      batcher.listen()
   )
   .then(() -> Flux.range(0, 1000)
                   .subscribe(i -> batcher.invokeMe("" + i)))
   .expectNextCount(10)
   .thenCancel()
   .verify()

请注意,unicastporcessor只允许一个订阅者,因此当批处理结果中有一个相关方和许多数据生产者时,它将对模型有用。如果您的订阅服务器和生产者一样多,您可能希望使用下一个处理器-->DirectProcessorTopicProcessorWorkerQueueProcessor。要了解更多关于反应堆处理器的信息,请访问以下链接

 类似资料:
  • 问题内容: 我需要在 不更改URL的情况下 进行路由。 在自己实现此功能之前,我尝试过通过React Router寻找一些东西。我看到有这样一个东西 : createMemoryHistory([options]) createMemoryHistory创建一个不与浏览器URL交互的内存历史对象。当您需要自定义用于服务器端呈现,自动测试的历史记录对象或不想操纵浏览器URL(例如,将应用程序嵌入到i

  • 我使用以下命令从Python:3.8映像创建了一个新容器:,每次我必须在该容器中运行解释器。 我希望运行一个新容器,然后直接进入以运行其他内容。然后我可以使用Regan的方法离开容器而不停止它。这可能吗,或者这有什么意义吗?(对docker来说还是个新手)

  • 我有一个雇主组合。选择雇主后,表格中会填入特定于雇主的数据: 下面是它绑定到的属性和检查表是否脏的方法。如果表是脏的,则提示用户如果他们改变雇主,改变将丢失: 一切似乎都正常工作: 用户选择更新表的雇主('KMH')。 用户对表进行更改。 用户然后选择不同的雇主('MPC') 提示用户更改将丢失 用户选择“否”并且Can火雇主返回“假” 选择雇主不改变(跳过if{}块) 然而,在GUI中,雇主选项

  • 问题内容: 我一直在寻找一种不用使用collections.sort就可以对数组列表进行排序的方法,因为我自己的逻辑有缺陷,而且我遇到了很多麻烦。 我需要对它进行排序,以便可以使用我创建的一种方法,该方法基本上可以执行collections.swap的工作,以便对数组列表进行完全排序。 这是我的代码: 我对此一直很烦恼。抱歉,这是在伤害社区。 问题答案: 我想,你希望下面的算法:在阵列的其余部分发

  • 问题内容: 在SQL(SQL Server)中,是否可以从表的标识列中检索下一个ID(整数),而实际上无需插入行?如果删除了最近的行,则不一定是最高ID加1。 我之所以这样问,是因为我们有时不得不用新行更新活动数据库。该行的ID在我们的代码中使用(例如,Switch(ID){Case ID:},并且必须相同。如果我们的开发数据库和实时数据库不同步,最好预先预测一个行ID部署之前。 我当然可以 SE