当前位置: 首页 > 知识库问答 >
问题:

Spring Cloud Stream项目无法获取分区信息错误

梁丘成和
2023-03-14

使用此配置时:

spring:
  cloud:
    stream:
      kafka:
        binder:
          min-partition-count: 1
          replication-factor: 1
  kafka:
    producer:
      transaction-id-prefix: tx-
      retries: 1
      acks: all

我的应用程序启动正确,但事务性。我在控制台输出中看到的id显示为null。我已经将这个额外的配置(事务)应用于spring cloud stream,以获得正确的事务。身份证件:

spring:
  cloud:
    stream:
      kafka:
        binder:
          min-partition-count: 1
          replication-factor: 1
          transaction:
            transaction-id-prefix: txl-
  kafka:
    producer:
      transaction-id-prefix: tx-
      retries: 1
      acks: all

但服务未成功启动,控制台输出显示:

app_poc.1.nqc57nvh0qhr@ms-poc-02    | 2020-11-25 20:27:49.435  INFO [poc,,,] 1 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka version: 2.5.1
app_poc.1.nqc57nvh0qhr@ms-poc-02    | 2020-11-25 20:27:49.437  INFO [poc,,,] 1 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId: 0efa8fb0f4c73d92
app_poc.1.nqc57nvh0qhr@ms-poc-02    | 2020-11-25 20:27:49.437  INFO [poc,,,] 1 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka startTimeMs: 1606336069435
app_poc.1.nqc57nvh0qhr@ms-poc-02    | 2020-11-25 20:27:49.597  INFO [poc,,,] 1 --- [           main] o.a.k.clients.producer.ProducerConfig    : ProducerConfig values: 
app_poc.1.nqc57nvh0qhr@ms-poc-02    |   acks = -1
app_poc.1.nqc57nvh0qhr@ms-poc-02    |   batch.size = 16384
app_poc.1.nqc57nvh0qhr@ms-poc-02    |   bootstrap.servers = [kafka:29092]
app_poc.1.nqc57nvh0qhr@ms-poc-02    |   buffer.memory = 33554432
app_poc.1.nqc57nvh0qhr@ms-poc-02    |   client.dns.lookup = default
app_poc.1.nqc57nvh0qhr@ms-poc-02    |   client.id = producer-txl-1
app_poc.1.nqc57nvh0qhr@ms-poc-02    |   compression.type = none
app_poc.1.nqc57nvh0qhr@ms-poc-02    |   connections.max.idle.ms = 540000
app_poc.1.nqc57nvh0qhr@ms-poc-02    |   delivery.timeout.ms = 120000
app_poc.1.nqc57nvh0qhr@ms-poc-02    |   enable.idempotence = true
app_poc.1.nqc57nvh0qhr@ms-poc-02    |   interceptor.classes = []
app_poc.1.nqc57nvh0qhr@ms-poc-02    |   key.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
app_poc.1.nqc57nvh0qhr@ms-poc-02    |   linger.ms = 0
app_poc.1.nqc57nvh0qhr@ms-poc-02    |   max.block.ms = 60000
app_poc.1.nqc57nvh0qhr@ms-poc-02    |   max.in.flight.requests.per.connection = 5
app_poc.1.nqc57nvh0qhr@ms-poc-02    |   max.request.size = 1048576
app_poc.1.nqc57nvh0qhr@ms-poc-02    |   metadata.max.age.ms = 300000
app_poc.1.nqc57nvh0qhr@ms-poc-02    |   metadata.max.idle.ms = 300000
app_poc.1.nqc57nvh0qhr@ms-poc-02    |   metric.reporters = []
app_poc.1.nqc57nvh0qhr@ms-poc-02    |   metrics.num.samples = 2
app_poc.1.nqc57nvh0qhr@ms-poc-02    |   metrics.recording.level = INFO
app_poc.1.nqc57nvh0qhr@ms-poc-02    |   metrics.sample.window.ms = 30000
app_poc.1.nqc57nvh0qhr@ms-poc-02    |   partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
app_poc.1.nqc57nvh0qhr@ms-poc-02    |   receive.buffer.bytes = 32768
app_poc.1.nqc57nvh0qhr@ms-poc-02    |   reconnect.backoff.max.ms = 1000
app_poc.1.nqc57nvh0qhr@ms-poc-02    |   reconnect.backoff.ms = 50
app_poc.1.nqc57nvh0qhr@ms-poc-02    |   request.timeout.ms = 30000
app_poc.1.nqc57nvh0qhr@ms-poc-02    |   retries = 1
app_poc.1.nqc57nvh0qhr@ms-poc-02    |   retry.backoff.ms = 100
app_poc.1.nqc57nvh0qhr@ms-poc-02    |   sasl.client.callback.handler.class = null
app_poc.1.nqc57nvh0qhr@ms-poc-02    |   sasl.jaas.config = null
app_poc.1.nqc57nvh0qhr@ms-poc-02    |   sasl.kerberos.kinit.cmd = /usr/bin/kinit
app_poc.1.nqc57nvh0qhr@ms-poc-02    |   sasl.kerberos.min.time.before.relogin = 60000
app_poc.1.nqc57nvh0qhr@ms-poc-02    |   sasl.kerberos.service.name = null
app_poc.1.nqc57nvh0qhr@ms-poc-02    |   sasl.kerberos.ticket.renew.jitter = 0.05
app_poc.1.nqc57nvh0qhr@ms-poc-02    |   sasl.kerberos.ticket.renew.window.factor = 0.8
app_poc.1.nqc57nvh0qhr@ms-poc-02    |   sasl.login.callback.handler.class = null
app_poc.1.nqc57nvh0qhr@ms-poc-02    |   sasl.login.class = null
app_poc.1.nqc57nvh0qhr@ms-poc-02    |   sasl.login.refresh.buffer.seconds = 300
app_poc.1.nqc57nvh0qhr@ms-poc-02    |   sasl.login.refresh.min.period.seconds = 60
app_poc.1.nqc57nvh0qhr@ms-poc-02    |   sasl.login.refresh.window.factor = 0.8
app_poc.1.nqc57nvh0qhr@ms-poc-02    |   sasl.login.refresh.window.jitter = 0.05
app_poc.1.nqc57nvh0qhr@ms-poc-02    |   sasl.mechanism = GSSAPI
app_poc.1.nqc57nvh0qhr@ms-poc-02    |   security.protocol = PLAINTEXT
app_poc.1.nqc57nvh0qhr@ms-poc-02    |   security.providers = null
app_poc.1.nqc57nvh0qhr@ms-poc-02    |   send.buffer.bytes = 131072
app_poc.1.nqc57nvh0qhr@ms-poc-02    |   ssl.cipher.suites = null
app_poc.1.nqc57nvh0qhr@ms-poc-02    |   ssl.enabled.protocols = [TLSv1.2]
app_poc.1.nqc57nvh0qhr@ms-poc-02    |   ssl.endpoint.identification.algorithm = https
app_poc.1.nqc57nvh0qhr@ms-poc-02    |   ssl.key.password = null
app_poc.1.nqc57nvh0qhr@ms-poc-02    |   ssl.keymanager.algorithm = SunX509
app_poc.1.nqc57nvh0qhr@ms-poc-02    |   ssl.keystore.location = null
app_poc.1.nqc57nvh0qhr@ms-poc-02    |   ssl.keystore.password = null
app_poc.1.nqc57nvh0qhr@ms-poc-02    |   ssl.keystore.type = JKS
app_poc.1.nqc57nvh0qhr@ms-poc-02    |   ssl.protocol = TLSv1.2
app_poc.1.nqc57nvh0qhr@ms-poc-02    |   ssl.provider = null
app_poc.1.nqc57nvh0qhr@ms-poc-02    |   ssl.secure.random.implementation = null
app_poc.1.nqc57nvh0qhr@ms-poc-02    |   ssl.trustmanager.algorithm = PKIX
app_poc.1.nqc57nvh0qhr@ms-poc-02    |   ssl.truststore.location = null
app_poc.1.nqc57nvh0qhr@ms-poc-02    |   ssl.truststore.password = null
app_poc.1.nqc57nvh0qhr@ms-poc-02    |   ssl.truststore.type = JKS
app_poc.1.nqc57nvh0qhr@ms-poc-02    |   transaction.timeout.ms = 60000
app_poc.1.nqc57nvh0qhr@ms-poc-02    |   transactional.id = txl-1
app_poc.1.nqc57nvh0qhr@ms-poc-02    |   value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
app_poc.1.nqc57nvh0qhr@ms-poc-02    | 
app_poc.1.nqc57nvh0qhr@ms-poc-02    | 2020-11-25 20:27:49.599  INFO [poc,,,] 1 --- [           main] o.a.k.clients.producer.KafkaProducer     : [Producer clientId=producer-txl-1, transactionalId=txl-1] Instantiated a transactional producer.
app_poc.1.nqc57nvh0qhr@ms-poc-02    | 2020-11-25 20:27:49.623  INFO [poc,,,] 1 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka version: 2.5.1
app_poc.1.nqc57nvh0qhr@ms-poc-02    | 2020-11-25 20:27:49.624  INFO [poc,,,] 1 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId: 0efa8fb0f4c73d92
app_poc.1.nqc57nvh0qhr@ms-poc-02    | 2020-11-25 20:27:49.624  INFO [poc,,,] 1 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka startTimeMs: 1606336069623
app_poc.1.nqc57nvh0qhr@ms-poc-02    | 2020-11-25 20:27:49.626  INFO [poc,,,] 1 --- [           main] o.a.k.c.p.internals.TransactionManager   : [Producer clientId=producer-txl-1, transactionalId=txl-1] Invoking InitProducerId for the first time in order to acquire a producer ID
app_poc.1.nqc57nvh0qhr@ms-poc-02    | 2020-11-25 20:27:49.637  INFO [poc,,,] 1 --- [ producer-txl-1] org.apache.kafka.clients.Metadata        : [Producer clientId=producer-txl-1, transactionalId=txl-1] Cluster ID: 3wV8FW9yTfKSVhNwNMoC2Q
app_poc.1.nqc57nvh0qhr@ms-poc-02    | 2020-11-25 20:28:49.630 ERROR [poc,,,] 1 --- [           main] o.s.c.s.b.k.p.KafkaTopicProvisioner      : Failed to obtain partition information
app_poc.1.nqc57nvh0qhr@ms-poc-02    | 
app_poc.1.nqc57nvh0qhr@ms-poc-02    | org.apache.kafka.common.errors.TimeoutException: Timeout expired after 60000milliseconds while awaiting InitProducerId

无法获取分区信息我认为我的配置有问题(肯定)

我的意图是有一次,以避免重复。所以我想看transactional.id

额外信息:我的消费者同时使用JPA和Kafka事务(使用chainedKafkaTransactionManager进行事务同步)

编辑:在@Configuration类中,我有这些bean

   @Bean
    @Primary
    fun transactionManager(em: EntityManagerFactory): JpaTransactionManager {
        return JpaTransactionManager(em)
    }

    @Bean
    fun kafkaTransactionManager(producerFactory: ProducerFactory<Any, Any>): KafkaTransactionManager<*, *> {
        return KafkaTransactionManager(producerFactory)
    }

    @Bean
    fun chainedTransactionManager(
        kafkaTransactionManager: KafkaTransactionManager<String, String>,
        transactionManager: JpaTransactionManager,
    ): ChainedKafkaTransactionManager<Any, Any> {
        return ChainedKafkaTransactionManager(kafkaTransactionManager, transactionManager)
    }

    @Bean
    fun kafkaListenerContainerFactory(
        configurer: ConcurrentKafkaListenerContainerFactoryConfigurer,
        kafkaConsumerFactory: ConsumerFactory<Any, Any>,
        chainedKafkaTransactionManager: ChainedKafkaTransactionManager<Any, Any>,
    ): ConcurrentKafkaListenerContainerFactory<*, *> {
        val factory = ConcurrentKafkaListenerContainerFactory<Any, Any>()
        configurer.configure(factory, kafkaConsumerFactory)
        factory.containerProperties.transactionManager = chainedKafkaTransactionManager
        return factory
    }

和对应的@Transactional处理器类

@EnableKafka
@EnableBinding(Channels::class)
@Service
@Transactional
class EventProcessor()
...

根据我所展示的第一个配置,事务同步工作。

我使用此日志配置来确认TransactionSynchronizationManager的初始化事务同步和清除事务同步。

logging:
  level:
    org.springframework.kafka: trace
    org.springframework.transaction: trace

共有1个答案

晁砚
2023-03-14

看到这个答案。

您很可能没有足够的副本或同步副本用于事务日志主题。

使用ChainedKafkaTransactionManager

这仅在spring cloud stream(开箱即用)中支持生产者事务。对于消费-

或者,必须将正确配置的CKTM注入绑定的侦听器容器。

您需要显示您的代码和其余的配置。

 类似资料:
  • 基本信息 Path: /api/project/get Method: GET 接口描述: 请求参数 Query 参数名称 是否必须 示例 备注 token 是 项目token

  • 除了获取客户端的地区信息外,有时他们所在的时区信息也非常有用。LocaleContextResolver接口为LocaleResolver提供了拓展点,允许解析器在LocaleContext中提供更多的信息,这里面就可以包含时区信息。 如果用户的时区信息能被解析到,那么你总可以通过RequestContext.getTimeZone()方法获得。时区信息会自动被SpringConversionSe

  • 执行 gradle projects 命令会为你列出子项目名称列表. 例 11.7. 收集项目信息 gradle -q projects 命令的输出结果 > gradle -q projects ------------------------------------------------------------ Root project ----------------------------

  • 我得到这个错误(我激活了wp-debug)当我试图激活一个主题或当我试图保存后: 警告:无法修改标题信息-标题已由/mnt/webv/b3/13/56920413/htdocs/WordPress_02/wp content/themes/BackpackFamily/functions.php:58中的/mnt/webv/b3/13/56920413/htdocs/WordPress_02/wp

  • 执行 gradle properties 可以获取项目所有属性列表. 如下例: 例 11.16. 属性信息 gradle -q api:properties 的输出结果 > gradle -q api:properties ------------------------------------------------------------ Project :api - The shared A

  • 问题内容: 我正在开发一个Maven插件。我似乎很难弄清楚,从执行MOJO的项目中获取POM信息的好方法是什么? 例如,如果我在另一个Maven项目中执行我的mojo,我想获取项目名称或其他一些参数。 还有一件事,在AbstractMojo.java类中有一个上下文MAP,有私有Map pluginContext,如果我错了,有人可以纠正我,但这应该用于在mojos之间传递信息吗? 问题答案: 您