当前位置: 首页 > 知识库问答 >
问题:

Spring云数据流多主题事务管理

夏季萌
2023-03-14

我正在尝试用Java创建一个PoC应用程序,以了解在使用Kafka进行消息发布时如何在Spring Cloud Stream中进行事务管理。我试图模拟的用例是一个接收消息的处理器。然后,它进行一些处理,并生成两条发送到两个不同主题的新消息。我希望能够将这两条消息作为单个事务发布。因此,如果发布第二条消息失败,我希望滚动(而不是提交)第一条消息。SpringCloudStream支持这样的用例吗?

我已经设置了@Transactional注释,并且我可以看到一个全局事务在消息传递给消费者之前开始。但是,当我试图通过MessageChannel发布消息时。send()method我可以看到在KafkaProducerMessageHandlerclass'handleRequestMessage()方法中启动并完成了一个新的本地事务。这意味着消息的发送不参与全局事务。因此,如果在发布第一条消息后引发异常,则不会回滚该消息。全局事务被回滚,但由于第一条消息已经提交,因此实际上没有做任何事情。

spring:
  cloud:
    stream:
      kafka:
        binder:
          brokers: localhost:9092
          transaction:
            transaction-id-prefix: txn.
            producer: # these apply to all producers that participate in the transaction
              partition-key-extractor-name: partitionKeyExtractorStrategy
              partition-selector-name: partitionSelectorStrategy
              partition-count: 3
              configuration:
               acks: all
               enable:
                 idempotence: true
               retries: 10
        bindings:
          input-customer-data-change-topic:
            consumer:
              configuration:
                isolation:
                  level: read_committed
              enable-dlq: true
      bindings:
        input-customer-data-change-topic:
          content-type: application/json
          destination: com.fis.customer
          group: com.fis.ec
          consumer:
            partitioned: true
            max-attempts: 1
        output-name-change-topic:
          content-type: application/json
          destination: com.fis.customer.name          
        output-email-change-topic:
          content-type: application/json
          destination: com.fis.customer.email
@SpringBootApplication
@EnableBinding(CustomerDataChangeStreams.class)
public class KafkaCloudStreamCustomerDemoApplication
{
   public static void main(final String[] args)
   {
      SpringApplication.run(KafkaCloudStreamCustomerDemoApplication.class, args);
   }
}
public interface CustomerDataChangeStreams
{
   @Input("input-customer-data-change-topic")
   SubscribableChannel inputCustomerDataChange();

   @Output("output-email-change-topic")
   MessageChannel outputEmailDataChange();

   @Output("output-name-change-topic")
   MessageChannel outputNameDataChange();
}
@Component
public class CustomerDataChangeListener
{
   @Autowired
   private CustomerDataChangeProcessor mService;

   @StreamListener("input-customer-data-change-topic")
   public Message<String> handleCustomerDataChangeMessages(
      @Payload final ImmutableCustomerDetails customerDetails)
   {
      return mService.processMessage(customerDetails);
   }
}
@Component
public class CustomerDataChangeProcessor
{
   private final CustomerDataChangeStreams mStreams;

   @Value("${spring.cloud.stream.bindings.output-email-change-topic.destination}")
   private String mEmailChangeTopic;

   @Value("${spring.cloud.stream.bindings.output-name-change-topic.destination}")
   private String mNameChangeTopic;

   public CustomerDataChangeProcessor(final CustomerDataChangeStreams streams)
   {
      mStreams = streams;
   }

   public void processMessage(final CustomerDetails customerDetails)
   {
      try
      {
         sendNameMessage(customerDetails);
         sendEmailMessage(customerDetails);
      }
      catch (final JSONException ex)
      {
         LOGGER.error("Failed to send messages.", ex);
      }
   }

   public void sendNameMessage(final CustomerDetails customerDetails)
      throws JSONException
   {
      final JSONObject nameChangeDetails = new JSONObject();
      nameChangeDetails.put(KafkaConst.BANK_ID_KEY, customerDetails.bankId());
      nameChangeDetails.put(KafkaConst.CUSTOMER_ID_KEY, customerDetails.customerId());
      nameChangeDetails.put(KafkaConst.FIRST_NAME_KEY, customerDetails.firstName());
      nameChangeDetails.put(KafkaConst.LAST_NAME_KEY, customerDetails.lastName());
      final String action = customerDetails.action();
      nameChangeDetails.put(KafkaConst.ACTION_KEY, action);

      final MessageChannel nameChangeMessageChannel = mStreams.outputNameDataChange();
      emailChangeMessageChannel.send(MessageBuilder.withPayload(nameChangeDetails.toString())
         .setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON)
         .setHeader(KafkaHeaders.TOPIC, mNameChangeTopic).build());

      if ("fail_name_illegal".equalsIgnoreCase(action))
      {
         throw new IllegalArgumentException("Customer name failure!");
      }
   }

   public void sendEmailMessage(final CustomerDetails customerDetails) throws JSONException
   {
      final JSONObject emailChangeDetails = new JSONObject();
      emailChangeDetails.put(KafkaConst.BANK_ID_KEY, customerDetails.bankId());
      emailChangeDetails.put(KafkaConst.CUSTOMER_ID_KEY, customerDetails.customerId());
      emailChangeDetails.put(KafkaConst.EMAIL_ADDRESS_KEY, customerDetails.email());
      final String action = customerDetails.action();
      emailChangeDetails.put(KafkaConst.ACTION_KEY, action);

      final MessageChannel emailChangeMessageChannel = mStreams.outputEmailDataChange();
      emailChangeMessageChannel.send(MessageBuilder.withPayload(emailChangeDetails.toString())
         .setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON)
         .setHeader(KafkaHeaders.TOPIC, mEmailChangeTopic).build());

      if ("fail_email_illegal".equalsIgnoreCase(action))
      {
         throw new IllegalArgumentException("E-mail address failure!");
      }
   }
}

编辑

我们越来越近了。不再创建本地事务。但是,即使出现异常,全局事务仍会被提交。据我所知,异常不会传播到TransactionTemplate。execute()方法。因此,事务被提交。似乎sendMessage()方法中的MessageProducerSupport类“吞咽”了catch子句中的异常。如果定义了一个错误通道,则会向其发布一条消息,因此不会重新引发异常。我试图关闭错误频道(spring.cloud.stream.kafka.binder.transaction.producer.error-channel-enabled=false),但这并没有关闭它。因此,为了进行测试,我只需在调试器中将错误通道设置为null,以强制重新调用异常。似乎就是这样。但是,原始消息会不断重新传递给初始消费者,即使我将该消费者的最大尝试次数设置为1。


共有1个答案

关飞翼
2023-03-14

请参阅文档。

spring.cloud.stream.kafka.binder.transaction.transactionIdPrefix

启用活页夹中的事务。见交易。Kafka文档中的id和springKafka文档中的事务。启用事务时,将忽略单个生产者属性,并且所有生产者都使用spring。云流动Kafka。粘合剂交易制作人。*财产。

默认为空(无事务)

spring。云流动Kafka。粘合剂交易制片人*

事务绑定器中生产者的全局生产者属性。见Spring。云流动Kafka。粘合剂交易transactionIdPrefix和Kafka生产者属性以及所有活页夹支持的一般生产者属性。

默认设置:请参见各个生产者属性。

必须配置共享的全局生产者。

不要添加@Transactional-在提交事务之前,容器将启动事务并向事务发送偏移量。

如果侦听器抛出异常,则事务将回滚,并且DefaultAfterRollback Post处理器将重新查找主题/分区,以便重新传递记录。

编辑

绑定器的事务管理器配置中存在一个错误,该错误导致输出绑定启动新的本地事务。

要解决此问题,请使用以下容器定制器bean重新配置TM。。。

@Bean
public ListenerContainerCustomizer<AbstractMessageListenerContainer<byte[], byte[]>> customizer() {
    return (container, dest, group) -> {
        KafkaTransactionManager<?, ?> tm = (KafkaTransactionManager<?, ?>) container.getContainerProperties()
                .getTransactionManager();
        tm.setTransactionSynchronization(AbstractPlatformTransactionManager.SYNCHRONIZATION_ON_ACTUAL_TRANSACTION);
    };
}

编辑2

您不能使用活页夹的DLQ支持,因为从容器的角度来看,交付是成功的。我们需要将异常传播到容器以强制回滚。因此,您需要将死信移到AfterRollbackProcessor。以下是我的完整测试课程:

@SpringBootApplication
@EnableBinding(Processor.class)
public class So57379575Application {

    public static void main(String[] args) {
        SpringApplication.run(So57379575Application.class, args);
    }

    @Autowired
    private MessageChannel output;

    @StreamListener(Processor.INPUT)
    public void listen(String in) {
        System.out.println("in:" + in);
        this.output.send(new GenericMessage<>(in.toUpperCase()));
        if (in.equals("two")) {
            throw new RuntimeException("fail");
        }
    }

    @KafkaListener(id = "so57379575", topics = "so57379575out")
    public void listen2(String in) {
        System.out.println("out:" + in);
    }

    @KafkaListener(id = "so57379575DLT", topics = "so57379575dlt")
    public void listen3(String in) {
        System.out.println("dlt:" + in);
    }

    @Bean
    public ApplicationRunner runner(KafkaTemplate<byte[], byte[]> template) {
        return args -> {
            template.send("so57379575in", "one".getBytes());
            template.send("so57379575in", "two".getBytes());
        };
    }

    @Bean
    public ListenerContainerCustomizer<AbstractMessageListenerContainer<byte[], byte[]>> customizer(
            KafkaTemplate<Object, Object> template) {

        return (container, dest, group) -> {
            // enable transaction synchronization
            KafkaTransactionManager<?, ?> tm = (KafkaTransactionManager<?, ?>) container.getContainerProperties()
                    .getTransactionManager();
            tm.setTransactionSynchronization(AbstractPlatformTransactionManager.SYNCHRONIZATION_ON_ACTUAL_TRANSACTION);
            // container dead-lettering
            DefaultAfterRollbackProcessor<? super byte[], ? super byte[]> afterRollbackProcessor =
                    new DefaultAfterRollbackProcessor<>(new DeadLetterPublishingRecoverer(template,
                            (ex, tp) -> new TopicPartition("so57379575dlt", -1)), 0);
            container.setAfterRollbackProcessor(afterRollbackProcessor);
        };
    }

}

spring:
  kafka:
    bootstrap-servers:
    - 10.0.0.8:9092
    - 10.0.0.8:9093
    - 10.0.0.8:9094
    consumer:
      auto-offset-reset: earliest
      enable-auto-commit: false
      properties:
        isolation.level: read_committed
  cloud:
    stream:
      bindings:
        input:
          destination: so57379575in
          group: so57379575in
          consumer:
            max-attempts: 1
        output:
          destination: so57379575out
      kafka:
        binder:
          transaction:
            transaction-id-prefix: so57379575tx.
            producer:
              configuration:
                acks: all
                retries: 10

#logging:
#  level:
#    org.springframework.kafka: trace
#    org.springframework.transaction: trace

in:two
2019-08-07 12:43:33.457 ERROR 36532 --- [container-0-C-1] o.s.integration.handler.LoggingHandler   : org.springframework.messaging.MessagingException: Exception thrown while 
...
Caused by: java.lang.RuntimeException: fail
...
in:one
dlt:two
out:ONE

 类似资料:
  • 我试图编写一个spring-cloud-stream函数(spring-starter-parent 2.5.3,java 11,spring-cloud-version 2020.0.3),该函数同时具有Kafka和Postgres事务。每当使用的消息以字符串“fail”开始时,该函数将引发一个模拟错误,我希望这将导致数据库事务回滚,然后导致kafka事务回滚。(我知道Kafka交易不是XA,这

  • 我正在使用docker部署spring云数据流服务器。我在dataflow服务器内部创建了一个数据处理管道,通过部署两个spring boot应用程序作为源、处理器和接收器。为了访问每个服务的日志,我必须从docker continer(bash)内部跟踪它,或者将其从docker容器复制到本地磁盘。 我想使用log4j-kafka appender将这些日志推送给kafka以供以后分析。我已经为

  • 我希望有一个Spring云流侦听器处理有关其中发送的所有消息的完整事务。或者,即使之后有异常,也会提交函数中使用StreamBridge手动发送的所有消息。 这是我的lib版本: 我的Spring云流形态: 我的测试java代码: 要运行的测试类: 我还添加了TransactionManager: 在最后的这个示例中,我的兔子队列中有: 或者我应该只有两条消息test.other.request.

  • 在我的spring服务中,我调用了两个spring数据存储库方法 现在我的查询与事务管理相关。就我所了解和看到的代码而言,spring存储库使用@Transactional为其方法启用了事务。对于select操作,它的readonly=true。 我对事务的理解是,当执行选择操作时,会创建一个事务,然后为保存操作创建另一个事务,因为对于选择操作,事务只读=true。 我希望在单个事务中执行读写操作

  • 在spring cloud dataflow中,根据我的理解,每个流都是一个微服务,但数据流服务器不是。我说的对吗?

  • 我试图了解运行批处理任务时通过Spring Cloud数据流WRT数据源配置的预期行为。 Spring批处理数据库表(Batch\u JOB\u EXECUTION等)是否在SCDF数据库本身中?当通过SCDF启动任务时,似乎发生了一些神奇的事情,它在SCDF数据库中创建了这些表,并似乎在使用它们。它似乎正在将SCDF数据源注入我的应用程序? 我目前在localhost服务器版本2.0.1上运行。