当前位置: 首页 > 知识库问答 >
问题:

在Spring Integration中,DSL如何指定对已发布频道的订阅?

曾航
2023-03-14

当使用Spring Integration DSL builder模式时,它通常会“自动”填充元素之间所需的通道。然而,有时情况并非如此。

在高层次上,包装应用程序将元数据保存在数据库中,以便根据需要动态创建和销毁我们(可能)从未见过的跨平台流。因此,流不适合使用静态符号(如@Bean)实例化,而是必须动态创建和销毁,并在运行时在Spring上下文中注册/取消注册。

我有一个在动态创建的主流中使用的已发布消息通道和一个在动态创建的子流中使用的通道,但是我看不到如何从子流订阅main PublishChannel。

这让我把消息推入频道,但没有订阅,什么都不会发生。

提前感谢。

一些先前的研究(不是详尽的清单:

https://github.com/spring-projects/spring-integration-flow

https://dzone.com/articles/spring-integration-building

https://xpadro.com/2014/05/spring-integration-4-0-a-complete-xml-free-example.html

Spring集成网关"Dispatcher没有订阅者"

Spring集成-如何调试“Dispatcher没有订阅服务器”?

原木剪

task-scheduler-1 2020-12-31 00:25:32,526 INFO  o.s.i.g.GatewayProxyFactoryBean - started b653ca1c-038d-4567-bd4e-4c16ecc502a3.org.springframework.integration.config.ConsumerEndpointFactoryBean#3#gpfb
task-scheduler-1 2020-12-31 00:25:32,538 DEBUG o.s.i.c.PublishSubscribeChannel - preSend on channel 'b653ca1c-038d-4567-bd4e-4c16ecc502a3.mainPublishChannel', message: GenericMessage [payload=[{... timestamp=1609395932538}]
task-scheduler-1 2020-12-31 00:25:32,539 DEBUG o.s.i.d.BroadcastingDispatcher - No subscribers, default behavior is ignore
task-scheduler-1 2020-12-31 00:25:32,539 DEBUG o.s.i.c.PublishSubscribeChannel - postSend (sent=true) on channel 'b653ca1c-038d-4567-bd4e-4c16ecc502a3.mainPublishChannel', message: GenericMessage [payload=[{aaa=ee}], headers={aaa=ee, sequenceNumber=1, replyChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@7771e96a, errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@7771e96a, sequenceSize=2, yyy=2020-12-24 11:15:30.915278, correlationId=0eef0e4e-768c-90db-fa7b-2d1767335b26, timestamp=1609395932538}]

代码剪切:

    String channelId=getId().toString()+'.'+"mainPublishChannel";
    MessageChannel channel = MessageChannels.publishSubscribe(channelId, stepTaskExecutor).get();

    final IntegrationFlowBuilder bldr = IntegrationFlows
        .from(setupAdapter,
                c -> c.poller(Pollers.fixedRate(pollerFixedRate, TimeUnit.MILLISECONDS).maxMessagesPerPoll(1)))
        .enrichHeaders(h -> h.headerExpression("xxx", "payload[0].get(\"xxx\")")
                .headerExpression("yyy", "payload[0].get(\"yyy\")"))
        .split(tableSplitter)
        .gateway(channel)   
        .routeToRecipients(r -> this.buildRecipientListRouterSpecForRules(r, rules, channel)
                    ) 
        .aggregate()
        .handle(cleanupAdapter)
        ;
...
snip
...
private RecipientListRouterSpec buildRecipientListRouterSpecForRules(RecipientListRouterSpec recipientListSpec,
        Collection<RuleMetadata> rules, MessageChannel publishedChannel) {

    // ??? How to subscribe this to publishedChannel??
    recipientListSpec
        .recipient(MessageChannels.publishSubscribe(this.getId().toString()+'.'+"mainReceiveChannel", stepTaskExecutor).get());
    
    rules.forEach(
            rule -> recipientListSpec.recipientFlow(getFilterExpression(rule), f -> createFlowDefForRule(f, rule)));
            
    recipientListSpec
        .ignoreSendFailures(true)
        .defaultOutputToParentFlow();
    
    return recipientListSpec;
}

共有1个答案

晁砚
2023-03-14

publishedChannel必须作为输入通道传递给子流

        return flowDef
            .channel(receiveChannel) //  <---- This is the reference to the main publish channel in the child flow, which allows the builder to create the subscription
            .log()
            .handle(inboundAdapter)
... snip ...

            ;
 类似资料:
  • 问题内容: 跟随Redis Pub / Sub 这工作正常,我可以使用以下任何语言发布消息 使用,我可以验证此请求是否已正确发布 当我将订阅者 块 添加到 其他类(侦听器类)中的 该频道时,问题就开始了,如下所示 中的,还表明侦听器已正确订阅 问题是,当我将订户侦听器类添加到相同的Rails应用程序时…它停止工作,导致侦听Redis服务器并停止执行任何其他代码…它只是坐在那里侦听。 因此,有一种方

  • 我正在尝试使用bot在discord服务器上创建一个语音频道,但我需要它在频道列表中的特定位置。我该怎么做?

  • 我正在开发一个使用Spring Boot版本2.0.4的Java应用程序。RELEASE和RabbitMQ版本3.7.7。该应用程序正在Redis数据库中缓存RabbitMQ中的所有消息,并且在RabbitMQ中创建新队列时必须重新发送。目前,我设法使用Event Exchange Plugin和队列名称捕获了队列创建。我正在使用AMQP出站适配器将消息发送回RabbitMQ。 流出 我可以使用r

  • 我已经完成了以下配置和java代码,以向Kafka主题发送消息。我只想发信息。 我正在使用Spring boot和kafka集成 Kafka应用程序已从Docker启动 Spring Boot应用程序通过简单的配置连接kafka 波姆。xml 应用yml 消息通道Bean 日志事件发布程序 Restendpoint发布消息 下面是我在执行其余endpoint时得到的异常跟踪。

  • 问题内容: 当前,我正在使用node.js和redis来构建应用程序,使用redis的原因是由于发布/订阅功能。该应用程序只是在用户进入用户或不在房间时通知管理员。 由于我想听join和disjoin事件,我的问题是我是否应该使用两个redisclient来听这两个事件,例如 或者只是使用一个redisclient来监听和分离回调中的逻辑 我知道这两种方式都是可行的,但是我不知道人们在哪种情况下会