当前位置: 首页 > 知识库问答 >
问题:

如何在具有事务的反应式生产者中从无法访问的Kafka中恢复?

常雪风
2023-03-14

我有一个简单的反应式Kafka生产者,我需要它来手动提交事务。在Reactor Kafka for method begin()中的javadoc之后,我创建了一个producer方法

@Autowired
private ReactiveKafkaProducerTemplate<String, String> kafkaTemplate;

public Mono<Void> send(String message) {
    return kafkaTemplate.transactionManager()
            .begin()
            .then(kafkaTemplate.send("my.test.topic", message))
            .then(kafkaTemplate.transactionManager().commit());
}

其配置为

@Bean
public ReactiveKafkaProducerTemplate<String, String> reactiveKafkaProducerTemplate() {
    Map<String, Object> props = new HashMap<>();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "http://localhost:9092");
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
    props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "producer-tx-1");

    return new ReactiveKafkaProducerTemplate<>(SenderOptions.create(props));
}

它运行良好,我正在向Kafka发送消息。但现在我需要保护它,使其免受我们公司潜在Kafka/网络中断的影响。为了模拟这种情况,我停止了我的Kafkadocker,发送了一条消息,一段时间后再次启动Kafka。但奇怪的事情正在发生。我遇到了三种可能的情况:

  1. 当我在大约一分钟内启动Kafka时,制作人恢复了连接和事务,并很好地完成了发送
org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for my.test.topic-0:219057 ms has passed since batch creation

此状态不可恢复,每次后续尝试都会失败

org.apache.kafka.common.KafkaException: Cannot execute transactional method because we are in an error state

我必须重新启动应用程序

TransactionalId producer-tx-1: Invalid transition attempted from state IN_TRANSACTION to state IN_TRANSACTION

此状态也无法恢复,我必须重新启动应用程序。

如何从这种情况下恢复?我想实现的是生产者等待f. e.20秒,如果连接没有重新建立,那么就抛出异常并丢弃请求。最重要的是,系统不应该因为无法访问Kafka而崩溃。

起初,我认为需要使用kafkaTemplate手动中止事务。transactionManager()。abort()在某些中。doOnError()但这不起作用(在场景3中为f.e.,代码不会引发任何异常)。然后,我玩了一下ProducerConfig,尝试了TRANSACTION\u TIMEOUT\u CONFIG、REQUEST\u TIMEOUT\u MS\u CONFIG或DELIVERY\u TIMEOUT\u MS\u CONFIG,但请求没有超时,不确定我做错了什么。

共有1个答案

朱建弼
2023-03-14

>

之后,您将需要创建一个新的生产者并处理旧的生产者。

 类似资料:
  • 我从源主题收到一条消息。然后我将消息分成3个部分,并将每个部分发送到3个不同的主题。现在有2条消息成功传递到第2个主题。但是在发送第3条消息时,我们会收到异常(例如ProducerFencedException|OutOfOrderSequenceException|AuthorizationException|RecordLengthException) 如何回滚/还原前2个主题中的其他2条消息

  • 对于已由侦听器容器启动的事务,我们需要为所有应用实例设置相同的事务id前缀。对于仅生产事务,我们需要为每个实例设置不同的值。 我在应用程序中使用了Spring Cloud Stream Kafka活页夹,它既有事务类型,也有属性Spring。云流动Kafka。粘合剂交易事务id前缀用于创建公共事务管理器。 我想知道如何使这一切正常工作,因为似乎你不能同时拥有这两种方式。

  • 我是Kafka生态系统的新手,在我的例子中,我使用的是Java生产者,但不需要发送密钥以及序列化Avro的记录值。有没有办法构建一个Java生产者来不发送密钥,或者在Kafka中发送消息时需要密钥?

  • 当配置为事务生产者时,在面向请求(例如http或RPC服务器)的应用程序中管理Kafka生产者对象的最佳实践是什么?具体来说,如何在服务线程之间共享生产者对象,以及如何定义事务。这些对象的id配置值? 在非事务性使用中,生产者对象是线程安全的,并且在所有请求服务线程之间共享一个对象是很常见的。设置kafka消费者线程使用的事务性生产者对象也很简单,只需为每个消费者线程实例化一个对象即可工作得很好。

  • 我正在研究Kafka支持的事务生产者和这两个链接中描述的精确一次处理: 1)https://www.confluent.io/blog/exactly-once-semantics-are-possible-heres-how-apache-kafka-does-it/2)https://www.confluent.io/blog/transactions-apache-kafka/ 对于用户可以

  • 通过 allowNon 事务属性,可以使用相同的 KafkaTemplate 来创建事务性和非事务性生产者。这两种生产者将使用来自Kafka生产者工厂的相同配置。 在某些特定情况下,可能需要为这两种生产者使用不同的配置。 下面的例子展示了一个具体的用例 KafkaTemplate-非事务性生产者- 问题可以通过拥有2个工厂和/或2个模板来解决,但为了避免样板代码和复杂性,我们的想法是保持相同的模板