当前位置: 首页 > 知识库问答 >
问题:

消息聚合的Camel条件

艾宏远
2023-03-14

我正在寻找一种基于消息聚合的有条件处理消息的方法。我已经研究了很多方法来实现这一点,但似乎Apache Camel不支持它。我会解释这个场景,然后解释我尝试的解决方案。

  • 我试图再次获取文件来处理它们。问题是,据我所知,你不能让消费者按需购买。我尝试使用pollenfrice,但它只能获取单个文件,而不是目录中的所有文件。
  • 我试图筛选/停止父路由。这里的问题是filter()/choice...stop()/end()只会停止具有目录大小的聚合路由,而不会停止具有文件消息的父路由。我不能有条件地处理这些。
  • 我试图将聚合条件移动到我将调用的另一个路由,但这会导致与第一个解决方案相同的问题。

我考虑做的事情:

  • 重写聚合策略,以便不仅将大小聚合,还将文件本身聚合到一个GroupedExchange中。这样,我可以在检查之后再次拆分聚合。我真的不喜欢这个解决方案,因为它会导致很多样板文件,在运行时的代码中都是如此。
  • 将文件大小计算器移到处理器而不是聚合器。这会首先挫败使用骆驼的目的…我会手动获取文件并添加大小…对于每一个文件都是如此..
  • 使用ControlBus在该目录上动态启动删除路由。再一次,为了实现一些我认为应该可以通过简单路线完成的事情,我需要大量的变通方法。
  • 我想设置每个父消息的计算大小,但我不知道如何实现?
  • 另一种我没有想到的停止父路由的方法?

共有1个答案

徐学潞
2023-03-14

Camel真的很棒,但有时确实很难确定要使用哪种设计模式;)

首先,您需要保留文件对象的副本,因为在达到阈值之前,您不知道是否要删除它们--基本上(至少)有两种方法可以做到这一点。

备选案文1

第一种方法是在exchange属性中使用列表。无论您对exchange主体做什么,此属性都将一直存在。如果您查看GroupedExchangeAggregationStrategy的源代码,它可以做到以下几点:

        list = new ArrayList<Exchange>();
        answer.setProperty(Exchange.GROUPED_EXCHANGE, list);
        // ...
        list.add(newExchange);

也可以在自己的exchange属性上手动执行相同的操作。在任何情况下,都可以像您所做的那样使用分组聚合策略。

  • 通过GroupedExchangeAggregationStrategy聚合以将单个文件保存在列表中
  • 计算文件总大小(使用处理器,或使用自定义聚合策略)
  • 使用筛选器(简单(“${body}<123”))
  • 通过拆分器(简单(“${property.camelgroupedexchange}”))“展开”聚合
  • 逐个删除文件

请让我知道,如果这是没有意义的,或者如果我误解了你的问题在任何方面。

 类似资料:
  • 我有一个简单的camel MINA服务器,使用JAVA DSL,我的运行方式与这里记录的示例类似: 独立运行骆驼并让它在JAVA中继续运行 MINA 2组件 我正在尝试创建一个托管在mina:tcp://localhost:9991(又名MyApp_B)的示例应用程序,该应用程序向托管在mina:tcp://localhost:9990(又名MyApp_A)的服务器发送一个非常简单的消息。 我想发

  • 我有一个用例: 我需要定期阅读和汇总Kafka主题的信息,并发布到不同的主题。本地存储不是一个选项。这就是我计划解决这个问题的方式,欢迎提出任何改进建议 为了安排Kafka消息的聚合和发布,计划使用聚合器EIP的completionInterval选项。这是代码。 路线是:

  • 我有两个jms消费者,每个都在不同的流中。我想使用另一个流来聚合这两条消息的消息。并且还需要保留相关ID,因为我需要分割有效负载并发送回消息。 我尝试将分散收集与两个入站 VM 一起使用,但收到以下错误: 由以下原因导致:组织.xml.sax.SAXParse 异常:cvc-complex-type.2.4.a:发现以元素“vm:入站终结点”开头的无效内容。“{”http://www.muleso

  • 我有一个集合,其中的文档如下所示:

  • 我试图开发以下过滤器与熊猫数据帧: 我有四列,,,和 如何将其作为聚合函数编写? 下面是一个编写效率低下的工作示例: 输出: