当前位置: 首页 > 知识库问答 >
问题:

在使用包括重试的流量时顺序调用非阻塞操作

明宜年
2023-03-14
    Flux<ReceiverRecord<Integer, DataDocument>> messages = Flux.defer(() -> {
        KafkaReceiver<Integer, DataDocument> receiver = KafkaReceiver.create(options);
        return receiver.receive();
    });

    messages.map(this::transformToOutputFormat)
            .map(this::performAction)
            .flatMapSequential(receiverRecordMono -> receiverRecordMono)
            .doOnNext(record -> record.receiverOffset().acknowledge())
            .doOnError(error -> logger.error("Error receiving record", error))
            .retryBackoff(100, Duration.ofSeconds(5), Duration.ofMinutes(5))
            .subscribe();

正如你所看到的,我所做的是:从Kafka那里获取消息,将其转换成一个意在新目的地的对象,然后将其发送到目的地,然后确认偏移量以标记消息为已消费和已处理。以与从Kafka消费的消息相同的顺序确认偏移量是非常关键的,这样我们就不会将偏移量移动到未完全处理的消息之外(包括将一些数据发送到目的地)。因此,我使用FlatMapSequential来确保这一点。

为了简单起见,我们假设TransformToOutputFormat()方法是一个标识转换。

public ReceiverRecord<Integer, DataDocument> transformToOutputFormat(ReceiverRecord<Integer, DataDocument> record) {
    return record;
}

performAction()方法需要通过网络执行某些操作,例如调用HTTP REST API。因此适当的API返回一个Mono,这意味着需要订阅链。此外,我需要此方法返回receiverrecord,以便可以在上面的flatMapSequential()运算符中确认偏移量。因为我需要订阅的单声道,所以我使用上面的FlatMapSequential。如果不是,我可以使用映射

public Mono<ReceiverRecord<Integer, DataDocument>> performAction(ReceiverRecord<Integer, DataDocument> record) {
    return Mono.just(record)
            .flatMap(receiverRecord ->
                    HttpClient.create()
                            .port(3000)
                            .get()
                            .uri("/makeCall?data=" + receiverRecord.value().getData())
                            .responseContent()
                            .aggregate()
                            .asString()
            )
            .retryBackoff(100, Duration.ofSeconds(5), Duration.ofMinutes(5))
            .then(Mono.just(record));

我在这个方法中有两个相互冲突的需求:1。订阅进行HTTP调用的链2。返回ReceiverRecord

使用flatMap()意味着返回类型更改为Mono。在同一位置使用doOnNext()将保留链中的ReceiverRecord,但不允许自动订阅HttpClient响应。

我无法在asstring()之后添加.subscribe(),因为我想等到完全接收到HTTP响应后再确认偏移量。

结果,我需要作弊并从方法范围返回record对象。

另一件事是在PerformAction内重试时,它会切换线程。由于flatMapSequential()急切地订阅外部通量中的每个Mono,这意味着虽然可以按顺序保证对偏移量的确认,但我们不能保证PerformAction中的HTTP调用将按相同的顺序执行。

所以我有两个问题。

    null

共有1个答案

常英纵
2023-03-14

这里是我想出的解决办法。

Flux<ReceiverRecord<Integer, DataDocument>> messages = Flux.defer(() -> {
    KafkaReceiver<Integer, DataDocument> receiver = KafkaReceiver.create(options);
    return receiver.receive();
});

messages.map(this::transformToOutputFormat)
        .delayUntil(this::performAction)
        .doOnNext(record -> record.receiverOffset().acknowledge())
        .doOnError(error -> logger.error("Error receiving record", error))
        .retryBackoff(100, Duration.ofSeconds(5), Duration.ofMinutes(5))
        .subscribe();

我没有使用flatMapSequential订阅performAction Mono并保留序列,而是延迟了从Kafka接收器请求更多消息的请求,直到执行该操作。这就实现了我所需要的一次一次的处理。

因此,performAction不需要返回ReceiverRecord的单声道。我还将其简化为:

public Mono<String> performAction(ReceiverRecord<Integer, DataDocument> record) {
    HttpClient.create()
        .port(3000)
        .get()
        .uri("/makeCall?data=" + receiverRecord.value().getData())
        .responseContent()
        .aggregate()
        .asString()
        .retryBackoff(100, Duration.ofSeconds(5), Duration.ofMinutes(5));
}
 类似资料:
  • 我完全混淆了,,。 哪个是阻塞,哪个不是? 我的意思是如果我使用父进程是否等待子进程返回/才继续执行。 如何影响这些调用?

  • 本文向大家介绍node.js回调函数之阻塞调用与非阻塞调用,包括了node.js回调函数之阻塞调用与非阻塞调用的使用技巧和注意事项,需要的朋友参考一下 首先,node.js作为javascript运行平台,它采用了事件驱动和异步编程的方式,通过事件注册和异步函数,开发人员可以提高资源利用率,服务器的性能也能得到改善。其次,对于前端人来说,node.js作为js的运行平台,我们可以通过编写系统级或者

  • 问题内容: 我有大量数据,并且想要调用缓慢但干净的方法,而不是调用带有第一个结果的副作用的快速方法。我对中间结果不感兴趣,所以我不想收集它们。 明显的解决方案是创建并行流,进行慢速调用,再次使流顺序进行,然后进行快速调用。问题是,所有代码都在单个线程中执行,没有实际的并行性。 示例代码: 如果我删除,代码将按预期执行,但是很明显,非并行操作将在多个线程中调用。 您能推荐一些有关这种行为的参考,或者

  • 我有一个JMS队列,在那里我可以获得几种类型的消息。所有消息都是对象消息。我让Spring DMLC处理它们,并尝试将它们传递到外部endpoint。 每种消息类型都与不同的endpoint相关联。 如果消息必须传递到1个或多个endpoint,我认为阻塞使用者对我没有帮助。 我找不到任何非阻塞消费者的信息,我想知道它是否真的帮助我解决我的问题。

  • 问题内容: 我想使用打开管道,并对其具有非阻塞的“读取”访问权限。 我该如何实现? (我发现的示例都是阻塞/同步的) 问题答案: 设置如下: 现在您可以阅读: 完成后,清理:

  • 现在我们知道如何在一个指定I/O调度器上来调度一个任务,我们可以修改storeBitmap()函数并再次检查StrictMode的不合规做法。为了这个例子,我们可以在新的blockingStoreBitmap()函数中重排代码。 private static void blockingStoreBitmap(Context context, Bitmap bitmap, String filena