当前位置: 首页 > 知识库问答 >
问题:

Spring集成-路由到多个通道时丢失消息

居飞扬
2023-03-14

我是Spring集成的新手,正在研究一个从单个通道向多个通道发送消息的示例,从这个角度来看,为每个通道使用Redis消息存储,目的是不丢失任何消息。要求将消息发送到通道-replyChannel、mailChannel和dbChannel。目前,代码只打印sysout语句,没有主要功能

为了检查消息是否被正确路由,我编写了一个java测试类来发送15条消息。

检查输出,我发现一些消息正在丢失。也没有显示任何异常。

谢谢你的支持。

@Bean
public MessageChannel replyQueueChannel () {    return new QueueChannel (new MessageGroupQueue(redisMesageStore(),replyQueue, 1000));    }  

@Bean
public MessageChannel mailQueueChannel () {    return new QueueChannel (new MessageGroupQueue(redisMesageStore(),mailQueue, 1000));    }            

@Bean
public MessageChannel auditlogQueueChannel () {     return new QueueChannel (new MessageGroupQueue(redisMesageStore(),auditLogQueue, 1000));    }
@Override
@ServiceActivator (inputChannel="dbQueueChannel", poller =@Poller(fixedDelay="10", taskExecutor="dbServiceExecutor"))
public void executeDBConditions (Message<DeferredModel> deferredMsg) {
replyQueueChannel.send(deferredMsg);        
auditlogQueueChannel.send(deferredMsg);
mailQueueChannel.send (deferredMsg);        
}

共有1个答案

司寇嘉茂
2023-03-14

第一步总是为org启用调试日志记录。springframework;您将看到许多显示消息流的调试消息。

您还可以使用redis-cli监视命令监视redis以跟踪redis活动。

如果您无法使用这些技术弄清楚发生了什么,请将日志发布在github gist或pastebin等地方。

编辑

这是您的场景的工作版本;我对“丢失消息”没有问题。由于您没有显示完整的应用程序,我不知道您为什么会遇到问题。

我还添加了一个使用在Java中配置的RecipientListRout的版本-但是你真的应该把它作为一个新问题来问,而不是背着这个问题...

package com.example;

import java.util.Arrays;
import java.util.List;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.data.redis.connection.jedis.JedisConnectionFactory;
import org.springframework.integration.annotation.MessageEndpoint;
import org.springframework.integration.annotation.Router;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.redis.store.RedisChannelMessageStore;
import org.springframework.integration.store.MessageGroupQueue;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.PollableChannel;
import org.springframework.util.Assert;

@SpringBootApplication
public class So34628789Application {

    public static void main(String[] args) throws Exception {
        ConfigurableApplicationContext context = SpringApplication.run(So34628789Application.class, args);
        MessageChannel foo = context.getBean("foo", MessageChannel.class);
        for (int i = 0; i < 15; i++) {
            foo.send(MessageBuilder.withPayload("foo" + i).build());
        }
        getOutput(context);

        // now with an RLR...

        foo = context.getBean("routedMessageChannel", MessageChannel.class);
        for (int i = 0; i < 15; i++) {
            foo.send(MessageBuilder.withPayload("foo" + (i + 15)).build());
        }
        getOutput(context);

        context.close();
    }

    private static void getOutput(ConfigurableApplicationContext context) {
        int n = 0;
        PollableChannel out = context.getBean("replyQueueChannel", PollableChannel.class);
        for (int i = 0; i < 15; i++) {
            Message<?> received = out.receive(10000);
            if (received != null) {
                System.out.println(received);
                n++;
            }
        }
        out = context.getBean("mailQueueChannel", PollableChannel.class);
        for (int i = 0; i < 15; i++) {
            Message<?> received = out.receive(10000);
            if (received != null) {
                System.out.println(received);
                n++;
            }
        }
        out = context.getBean("auditlogQueueChannel", PollableChannel.class);
        for (int i = 0; i < 15; i++) {
            Message<?> received = out.receive(10000);
            if (received != null) {
                System.out.println(received);
                n++;
            }
        }
        Assert.state(n == 45, "expected 45 messages");
    }

    @Autowired
    private JedisConnectionFactory connectionFactory;

    @Bean
    public MessageChannel replyQueueChannel() {
        return new QueueChannel(new MessageGroupQueue(redisMessageStore(), "replyQueue", 1000));
    }

    @Bean
    public MessageChannel mailQueueChannel() {
        return new QueueChannel(new MessageGroupQueue(redisMessageStore(), "mailQueue", 1000));
    }

    @Bean
    public MessageChannel auditlogQueueChannel() {
        return new QueueChannel(new MessageGroupQueue(redisMessageStore(), "auditLogQueue", 1000));
    }

    @Bean
    public RedisChannelMessageStore redisMessageStore() {
        return new RedisChannelMessageStore(connectionFactory);
    }

    @Bean
    public MessageChannel foo() {
        return new DirectChannel();
    }

    @Bean
    public Foo fooService() {
        return new Foo();
    }

    @Bean
    public MessageChannel routedMessageChannel() {
        return new DirectChannel();
    }

// Router Technique 1
//  @Bean
//  @ServiceActivator(inputChannel="routedMessageChannel")
//  public RecipientListRouter router() {
//      RecipientListRouter router = new RecipientListRouter();
//      List<Recipient> recipients = new ArrayList<>();
//      recipients.add(new Recipient(replyQueueChannel()));
//      recipients.add(new Recipient(mailQueueChannel()));
//      recipients.add(new Recipient(auditlogQueueChannel()));
//      router.setRecipients(recipients);
//      return router;
//  }

    @MessageEndpoint
    public static class Foo {

        @Autowired
        private MessageChannel replyQueueChannel;

        @Autowired
        private MessageChannel mailQueueChannel;

        @Autowired
        private MessageChannel auditlogQueueChannel;

        private List<MessageChannel> channels;

        @ServiceActivator(inputChannel="foo")
        public void sendIt(Message<String> deferredMsg) {
            replyQueueChannel.send(deferredMsg);
            auditlogQueueChannel.send(deferredMsg);
            mailQueueChannel.send (deferredMsg);
        }

        // Router Technique 2
        @Router(inputChannel="routedMessageChannel")
        public List<MessageChannel> route(Message<?> message) {
            if (this.channels == null) {
                this.channels = Arrays.asList(new MessageChannel[] { this.replyQueueChannel, this.mailQueueChannel,
                        this.auditlogQueueChannel });
            }
            return this.channels;
        }

    }

}

编辑2

有两种技术可以在Java配置中创建路由器。

使用XML中声明的endpoint会创建2个bean-一个消息处理程序和一个消费者。使用JavaConfig时,任何MessageHandler@Bean都可以用@ServiceActivator注释以创建相应的消费者。

或者,您可以使用注释为@路由的POJO方法。

我已经更新了上面的示例以显示其他技术。这对我来说也很好。

您可以在github上找到完整的项目。

正如我所说,如果您以某种方式丢失消息,那么应该使用调试日志记录以及redis cli命令来了解发生了什么。

 类似资料:
  • 我是Spring集成的新手 我的目标是将信息从一个渠道传递到另一个渠道(链式过程) 通道1--- 1. 尝试: 1.当我尝试使用@transformer无法与“erroeChannel”通信时。 问题: 找到答案 } 其他通道代码也是如此

  • 我需要在我的Spring集成上下文中动态地将消息分配给MessageChannel。当我知道我想要的MessageChannel的名称时,我可以通过从上下文中获取MessageChannel bean来做到这一点。 我需要做的是通过编程查找在ChannelAdapter/服务中设置的消息通道的名称/id。 但是,MessageChannel API没有与之关联的getName()或getId()方

  • 如何在不使用XML的情况下将2个通道输出到具有Spring集成的单个通道。类似于以下问题多通道的消息进入单通道 在我的上下文中,我有2个PollableChannel bean,我希望将消息从这两个bean(非聚合)路由到一个@ServiceActivator,即完成如下操作:

  • 如果Spring集成通道是用任务执行器定义的,那么线程池将用于处理传入的消息。如果service activator或transformerendpoint组件从该内部通道接收消息,是否会实例化一个endpoint组件池,每个线程一个?如果这不是默认行为,那么需要什么配置来实现这一点? 这一点很重要,原因有二: > 以确保endpoint组件在内部通道使用的同一线程中处理消息,因此它们是同一事务的

  • 所以,问题在于路由器。当路由器尝试向通道发送消息时,我会收到错误:Dispatcher没有通道“newTypingNotificationHandler”的订户。输入'。但我有这个频道名称的集成流防御。 原因: org.springframework.integration.MessageDispatching异常:调度器在org.springframework.integration.dispa

  • 我需要实现一个由多个步骤组成的集成流程,每个步骤都可以由不同数量的处理器(插件)执行。 到目前为止我所拥有的: 预期的行为如下: 通过网关发送第一个请求 一切正常,但结果不是预期的,我只收到2个(随机)项目,而不是4个。 我认为问题在于聚合器仅在两个项目之后触发发布,因为“step/2”通道中的“apply sequence”覆盖了“step/1”中的“apply sequence”。所以问题是: