当前位置: 首页 > 知识库问答 >
问题:

春云流中的事务

楚天宇
2023-03-14

问题:我试图逐行读取一个大文件,并将消息放入RabbitMQ中。我想在文件末尾提交给rabbitMQ。如果文件中的任何记录是坏的,那么我想撤销发布到队列的消息。

技术:Spring boot、Spring cloud stream、RabbitMQ

你能帮我实现这个过渡的东西吗。我知道如何使用spring cloud Stream读取文件并发布到队列。

  @Transactional
  public void sendToQueue(List<Data> dataList) {

      for(Data data:dataList)
      {
          this.output.send(MessageBuilder.withPayload(data).build());
          counter++; // I can see message getting published in the queue though management plugin
      }
      LOGGER.debug("message sent to Q2");

  }
spring: 
   cloud:    
      stream:
        bindings:
           # Q1 input channel
           tpi_q1_input:
            destination: TPI_Q1
            binder: local_rabbit
            content-type: application/json
            group: TPIService
            # Q2 output channel  
           tpi_q2_output:
            destination: TPI_Q2
            binder: local_rabbit
            content-type: application/json
            group: TPIService
            # Q2 input channel
           tpi_q2_input:
            destination: TPI_Q2
            binder: local_rabbit
            content-type: application/json
            group: TPIService     
        binders:
          local_rabbit:
            type: rabbit
            environment:
              spring:
                rabbitmq:
                  host: localhost
                  port: 5672
                  username: guest
                  password: guest
                  virtual-host: /
          rabbit:
            bindings:
                  tpi_q2_output:
                    producer:
                          #autoBindDlq: true
                          transacted: true
                          #batchingEnabled: true
                  tpi_q2_input:  
                   consumer:
                        acknowledgeMode: AUTO
                        #autoBindDlq: true
                        #recoveryInterval: 5000
                        transacted: true       

spring.cloud.stream.default-binder: local_rabbit
@EnableTransactionManagement
public class QueueConfig {

  @Bean
  public RabbitTransactionManager transactionManager(ConnectionFactory cf) {
    return new RabbitTransactionManager(cf);
  }
}
@StreamListener(JmsQueueConstants.QUEUE_2_INPUT)
  @Transactional
  public void receiveMesssage(Data data) {

    logger.info("Message Received in Q2:");
  }

共有1个答案

邬良才
2023-03-14

>

  • 将生成器配置为使用事务...producer.transacted=true

    在事务范围内发布消息(使用RabbitTransactionManager)。

    对#2使用普通的Spring事务机制(@transact注释或TransactionTemplate)。

    @SpringBootApplication
    @EnableBinding(Source.class)
    @EnableTransactionManagement
    public class So50372319Application {
    
        public static void main(String[] args) {
            SpringApplication.run(So50372319Application.class, args).close();
        }
    
        @Bean
        public ApplicationRunner runner(MessageChannel output, RabbitTemplate template, AmqpAdmin admin,
                TransactionalSender sender) {
            admin.deleteQueue("so50372319.group");
            admin.declareQueue(new Queue("so50372319.group"));
            admin.declareBinding(new Binding("so50372319.group", DestinationType.QUEUE, "output", "#", null));
            return args -> {
                sender.send("foo", "bar");
                System.out.println("Received: " + new String(template.receive("so50372319.group", 10_000).getBody()));
                System.out.println("Received: " + new String(template.receive("so50372319.group", 10_000).getBody()));
                try {
                    sender.send("baz", "qux");
                }
                catch (RuntimeException e) {
                    System.out.println(e.getMessage());
                }
                System.out.println("Received: " + template.receive("so50372319.group", 3_000));
            };
        }
    
        @Bean
        public RabbitTransactionManager transactionManager(ConnectionFactory cf) {
            return new RabbitTransactionManager(cf);
        }
    
    }
    
    @Component
    class TransactionalSender {
    
        private final MessageChannel output;
    
        public TransactionalSender(MessageChannel output) {
            this.output = output;
        }
    
        @Transactional
        public void send(String... data) {
            for (String datum : data) {
                this.output.send(new GenericMessage<>(datum));
                if ("qux".equals(datum)) {
                    throw new RuntimeException("fail");
                }
            }
        }
    
    }
    

    spring.cloud.stream.bindings.output.destination=output
    spring.cloud.stream.rabbit.bindings.output.producer.transacted=true
    

    Received: foo
    Received: bar
    fail
    Received: null
    

  •  类似资料:
    • Spring Cloud Kafka Streams与Spring Cloud Stream、Spring Cloud Function、Spring AMQP和Spring for Apache Kafka有什么区别?

    • 我用的是Apache Kafka 2.7.0和Spring Cloud Stream Kafka Streams。 在我的Spring Cloud Stream (Kafka Streams)应用程序中,我已经将我的application.yml配置为当输入主题中的消息出现反序列化错误时使用sendToDlq机制: 我启动了我的应用程序,但我看不到这个主题存在。文档指出,如果 DLQ 主题不存在,

    • 如何提供一个在条件不匹配时调用的处理程序? 如果我有两个处理程序,第一个有条件,第二个没有条件,当第一个条件匹配时,两个处理程序都被调用。我如何避免这种情况?

    • 我试图在我的应用程序中集成spring cloud stream kinesis,但我找不到手册中的所有配置选项。我看过这个链接: https://github.com/spring-cloud/spring-cloud-stream-binder-aws-kinesis/blob/master/spring-cloud-stream-binder-kinesis-docs/src/main/as

    • 我不知道spring-cloud-gateway是否支持从领事注册中心读取路由,就像Zuul一样。 另外,我用--debug启动了这一行:

    • 是否可以使用函数()样式,使用多个独立的函数/绑定来实现反应性SCS应用程序?我发现的所有示例总是只注册一个具有默认绑定的函数bean。我想注册多个,每个都有自己的绑定。 传统上,这可以使用来完成,但现在不推荐使用函数支持。