当前位置: 首页 > 知识库问答 >
问题:

带有消费者/生产者API的Spring Cloud Stream for Kafka,具有事务id前缀的精确一次语义未按预期工作

仲皓君
2023-03-14

我有一个场景,我看到了不同的行为。共有3种不同的服务

  • 第一个服务将从Solace队列监听并将其生成到kafka topic-1(启用事务的地方)
  • 第二服务将从上面的kafka主题-1监听并将其写入另一个kafka主题-2(我们没有手动提交,启用事务以生成到其他主题,自动提交偏移为false

现在,在我在第二个服务中启用事务和隔离级别后,问题是我无法读取任何消息,如果我在第二个服务中禁用了事务,我就能够读取所有消息。

  • 我们可以启用事务吗

编辑:下面是我的yml的样子

 - kafka:
   - binder:
     - transaction:
         - transaction-id-prefix:
       - brokers: 
         - configuration: 
               all my consumer properties (ssl, sasl)

已更新(yml with spring cloud):

spring: 
  cloud.stream:
      bindings:
        input:
          destination: test_input
          content-type: application/json
          group: test_group
        output:
          destination: test_output
          content-type: application/json
      kafka.binder: 
          configuration: 
            isolation.level: read_committed
            security.protocol: SASL_SSL
            sasl.mechanism: GSSAPI
            sasl.kerberos.service.name: kafka
            ssl.truststore.location: jks
            ssl.truststore.password: 
            ssl.endpoint.identification.algorithm: null            
          brokers: broker1:9092,broker2:9092,broker3:9092
          auto-create-topics: false
          transaction:
            transaction-id-prefix: trans-2
            producer:
              configuration:
                retries: 2000
                acks: all
                security.protocol: SASL_SSL
                sasl.mechanism: GSSAPI
                sasl.kerberos.service.name: kafka
                ssl.truststore.location: jks
                ssl.truststore.password: 
                ssl.endpoint.identification.algorithm: null

更新(带有Spring kafka的yml):

spring:
  kafka:
    bootstrap-servers: broker1:9092,broker2:9092,broker3:9092
    consumer:
      properties:
        isolation.level: read_committed
        ssl.truststore.location: truststore.jks
        ssl.truststore.password: 
        security.protocol: SASL_SSL
        sasl.mechanism: GSSAPI
        sasl.kerberos.service.name: kafka
    producer:
      transaction-id-prefix: trans-2
      retries: 2000
      acks: all
      properties:
        ssl.truststore.location: truststore.jks
        ssl.truststore.password: 
        security.protocol: SASL_SSL
        sasl.mechanism: GSSAPI
        sasl.kerberos.service.name: kafka
    admin:
      properties:
        ssl.truststore.location: truststore.jks
        ssl.truststore.password: 
        security.protocol: SASL_SSL
        sasl.mechanism: GSSAPI
        sasl.kerberos.service.name: kafka

使用动态目标更新

Caused by: java.lang.IllegalStateException: Cannot perform operation after producer has been closed
    at org.apache.kafka.clients.producer.KafkaProducer.throwIfProducerClosed(KafkaProducer.java:810) ~[kafka-clients-2.0.0.jar:na]
    at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:819) ~[kafka-clients-2.0.0.jar:na]
    at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:803) ~[kafka-clients-2.0.0.jar:na]
    at org.springframework.kafka.core.DefaultKafkaProducerFactory$CloseSafeProducer.send(DefaultKafkaProducerFactory.java:423) ~[spring-kafka-2.2.0.RELEASE.jar:2.2.0.RELEASE]
    at org.springframework.kafka.core.KafkaTemplate.doSend(KafkaTemplate.java:351) ~[spring-kafka-2.2.0.RELEASE.jar:2.2.0.RELEASE]
    at org.springframework.kafka.core.KafkaTemplate.send(KafkaTemplate.java:209) ~[spring-kafka-2.2.0.RELEASE.jar:2.2.0.RELEASE]
    at org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler.handleRequestMessage(KafkaProducerMessageHandler.java:382) ~[spring-integration-kafka-3.1.0.RELEASE.jar:3.1.0.RELEASE]
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:123) [spring-integration-core-5.1.0.RELEASE.jar:5.1.0.RELEASE]
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:169) [spring-integration-core-5.1.0.RELEASE.jar:5.1.0.RELEASE]

尝试了动态目标解析器问题的两种方法:动态目标解析器

共有1个答案

墨财
2023-03-14

这对我来说很好;这些都在同一个应用程序中,但这不会有什么不同。。。

@SpringBootApplication
@EnableBinding(Channels.class)
public class So55419549Application {

    public static void main(String[] args) {
        SpringApplication.run(So55419549Application.class, args);
    }

    @Bean
    public IntegrationFlow service1(MessageChannel out1) {
        return IntegrationFlows.from(() -> "foo", e -> e
                    .poller(Pollers.fixedDelay(Duration.ofSeconds(5))))
                .log(Level.INFO, m -> "s1 " + m.getPayload())
                .channel(out1)
                .get();
    }

    @StreamListener("in2")
    @SendTo("out2")
    public String service2(String in) {
        System.out.println("s2 " + in);
        return in.toUpperCase();
    }

    @StreamListener("in3")
    public void service3(String in) {
        System.out.println("s3 " + in);
    }

}

interface Channels {

    @Output
    MessageChannel out1();

    @Input
    MessageChannel in2();

    @Output
    MessageChannel out2();

    @Input
    MessageChannel in3();

}

spring:
  cloud:
    stream:
      bindings:
        out1:
          destination: topic1
        in2:
          group: s2
          destination: topic1
        out2:
          destination: topic2
        in3:
          group: s3
          destination: topic2
      kafka:
        binder:
          transaction:
            transaction-id-prefix: tx
        bindings:
          in2:
            consumer:
              configuration:
                isolation:
                  level: read_committed
          in3:
            consumer:
              configuration:
                isolation:
                  level: read_committed
  kafka:
    producer:
      # needed again here so boot declares a TM for us
      transaction-id-prefix: tx
      retries: 10
      acks: all
logging:
  level:
    org.springframework.kafka.transaction: debug

2019-03-29 12:57:08.345  INFO 75700 --- [ask-scheduler-1] o.s.integration.handler.LoggingHandler   
    : s1 foo
2019-03-29 12:57:08.353 DEBUG 75700 --- [container-0-C-1] o.s.k.t.KafkaTransactionManager          : Creating new transaction with name [null]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT
2019-03-29 12:57:08.353 DEBUG 75700 --- [container-0-C-1] o.s.k.t.KafkaTransactionManager          : Created Kafka transaction on producer [CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@6790c874, txId=txs2.topic1.0]]
s2 foo
2019-03-29 12:57:08.357 DEBUG 75700 --- [container-0-C-1] o.s.k.t.KafkaTransactionManager          : Creating new transaction with name [null]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT
2019-03-29 12:57:08.358 DEBUG 75700 --- [container-0-C-1] o.s.k.t.KafkaTransactionManager          : Created Kafka transaction on producer [CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@820ef3d, txId=txs3.topic2.0]]
s3 FOO

编辑

绑定器未在事务管理器上启用事务同步。作为解决方法,添加

TransactionSynchronizationManager.setActualTransactionActive(true);

发送到您的StreamListener。

我在活页夹上打开了一个错误。

 类似资料:
  • 对于已由侦听器容器启动的事务,我们需要为所有应用实例设置相同的事务id前缀。对于仅生产事务,我们需要为每个实例设置不同的值。 我在应用程序中使用了Spring Cloud Stream Kafka活页夹,它既有事务类型,也有属性Spring。云流动Kafka。粘合剂交易事务id前缀用于创建公共事务管理器。 我想知道如何使这一切正常工作,因为似乎你不能同时拥有这两种方式。

  • 我正在编写一个程序,其中几个生产者生成一些应该由几个消费者处理的数据。由于每条数据的消耗大约需要100ms,而目标平台有很多处理器,所以在我看来,每个生产者和每个消费者都得到自己的线程似乎是很自然的。我的问题是:Qt信号/插槽是将数据块从生产者传递到消费者的好方法吗?还是建议更好的解决方案(强烈首选Qt)。 为了防患于未然,制作者每小时产生几十万个数据。

  • 我发现Kafka有一些非常奇怪的地方。 我有一个制片人,有三个经纪人: 然后,我尝试使用新的API运行消费者: 我什么都没有!但是如果我使用旧的API: 我收到我的留言了! 我怎么了? PS:我用的是Kafka10

  • 我正在与Java一起研究生产者-消费者问题的多生产者和消费者用例。代码在GitHub上。同样的实现适用于单个生产者和消费者用例,但对于多生产者和消费者用例却表现得很奇怪。 我有一些关于输出的问题: 一开始,所有生产者和一个消费者都有锁: 我想所有的线程都应该竞争锁,并且最多应该有一个线程拥有所有时间的锁?是不是所有的制作人都共用这个锁?当生产者线程t1持有锁时,使用者线程t5是如何获得锁的? 它运

  • 我试图用阻塞队列实现一些消费者-生产者问题。为了达到某种目的,我决定编写文件搜索工具。 我认为搜索机制是递归工作的,每个新目录都将有新的线程池来提高搜索速度。 我的问题是,我不知道如何实现最终停止打印线程(消费者)的机制——当搜索线程完成工作时。 我试图用一些想法来做到这一点,比如毒丸,但它效果不佳(线程在打印任何结果之前停止)。任何想法我该怎么做? 下面是一些代码: 搜索机制: } 打印机: }

  • 与服务人员一起进行脱机工作,从https://angular.io/guide/service-worker-config 应用程序脱机工作正常唯一的问题是,如果我使用应用程序脱机工作正常,但缓存中的文件在服务器上的文件更新时从不更新。如果我使用“installMode”:“lazy”`应用程序无法脱机工作。 以下是。 还尝试了,但仍然缓存未在更改时更新的文件