当前位置: 首页 > 知识库问答 >
问题:

SpringKafka与交易

陈哲
2023-03-14

我想在事务中使用SpringKafka,但我真的不明白应该如何配置它以及它是如何工作的。

这是我的配置

    props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
    props.put(ProducerConfig.RETRIES_CONFIG, String.valueOf(Integer.MAX_VALUE));
    props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1);
    props.put(ProducerConfig.ACKS_CONFIG, "all");

此配置用于事务id前缀为的DefaultKafkaProducerFactory:

defaultKafkaProducerFactory.setTransactionIdPrefix("my_app.");

问题一:

我应该如何选择这个交易ID前缀?如果我理解正确,这个前缀被Spring用来为创建的每个生产者生成一个事务性id。

为什么我们不能只使用"UUID。随机UUID()?

问题二:

如果生产者被销毁,它将生成一个新的事务性id。因此,如果应用程序崩溃,在重新启动时它将重用旧的事务性id。

这正常吗???

问题3:

我正在使用部署在云上的应用程序,它可以自动扩展/缩小。这意味着我的前缀无法修复,因为每个实例上的所有生产者都有冲突的事务性id。

我应该在其中添加一个随机部分吗?当实例缩小/增大或崩溃并重新启动时,是否需要恢复相同的前缀?

问题4:

最后但并非最不重要的一点是,我们正在使用Kafka的凭证。这似乎不起作用:

Current ACLs for resource `TransactionalId:my_app*`:
    User:CN... has Allow permission for operations: All from hosts: *

在知道已生成事务ID的情况下,如何设置ACL?

编辑1

进一步阅读后,如果我理解正确。

如果从P0(分区)读取C0(消费者)。如果经纪人开始消费者再平衡。P0可以被分配给另一个消费者C1。这个消费者C1应该使用与之前C0相同的交易ID来防止重复(僵尸Geofence)?

在SpringKafka,你是如何做到这一点的?事务id似乎与使用者无关,因此与分区读取无关。

谢啦

共有1个答案

蓬长恨
2023-03-14

>

  • 由于僵尸Geofence,您不能使用随机TID-如果服务器崩溃,您可能会在主题中有一个永远不会完成的部分事务,并且不会从任何分区中消耗更多的事务交易。

    这是故意的——出于上述原因。

    同样,你不能随机化;基于上述原因。

    例如,CloudFoundry有一个指示实例索引的环境变量。如果您使用的云平台不包含类似的内容,那么您必须以某种方式对其进行模拟。然后,在事务id中使用它:

    spring.kafka.producer.transaction-id-prefix=foo-${instance.index}-
    

    ACLs——我不能回答这个问题;我不熟悉Kafka的权限;最好是单独问一个问题。

    我认为我们需要向Spring添加一些逻辑,以确保相同的事务id始终用于特定的主题/分区。

    https://github.com/spring-projects/spring-kafka/issues/800#issuecomment-419501929

    编辑

    自这个答案(KIP-447)以来,情况发生了变化;如果你的经纪人是2.5。0或更高版本-请参阅。https://docs.spring.io/spring-kafka/docs/2.5.5.RELEASE/reference/html/#exactly-一劳永逸https://docs.spring.io/spring-kafka/docs/2.6.0-SNAPSHOT/reference/html/#exactly-一次

  •  类似资料:
    • 使用Kafka作为微服务体系结构中的消息传递系统,我想知道哪一个是首选,是Spring Kafka,还是Spring Integration Kafka,为什么?还有,我们根据什么因素来决定选择哪一个?

    • 我能够使用ErrorDesrializationHandler成功处理反序列化错误,但当我重新启动我的消费者时,它再次开始重新处理由于反序列化而导致的所有失败消息。 由于反序列化异常无法到达Kafka Listener,如何确认并提交偏移量? 谢谢。 我正在使用的自定义错误处理程序: }

    • 我有一个springboot应用程序,它侦听Kafka流并将记录发送到某个服务以进行进一步处理。服务有时可能会失败。注释中提到了异常情况。到目前为止,我自己模拟了服务成功和异常场景。 侦听器代码: 用户工厂配置如下: 由于REST服务正在抛出RestClientException,它应该进入上面提到的if块。关于FixedBackOff,我不希望SeekToCurrentErrorHandler执

    • 我已经在集群中配置了3个kafka,我正在尝试与sping-kafka一起使用。 但是在我杀死kafka领导者后,我无法发送其他消息到队列。 我将Spring.kafka.bootstrap-servers属性设置为:“kafka-1:9092;kafka-2:9093,kafka-3:9094”以及我的主机文件中的所有名称。 Kafka0.10版 有人知道如何正确配置? 编辑 我测试过一个东西,

    • 我试图通过stackoverflow搜索这个问题,但找不到合理的答案。我对@Kafkalistener注释的方法的签名很好奇,我们需要遵循什么约定吗?。如果我们想通过方法注入传入spring管理的bean呢?我试图通过方法注入将服务类传递给kafka侦听器- 我得到以下例外- 如果我使相关服务类自动生成,工作正常。

    • 如果我不想使用自动通信模式,sping提供了另一种方法。 spring kafkfa/#提交补偿为我们提供了以下关于提交补偿的信息: -当侦听器在处理记录后返回时提交偏移量 -处理poll()返回的所有记录后提交偏移量 -当poll()返回的所有记录都已被处理时,只要超过自上次提交以来的确认时间,就提交偏移量 -当poll()返回的所有记录都已被处理时,只要自上次提交以来已收到ackCount记录