当前位置: 首页 > 知识库问答 >
问题:

在使用reactor kafka进行并发处理的同时,重试使用消息

穆招
2023-03-14

我有一个项目,在这个项目中,我使用来自传入主题的消息进行处理,并将它们发送到传出主题。现在考虑一下,由于某些基础结构问题导致发送消息时出错,传出主题已关闭。在这种情况下,我将提交到最后一条成功的消息(成功处理并发送到传出主题),并从发送失败的地方重试消费。

这可以在“非反应性”kafka中使用ack. nack(index,0017)轻松实现,代码如下所示:

@KafkaListener(
        autoStartup = "false",
        topics = "incoming-topic",
        containerFactory = "myListenerContainerFactory",
        groupId = "test-consumer"
)
public void consume(List<ConsumerRecord<String, String>> consumerRecords, Acknowledgment ack) {
    for (int i = 0; i < consumerRecords.size(); i++) {
        ConsumerRecord<String, String> record = consumerRecords.get(i);
        try {
            JsonObject recordNode = gson.fromJson(record.value(), JsonObject.class);
            CompletableFuture<String> future1 = transformationService.transform1(recordNode);
            CompletableFuture<String> future2 = transformationService.transform2(recordNode);
            CompletableFuture.allOf(future1, future2).join();

            outgoingTopicProducer.sendMessage(record, recordNode); // method that calls producer
            ack.acknowledge();
        } catch (ApiException e) { // ApiException thrown from producer
            logger.error("Kafka exception occurred while sending message, blocking queue... ");
            ack.nack(i, 2000); // this enables to read same message again after 2 secs
        } catch (Exception e) { // ignore any other exception like JsonProcessingException
            logger.error("error occurred while processing message. Partition: {}, Offset: {}, Value: {}",
                    record.partition(),
                    record.offset(),
                    record.value());
            logger.error("exception: " + e.getMessage());
        }
    }
}

现在,为了获得更好的性能和处理背压,我想让这个代码变得被动。我还想并行处理每个分区。通过查看Reactorkafka文档,我得出了以下代码:

private final Scheduler scheduler = Schedulers.newBoundedElastic(60, Integer.MAX_VALUE, "sample", 60, true);
private final KafkaSender<String, String> sender = KafkaSender.create(SenderOptions.create(getProducerConfig()));
@EventListener(ApplicationReadyEvent.class)
public void receiverFlux() {
    KafkaReceiver.create(getReceiverOptions())
        .receive()
        .groupBy(m -> m.receiverOffset().topicPartition())
        .flatMap(partitionFlux ->
            partitionFlux.publishOn(scheduler)
                    .<SenderRecord<String, String, ReceiverOffset>>handle((record, sink) -> {
                        JsonObject recordNode = gson.fromJson(record.value(), JsonObject.class);

                        CompletableFuture<String> future1 = transformationService.transform1(recordNode);
                        Mono<String> mono1 = Mono.fromFuture(future1)
                                .subscribeOn(Schedulers.parallel())
                                .doFinally(sig -> {
                                    if (sig == SignalType.CANCEL) {
                                        future1.cancel(true);
                                    }
                                });

                        CompletableFuture<String> future2 = transformationService.transform2(recordNode);
                        Mono<String> mono2 = Mono.fromFuture(future2)
                                .subscribeOn(Schedulers.parallel())
                                .doFinally(sig -> {
                                    if (sig == SignalType.CANCEL) {
                                        future2.cancel(true);
                                    }
                                });

                        try {
                            Mono.when(mono1, mono2).block();
                            sink.next(SenderRecord.create(new ProducerRecord<>("outgoing-topic", record.partition(), record.timestamp(), record.key(), recordNode.toString(), record.headers()),
                                    record.receiverOffset()));
                        } catch (Exception e) {
                            sink.error(e);
                        }
                    })
                    .onErrorResume(e -> {
                        logger.error("error occurred wile processing record");
                        logger.error("exception: " + e.getMessage());
                        return Mono.empty();
                    })
                    .as(sender::send)
                    .doOnNext(record -> {
                        logger.info("ack offset");
                        record.correlationMetadata().acknowledge();
                    })
                    .doOnError(e -> {
                        logger.error("error occurred while sending message");
                        logger.error("exception: " + e.getMessage());
                    })
                    .retryWhen(Retry.backoff(10, Duration.ofSeconds(2)))

        ).onErrorResume(e -> {
            logger.error("Restart consumer ....");
            return Mono.empty();
        })
        .repeat()
        .subscribe();
}

为了测试这一点,我将传出主题的名称改为不正确的主题,并得到以下日志:

{"@message":"request(1)"}
{"@message":"[Producer clientId=clientId] Error while fetching metadata with correlation id 1 : {incorrect-topic=TOPIC_AUTHORIZATION_FAILED}"}
{"@message":"[Producer clientId=clientId] Topic authorization failed for topics [incorrect-topic]"}
{"@message":"Sender failed","@fields":{"level":"ERR","throwable":{"message":"Not authorized to access topics: [incorrect-topic]"}}}
{"@message":"error occurred while sending message"}
{"@message":"exception: Not authorized to access topics: [incorrect-topic]"}
{"@message":"Scheduler worker in group main failed with an uncaught exception","@fields{"level":"ERR","throwable":{"stackTrace":[{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"runAsync","fileName":"FluxPublishOn.java","lineNumber":1004,"nativeMethod":false,"className":"reactor.core.publisher.FluxPublishOn$PublishOnConditionalSubscriber"},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"run","fileName":"FluxPublishOn.java","lineNumber":1067,"nativeMethod":false,"className":"reactor.core.publisher.FluxPublishOn$PublishOnConditionalSubscriber"},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"call","fileName":"WorkerTask.java","lineNumber":84,"nativeMethod":false,"className":"reactor.core.scheduler.WorkerTask"},{"classLoaderName":"app","moduleName":null,"moduleVersion":null,"methodName":"call","fileName":"WorkerTask.java","lineNumber":37,"nativeMethod":false,"className":"reactor.core.scheduler.WorkerTask"},{"classLoaderName":null,"moduleName":"java.base","moduleVersion":"15","methodName":"run","fileName":"FutureTask.java","lineNumber":264,"nativeMethod":false,"className":"java.util.concurrent.FutureTask"},{"classLoaderName":null,"moduleName":"java.base","moduleVersion":"15","methodName":"run","fileName":"ScheduledThreadPoolExecutor.java","lineNumber":304,"nativeMethod":false,"className":"java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask"},{"classLoaderName":null,"moduleName":"java.base","moduleVersion":"15","methodName":"runWorker","fileName":"ThreadPoolExecutor.java","lineNumber":1130,"nativeMethod":false,"className":"java.util.concurrent.ThreadPoolExecutor"},{"classLoaderName":null,"moduleName":"java.base","moduleVersion":"15","methodName":"run","fileName":"ThreadPoolExecutor.java","lineNumber":630,"nativeMethod":false,"className":"java.util.concurrent.ThreadPoolExecutor$Worker"},{"classLoaderName":null,"moduleName":"java.base","moduleVersion":"15","methodName":"run","fileName":"Thread.java","lineNumber":832,"nativeMethod":false,"className":"java.lang.Thread"}],"message":"Cannot invoke \"java.util.Queue.isEmpty()\" because \"q\" is null","suppressed":[],"localizedMessage":"Cannot invoke \"java.util.Queue.isEmpty()\" because \"q\" is null"}}}

因此,观察结果是,此代码不会在失败的情况下重试发送消息,当新消息到来时,它会读取新消息,删除最后一条失败的消息,导致该消息丢失。优先级是我不想丢失任何消息。

请帮助我实现这些功能。欢迎任何其他有助于提高性能的建议。

共有1个答案

解鸿运
2023-03-14

我能够通过在ReceiverRecord通量周围使用defer运算符来解决这个问题。

Flux.defer(() -> KafkaReceiver.create(getReceiverOptions())
    .receive())

代码的其余部分不需要任何更改。

 类似资料:
  • 我附上了一个应用程序的示例代码,它在我的Core i3 370M笔记本电脑上(Win 7 64bit,Java 1.8.0.4564bit)在大约20秒内重现了这个问题。这个应用程序读取识别文本蕴涵(RTE)语料库的XML文件,然后使用标准Java并发类同时解析所有句子。本地RTE XML文件的路径需要作为命令行参数给出。在我的测试中,我使用了以下公开的XML文件:http://www.nist.

  • 我的Spring批处理作业每3分钟运行一次。 步骤应为 每个用户的记录应该并行执行。每个用户最多可以有150k条记录。 每个用户都可以有更新和删除记录。更新记录应在删除之前运行。 更新/删除集应该自己并行运行。但严格来说,所有更新都应该在删除之前完成。 有谁能提出在多个级别实现并行性的最佳方法,并遵循更新和删除级别的顺序吗。我正在研究Spring异步执行器服务、并行流和其他Spring库。Rx,仅

  • 我正在使用在每一行上执行一个函数,这需要很长时间,为了加快速度,有没有一种方法可以使用并行处理,使多个核心在不同的行上并发工作? 例如,我将PRISM天气数据(https://prism.oregonstate.edu/)聚合到州一级,同时按人口加权。这是基于https://www.patrickbaylis.com/blog/2021-08-15-pop-weighted-weather/. 请

  • 我正在使用Kafka活页夹的Spring Cloud Stream。它工作得很好,但客户端接收到重复的消息。已经尝试了所有Kafka消费属性,但没有结果。 在我的应用程序示例中检查2个类-Aggregate Application和EventFilterApplication。如果我运行EventFilterApplication-只有1条消息,如果是Aggregate Application-2

  • 我很难在Kafka主题的消费者中找到处理异常的简单模式。场景如下:在消费者中,我调用一个外部服务。如果服务不可用,我想重试几次,然后停止消费。 最简单的模式似乎是一种处理它的阻塞同步方式,在Java中如下所示: 但是,我觉得必须有一种更简单的方法(不使用第三方库),并且避免阻塞线程。 这似乎是我们想要的一种常见的东西,但我找不到一个简单的例子来说明这种模式。

  • 是否可以使用使用夸克和小黑-反应-消息注释的相同方法并行处理多个amqp-消息? 更准确地说,我有以下类: 使用应用程序属性中的配置: 现在,我想通过配置来定义可以并行处理多少条消息。例如,在4核cpu上,它应该并行运行4个内核。 目前,我只能添加4个具有不同名称的方法副本,以允许这种并行性,但这是不可配置的。

  • 我在chrome中遇到了一个错误:“您正在使用一个不受支持的命令行标志-忽略-证书-错误。稳定性和安全性将受到影响。”为我下面的硒代码。 我在下面的链接中搜索了相同的错误解决方案。 http://www.abodeqa.com/tag/how-to-remove-you-are-using-an-unsupport-command-line-flag-ignore-certifice-errors

  • 问题内容: 从文档 如果遇到需要插入1000 000行/对象的情况: 为什么我们应该使用这种方法?与StatelessSession一相比,它给我们带来了什么好处: 我的意思是,这个(“替代”)最后一个示例不使用内存,不需要进行同步,清除缓存,那么对于这样的情况,这应该是最佳实践吗?那么为什么要使用前一个呢? 问题答案: 从文档中,您链接到: 特别是,无状态会话不会实现第一级缓存,也不会与任何第二