当前位置: 首页 > 知识库问答 >
问题:

KafkaListenerEndpointContainer无法使用Spring Kafka创建Kafka事务

段干玺
2023-03-14

我使用的是Spring-Kafka2.2.2.release(org.apache.kafka:kafka-clients:jar:2.0.1)和spring-boot(2.1.1)。我无法执行事务,因为我的侦听器无法获得分配的分区。我只为一个消费者创建了建议的配置。我正在尝试配置一个事务性侦听器容器,并且只处理一次

我使用事务管理器配置了生产者和使用者,生产者使用事务id,使用者使用isolation.level=read_committed。

@Bean(name = "producerFactory")
        public ProducerFactory<String, MyObject> producerFactory() {
            Map<String, Object> configProps = new HashMap<>();
            configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
            configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
            configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
            configProps.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, false);
            configProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,true);
            configProps.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"txApp");
            DefaultKafkaProducerFactory<String, KafkaSerializer> producerFactory = new DefaultKafkaProducerFactory<>(configProps);
            producerFactory.setTransactionIdPrefix("tx.");

                    return producerFactory;
        }



@Bean
    public KafkaTransactionManager<?, ?> kafkaTransactionManager() {
        KafkaTransactionManager<?, ?> kafkaTransactionManager = new KafkaTransactionManager<>(producerFactory());
        // ...
        return kafkaTransactionManager;
    }

@Bean(name="appTemplate")
    public KafkaTemplate<String,MyObject> kafkaTemplate(){
        KafkaTemplate<String, MyObject> kafkaTemplate = new KafkaTemplate<>(
                producerFactory());
        return kafkaTemplate;
    }

//Consumer

@Bean("kafkaListenerContainerFactory")
    public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
                                                                          ConsumerFactory kafkaConsumerFactory,
                                                                          KafkaTransactionManager kafkaTransactionManager) {
        ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
        configurer.configure(factory, kafkaConsumerFactory);
        factory.getContainerProperties().setTransactionManager(kafkaTransactionManager());
        return factory;
    }

//in the Consumer
   @KafkaListener(topics = "myTopic", groupId = "ingest", concurrency = "4")
    public void listener(@Payload MyObject message,
                         @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) throws ExecutionException, InterruptedException {

...

// In my producer

myTemplate.executeInTransaction(t-> t.send(kafkaConfig.getTopicName(), myMessage));

我希望看到消息到达我的监听器,但当我执行生产者时,我得到以下错误:

22-07-2019 10:21:55.283 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] ERROR  o.a.k.c.c.i.ConsumerCoordinator.onJoinComplete request.id= request.caller=  - [Consumer clientId=consumer-2, groupId=ingest] User provided listener org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer$ListenerConsumerRebalanceListener failed on partition assignment 
org.springframework.transaction.CannotCreateTransactionException: Could not create Kafka transaction; nested exception is org.apache.kafka.common.errors.TimeoutException: Timeout expired while initializing transactional state in 60000ms.
    at org.springframework.kafka.transaction.KafkaTransactionManager.doBegin(KafkaTransactionManager.java:150)
    at org.springframework.transaction.support.AbstractPlatformTransactionManager.getTransaction(AbstractPlatformTransactionManager.java:378)
    at org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:137)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer$ListenerConsumerRebalanceListener.onPartitionsAssigned(KafkaMessageListenerContainer.java:1657)
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:283)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:422)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:352)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:337)
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:343)
    at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1218)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1175)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1154)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:719)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:676)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.common.errors.TimeoutException: Timeout expired while initializing transactional state in 60000ms.

共有1个答案

姜俊逸
2023-03-14

看一看服务器日志;很可能您没有足够的副本来支持事务(默认情况下为3个)。如果只是测试,可以将其设置为1。

请参阅代理属性transaction.state.log.replication.factormin.insync.replicas

事务主题的复制因子(设置较高以确保可用性)。在群集大小满足此复制因子要求之前,内部主题创建将失败。

当生产者将ACK设置为“全部”(或“-1”)时,此配置指定必须确认写入的副本的最小数量,以便将写入视为成功。如果不能满足此最小值,则生成器将引发异常(NotEnoughReplicas或NotEnoughReplicasAfterAppend)。当结合使用min.insync.replicas和acks时,您可以强制执行更高的耐久性保证。一个典型的场景是创建一个复制因子为3的主题,将min.insync.replicas设置为2,并使用“all”的ACK进行生产。这将确保如果大多数副本没有收到写操作,则生成器会引发异常。

 类似资料:
  • 我还更改了zookeeper中的zoo.cfg。 和server.properties。 我看了所有的教程,并做了完全相同的方法。还有乌斯金Kafka开放式动物园管理员。 3)创建主题 .\bin\windows\kafka-topics.bat--create-zookeeper localhost:2181-replication-factor 1-partitions 1-topic hel

  • 我正在尝试在分布式模式下运行Kafka辅助角色。与独立模式不同,我们在分布式模式下启动辅助角色时无法传递连接器属性文件。在分布式模式下,辅助角色是单独启动的,我们使用REST API部署和管理这些辅助角色上的连接器 参考链接 - https://docs.confluent.io/current/connect/managing/configuring.html#connect-managing-

  • 我试图连接到一个Kafka集群通过SASL_SSL协议与jaas配置如下: 上面的配置与Spring-Cloud-stream官方git repo上提供的示例配置是内联的。 在库的git repo上提出的类似问题说,它在最新版本中已经修复,但似乎没有。得到以下错误: SpringBootVersion: 2.2.8和Spring-Cloud-stream-依赖版本-Horsham。SR6. 这让我

  • 我们有一个Spring Boot应用程序,它使用来自IBM MQ的消息进行一些转换,并将结果发布到Kafka主题。我们使用https://spring.io/projects/spring-kafka为了这个。我知道Kafka不支持XA;然而,在文档中,我找到了一些关于使用ChainedKafkaTransactionManager链接多个事务管理器并同步事务的输入。同一文档还提供了一个示例,说明

  • 问题内容: 我无法在数据库(mySQL)中创建表,使用并尝试使用以下命令输入未来表的名称: 然后,在用户输入表名称之后,我尝试构造并调用该语句: 如果我尝试不输入名称就执行它(如常量字符串:“ CREATE TABLE newtable(…)”,但我需要输入名称),它将很好地工作。 问题答案: 阅读表名后,您将必须格式化字符串,例如: 然后创建像:

  • 我已经在集群中配置了3个kafka,我正在尝试与sping-kafka一起使用。 但是在我杀死kafka领导者后,我无法发送其他消息到队列。 我将Spring.kafka.bootstrap-servers属性设置为:“kafka-1:9092;kafka-2:9093,kafka-3:9094”以及我的主机文件中的所有名称。 Kafka0.10版 有人知道如何正确配置? 编辑 我测试过一个东西,