我想使用聚合器从两条消息中创建一条消息,但我不知道如何做到这一点。
目前,我正在从一个目录中读取两个文件,并希望将这些消息聚合为一个。
我的整个项目是这样的:
读入。拉链-
如果我可以在解压缩文件后发送一条包含两个有效负载的消息,那就太好了,但在读取后聚合就足够了。
我的拉链看起来像这样:
public class ZipHandler extends AbstractMessageHandler {
File dat;
File json;
@Override
protected void handleMessageInternal(Message<?> message) throws Exception {
byte[] buffer = new byte[1024];
try {
File file = (File) message.getPayload();
ZipFile zip = new ZipFile(file);
for (Enumeration<? extends ZipEntry> entries = zip.entries(); entries
.hasMoreElements();) {
ZipEntry ze = entries.nextElement();
String name = ze.getName();
if (name.endsWith(".dat") || name.endsWith(".DAT")) {
InputStream input = zip.getInputStream(ze);
File datFile = new File("D:/lrtrans/zipOut"
+ File.separator + name);
FileOutputStream fos = new FileOutputStream(datFile);
int len;
while ((len = input.read(buffer)) > 0) {
fos.write(buffer, 0, len);
}
this.dat = datFile;
fos.close();
} else if (name.endsWith(".json") || name.endsWith(".JSON")) {
InputStream input = zip.getInputStream(ze);
File jsonFile = new File("D:/lrtrans/zipOut"
+ File.separator + name);
FileOutputStream fos = new FileOutputStream(jsonFile);
int len;
while ((len = input.read(buffer)) > 0) {
fos.write(buffer, 0, len);
}
this.json = jsonFile;
fos.close();
}
}
zip.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
它将这些文件放入两个目录中,我使用FileReadingMessageSource再次从中读取它们。我还想只使用基于注释的表示法而不是xml来解决这个问题。
编辑:
我只想使用DefaultAggregatingMessageGroupProssor和基于标题“zip”的correlationStrategy,以及基于消息的releaseStrategy,因为在这种情况下,两个文件应该组合成一个。
@Aggregator(inputChannel = "toAggregatorChannel", outputChannel = "toRouterChannel", discardChannel = "nullChannel")
public DefaultAggregatingMessageGroupProcessor aggregate(){
DefaultAggregatingMessageGroupProcessor aggregator = new DefaultAggregatingMessageGroupProcessor();
return aggregator;
}
@CorrelationStrategy
public String correlateBy(@Header("zipFile") String zip){
return zip;
}
@ReleaseStrategy
public boolean isReadytoRelease(List<Message<?>> messages) {
return messages.size() == 2;
}
我会说你走对了路。由于您压缩了其中的多个文件,因此正确的要求是解压缩它并将这些文件收集为一条消息并发送以进行进一步的处理。
所以,是的,<代码>
不知道如何解压缩它们,但您确实可以使用zip文件名作为corICEF Key
并使用多个文件作为组大小来确定释放组的信号。
请随意问更多问题。但首先我需要看看你的“拉链”。
更新
首先,基于注释的聚合器配置有点有限,最好在AggregatingMessageHandler
@Bean
上使用@ServiceActivator
以更好地控制其选项。
然而,即使您选择了它,您也可以实现您的需求。但聚合器的配置应遵循POJO方法调用原则:
@Aggregator(inputChannel = "toAggregatorChannel", outputChannel = "toRouterChannel", discardChannel = "nullChannel")
public List<File> aggregate(List<File> files){
return files;
}
像这样的。
在我的用例中,最简单的集成组件安排是什么: 接收来自多个来源和多种格式的消息(所有消息都是JSON序列化对象)。 将消息存储在缓冲区中最多10秒(聚合) 通过不同的类属性getter(例如class1.someId(),class2.otherId(),...) 释放所有分组的消息并转换为新的聚合消息。 到目前为止(第1点和第2点),我正在使用聚合器,但不知道3)处的问题是否有现成的解决方案或者我
我有以下应用程序要求: 从RabbitMq接收消息,然后根据一些更复杂的规则进行聚合,例如基于属性(具有预先给定的类型时间映射)和基于消息在队列中等待的现有时间(属性) 正如您在图中看到的一个用例:三条消息已经聚合并等待下一秒发布(因为当前速率为),但就在那时,以到达,并更新了,使其成为优先级最高的第一条消息。因此,在下一个选项中,我们不再发布聚合3,而是发布聚合2,因为它现在具有更高的优先级。
目前,我正在与spring integration合作开发新的应用程序,并启动了poc,以了解如何处理故障案例。在我的应用程序中,spring integration将接收来自IBM mq的消息,并根据消息类型验证头信息和到不同队列的路由。传入的消息可能是批量消息,所以我使用了spring integration的拆分器和聚合器,并且对技术工作流程有很好的进展和控制。目前我面临的问题很少,我们有I
我在应用程序中使用拆分器聚合器模式。我有以下配置- 我的所有通道(CH1、CH2、CH3)都是。Splitter输入通道CH1的源代码是一个文件。 在我的测试中,我观察到即使在CH1通道中添加两个文件,在给定时间也只有一个文件被处理。所以我在我的CH1通道中添加了一个轮询器,现在正在同时处理CH1通道上的多个输入消息。 在聚合器方面,我也注意到执行总是单线程的,即直到第一个线程完成执行,第二个线程
问题:流输入仅适用于向聚合器发送输出通道输出的1个输入。随后的消息只进入丢弃通道logLateArvers。什么条件被用来发送到丢弃通道? 描述:尝试使用使用WebSphere的聚合器为基本jms移植Spring集成示例。 更新:-打开调试显示轮询器正在工作。消息被拉入并放到MQ,响应被拾取。但是,对于第一组消息之后的MQ场景,不使用AggregatingMessageHandler。消息被发送到
我一直在使用Spring boot,并且在我的项目中删除了所有文件。不幸的是,它还使用了Spring集成,根据我的经验,Spring集成非常基于。 我有一个场景,要求我有一个聚合器,并让聚合器每隔秒轮询一次。 这可以像这样使用XML来完成(从以前的SO问题中获得的示例): 我已经找到了一个类,这个类有点像是在玩把戏,它的bean定义是: 但是,只有在与此处理程序关联的中接收到新消息时,才触发的方法