当前位置: 首页 > 知识库问答 >
问题:

Spring云流:StreamBridge和事务

乌灿
2023-03-14

我希望有一个Spring云流侦听器处理有关其中发送的所有消息的完整事务。或者,即使之后有异常,也会提交函数中使用StreamBridge手动发送的所有消息。

这是我的lib版本:

spring : 2.5.5
spring cloud stream : 3.1.4
spring cloud stream rabbit binder : 3.1.4

我的Spring云流形态:

spring:
  cloud:
    function:
      definition: test
    stream:
      rabbit:
        default:
          producer:
            transacted: true
          consumer:
            transacted: true
        bindings:
          test-in-0:
            consumer:
              queueNameGroupOnly: true
              receive-timeout: 500
              transacted: true
          test-out-0:
            producer:
              queueNameGroupOnly: true
              transacted: true
          other-out-0:
            producer:
              queueNameGroupOnly: true
              transacted: true
      bindings:
        test-in-0:
          destination: test.request
          group: test.request
          consumer:
            requiredGroups: test.request
            maxAttempts: 1
        test-out-0:
          destination: test.response
          group:  test.response
          producer:
            requiredGroups:  test.response
        other-out-0:
          destination: test.other.request
          group: test.other.request
          producer:
            requiredGroups: test.other.request

我的测试java代码:

@Configuration
public class TestSender {
    @Bean
    public Function<Message<TestRequest>, Message<String>> test(Service service) {
        return (request) -> service.run(request.getPayload().getContent());
    }
}
@Component
@Transactional
public class Service {
    private static final Logger LOGGER = LoggerFactory.getLogger(Service.class);
    StreamBridge bridge;
    IWorker worker;
    public Service(StreamBridge bridge, IWorker worker) {
        this.bridge = bridge;
        this.worker = worker;
    }

    @Transactional
    public Message<String> run(String message) {
        LOGGER.info("Processing {}", message);
        bridge.send("other-out-0", MessageBuilder.withPayload("test")
                .setHeader("toto", "titi").build());
        if (message.equals("error")) {
            throw new RuntimeException("test error");
        }
        return MessageBuilder.withPayload("test")
                .setHeader("toto", "titi").build();
    }
}

要运行的测试类:

@SpringBootApplication
public class EmptyWorkerApplication {

    private static final Logger LOGGER = LoggerFactory.getLogger(EmptyWorkerApplication.class);

    public static void main(String[] args) {
        SpringApplication.run(EmptyWorkerApplication.class, args);
    }

    @Bean
    public ApplicationRunner runner(RabbitTemplate template) {
        return args -> {
            LOGGER.info("Sending messages ...");
template.convertAndSend("test.request", "#",
                                org.springframework.amqp.core.MessageBuilder.withBody(
                                                "{\"content\":\"toto\"}".getBytes(StandardCharsets.UTF_8))
                                        .setContentType("application/json")
                                        .build());
        template.convertAndSend("test.request", "#",
                                org.springframework.amqp.core.MessageBuilder.withBody(
                                        "{\"content\":\"error\"}".getBytes(StandardCharsets.UTF_8))
                                        .setContentType("application/json")
                                        .build());
        template.convertAndSend("test.request", "#",
                                org.springframework.amqp.core.MessageBuilder.withBody(
                                        "{\"content\":\"titi\"}".getBytes(StandardCharsets.UTF_8))
                                        .setContentType("application/json")
                                        .build());
        };
    }

我还添加了TransactionManager:

@Configuration
@EnableTransactionManagement
public class TransactionManagerConfiguration {

    @Bean(name = "transactionManager")
    public RabbitTransactionManager rabbitTransactionManager(ConnectionFactory cf) {
        RabbitTransactionManager manager = new RabbitTransactionManager(cf);
        return manager;
    }

}

在最后的这个示例中,我的兔子队列中有:

或者我应该只有两条消息test.other.request.我做错了什么?

编辑1

试验代码:

@Component("myfunction")
public class Myfunction implements Consumer<String> {

    private final StreamBridge streamBridge;

    public Myfunction(StreamBridge streamBridge) {
        this.streamBridge = streamBridge;
    }

    @Override
    @Transactional
    public void accept(String request) {
        this.streamBridge.send("myfunction-out-0", request);
        if (request.equals("error")) {
            throw new RuntimeException("test error");
        }
    }
}
@SpringBootApplication
public class EmptyWorkerApplication {

    public static void main(String[] args) {
        SpringApplication.run(EmptyWorkerApplication.class, args);
    }

    @Bean
    public RabbitTransactionManager rabbitTransactionManager(ConnectionFactory cf) {
        RabbitTransactionManager manager = new RabbitTransactionManager(cf);
        return manager;
    }

    @Bean
    public ApplicationRunner runner(RabbitTemplate template) {
        return args -> {
template.convertAndSend("test.request", "#",
                                org.springframework.amqp.core.MessageBuilder.withBody(
                                                "test".getBytes(StandardCharsets.UTF_8))
                                        .setContentType("text/plain")
                                        .build());
        template.convertAndSend("test.request", "#",
                                org.springframework.amqp.core.MessageBuilder.withBody(
                                        "error".getBytes(StandardCharsets.UTF_8))
                                        .setContentType("text/plain")
                                        .build());
        template.convertAndSend("test.request", "#",
                                org.springframework.amqp.core.MessageBuilder.withBody(
                                        "test".getBytes(StandardCharsets.UTF_8))
                                        .setContentType("text/plain")
                                        .build());
        };
    }
}
spring:
  rabbitmq:
    host: xx
    port: xx
    username: xx
    password: xx
    virtual-host: xx
  cloud:
    function:
      definition: myfunction
    stream:
      rabbit:
         bindings:
           myfunction-in-0:
             queueNameGroupOnly: true
           myfunction-out-0:
             queueNameGroupOnly: true
             transacted: true
      bindings:
        myfunction-in-0:
          destination: test.request
          group: test.request
          consumer:
            requiredGroups: test.request
            autoBindDlq: true
            maxAttempts: 1
        myfunction-out-0:
          destination: test.response
          group:  test.response
          producer:
            requiredGroups:  test.response

编辑2:

我终于成功了。我的错误是设置了属性spring。云流动兔子绑定。myfunction-in-0。消费者事务处理=true,而不是spring。云流动兔子绑定。中的myfunction。消费者交易=真

事实上,我不理解两者的区别,在spring cloud stream和spring cloud rabbit活页夹文档中也没有找到任何解释。

共有1个答案

景德海
2023-03-14

请看这个答案-Spring Cloud stream:如何使用@Transactional与新消费者

 类似资料:
  • 我使用的是Spring Boot 2.5.2和Spring Cloud 2020.0.3。我正在尝试包装一个rest服务调用,该调用使用JPA(CrudRepository.save)将记录保存到DB,然后使用StreamBridge使用spring cloud stream(Kafka binder)将消息发布到Kafka主题。我试过好几种方法,但似乎没有一种效果很好。我故意造成JPA问题(插入

  • 我试图编写一个spring-cloud-stream函数(spring-starter-parent 2.5.3,java 11,spring-cloud-version 2020.0.3),该函数同时具有Kafka和Postgres事务。每当使用的消息以字符串“fail”开始时,该函数将引发一个模拟错误,我希望这将导致数据库事务回滚,然后导致kafka事务回滚。(我知道Kafka交易不是XA,这

  • 我正在尝试用Java创建一个PoC应用程序,以了解在使用Kafka进行消息发布时如何在Spring Cloud Stream中进行事务管理。我试图模拟的用例是一个接收消息的处理器。然后,它进行一些处理,并生成两条发送到两个不同主题的新消息。我希望能够将这两条消息作为单个事务发布。因此,如果发布第二条消息失败,我希望滚动(而不是提交)第一条消息。SpringCloudStream支持这样的用例吗?

  • 问题:我试图逐行读取一个大文件,并将消息放入RabbitMQ中。我想在文件末尾提交给rabbitMQ。如果文件中的任何记录是坏的,那么我想撤销发布到队列的消息。 技术:Spring boot、Spring cloud stream、RabbitMQ 你能帮我实现这个过渡的东西吗。我知道如何使用spring cloud Stream读取文件并发布到队列。

  • 我正在使用Spring Cloud StreamBridge将消息发布到RabbitMQ交换机。使用本机RabbitMQ完美测试,我可以使用单个生产者轻松获得100kmsgs/s(1个通道)。如果我使用发送StreamBrige(也是1个通道)启动带有时循环的线程,我只获得~20kmsgs/s的类似设置(没有持久性,没有手动打包或确认,相同的Docker容器...)。我使用的是Spring Clo

  • 我无法使用功能供应商发送Avro消息。SCSt尝试将消息作为JSON发送,但失败。有人能指出是否需要任何其他配置吗? 这是供应商的功能bean 和配置