我希望有一个Spring云流侦听器处理有关其中发送的所有消息的完整事务。或者,即使之后有异常,也会提交函数中使用StreamBridge手动发送的所有消息。
这是我的lib版本:
spring : 2.5.5
spring cloud stream : 3.1.4
spring cloud stream rabbit binder : 3.1.4
我的Spring云流形态:
spring:
cloud:
function:
definition: test
stream:
rabbit:
default:
producer:
transacted: true
consumer:
transacted: true
bindings:
test-in-0:
consumer:
queueNameGroupOnly: true
receive-timeout: 500
transacted: true
test-out-0:
producer:
queueNameGroupOnly: true
transacted: true
other-out-0:
producer:
queueNameGroupOnly: true
transacted: true
bindings:
test-in-0:
destination: test.request
group: test.request
consumer:
requiredGroups: test.request
maxAttempts: 1
test-out-0:
destination: test.response
group: test.response
producer:
requiredGroups: test.response
other-out-0:
destination: test.other.request
group: test.other.request
producer:
requiredGroups: test.other.request
我的测试java代码:
@Configuration
public class TestSender {
@Bean
public Function<Message<TestRequest>, Message<String>> test(Service service) {
return (request) -> service.run(request.getPayload().getContent());
}
}
@Component
@Transactional
public class Service {
private static final Logger LOGGER = LoggerFactory.getLogger(Service.class);
StreamBridge bridge;
IWorker worker;
public Service(StreamBridge bridge, IWorker worker) {
this.bridge = bridge;
this.worker = worker;
}
@Transactional
public Message<String> run(String message) {
LOGGER.info("Processing {}", message);
bridge.send("other-out-0", MessageBuilder.withPayload("test")
.setHeader("toto", "titi").build());
if (message.equals("error")) {
throw new RuntimeException("test error");
}
return MessageBuilder.withPayload("test")
.setHeader("toto", "titi").build();
}
}
要运行的测试类:
@SpringBootApplication
public class EmptyWorkerApplication {
private static final Logger LOGGER = LoggerFactory.getLogger(EmptyWorkerApplication.class);
public static void main(String[] args) {
SpringApplication.run(EmptyWorkerApplication.class, args);
}
@Bean
public ApplicationRunner runner(RabbitTemplate template) {
return args -> {
LOGGER.info("Sending messages ...");
template.convertAndSend("test.request", "#",
org.springframework.amqp.core.MessageBuilder.withBody(
"{\"content\":\"toto\"}".getBytes(StandardCharsets.UTF_8))
.setContentType("application/json")
.build());
template.convertAndSend("test.request", "#",
org.springframework.amqp.core.MessageBuilder.withBody(
"{\"content\":\"error\"}".getBytes(StandardCharsets.UTF_8))
.setContentType("application/json")
.build());
template.convertAndSend("test.request", "#",
org.springframework.amqp.core.MessageBuilder.withBody(
"{\"content\":\"titi\"}".getBytes(StandardCharsets.UTF_8))
.setContentType("application/json")
.build());
};
}
我还添加了TransactionManager:
@Configuration
@EnableTransactionManagement
public class TransactionManagerConfiguration {
@Bean(name = "transactionManager")
public RabbitTransactionManager rabbitTransactionManager(ConnectionFactory cf) {
RabbitTransactionManager manager = new RabbitTransactionManager(cf);
return manager;
}
}
在最后的这个示例中,我的兔子队列中有:
或者我应该只有两条消息test.other.request.我做错了什么?
编辑1
试验代码:
@Component("myfunction")
public class Myfunction implements Consumer<String> {
private final StreamBridge streamBridge;
public Myfunction(StreamBridge streamBridge) {
this.streamBridge = streamBridge;
}
@Override
@Transactional
public void accept(String request) {
this.streamBridge.send("myfunction-out-0", request);
if (request.equals("error")) {
throw new RuntimeException("test error");
}
}
}
@SpringBootApplication
public class EmptyWorkerApplication {
public static void main(String[] args) {
SpringApplication.run(EmptyWorkerApplication.class, args);
}
@Bean
public RabbitTransactionManager rabbitTransactionManager(ConnectionFactory cf) {
RabbitTransactionManager manager = new RabbitTransactionManager(cf);
return manager;
}
@Bean
public ApplicationRunner runner(RabbitTemplate template) {
return args -> {
template.convertAndSend("test.request", "#",
org.springframework.amqp.core.MessageBuilder.withBody(
"test".getBytes(StandardCharsets.UTF_8))
.setContentType("text/plain")
.build());
template.convertAndSend("test.request", "#",
org.springframework.amqp.core.MessageBuilder.withBody(
"error".getBytes(StandardCharsets.UTF_8))
.setContentType("text/plain")
.build());
template.convertAndSend("test.request", "#",
org.springframework.amqp.core.MessageBuilder.withBody(
"test".getBytes(StandardCharsets.UTF_8))
.setContentType("text/plain")
.build());
};
}
}
spring:
rabbitmq:
host: xx
port: xx
username: xx
password: xx
virtual-host: xx
cloud:
function:
definition: myfunction
stream:
rabbit:
bindings:
myfunction-in-0:
queueNameGroupOnly: true
myfunction-out-0:
queueNameGroupOnly: true
transacted: true
bindings:
myfunction-in-0:
destination: test.request
group: test.request
consumer:
requiredGroups: test.request
autoBindDlq: true
maxAttempts: 1
myfunction-out-0:
destination: test.response
group: test.response
producer:
requiredGroups: test.response
编辑2:
我终于成功了。我的错误是设置了属性spring。云流动兔子绑定。myfunction-in-0。消费者事务处理=true,而不是spring。云流动兔子绑定。中的myfunction。消费者交易=真
事实上,我不理解两者的区别,在spring cloud stream和spring cloud rabbit活页夹文档中也没有找到任何解释。
请看这个答案-Spring Cloud stream:如何使用@Transactional与新消费者
我使用的是Spring Boot 2.5.2和Spring Cloud 2020.0.3。我正在尝试包装一个rest服务调用,该调用使用JPA(CrudRepository.save)将记录保存到DB,然后使用StreamBridge使用spring cloud stream(Kafka binder)将消息发布到Kafka主题。我试过好几种方法,但似乎没有一种效果很好。我故意造成JPA问题(插入
我试图编写一个spring-cloud-stream函数(spring-starter-parent 2.5.3,java 11,spring-cloud-version 2020.0.3),该函数同时具有Kafka和Postgres事务。每当使用的消息以字符串“fail”开始时,该函数将引发一个模拟错误,我希望这将导致数据库事务回滚,然后导致kafka事务回滚。(我知道Kafka交易不是XA,这
我正在尝试用Java创建一个PoC应用程序,以了解在使用Kafka进行消息发布时如何在Spring Cloud Stream中进行事务管理。我试图模拟的用例是一个接收消息的处理器。然后,它进行一些处理,并生成两条发送到两个不同主题的新消息。我希望能够将这两条消息作为单个事务发布。因此,如果发布第二条消息失败,我希望滚动(而不是提交)第一条消息。SpringCloudStream支持这样的用例吗?
问题:我试图逐行读取一个大文件,并将消息放入RabbitMQ中。我想在文件末尾提交给rabbitMQ。如果文件中的任何记录是坏的,那么我想撤销发布到队列的消息。 技术:Spring boot、Spring cloud stream、RabbitMQ 你能帮我实现这个过渡的东西吗。我知道如何使用spring cloud Stream读取文件并发布到队列。
我正在使用Spring Cloud StreamBridge将消息发布到RabbitMQ交换机。使用本机RabbitMQ完美测试,我可以使用单个生产者轻松获得100kmsgs/s(1个通道)。如果我使用发送StreamBrige(也是1个通道)启动带有时循环的线程,我只获得~20kmsgs/s的类似设置(没有持久性,没有手动打包或确认,相同的Docker容器...)。我使用的是Spring Clo
我无法使用功能供应商发送Avro消息。SCSt尝试将消息作为JSON发送,但失败。有人能指出是否需要任何其他配置吗? 这是供应商的功能bean 和配置