当前位置: 首页 > 知识库问答 >
问题:

如何使用ApacheKafka实现“恰好一次”kafka消费者?

龚伯寅
2023-03-14

我们有一个应用程序,它使用来自Kafka主题(3个分区)的消息,丰富数据,并将记录保存在DB(Spring JPA)中,然后将消息发布到另一个Kafka主题(在同一个代理上),所有这些都通过使用Camel 2.4.1和Spring Boot 2.1.7进行编排。释放

我们想为 kafka 消费者-生产者组合实现“exactly-once”语义。

消费者设置:

   autoOffsetReset: earliest
   autoCommitEnable: false
   allowManualCommit: true
   breakOnFirstError: true
   group.id : CONSUMER.GROUP.ID
   count: 3
   max.poll.records = 1  # rollback when message processing fails.

生产者设置:

   idempotence: true
   transactionIdPrefix: txn-prefix-id

豆接线:

   @Bean
    SpringTransactionPolicy springTransactionPolicy() throws Exception {
        SpringTransactionPolicy txRequired = new SpringTransactionPolicy();
        txRequired.setTransactionManager(transactionManager());
        txRequired.setPropagationBehaviorName("PROPAGATION_REQUIRED");
        return txRequired;
    }

    @Bean
    public DefaultKafkaProducerFactory<byte[], byte[]> producerFactory() {
        DefaultKafkaProducerFactory<byte[], byte[]> defaultKafkaProducerFactory = new DefaultKafkaProducerFactory<byte[], byte[]>(
                kafkaConfigs());
        // enable transaction manager
        defaultKafkaProducerFactory.setTransactionIdPrefix(transactionIdPrefix);
        return defaultKafkaProducerFactory;
    }


    @Bean
    @Primary
    public ChainedKafkaTransactionManager<byte[], byte[]> transactionManager() throws Exception {
        return new ChainedKafkaTransactionManager<>(kafkaTransactionManager(),jpaTransactionManager());
    }

    @Bean
    public PlatformTransactionManager kafkaTransactionManager() {
        KafkaTransactionManager<byte[], byte[]> kafkaTransactionManager = new KafkaTransactionManager<>(producerFactory);
        kafkaTransactionManager.setTransactionSynchronization(AbstractPlatformTransactionManager.SYNCHRONIZATION_ON_ACTUAL_TRANSACTION);
        kafkaTransactionManager.setRollbackOnCommitFailure(true);
        return kafkaTransactionManager;
    }

    @Bean
    JpaTransactionManager jpaTransactionManager() {
        JpaTransactionManager transactionManager = new JpaTransactionManager();
        transactionManager.setRollbackOnCommitFailure(true);
        return transactionManager;
    }

骆驼路线:

public RoutesBuilder inboundRoute() {
        return new RouteBuilder() {

            @Override
            public void configure() throws Exception {
                //Common error handler
                onException(UnsupportedMessageTypeException.class).
                   maximumRedeliveries(redeliveryCount).
                   handled(true).
                   bean(ExceptionPropagatorProcessor.class, "process").
                   bean(manualCommitProcessor).
                end();

                onException(AppRuntimeException.class).
                   maximumRedeliveries(redeliveryCount).                
                   bean(ExceptionPropagatorProcessor.class, "process")
                end();  

                onException(RetryExhaustedException.class).
                   maximumRedeliveries(0).// No retry for this exception
                   handled(true).
                   bean(ExceptionPropagatorProcessor.class, "process").
                   bean(kafkaManualCommitProcessor).
                end(); 

                from("kafka:inboundTopic").
                    routeId("consume-msg").
                    transacted("springTransactionPolicy").
                               bean(transactionBeginProcessor).
                               //check if this is a retry scenario, the max retry count reached then throw RetryExhaustedException.
                               bean(retryEvaluationProcessor). 
                               bean(enrichProcessor). // publish kafka messages
                               bean(persistenceProcessor).
                               bean(transactionEndProcessor). // publish kafka messages
                    bean(manualCommitProcessor);

但当出现异常处理场景时,我们无法让Kafka制作人提交消息。我缺少什么,正确的方法是什么?

共有1个答案

阚允晨
2023-03-14

您似乎正在使用Spring Kafka,而他们的KafkaTransactionManager并不是一个真正支持XA的事务管理器(有关限制,请参阅他们的文档),因此您不能将其用于Kafka和JDBC数据库等的回滚。

并且 camel-kafka 不支持 Kafka 事务(目前)。我已经创建了一张票:https://issues.apache.org/jira/browse/CAMEL-15016

 类似资料:
  • 我正在使用Spring boot2.1.7。RELEASE和spring-kafka 2.2.7。RELEASE。我正在使用@KafkaListener注释来创建一个消费者,我正在使用消费者的所有默认设置。 这是我的消费者配置: 由于某些原因,我在同一个应用程序中有多个使用者,如下所示。 尽管如此,根据关于“消费者线程安全”的合流文件 一个线程中不能有多个属于同一组的使用者,也不能有多个线程安全地

  • 问题内容: 因此,以某种方式(玩转),我发现自己使用了regex这样的正则表达式。 从逻辑上讲,对我来说,它应表示: (一个数字正好一次)正好两次,即一个数字正好两次。 但实际上,它似乎仅表示“一个数字仅一次”(因此忽略了)。 使用或相似,可以看到相似的结果。 为什么会这样?它是在regex / Java文档中的某个地方明确声明的,还是只是Java开发人员即时做出的决定,还是一个错误? 还是实际上

  • 当我只打开一次处理时,我会得到以下错误。注意:我们的应用程序非常安全,我们只允许Kafka用户和消费者访问他们明确需要的资源。 只有一次处理kafka流是否在所有流任务中使用每个流任务的消费者组而不是消费者组?

  • 我在一些关于堆栈溢出的答案中看到,通常在web中也看到,Kafka不支持消费确认,或者消费一次就很难实现。 在以下作为示例的条目中,有没有理由使用RabbitMQ而不是Kafka?,我可以读到以下语句: RabbitMQ将保留已消耗/已确认/未确认消息的所有状态,而Kafka则不保留 或 有人能解释一下为什么Kafka的“一次消费保证”很难实现吗?这与Kafka和RabbitMQ等其他更传统的消息

  • 如何提高Kafka消费者的绩效?我有(并且需要)至少一次Kafka消费语义学 我有以下配置。processInDB()需要2分钟才能完成。因此,仅处理10条消息(全部在单个分区中)就需要20分钟(假设每条消息2分钟)。我可以在不同的线程中调用processInDB,但我可能会丢失消息!。如何在2到4分钟的时间窗口内处理所有10条消息? 下面是我的Kafka消费者代码。

  • 本文向大家介绍Kafka 的消费者如何消费数据相关面试题,主要包含被问及Kafka 的消费者如何消费数据时的应答技巧和注意事项,需要的朋友参考一下 消费者每次消费数据的时候,消费者都会记录消费的物理偏移量(offset)的位置 等到下次消费时,他会接着上次位置继续消费