我想在事务中使用SpringKafka,但我真的不明白应该如何配置它以及它是如何工作的。
这是我的配置
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
props.put(ProducerConfig.RETRIES_CONFIG, String.valueOf(Integer.MAX_VALUE));
props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1);
props.put(ProducerConfig.ACKS_CONFIG, "all");
此配置用于事务id前缀为的DefaultKafkaProducerFactory:
defaultKafkaProducerFactory.setTransactionIdPrefix("my_app.");
问题一:
我应该如何选择这个交易ID前缀?如果我理解正确,这个前缀被Spring用来为创建的每个生产者生成一个事务性id。
为什么我们不能只使用"UUID。随机UUID()?
问题二:
如果生产者被销毁,它将生成一个新的事务性id。因此,如果应用程序崩溃,在重新启动时它将重用旧的事务性id。
这正常吗???
问题3:
我正在使用部署在云上的应用程序,它可以自动扩展/缩小。这意味着我的前缀无法修复,因为每个实例上的所有生产者都有冲突的事务性id。
我应该在其中添加一个随机部分吗?当实例缩小/增大或崩溃并重新启动时,是否需要恢复相同的前缀?
问题4:
最后但并非最不重要的一点是,我们正在使用Kafka的凭证。这似乎不起作用:
Current ACLs for resource `TransactionalId:my_app*`:
User:CN... has Allow permission for operations: All from hosts: *
在知道已生成事务ID的情况下,如何设置ACL?
编辑1
进一步阅读后,如果我理解正确。
如果从P0(分区)读取C0(消费者)。如果经纪人开始消费者再平衡。P0可以被分配给另一个消费者C1。这个消费者C1应该使用与之前C0相同的交易ID来防止重复(僵尸Geofence)?
在SpringKafka,你是如何做到这一点的?事务id似乎与使用者无关,因此与分区读取无关。
谢啦
>
由于僵尸Geofence,您不能使用随机TID-如果服务器崩溃,您可能会在主题中有一个永远不会完成的部分事务,并且不会从任何分区中消耗更多的事务交易。
这是故意的——出于上述原因。
同样,你不能随机化;基于上述原因。
例如,CloudFoundry有一个指示实例索引的环境变量。如果您使用的云平台不包含类似的内容,那么您必须以某种方式对其进行模拟。然后,在事务id中使用它:
spring.kafka.producer.transaction-id-prefix=foo-${instance.index}-
ACLs——我不能回答这个问题;我不熟悉Kafka的权限;最好是单独问一个问题。
我认为我们需要向Spring添加一些逻辑,以确保相同的事务id始终用于特定的主题/分区。
https://github.com/spring-projects/spring-kafka/issues/800#issuecomment-419501929
编辑
自这个答案(KIP-447)以来,情况发生了变化;如果你的经纪人是2.5。0或更高版本-请参阅。https://docs.spring.io/spring-kafka/docs/2.5.5.RELEASE/reference/html/#exactly-一劳永逸https://docs.spring.io/spring-kafka/docs/2.6.0-SNAPSHOT/reference/html/#exactly-一次
使用Kafka作为微服务体系结构中的消息传递系统,我想知道哪一个是首选,是Spring Kafka,还是Spring Integration Kafka,为什么?还有,我们根据什么因素来决定选择哪一个?
我能够使用ErrorDesrializationHandler成功处理反序列化错误,但当我重新启动我的消费者时,它再次开始重新处理由于反序列化而导致的所有失败消息。 由于反序列化异常无法到达Kafka Listener,如何确认并提交偏移量? 谢谢。 我正在使用的自定义错误处理程序: }
我有一个springboot应用程序,它侦听Kafka流并将记录发送到某个服务以进行进一步处理。服务有时可能会失败。注释中提到了异常情况。到目前为止,我自己模拟了服务成功和异常场景。 侦听器代码: 用户工厂配置如下: 由于REST服务正在抛出RestClientException,它应该进入上面提到的if块。关于FixedBackOff,我不希望SeekToCurrentErrorHandler执
我已经在集群中配置了3个kafka,我正在尝试与sping-kafka一起使用。 但是在我杀死kafka领导者后,我无法发送其他消息到队列。 我将Spring.kafka.bootstrap-servers属性设置为:“kafka-1:9092;kafka-2:9093,kafka-3:9094”以及我的主机文件中的所有名称。 Kafka0.10版 有人知道如何正确配置? 编辑 我测试过一个东西,
我试图通过stackoverflow搜索这个问题,但找不到合理的答案。我对@Kafkalistener注释的方法的签名很好奇,我们需要遵循什么约定吗?。如果我们想通过方法注入传入spring管理的bean呢?我试图通过方法注入将服务类传递给kafka侦听器- 我得到以下例外- 如果我使相关服务类自动生成,工作正常。
如果我不想使用自动通信模式,sping提供了另一种方法。 spring kafkfa/#提交补偿为我们提供了以下关于提交补偿的信息: -当侦听器在处理记录后返回时提交偏移量 -处理poll()返回的所有记录后提交偏移量 -当poll()返回的所有记录都已被处理时,只要超过自上次提交以来的确认时间,就提交偏移量 -当poll()返回的所有记录都已被处理时,只要自上次提交以来已收到ackCount记录