当前位置: 首页 > 知识库问答 >
问题:

如何在Spring集成中推迟消息的消耗

常自强
2023-03-14

我正在开发一个使用Spring Integration 5.0.1和Spring Boot 2.0.0的应用程序。RC1

目前,应用程序响应Application ationReadyEvents并运行一些可能需要一段时间才能完成的初始化代码。这不使用任何Spring集成组件。

我还有一些非常基本的集成流,使用JavaDSL编写,并在配置中声明为bean。

有什么方法可以推迟流何时开始消耗消息吗?我希望能够在初始化完成时手动启动它们。

配置ControlBus似乎是解决方案,但我不知道如何将类似的东西与其他流连接起来。

以下是流如何使用消息的示例:

IntegrationFlows.from(sourceGateway)
                .transform(Transformers.fromJson(IncomingTask.class, jsonObjectMapper))

                .handle(IncomingTask.class, (incomingTask, headers) -> {

                //stuff with the task here.

                })

                .get();

共有1个答案

卫甫
2023-03-14

好的,你绝对可以在这件事上使用ControlBus。对于Java DSL,它看起来像:

@Bean
public IntegrationFlow controlBus() {
    return IntegrationFlowDefinition::controlBus;
}

要使用它,您需要:

@Autowired
@Qualifier("controlBus.input")
private MessageChannel controlBusChannel;

现在我们需要知道您的目标intgraion Flow是如何启动的。什么消耗消息。例如,我有这个:

@Bean
public IntegrationFlow fileFlow1() {
    return IntegrationFlows.from("fileFlow1Input")
            .handle(Files.outboundAdapter(tmpDir.getRoot()),
                        c -> c.id("fileWriting").autoStartup(false))
                .get();
    }

注意c.id("fileWrit"). autoStartup(false)id是用于endpointbean的,它可以通过发送到Control Bus的命令来访问。autoStartup(false)意味着它不会立即消耗消息,而是只有当我们调用start()时才会消耗消息。我是这样做的:

this.controlBusChannel.send(new GenericMessage<>("@fileWriting.start()"));

您应该在配置中确保将消息消耗推迟到您需要的时间。

 类似资料:
  • 我有一个集成应用程序,大部分工作,但注意到昨天一个消息丢失了。当时,service-activatorendpoint正忙于处理先前的消息。 以下是适用于该问题的配置。

  • 我正在使用Spring Kafka1.0.3来消费kafka消息。Kafka的2个主题,每个主题有1个分区。在java代码中,有2@KafKalistener来消费每个主题消息。ConcurrentKafkaListenerContainerFactory的并发设置为1。但消息有时会延迟20秒以上。 有人知道为什么吗? 添加调试日志,并且延迟不是每次都可以,有时也可以:

  • 我配置了一个基于Web服务的入站消息传递网关。我想记录传入的SOAP消息(信封和里面的所有消息)。最好的方法是什么? 我曾尝试使用带有日志通道适配器的有线抽头,但不知道一个好的表达式值来获取实际的SOAP XML。如果入站网关配置为不提取有效负载,则我将SaajSoapMessage视为有效负载,否则将DOMSource视为有效负载。是否有一个表达式将SaajSoapMessage作为XML字符串

  • 在Spring集成中使用出站网关时,我试图在JMS标头中发送回复Q详细信息。我了解到JIRA#INT-97中的增强功能在将Spring消息标头发送到JMS目标之前将其复制到JMS标头。 在将消息发送到出站网关之前,将消息头设置如下。message.getHeader(). setAtcm(JmsTargetAdapter.JMS_REPLY_TO, myReplyDestation); 但是我无法

  • 如何使用Apache Kafka生成/使用延迟消息?标准的Kafka(和Java的kafka-client)功能似乎没有这个特性。我知道我自己可以用标准的等待/通知机制来实现它,但是它看起来不是很可靠,所以任何建议和好的实践都很感谢。 找到相关问题,但没有帮助。正如我所看到的:Kafka基于从文件系统的顺序读取,并且只能用于直接读取主题,保持消息的顺序。我说的对吗?

  • 我需要在我的Spring集成上下文中动态地将消息分配给MessageChannel。当我知道我想要的MessageChannel的名称时,我可以通过从上下文中获取MessageChannel bean来做到这一点。 我需要做的是通过编程查找在ChannelAdapter/服务中设置的消息通道的名称/id。 但是,MessageChannel API没有与之关联的getName()或getId()方