当前位置: 首页 > 知识库问答 >
问题:

Spring Kafka事务已启用,但消费者仍收到回滚消息

梁丘翔
2023-03-14

我正在为我的生产者和消费者应用程序使用Spring kafka事务。

要求在生产者方面有多个步骤:将消息发送到kafka,然后保存到db。如果保存到db失败,则希望回滚发送给kafka的消息。

所以在消费者方面,我将isolation.leve设置为read_committed,如果消息是从kafka回滚的,消费者不应该阅读它。

生产商应用代码为:

@Configuration
@EnableKafka
public class KafkaConfiguration {

  @Bean
  public ProducerFactory<String, Customer> producerFactory() {
    DefaultKafkaProducerFactory<String, Customer> pf = new DefaultKafkaProducerFactory<>(producerConfigs());
    pf.setTransactionIdPrefix("customer.txn.tx-");
    return pf;
  }

  @Bean
  public Map<String, Object> producerConfigs() {
    Map<String, Object> props = new HashMap<>();
    // create a minimum Producer configs
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "http://127.0.0.1:9092");
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
    props.put("schema.registry.url", "http://127.0.0.1:8081");

    // create safe Producer
    props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
    props.put(ProducerConfig.ACKS_CONFIG, "all");
    props.put(ProducerConfig.RETRIES_CONFIG, Integer.toString(Integer.MAX_VALUE));
    props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "5"); // kafka 2.0 >= 1.1 so we can keep this as 5. Use 1 otherwise.

    // high throughput producer (at the expense of a bit of latency and CPU usage)
    props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");
    props.put(ProducerConfig.LINGER_MS_CONFIG, "20");
    props.put(ProducerConfig.BATCH_SIZE_CONFIG, Integer.toString(32 * 1024)); // 32 KB batch size
    return props;
  }

  @Bean
  public KafkaTemplate<String, Customer> kafkaTemplate() {
    return new KafkaTemplate<>(producerFactory());
  }

  @Bean
  public KafkaTransactionManager kafkaTransactionManager(ProducerFactory<String, Customer> producerFactory) {
    KafkaTransactionManager<String, Customer> ktm = new KafkaTransactionManager<>(producerFactory);
    ktm.setTransactionSynchronization(AbstractPlatformTransactionManager.SYNCHRONIZATION_ON_ACTUAL_TRANSACTION);
    return ktm;
  }

  @Bean
  @Primary
  public JpaTransactionManager jpaTransactionManager(EntityManagerFactory entityManagerFactory) {
    return new JpaTransactionManager(entityManagerFactory);
  }

  @Bean(name = "chainedTransactionManager")
  public ChainedTransactionManager chainedTransactionManager(JpaTransactionManager jpaTransactionManager,
                                                             KafkaTransactionManager kafkaTransactionManager) {
    return new ChainedTransactionManager(kafkaTransactionManager, jpaTransactionManager);
  }
}


@Component
@Slf4j
public class KafkaProducerService {

  private KafkaTemplate<String, Customer> kafkaTemplate;
  private CustomerConverter customerConverter;
  private CustomerRepository customerRepository;

  public KafkaProducerService(KafkaTemplate<String, Customer> kafkaTemplate, CustomerConverter customerConverter, CustomerRepository customerRepository) {
    this.kafkaTemplate = kafkaTemplate;
    this.customerConverter = customerConverter;
    this.customerRepository = customerRepository;
  }

  @Transactional(transactionManager = "chainedTransactionManager", rollbackFor = Exception.class)
  public void sendEvents(String topic, CustomerModel customer) {
    LOGGER.info("Sending to Kafka: topic: {}, key: {}, customer: {}", topic, customer.getKey(), customer);
//    kafkaTemplate.send(topic, customer.getKey(), customerConverter.convertToAvro(customer));
    kafkaTemplate.executeInTransaction(kt -> kt.send(topic, customer.getKey(), customerConverter.convertToAvro(customer)));
    customerRepository.saveToDb();
  }
}


因此,我在saveToDb方法中显式抛出一个异常,并且可以看到异常抛出。但是消费者应用程序仍然可以看到消息。

消费者代码:

@Slf4j
@Configuration
@EnableKafka
public class KafkaConfiguration {

  @Bean
  ConcurrentKafkaListenerContainerFactory<String, Customer> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, Customer> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.setAfterRollbackProcessor(new DefaultAfterRollbackProcessor<String, Customer>(-1));


//    SeekToCurrentErrorHandler errorHandler =
//        new SeekToCurrentErrorHandler((record, exception) -> {
//          // recover after 3 failures - e.g. send to a dead-letter topic
////          LOGGER.info("***in error handler data, {}", record);
////          LOGGER.info("***in error handler headers, {}", record.headers());
////          LOGGER.info("value: {}", new String(record.headers().headers("springDeserializerExceptionValue").iterator().next().value()));
//        }, 3);
//
//    factory.setErrorHandler(errorHandler);

    return factory;
  }

  @Bean
  public ConsumerFactory<String, Customer> consumerFactory() {
    return new DefaultKafkaConsumerFactory<>(consumerConfigs());
  }

  @Bean
  public Map<String, Object> consumerConfigs() {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
//    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
//    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);

    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer2.class);
    props.put(ErrorHandlingDeserializer2.VALUE_DESERIALIZER_CLASS, KafkaAvroDeserializer.class);

    props.put("schema.registry.url", "http://127.0.0.1:8081");
    props.put("specific.avro.reader", "true");
    props.put("isolation.level", "read_committed");

//    props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); // disable auto commit of offsets
    props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "100"); // disable auto commit of offsets
    return props;
  }
}

@Component
@Slf4j
public class KafkaConsumerService {

  @KafkaListener(id = "demo-consumer-stream-group", topics = "customer.txn")
  @Transactional
  public void process(ConsumerRecord<String, Customer> record) {
    LOGGER.info("Customer key: {} and value: {}", record.key(), record.value());
    LOGGER.info("topic: {}, partition: {}, offset: {}", record.topic(), record.partition(), record.offset());
  }
}

我错过什么了吗?

共有1个答案

东门楚
2023-03-14

执行InTransaction将在单独的事务中运行。请参阅javadocs:

/**
 * Execute some arbitrary operation(s) on the operations and return the result.
 * The operations are invoked within a local transaction and do not participate
 * in a global transaction (if present).
 * @param callback the callback.
 * @param <T> the result type.
 * @return the result.
 * @since 1.1
 */
<T> T executeInTransaction(OperationsCallback<K, V, T> callback);

只需使用send()即可参与现有事务。

 类似资料:
  • 我正在使用Spring Kafka consumer,它从主题中获取消息并将其保存到数据库中。如果满足故障条件,例如db不可用,kafka消费者库是否提供重试机制?如果是,是否有方法设置不同的重试间隔,如第1次重试应在5分钟后进行,第2次重试应在30分钟后进行,第3次重试应在1小时后进行等。

  • 我们有一个制作人 在开发过程中,我重新部署了producer应用程序,并做了一些更改。但在此之后,我的消费者没有收到任何消息。我尝试重新启动消费者,但没有成功。问题可能是什么和/或如何解决? 消费者配置: 生产者配置: 编辑2: 5分钟后,消费者应用程序死亡,但以下情况除外:

  • 在使用Spring Kafka Consumer时,我有时会收到以下错误消息。如代码片段所示,我至少实现了一次语义 1)我的疑问是,我是否错过了来自消费者的任何信息? 2) 我需要处理这个错误吗。由于 org.apache.kafka.clients.consumer.提交失败异常:无法完成偏移提交,因为消费者不是自动分区分配的活动组的一部分;消费者很可能被踢出组。 我的SpringKafka消费

  • 我使用的是Camel 2.16.1。关闭后,骆驼的消费者仍然接受新消息。有没有办法迫使消费者立即停止消费。这里也有同样的问题:驼峰关机策略:飞行中的信息不会减少 我为这个问题创建了一个测试用例: 运行测试用例时,我们可以看到机上交换的数量在开始优雅关闭后增加:

  • 我正在使用以下在docker上运行kafka、zookeeper和kafdrop: 我有一个具有以下配置的Spring Boot Producer应用程序-: 在我的中,我有以下内容: 这是一个单独的应用程序,我在我的服务中这样称呼Kafka制作人: 在一个完全不同的spring引导应用程序中,我有一个像这样的使用者: 我可以看到消费者正在连接到代理,但是有消息的日志。下面是我能看到的完整日志:

  • 我需要使用consume process Product模式来处理Kafka消息,并已使用Kafka事务管理器配置了Spring Kafka侦听器容器,还设置了事务id前缀以启用Kafka事务。我正在使用批处理的ack模式,并试图了解在这种模式下,在事务中何时提交偏移量。文档似乎表明,一旦使用了轮询中的所有记录,ack模式批提交偏移量——在事务上下文中也是这样吗,即每个轮询1个事务? 或者,在使用