Flux<ReceiverRecord<Integer, DataDocument>> messages = Flux.defer(() -> {
KafkaReceiver<Integer, DataDocument> receiver = KafkaReceiver.create(options);
return receiver.receive();
});
messages.map(this::transformToOutputFormat)
.map(this::performAction)
.flatMapSequential(receiverRecordMono -> receiverRecordMono)
.doOnNext(record -> record.receiverOffset().acknowledge())
.doOnError(error -> logger.error("Error receiving record", error))
.retryBackoff(100, Duration.ofSeconds(5), Duration.ofMinutes(5))
.subscribe();
正如你所看到的,我所做的是:从Kafka那里获取消息,将其转换成一个意在新目的地的对象,然后将其发送到目的地,然后确认偏移量以标记消息为已消费和已处理。以与从Kafka消费的消息相同的顺序确认偏移量是非常关键的,这样我们就不会将偏移量移动到未完全处理的消息之外(包括将一些数据发送到目的地)。因此,我使用FlatMapSequential
来确保这一点。
为了简单起见,我们假设TransformToOutputFormat()
方法是一个标识转换。
public ReceiverRecord<Integer, DataDocument> transformToOutputFormat(ReceiverRecord<Integer, DataDocument> record) {
return record;
}
performAction()
方法需要通过网络执行某些操作,例如调用HTTP REST API。因此适当的API返回一个Mono,这意味着需要订阅链。此外,我需要此方法返回receiverrecord
,以便可以在上面的flatMapSequential()运算符中确认偏移量。因为我需要订阅的单声道,所以我使用上面的FlatMapSequential
。如果不是,我可以使用映射
。
public Mono<ReceiverRecord<Integer, DataDocument>> performAction(ReceiverRecord<Integer, DataDocument> record) {
return Mono.just(record)
.flatMap(receiverRecord ->
HttpClient.create()
.port(3000)
.get()
.uri("/makeCall?data=" + receiverRecord.value().getData())
.responseContent()
.aggregate()
.asString()
)
.retryBackoff(100, Duration.ofSeconds(5), Duration.ofMinutes(5))
.then(Mono.just(record));
我在这个方法中有两个相互冲突的需求:1。订阅进行HTTP调用的链2。返回ReceiverRecord
使用flatMap()意味着返回类型更改为Mono。在同一位置使用doOnNext()将保留链中的ReceiverRecord,但不允许自动订阅HttpClient响应。
我无法在asstring()
之后添加.subscribe()
,因为我想等到完全接收到HTTP响应后再确认偏移量。
结果,我需要作弊并从方法范围返回record
对象。
另一件事是在PerformAction
内重试时,它会切换线程。由于flatMapSequential()急切地订阅外部通量中的每个Mono,这意味着虽然可以按顺序保证对偏移量的确认,但我们不能保证PerformAction
中的HTTP调用将按相同的顺序执行。
所以我有两个问题。
这里是我想出的解决办法。
Flux<ReceiverRecord<Integer, DataDocument>> messages = Flux.defer(() -> {
KafkaReceiver<Integer, DataDocument> receiver = KafkaReceiver.create(options);
return receiver.receive();
});
messages.map(this::transformToOutputFormat)
.delayUntil(this::performAction)
.doOnNext(record -> record.receiverOffset().acknowledge())
.doOnError(error -> logger.error("Error receiving record", error))
.retryBackoff(100, Duration.ofSeconds(5), Duration.ofMinutes(5))
.subscribe();
我没有使用flatMapSequential订阅performAction Mono并保留序列,而是延迟了从Kafka接收器请求更多消息的请求,直到执行该操作。这就实现了我所需要的一次一次的处理。
因此,performAction不需要返回ReceiverRecord的单声道。我还将其简化为:
public Mono<String> performAction(ReceiverRecord<Integer, DataDocument> record) {
HttpClient.create()
.port(3000)
.get()
.uri("/makeCall?data=" + receiverRecord.value().getData())
.responseContent()
.aggregate()
.asString()
.retryBackoff(100, Duration.ofSeconds(5), Duration.ofMinutes(5));
}
我完全混淆了,,。 哪个是阻塞,哪个不是? 我的意思是如果我使用父进程是否等待子进程返回/才继续执行。 如何影响这些调用?
本文向大家介绍node.js回调函数之阻塞调用与非阻塞调用,包括了node.js回调函数之阻塞调用与非阻塞调用的使用技巧和注意事项,需要的朋友参考一下 首先,node.js作为javascript运行平台,它采用了事件驱动和异步编程的方式,通过事件注册和异步函数,开发人员可以提高资源利用率,服务器的性能也能得到改善。其次,对于前端人来说,node.js作为js的运行平台,我们可以通过编写系统级或者
问题内容: 我有大量数据,并且想要调用缓慢但干净的方法,而不是调用带有第一个结果的副作用的快速方法。我对中间结果不感兴趣,所以我不想收集它们。 明显的解决方案是创建并行流,进行慢速调用,再次使流顺序进行,然后进行快速调用。问题是,所有代码都在单个线程中执行,没有实际的并行性。 示例代码: 如果我删除,代码将按预期执行,但是很明显,非并行操作将在多个线程中调用。 您能推荐一些有关这种行为的参考,或者
我有一个JMS队列,在那里我可以获得几种类型的消息。所有消息都是对象消息。我让Spring DMLC处理它们,并尝试将它们传递到外部endpoint。 每种消息类型都与不同的endpoint相关联。 如果消息必须传递到1个或多个endpoint,我认为阻塞使用者对我没有帮助。 我找不到任何非阻塞消费者的信息,我想知道它是否真的帮助我解决我的问题。
问题内容: 我想使用打开管道,并对其具有非阻塞的“读取”访问权限。 我该如何实现? (我发现的示例都是阻塞/同步的) 问题答案: 设置如下: 现在您可以阅读: 完成后,清理:
现在我们知道如何在一个指定I/O调度器上来调度一个任务,我们可以修改storeBitmap()函数并再次检查StrictMode的不合规做法。为了这个例子,我们可以在新的blockingStoreBitmap()函数中重排代码。 private static void blockingStoreBitmap(Context context, Bitmap bitmap, String filena