当前位置: 首页 > 知识库问答 >
问题:

Spring集成-查看尚未添加的消息的ReleaseStrategy

柯昆
2023-03-14

我有一些消息是按组(比如50个)提取到系统中的,需要按AGGREGATION\u ID分组到消息列表中,并进一步发送到流中。

我可以使用correlationStrategy来聚合该id,但我需要知道何时释放聚合的消息。在ReleaseStrategy中,我只能查看已经添加到聚合中的消息,但我需要知道在50个具有相同AGGREGATION\u ID的抓取组中何时不再有消息,才能知道何时发送该组。我该怎么做?

共有1个答案

丘普松
2023-03-14

ReleaseStrategy可以是对整个应用程序上下文具有完全访问权限的任何bean。如果您在聚合之前有存储这些消息的某个地方,您肯定可以从自定义ReleaseStrategy实现中查看该地方。

另一方面,我建议查看聚合器的groupTimeout选项:https://docs.spring.io/spring-integration/docs/5.3.0.M4/reference/html/message-routing.html#agg-和分组到。因此,按照正常的行为,您的组将按照预期的大小(50)进行收集,但当一段时间内没有新消息时,将发布一个组,其中包含迄今为止存在的任何消息。您还可以将该组超时配置为SpEL表达式,这样也可以访问应用程序上下文。

 类似资料:
  • 我试图理解在Spring集成中聚合时返回的类型,这相当困难。我正在使用Project Reactor,我的代码片段是: 除了理解示例中传递的类型之外,我还想知道如何才能知道中流动的对象及其类型。

  • 我有一个集成应用程序,大部分工作,但注意到昨天一个消息丢失了。当时,service-activatorendpoint正忙于处理先前的消息。 以下是适用于该问题的配置。

  • 使用Java 8 streams,我可以通过分类器对消息进行分组: 我想写一个聚合器,将消息进行相应的分组。在如上所示的五条有效载荷消息中,我想生成三条消息:第一条消息应将“a”作为有效载荷,第二条消息应将三条“b”作为有效载荷,第三条消息应将“c”作为有效载荷。 当达到序列大小时,应释放所有消息组。基于有效负载的分组工作正常,但消息组永远不会被释放。 在发布策略中,我可以访问序列大小,但我无法找

  • 我正在尝试创建一个TCP服务器,该服务器在端口5002上接受来自外部程序的消息。但是,它不接收来自外部程序的消息。 为了验证我的TCP服务器是否正常工作,我像这样使用了telnet,程序确实收到了文本“hello”。 设置wireshark时,我可以看到计算机正在端口5002上接收来自外部程序(我期待)的消息。为什么我的程序无法接收这些消息? 关于最终解决方案的最新情况: 由于负载没有停止线,我必

  • 我需要在我的Spring集成上下文中动态地将消息分配给MessageChannel。当我知道我想要的MessageChannel的名称时,我可以通过从上下文中获取MessageChannel bean来做到这一点。 我需要做的是通过编程查找在ChannelAdapter/服务中设置的消息通道的名称/id。 但是,MessageChannel API没有与之关联的getName()或getId()方

  • 我正在开发一个Spring应用程序,它每分钟将接收大约500条xml消息。下面的xml配置只允许每分钟处理大约60条消息,其余消息存储在队列中(持久化在DB中),并以每分钟60条消息的速率检索。 尝试从多个来源阅读文档,但仍然不清楚轮询器和任务执行器的角色。我对当前每分钟处理60条消息的理解是因为轮询器配置中的“固定延迟”值设置为10(因此它将在1分钟内轮询6次),“每轮询最大消息数”设置为10,