我正在尝试批量记录不断发出从一个流源(Kafka)和调用我的服务在一批100。
作为输入,我得到的是一个单一的记录。我正在尝试什么是最好的方式来实现它的反应方式使用Spring反应堆,而不需要有一个突变和锁定在管道外。
Mono.just(input)
.subscribe(i -> {
batches.add(input);
if(batches.size() >= 100) {
// Invoke another reactive pipeline.
// Clear the batch (requires locking in order to be thread safe).
}
});
什么是最好的方法,以实现分批在一个流式源使用反应器。
使用flux.buffer
或flux.bufferTimeout
可以将固定数量的元素收集到列表
中
StepVerifier.create(
Flux.range(0, 1000)
.buffer(100)
)
.expectNextCount(10)
.expectComplete()
.verify()
在情况下,当输入是一个单个值时,假设like调用带有参数的方法:
public void invokeMe(String element);
您可以采用unicastprocessor
技术,将所有数据传送到该处理器,这样它就可以处理批处理
class Batcher {
final UnicastProcessor processor = UnicastProcessor.create();
public void invokeMe(String element) {
processor.sink().next(element);
// or Mono.just(element).subscribe(processor);
}
public Flux<List<String>> listen() {
return processor.bufferTimeout(100, Duration.ofSeconds(5));
}
}
Batcher batcher = new Batcher();
StepVerifier.create(
batcher.listen()
)
.then(() -> Flux.range(0, 1000)
.subscribe(i -> batcher.invokeMe("" + i)))
.expectNextCount(10)
.thenCancel()
.verify()
请注意,unicastporcessor
只允许一个订阅者,因此当批处理结果中有一个相关方和许多数据生产者时,它将对模型有用。如果您的订阅服务器和生产者一样多,您可能希望使用下一个处理器-->DirectProcessor
、TopicProcessor
、WorkerQueueProcessor
。要了解更多关于反应堆处理器的信息,请访问以下链接
问题内容: 我需要在 不更改URL的情况下 进行路由。 在自己实现此功能之前,我尝试过通过React Router寻找一些东西。我看到有这样一个东西 : createMemoryHistory([options]) createMemoryHistory创建一个不与浏览器URL交互的内存历史对象。当您需要自定义用于服务器端呈现,自动测试的历史记录对象或不想操纵浏览器URL(例如,将应用程序嵌入到i
我使用以下命令从Python:3.8映像创建了一个新容器:,每次我必须在该容器中运行解释器。 我希望运行一个新容器,然后直接进入以运行其他内容。然后我可以使用Regan的方法离开容器而不停止它。这可能吗,或者这有什么意义吗?(对docker来说还是个新手)
我有一个雇主组合。选择雇主后,表格中会填入特定于雇主的数据: 下面是它绑定到的属性和检查表是否脏的方法。如果表是脏的,则提示用户如果他们改变雇主,改变将丢失: 一切似乎都正常工作: 用户选择更新表的雇主('KMH')。 用户对表进行更改。 用户然后选择不同的雇主('MPC') 提示用户更改将丢失 用户选择“否”并且Can火雇主返回“假” 选择雇主不改变(跳过if{}块) 然而,在GUI中,雇主选项
问题内容: 我一直在寻找一种不用使用collections.sort就可以对数组列表进行排序的方法,因为我自己的逻辑有缺陷,而且我遇到了很多麻烦。 我需要对它进行排序,以便可以使用我创建的一种方法,该方法基本上可以执行collections.swap的工作,以便对数组列表进行完全排序。 这是我的代码: 我对此一直很烦恼。抱歉,这是在伤害社区。 问题答案: 我想,你希望下面的算法:在阵列的其余部分发
问题内容: 在SQL(SQL Server)中,是否可以从表的标识列中检索下一个ID(整数),而实际上无需插入行?如果删除了最近的行,则不一定是最高ID加1。 我之所以这样问,是因为我们有时不得不用新行更新活动数据库。该行的ID在我们的代码中使用(例如,Switch(ID){Case ID:},并且必须相同。如果我们的开发数据库和实时数据库不同步,最好预先预测一个行ID部署之前。 我当然可以 SE