我有兴趣在一些制作人中使用Publisher确认,我们在一个项目中使用了Spring Cloud Stream。我试过做一个小的PoC,但它不起作用。据我在文档中看到的情况,Asynchronous Publisher可以确认这一点,并且应该很容易进行以下更改:
在应用程序中添加。yml confirmAckChannel并启用errorChannelEnabled属性。
spring.cloud.stream:
binders:
rabbitDefault:
defaultCandidate: false
type: rabbit
environment.spring.rabbitmq.host: ${spring.rabbitmq.addresses}
....
bindings:
testOutput:
destination: test
binder: rabbitDefault
content-type: application/json
rabbit.bindings:
testOutput.producer:
confirmAckChannel: "testAck"
errorChannelEnabled: true
然后是一个由endpoint触发的简单服务,我在其中插入与事件的errorChannel相关的头。
@Service
@RequiredArgsConstructor
public class TestService {
private final TestPublisher testPublisher;
public void sendMessage() {
testPublisher.send(addHeaders());
}
private Message<Event<TestEvent>> addHeaders() {
return withPayload(new Event<>(TestEvent.builder().build()))
.setHeader(MessageHeaders.ERROR_CHANNEL, "errorChannelTest")
.build();
}
}
然后是RabbitMQ的出版商
@Component
@RequiredArgsConstructor
public class TestPublisher {
private final MessagingChannels messagingChannels;
public boolean send(Message<Event<TestEvent>> message) {
return messagingChannels.test().send(message);
}
}
其中MessagingChannels实现为
public interface MessagingChannels {
@Input("testAck")
MessageChannel testAck();
@Input("errorChannelTest")
MessageChannel testError();
@Output("testOutput")
MessageChannel test();
}
之后,我实现了两个监听器,一个用于errorChannelTest输入,另一个用于testAck。
@Slf4j
@Component
@RequiredArgsConstructor
class TestErrorListener {
@StreamListener("errorChannelTest")
void onCommandReceived(Event<Message> message) {
log.info("Message error received: " + message);
}
}
@Slf4j
@Component
@RequiredArgsConstructor
class TestAckListener {
@StreamListener("testAck")
void onCommandReceived(Event<Message> message) {
log.info("Message ACK received: " + message);
}
}
但是,在这2个侦听器中,我没有收到RabbitMQ的任何ACK或NACK,事件已正确发送到RabbitMQ并由exchange管理,但我没有收到RabbitMQ的任何响应。
我错过什么了吗?我也检查了这两个属性,但效果不太好
spring:
rabbitmq:
publisher-confirm-type: CORRELATED
publisher-returns: true
我正在使用Spring Cloud Stream 3.0.1。发布和spring cloud starter stream rabbit 3.0.1。释放
----已编辑------
这是根据Gary Russell的建议更新的工作示例
一个pplication.yml
spring.cloud.stream:
binders:
rabbitDefault:
defaultCandidate: false
type: rabbit
environment.spring.rabbitmq.host: ${spring.rabbitmq.addresses}
bindings:
testOutput:
destination: exchange.output.test
binder: rabbitDefault
content-type: application/json
testOutput.producer:
errorChannelEnabled: true
rabbit.bindings:
testOutput.producer:
confirmAckChannel: "testAck"
spring:
rabbitmq:
publisher-confirm-type: correlated
publisher-returns: true
测试服务
@Service
@RequiredArgsConstructor
public class TestService {
private final TestPublisher testPublisher;
public void sendMessage() {
testPublisher.send(addHeaders());
}
private Message<Event<TestEvent>> addHeaders(Test test) {
return withPayload(new Event<>(TestEvent.builder().test(test).build()))
.build();
}
}
TestService由下一个简单控制器中的endpoint触发,以检查此PoC。
@RestController
@RequiredArgsConstructor
public class TestController {
private final TestService testService;
@PostMapping("/services/v1/test")
public ResponseEntity<Object> test(@RequestBody Test test) {
testService.sendMessage(test);
return ResponseEntity.ok().build();
}
}
然后是具有两个ServiceActivator的RabbitMQ的发布者
@Component
@RequiredArgsConstructor
public class TestPublisher {
private final MessagingChannels messagingChannels;
public boolean send(Message<Event<TestEvent>> message) {
log.info("Message for Testing Publisher confirms sent: " + message);
return messagingChannels.test().send(message);
}
@ServiceActivator(inputChannel = TEST_ACK)
public void acks(Message<?> ack) {
log.info("Message ACK received for Test: " + ack);
}
@ServiceActivator(inputChannel = TEST_ERROR)
public void errors(Message<?> error) {
log.info("Message error for Test received: " + error);
}
}
其中MessagingChannels实现为
public interface MessagingChannels {
@Input("testAck")
MessageChannel testAck();
@Input("testOutput.errors")
MessageChannel testError();
@Output("testOutput")
MessageChannel test();
}
这是应用程序的主要部分(我也用@EnableIntegration进行了检查)。
@EnableBinding(MessagingChannels.class)
@SpringBootApplication
@EnableScheduling
public class Main {
public static void main(String[] args) {
SpringApplication.run(Main.class, args);
}
}
测试包不应是绑定;它应该是一个ServiceActivator。
.setHeader(MessageHeaders.ERROR_CHANNEL, "errorChannelTest")
这在这种情况下是行不通的;错误被发送到名为testOutput的通道。错误 ;再一次这需要一个ServiceActivator,而不是绑定。
您在错误的位置启用了错误通道;这是一个共同的生产者财产,而不是兔子特有的。
java prettyprint-override">@SpringBootApplication
@EnableBinding(Source.class)
public class So62219823Application {
public static void main(String[] args) {
SpringApplication.run(So62219823Application.class, args);
}
@InboundChannelAdapter(channel = "output")
public String source() {
return "foo";
}
@ServiceActivator(inputChannel = "acks")
public void acks(Message<?> ack) {
System.out.println("Ack: " + ack);
}
@ServiceActivator(inputChannel = "output.errors")
public void errors(Message<?> error) {
System.out.println("Error: " + error);
}
}
spring:
cloud:
stream:
bindings:
output:
producer:
error-channel-enabled: true
rabbit:
bindings:
output:
producer:
confirm-ack-channel: acks
rabbitmq:
publisher-confirm-type: correlated
publisher-returns: true
我正在开发SpringCloudStream的Brooklyn.Release版本。我的用例具有多个接收器的HttpSource。当我将初学者应用程序依赖项添加到应用程序中并使用它时,如下所示: 我的聚合应用程序是 一直得到如下响应:
What is Open-Publisher Open publisher is really just a couple of bash scripts that wrap around Jekyll, Pandoc, KindleGen, and LaTeX, along with some custom Pandoc templates created with a focus on fic
Confluence Publisher The Confluence Publisher allows documentation written in AsciiDoc and versioned directly with the documented code baseto be published to a Confluence space. It supports a "docs-as
温馨提示:该项目为开源软件,已停止维护。 LitePublisher是一个免费开源的内容管理系统,Lite Publisher可以简单的个性化任何内容任何,比如blog,wiki,bug管理,文档系统.这是Lite Publisher的基本功能. LitePublisher内置cache功能,有SEO方面的设置,比如每篇文章的url,description设置等等.内置5个风格.安装的时候可以选择
Gradle Play Publisher Gradle Play Publisher (GPP) is Android's unofficial release automation Gradle Plugin. It can doanything from building, uploading, and then promoting your App Bundle or APK to pub
使用StreamBridge,我将包含两种不同类型的对象的消息发送到单个Kafka主题。有没有办法定义一个能够使用两种类型消息的Spring Cloud Stream的功能消费者?