我是Spring集成的新手,正在研究一个从单个通道向多个通道发送消息的示例,从这个角度来看,为每个通道使用Redis消息存储,目的是不丢失任何消息。要求将消息发送到通道-replyChannel、mailChannel和dbChannel。目前,代码只打印sysout语句,没有主要功能。
为了检查消息是否被正确路由,我编写了一个java测试类来发送15条消息。
检查输出,我发现一些消息正在丢失。也没有显示任何异常。
谢谢你的支持。
@Bean
public MessageChannel replyQueueChannel () { return new QueueChannel (new MessageGroupQueue(redisMesageStore(),replyQueue, 1000)); }
@Bean
public MessageChannel mailQueueChannel () { return new QueueChannel (new MessageGroupQueue(redisMesageStore(),mailQueue, 1000)); }
@Bean
public MessageChannel auditlogQueueChannel () { return new QueueChannel (new MessageGroupQueue(redisMesageStore(),auditLogQueue, 1000)); }
@Override
@ServiceActivator (inputChannel="dbQueueChannel", poller =@Poller(fixedDelay="10", taskExecutor="dbServiceExecutor"))
public void executeDBConditions (Message<DeferredModel> deferredMsg) {
replyQueueChannel.send(deferredMsg);
auditlogQueueChannel.send(deferredMsg);
mailQueueChannel.send (deferredMsg);
}
第一步总是为org启用调试日志记录。springframework;您将看到许多显示消息流的调试消息。
您还可以使用
redis-cli
和监视
命令监视redis以跟踪redis活动。
如果您无法使用这些技术弄清楚发生了什么,请将日志发布在github gist或pastebin等地方。
编辑
这是您的场景的工作版本;我对“丢失消息”没有问题。由于您没有显示完整的应用程序,我不知道您为什么会遇到问题。
我还添加了一个使用在Java中配置的
RecipientListRout
的版本-但是你真的应该把它作为一个新问题来问,而不是背着这个问题...
package com.example;
import java.util.Arrays;
import java.util.List;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.data.redis.connection.jedis.JedisConnectionFactory;
import org.springframework.integration.annotation.MessageEndpoint;
import org.springframework.integration.annotation.Router;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.redis.store.RedisChannelMessageStore;
import org.springframework.integration.store.MessageGroupQueue;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.PollableChannel;
import org.springframework.util.Assert;
@SpringBootApplication
public class So34628789Application {
public static void main(String[] args) throws Exception {
ConfigurableApplicationContext context = SpringApplication.run(So34628789Application.class, args);
MessageChannel foo = context.getBean("foo", MessageChannel.class);
for (int i = 0; i < 15; i++) {
foo.send(MessageBuilder.withPayload("foo" + i).build());
}
getOutput(context);
// now with an RLR...
foo = context.getBean("routedMessageChannel", MessageChannel.class);
for (int i = 0; i < 15; i++) {
foo.send(MessageBuilder.withPayload("foo" + (i + 15)).build());
}
getOutput(context);
context.close();
}
private static void getOutput(ConfigurableApplicationContext context) {
int n = 0;
PollableChannel out = context.getBean("replyQueueChannel", PollableChannel.class);
for (int i = 0; i < 15; i++) {
Message<?> received = out.receive(10000);
if (received != null) {
System.out.println(received);
n++;
}
}
out = context.getBean("mailQueueChannel", PollableChannel.class);
for (int i = 0; i < 15; i++) {
Message<?> received = out.receive(10000);
if (received != null) {
System.out.println(received);
n++;
}
}
out = context.getBean("auditlogQueueChannel", PollableChannel.class);
for (int i = 0; i < 15; i++) {
Message<?> received = out.receive(10000);
if (received != null) {
System.out.println(received);
n++;
}
}
Assert.state(n == 45, "expected 45 messages");
}
@Autowired
private JedisConnectionFactory connectionFactory;
@Bean
public MessageChannel replyQueueChannel() {
return new QueueChannel(new MessageGroupQueue(redisMessageStore(), "replyQueue", 1000));
}
@Bean
public MessageChannel mailQueueChannel() {
return new QueueChannel(new MessageGroupQueue(redisMessageStore(), "mailQueue", 1000));
}
@Bean
public MessageChannel auditlogQueueChannel() {
return new QueueChannel(new MessageGroupQueue(redisMessageStore(), "auditLogQueue", 1000));
}
@Bean
public RedisChannelMessageStore redisMessageStore() {
return new RedisChannelMessageStore(connectionFactory);
}
@Bean
public MessageChannel foo() {
return new DirectChannel();
}
@Bean
public Foo fooService() {
return new Foo();
}
@Bean
public MessageChannel routedMessageChannel() {
return new DirectChannel();
}
// Router Technique 1
// @Bean
// @ServiceActivator(inputChannel="routedMessageChannel")
// public RecipientListRouter router() {
// RecipientListRouter router = new RecipientListRouter();
// List<Recipient> recipients = new ArrayList<>();
// recipients.add(new Recipient(replyQueueChannel()));
// recipients.add(new Recipient(mailQueueChannel()));
// recipients.add(new Recipient(auditlogQueueChannel()));
// router.setRecipients(recipients);
// return router;
// }
@MessageEndpoint
public static class Foo {
@Autowired
private MessageChannel replyQueueChannel;
@Autowired
private MessageChannel mailQueueChannel;
@Autowired
private MessageChannel auditlogQueueChannel;
private List<MessageChannel> channels;
@ServiceActivator(inputChannel="foo")
public void sendIt(Message<String> deferredMsg) {
replyQueueChannel.send(deferredMsg);
auditlogQueueChannel.send(deferredMsg);
mailQueueChannel.send (deferredMsg);
}
// Router Technique 2
@Router(inputChannel="routedMessageChannel")
public List<MessageChannel> route(Message<?> message) {
if (this.channels == null) {
this.channels = Arrays.asList(new MessageChannel[] { this.replyQueueChannel, this.mailQueueChannel,
this.auditlogQueueChannel });
}
return this.channels;
}
}
}
编辑2
有两种技术可以在Java配置中创建路由器。
使用XML中声明的endpoint会创建2个bean-一个消息处理程序和一个消费者。使用JavaConfig时,任何
MessageHandler
@Bean
都可以用@ServiceActivator
注释以创建相应的消费者。
或者,您可以使用注释为
@路由器
的POJO方法。
我已经更新了上面的示例以显示其他技术。这对我来说也很好。
您可以在github上找到完整的项目。
正如我所说,如果您以某种方式丢失消息,那么应该使用调试日志记录以及redis cli命令来了解发生了什么。
我是Spring集成的新手 我的目标是将信息从一个渠道传递到另一个渠道(链式过程) 通道1--- 1. 尝试: 1.当我尝试使用@transformer无法与“erroeChannel”通信时。 问题: 找到答案 } 其他通道代码也是如此
我需要在我的Spring集成上下文中动态地将消息分配给MessageChannel。当我知道我想要的MessageChannel的名称时,我可以通过从上下文中获取MessageChannel bean来做到这一点。 我需要做的是通过编程查找在ChannelAdapter/服务中设置的消息通道的名称/id。 但是,MessageChannel API没有与之关联的getName()或getId()方
如何在不使用XML的情况下将2个通道输出到具有Spring集成的单个通道。类似于以下问题多通道的消息进入单通道 在我的上下文中,我有2个PollableChannel bean,我希望将消息从这两个bean(非聚合)路由到一个@ServiceActivator,即完成如下操作:
如果Spring集成通道是用任务执行器定义的,那么线程池将用于处理传入的消息。如果service activator或transformerendpoint组件从该内部通道接收消息,是否会实例化一个endpoint组件池,每个线程一个?如果这不是默认行为,那么需要什么配置来实现这一点? 这一点很重要,原因有二: > 以确保endpoint组件在内部通道使用的同一线程中处理消息,因此它们是同一事务的
所以,问题在于路由器。当路由器尝试向通道发送消息时,我会收到错误:Dispatcher没有通道“newTypingNotificationHandler”的订户。输入'。但我有这个频道名称的集成流防御。 原因: org.springframework.integration.MessageDispatching异常:调度器在org.springframework.integration.dispa
我需要实现一个由多个步骤组成的集成流程,每个步骤都可以由不同数量的处理器(插件)执行。 到目前为止我所拥有的: 预期的行为如下: 通过网关发送第一个请求 一切正常,但结果不是预期的,我只收到2个(随机)项目,而不是4个。 我认为问题在于聚合器仅在两个项目之后触发发布,因为“step/2”通道中的“apply sequence”覆盖了“step/1”中的“apply sequence”。所以问题是: