当前位置: 首页 > 知识库问答 >
问题:

Spring Cloud Stream Kafka活页夹:“试图从状态IN_TRANSACTION转换到状态IN_TRANSACTION无效”

贺光华
2023-03-14

我在生产者中安装了Apache Kafka“Kafka2.11-1.0.0”并定义了“TransactionIDPrefix”,我知道这是在Spring Kafka中启用事务所需要做的唯一一件事,但是当我这样做并在同一个应用程序中运行简单的源和接收器绑定时,我会看到消费者收到并打印了一些消息,而有些消息会出现错误。

例如,收到的消息#6:

[49] Received message [Payload String content=FromSource1 6][Headers={kafka_offset=1957, scst_nativeHeadersPresent=true, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@6695c9a9, kafka_timestampType=CREATE_TIME, my-transaction-id=my-id-6, id=302cf3ef-a154-fd42-6b43-983778e275dc, kafka_receivedPartitionId=0, contentType=application/json, kafka_receivedTopic=test10, kafka_receivedTimestamp=1514384106395, timestamp=1514384106419}]

但是消息#7有一个错误“试图从状态IN_TRANSACTION转换到状态IN_TRANSACTION无效”:

2017-12-27 16:15:07.405 ERROR 7731 --- [ask-scheduler-4] o.s.integration.handler.LoggingHandler   : org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder$ProducerConfigurationMessageHandler@7d3bbc0b]; nested exception is org.apache.kafka.common.KafkaException: TransactionalId my-transaction-3: Invalid transition attempted from state IN_TRANSACTION to state IN_TRANSACTION, failedMessage=GenericMessage [payload=byte[13], headers={my-transaction-id=my-id-7, id=d31656af-3286-99b0-c736-d53aa57a5e65, contentType=application/json, timestamp=1514384107399}]
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:153)
    at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder$SendingHandler.handleMessageInternal(AbstractMessageChannelBinder.java:575)
  • 此错误表示什么?
  • 我的配置缺少什么吗?
  • 启用事务时,是否需要以不同的方式实现源或接收器?

找不到如何在启用Kafka绑定+Trasanctions的情况下使用Spring Cloud Stream的示例

为了重现,需要使用spring boot版本“2.0.0.M5”和“Spring-Cloud-Stream-Dependencies”版本“elmhurst.m3”创建一个简单的maven项目,并使用以下配置创建一个简单的应用程序:

server:
  port: 8082
spring:
  kafka:
    producer:
      retries: 5555
      acks: "all"
  cloud:
    stream:
      kafka:
        binder:
          autoAddPartitions: true
          transaction:
            transactionIdPrefix: my-transaction-
      bindings:
        output1:
          destination: test10
          group: test111
          binder: kafka
        input1:
          destination: test10
          group: test111
          binder: kafka
          consumer:
            partitioned: true

我还创建了简单的源和接收器类:

@EnableBinding(SampleSink.MultiInputSink.class)
public class SampleSink {

    @StreamListener(MultiInputSink.INPUT1)
    public synchronized void receive1(Message<?> message) {
        System.out.println("["+Thread.currentThread().getId()+"] Received message " + message);
    }

    public interface MultiInputSink {
        String INPUT1 = "input1";

        @Input(INPUT1)
            SubscribableChannel input1();

    }
}

和:

@EnableBinding(SampleSource.MultiOutputSource.class)
public class SampleSource {

    AtomicInteger atomicInteger = new AtomicInteger(1);

    @Bean
    @InboundChannelAdapter(value = MultiOutputSource.OUTPUT1, poller = @Poller(fixedDelay = "1000", maxMessagesPerPoll = "1"))
    public synchronized MessageSource<String> messageSource1() {
        return new MessageSource<String>() {
            public Message<String> receive() {
                String message = "FromSource1 "+atomicInteger.getAndIncrement();
                m.put("my-transaction-id","my-id-"+ UUID.randomUUID());
                return new GenericMessage(message, new MessageHeaders(m));
            }
        };
    }

    public interface MultiOutputSource {
        String OUTPUT1 = "output1";

        @Output(OUTPUT1)
            MessageChannel output1();

    }
}

共有1个答案

慕凌龙
2023-03-14

我在这个项目的GitHub上开了一张票。请参考那里的答案和讨论:

https://github.com/spring-cloud/spring-cloud-stream/issues/1166

但第一个答案是:

 类似资料:
  • 状态转换的一般含义是,相同情况的不同形式,并且根据含义,状态转换方法也是如此。当不同的输入值赋予相同的函数时,它用于捕获软件应用程序的行为。 我们都使用过自动取款机,当从中取款时,它会显示帐户详细信息。现在再次进行另一次交易,然后再次显示帐户详细信息,但第二次交易后显示的详细信息与第一次交易不同,但两个详细信息都使用ATM的相同功能显示。所以这里使用了相同的函数,但每次输出不同时,这称为状态转换。

  • 帮助用户快速部署无状态的应用。 无状态应用即Deployment,Deployment的详细介绍内容,请参考kubernets官方文档-Deployments。 Pod是Kubernetes的最小编排单位,无状态deployment应用即通过声明pod模板等信息编排部署无状态应用,适用于pod完全一样、没有顺序、无所谓运行在哪台主机的应用。 入口:在云管平台单击左上角导航菜单,在弹出的左侧菜单栏中

  • 我引用了前台和后台接收firebase通知的答案-->https://stackoverflow.com/A/38451582/12553303 其实下面是我的疑问:-------- 如果我没有为前台条件编码(谈论推送通知),当我的应用程序在后台时,我仍然会得到通知,对吗????-->是的 但是,当im处于前台状态时,我从firebase推送了一个通知-->我不会在状态栏上看到通知,这也是可以的(

  • 在我的React JS项目中,我正在处理。我已经通过了使用进行私有路由和身份验证的示例。 https://reacttraining.com/react-router/web/example/auth-workflow 根据这个留档,他们创建了一个作为无状态组件。 但我的要求是将其转换为有状态的React组件,因为我想将我的组件连接到redux存储。 这是我的代码。 无状态组件 我将这个组件转换成

  • M600 智能手表通过小尺寸指示图标显示它的当前状态。 以下状态图标可能出现在 M600 的主页屏幕上。 M600 与手机断开连接 电池正在充电 飞行模式已开启 GPS 位置监控功能开启 “勿扰模式”已开启 您有新通知