当前位置: 首页 > 知识库问答 >
问题:

多分区ReactorKafka至少一次处理故障与偏移

燕嘉熙
2023-03-14

下面是从kafka主题(8分区)接收消息并对其进行处理的消费者代码

    @Component
    public class MessageConsumer {

        private static final String TOPIC = "mytopic.t";
        private static final String GROUP_ID = "mygroup";
        private final ReceiverOptions consumerSettings;
        private static final Logger LOG = LoggerFactory.getLogger(MessageConsumer.class);

        @Autowired
        public MessageConsumer(@Qualifier("consumerSettings") ReceiverOptions consumerSettings)
        {
            this.consumerSettings=consumerSettings;
            consumerMessage();
        }

        private void consumerMessage()
        {

        KafkaReceiver<String, String> receiver = KafkaReceiver.create(receiverOptions(Collections.singleton(TOPIC)));

        Scheduler scheduler = Schedulers.newElastic("FLUX_DEFER", 10, true);

        Flux.defer(receiver::receive)
                .groupBy(m -> m.receiverOffset().topicPartition())
                .flatMap(partitionFlux ->
                        partitionFlux.publishOn(scheduler)
                                .concatMap(m -> {
                                    LOG.info("message received from kafka : " + "key : " + m.key()+ " partition: " + m.partition());
                                    return process(m.key(), m.value())
                                            .thenEmpty(m.receiverOffset().commit());
                                }))
                .retryBackoff(5, Duration.ofSeconds(2), Duration.ofHours(2))
                .doOnError(err -> {
                    handleError(err);
                }).retry()
                .doOnCancel(() -> close()).subscribe();

    }

    private void close() {
    }

    private void handleError(Throwable err) {
        LOG.error("kafka stream error : ",err);
    }

    private Mono<Void> process(String key, String value)
    {
        if(key.equals("error"))
            return Mono.error(new Exception("process error : "));

        LOG.error("message consumed : "+key);
        return Mono.empty();
    }


    public ReceiverOptions<String, String> receiverOptions(Collection<String> topics) {
        return consumerSettings
                .commitInterval(Duration.ZERO)
                .commitBatchSize(0)
                .addAssignListener(p -> LOG.info("Group {} partitions assigned {}", GROUP_ID, p))
                .addRevokeListener(p -> LOG.info("Group {} partitions assigned {}", GROUP_ID, p))
                .subscription(topics);
    }


}
    @Bean(name="consumerSettings")
    public ReceiverOptions<String, String> getConsumerSettings() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
        props.put(ConsumerConfig.CLIENT_ID_CONFIG, GROUP_ID);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        props.put("max.block.ms", "3000");
        props.put("request.timeout.ms", "3000");

        return ReceiverOptions.create(props);
    }

如果处理逻辑中没有返回错误,则所有工作都按预期进行。

但是,如果我抛出一个错误来模拟特定消息的处理逻辑中的异常行为,那么我就没有处理导致异常的消息。流将移动到下一条消息。

我想要实现的是,处理当前消息并提交偏移量,如果它成功,然后移动到下一个记录。

问候,

维诺特

共有1个答案

晏正豪
2023-03-14

下面的代码适用于我。其思想是重试配置的失败消息多次,如果它仍然失败,则将它移到失败队列并提交消息。同时并发地处理来自其他分区的消息。

如果来自特定分区的消息多次配置失败,那么在延迟后重新启动流,这样我们就可以通过不连续地命中依赖项故障来处理依赖项故障。

@Autowired
public ReactiveMessageConsumer(@Qualifier("consumerSettings") ReceiverOptions consumerSettings,MessageProducer producer)
{
    this.consumerSettings=consumerSettings;
    this.fraudCheckService=fraudCheckService;
    this.producer=producer;
    consumerMessage();
}

private void consumerMessage() {

    int numRetries=3;

    Scheduler scheduler = Schedulers.newElastic("FLUX_DEFER", 10, true);

    KafkaReceiver<String, String> receiver = KafkaReceiver.create(receiverOptions(Collections.singleton(TOPIC)));

    Flux<GroupedFlux<TopicPartition, ReceiverRecord<String, String>>> f = Flux.defer(receiver::receive)
            .groupBy(m -> m.receiverOffset().topicPartition());

    Flux f1 = f.publishOn(scheduler).flatMap(r -> r.publishOn(scheduler).concatMap(b ->
            Flux.just(b)
                    .concatMap(a -> {
                        LOG.error("processing message - order: {} offset: {} partition: {}",a.key(),a.receiverOffset().offset(),a.receiverOffset().topicPartition().partition());

                        return process(a.key(), a.value()).
                                then(a.receiverOffset().commit())
                                .doOnSuccess(d -> LOG.info("committing  order {}: offset: {} partition: {} ",a.key(),a.receiverOffset().offset(),a.receiverOffset().topicPartition().partition()))
                                .doOnError(d -> LOG.info("committing offset failed for order {}: offset: {} partition: {} ",a.key(),a.receiverOffset().offset(),a.receiverOffset().topicPartition().partition()));
                    })
                    .retryWhen(companion -> companion
                            .doOnNext(s -> LOG.info(" --> Exception processing message for order {}: offset: {} partition: {} message: {} " , b.key() , b.receiverOffset().offset(),b.receiverOffset().topicPartition().partition(),s.getMessage()))
                            .zipWith(Flux.range(1, numRetries), (error, index) -> {
                                if (index < numRetries) {
                                    LOG.info(" --> Retying {} order: {} offset: {} partition: {} ", index, b.key(),b.receiverOffset().offset(),b.receiverOffset().topicPartition().partition());
                                    return index;
                                } else {
                                    LOG.info(" --> Retries Exhausted: {} - order: {} offset: {} partition: {}. Message moved to error queue. Commit and proceed to next", index, b.key(),b.receiverOffset().offset(),b.receiverOffset().topicPartition().partition());
                                    producer.sendMessages(ERROR_TOPIC,b.key(),b.value());
                                    b.receiverOffset().commit();
                                    //return index;
                                    throw Exceptions.propagate(error);
                                }
                            })
                            .flatMap(index -> Mono.delay(Duration.ofSeconds((long) Math.pow(1.5, index - 1) * 3)))
                            .doOnNext(s -> LOG.info(" --> Retried at: {} ", LocalTime.now()))
                    ))
    );

    f1.doOnError(a ->  {
                LOG.info("Moving to next message because of : ", a);
                try {

                    Thread.sleep(5000); // configurable
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }

    ).retry().subscribe(); 

}

public ReceiverOptions<String, String> receiverOptions(Collection<String> topics) {
    return consumerSettings
            .commitInterval(Duration.ZERO)
            .commitBatchSize(0)
            .addAssignListener(p -> LOG.info("Group {} partitions assigned {}", GROUP_ID, p))
            .addRevokeListener(p -> LOG.info("Group {} partitions assigned {}", GROUP_ID, p))
            .subscription(topics);
}

private Mono<Void> process(OrderId orderId, TraceId traceId)
{
    try {

        Thread.sleep(500); // simulate slow response
    } catch (InterruptedException e) {
        // Causes the restart
        e.printStackTrace();
    }

   if(orderId.getId().startsWith("error")) // simulate error scenario
        return Mono.error(new Exception("processing message failed for order: " + orderId.getId()));

    return Mono.empty();
}
 类似资料:
  • 问题内容: 我有一个用于捕获任何分段错误或ctrl- c的应用程序。使用下面的代码,我能够捕获分段错误,但是该处理程序一次又一次地被调用。我该如何阻止他们。供您参考,我不想退出我的申请。我只是可以小心释放所有损坏的缓冲区。 可能吗? 处理程序就是这样。 在这里,对于Segmentation故障信号,处理程序被多次调用,并且很明显MyfreeBuffers()给我释放已释放的内存的错误。我只想释放一

  • 我有一个应用程序,我用它来捕捉任何分割错误或ctrl-c。使用下面的代码,我能够捕获分段错误,但是处理程序被一次又一次地调用。我怎样才能阻止他们。告诉你,我不想退出我的申请。我只是可以小心释放所有损坏的缓冲区。 可能吗? handler是这样的。 这里的分段故障信号,处理程序被多次调用,因为明显的MyFreeBuffers()给我释放已经释放的内存的错误。我只想免费一次,但仍然不想退出应用程序。

  • Webpack 的配置比较复杂,很容出现错误,下面是一些通常的故障处理手段。 一般情况下,webpack 如果出问题,会打印一些简单的错误信息,比如模块没有找到。我们还可以通过参数 --display-error-details 来打印错误详情。 $ webpack --display-error-details Hash: a40fbc6d852c51fceadb Version: webpa

  • 3.1 PG 无法达到 CLEAN 状态 创建一个新集群后,PG 的状态一直处于 active , active + remapped 或 active + degraded 状态, 而无法达到 active + clean 状态 ,那很可能是你的配置有问题。 你可能需要检查下集群中有关 Pool 、 PG 和 CRUSH 的配置项,做以适当的调整。 一般来说,你的集群中需要多于 1 个 OSD,

  • 进行 OSD 排障前,先检查一下 monitors 和网络。如果 ceph health 或 ceph -s 返回的是健康状态,这意味着 monitors 形成了法定人数。如果 monitor 还没达到法定人数、或者 monitor 状态错误,要先解决 monitor 的问题。核实下你的网络,确保它在正常运行,因为网络对 OSD 的运行和性能有显著影响。 2.1 收集 OSD 数据 开始 OSD

  • Monitor 维护着 Ceph 集群的信息,如果 Monitor 无法正常提供服务,那整个 Ceph 集群就不可访问。一般来说,在实际运行中,Ceph Monitor的个数是 2n + 1 ( n >= 0) 个,在线上至少3个,只要正常的节点数 >= n+1,Ceph 的 Paxos 算法就能保证系统的正常运行。所以,当 Monitor 出现故障的时候,不要惊慌,冷静下来,一步一步地处理。 1