当前位置: 首页 > 知识库问答 >
问题:

Spring集成-聚合和转换

连曜灿
2023-03-14

在我的用例中,最简单的集成组件安排是什么:

  1. 接收来自多个来源和多种格式的消息(所有消息都是JSON序列化对象)。
  2. 将消息存储在缓冲区中最多10秒(聚合)
  3. 通过不同的类属性getter(例如class1.someId(),class2.otherId(),...)
  4. 释放所有分组的消息并转换为新的聚合消息。

到目前为止(第1点和第2点),我正在使用聚合器,但不知道3)处的问题是否有现成的解决方案或者我必须尝试转换每条消息并检查对象的类型是否为class1,然后使用correlationstrategy someId,如果是class2,则使用otherId。

对于问题4.)-我可以手动编码一些东西-但是转换器似乎是一个很好的组件,我只是不知道是否有像聚合转换器这样的东西,我可以为每个输入类型指定映射规则。

更新

像这样的东西:

class One{
    public String getA(){ return "1"; }
}

class Two{
    public Integer getB(){ return 1; }
}

class ReduceTo{
    public void setId(Integer id){}
    public void setOne(One one){}
    public void setTwo(Two two){}
}

public class ReducingAggregator {


    @CorrelationStrategyMethod
    public String strategy(One one){
        return one.getA();
    }

    @CorrelationStrategyMethod
    public String strategy(Two two){
        return two.getB().toString();
    }

    @AggregatorMethod
    public void reduce(ReduceTo out, One in){
        out.setId(Integer.valueOf(in.getA()));
        out.setOne(in);
    }

    @AggregatorMethod
    public void reduce(ReduceTo out, Two in){
        out.setId(in.getB());
        out.setTwo(in);
    }
}

我想注释的用例与当前的Spring不同。RediceTo可以是包括集合在内的任何对象。在config中,我们可以指定第一次传递时应该是空列表还是其他东西(例如java流中的减少)。

共有1个答案

邵弘伟
2023-03-14

不确定您希望看到什么现成的解决方案。这是你的类,也是你的方法。框架如何对其做出决策?

嗯,是的,你需要实现Cor的策略。或者你可以考虑使用ExpressionEvalue atingCor的策略,不要写Java代码:-)。

请详细说明您希望看到的开箱即用功能。

聚合变换器完全封装在Aggregator的MessageGroupProcess函数中。默认情况下,它是DefaultAggregatingMessageGroupProcess。是的,您可以自己或再次编写代码-使用ExpressionEvalue atingMessageGroupProcess并且不要再次编写Java代码:-)

 类似资料:
  • 我想使用聚合器从两条消息中创建一条消息,但我不知道如何做到这一点。 目前,我正在从一个目录中读取两个文件,并希望将这些消息聚合为一个。 我的整个项目是这样的: 读入。拉链- 如果我可以在解压缩文件后发送一条包含两个有效负载的消息,那就太好了,但在读取后聚合就足够了。 我的拉链看起来像这样: 它将这些文件放入两个目录中,我使用FileReadingMessageSource再次从中读取它们。我还想只

  • 目前,我正在与spring integration合作开发新的应用程序,并启动了poc,以了解如何处理故障案例。在我的应用程序中,spring integration将接收来自IBM mq的消息,并根据消息类型验证头信息和到不同队列的路由。传入的消息可能是批量消息,所以我使用了spring integration的拆分器和聚合器,并且对技术工作流程有很好的进展和控制。目前我面临的问题很少,我们有I

  • 我有以下应用程序要求: 从RabbitMq接收消息,然后根据一些更复杂的规则进行聚合,例如基于属性(具有预先给定的类型时间映射)和基于消息在队列中等待的现有时间(属性) 正如您在图中看到的一个用例:三条消息已经聚合并等待下一秒发布(因为当前速率为),但就在那时,以到达,并更新了,使其成为优先级最高的第一条消息。因此,在下一个选项中,我们不再发布聚合3,而是发布聚合2,因为它现在具有更高的优先级。

  • 我们的应用程序中存在以下问题。消息通过入站通道适配器传入,并使用持久消息存储在聚合器中累积。一旦释放策略中定义的条件返回true,消息将被发送到处理的下一阶段。如果在下一个处理阶段抛出异常,事务将回滚,消息将再次放入持久消息存储中。但是,事务不会将消息放回原始队列,因为消息一旦放在聚合器中就会被确认。这不是我们想要的。理想情况下,如果在处理聚合器已批处理的其中一条下游消息时发生异常,则事务只会回滚

  • 问题:流输入仅适用于向聚合器发送输出通道输出的1个输入。随后的消息只进入丢弃通道logLateArvers。什么条件被用来发送到丢弃通道? 描述:尝试使用使用WebSphere的聚合器为基本jms移植Spring集成示例。 更新:-打开调试显示轮询器正在工作。消息被拉入并放到MQ,响应被拾取。但是,对于第一组消息之后的MQ场景,不使用AggregatingMessageHandler。消息被发送到

  • 如何识别代码中的组合和聚合?特别是在为现有代码绘制类图时? 我知道组成是“HAS-a”关系,聚合是“PART OF”关系。我知道,在组合子类中,实例将随类一起销毁,而在聚合中则不会。 下面是一个 C/CLI 代码 报警 报警.cpp 据我所知,警报和通知之间的联系是组合,因为没有就没有。我说的对吗?如果我是对的,我怎样才能使这段代码在两个类之间具有聚合关系?请问那里的代码示例? 请帮忙。