我对Spring Integration是新手。我尝试使用文件拆分器将消息从文件中拆分出来,然后使用.aggregate()构建单个消息并发送到输出通道。我有标记为true,因此apply-sequence现在默认为false。我已经使用EnricHeaders将correlationId设置为常量“1”。我在设置realease策略时遇到了困难,因为我没有在序列结束上保持。下面是我的代码的外观。
IntegrationFlows
.from(s -> s.file(new File(fileDir))
.filter(getFileFilter(fileName)),
e -> e.poller(poller))
.split(Files.splitter(true, true)
.charset(StandardCharsets.US_ASCII),
e -> e.id(beanName)).enrichHeaders(h -> h.header("correlationId", "1"));
IntegrationFlow integrationFlow = integrationFlowBuilder
.<Object, Class<?>>route(Object::getClass, m -> m
.channelMapping(FileSplitter.FileMarker.class, "markers.input")
.channelMapping(String.class, "lines.input"))
.get();
@Bean
public IntegrationFlow itemExcludes() {
return flow -> flow.transform(new ItemExcludeRowMapper(itemExcludeRowUnmarshaller)) //This maps each line to ItemExclude object
.aggregate(aggregator -> aggregator
.outputProcessor(group -> group.getMessages()
.stream()
.map(message -> ((ItemExclude) message.getPayload()).getPartNumber())
.collect(Collectors.joining(","))))
.transform(Transformers.toJson())
.channel(customSource.itemExclude());
}
@Bean
public IntegrationFlow itemExcludeMarkers() {
return flow -> flow
.log(LoggingHandler.Level.INFO)
.<FileSplitter.FileMarker>filter(m -> m.getMark().equals(FileSplitter.FileMarker.Mark.END))
.<FileHandler>handle(new FileHandler(configProps))
.channel(NULL_CHANNEL);
}
任何帮助都很感激。
我将在拆分器
之前为correlationid
移动头部enricher并使其如下所示:
.enrichHeaders(h -> h
.headerFunction(IntegrationMessageHeaderAccessor.CORRELATION_ID,
m -> m.getHeaders().getId()))
常量CorrelationID
在多线程环境中绝对不好:不同的线程拆分不同的文件并将不同的行发送到同一个聚合器。因此,使用“1”
作为关联键,您将始终拥有一个要聚合和释放的组。默认的序列行为是将原始消息ID
填充到CorrelationID
。由于您将不依赖filesplitter
中的applysequence
,所以我建议使用简单的解决方案来模拟这种行为。
正如Gary在回答中指出的,您需要考虑自定义releaseStrategy
,并将filesplitter.fileMarker
发送到聚合器。filesplitter.fileMarker.end
具有lineCount
属性,可以将该属性与messageGroup.size
进行比较,以决定是否可以释放组。在生成输出结果的过程中,MessageGroupProcessor
确实必须过滤FilespLitter.FileMarker
消息。
我想用Spring集成创建一个简单的IntegrationFlow,但我遇到了一些困难。 我想创建一个集成流,从Rabbit Mq中的队列中获取消息并将消息发布到endpointRest。 我要处理的问题是,当一个请求失败时,它会继续无休止地重试,如何在这段代码中实现重试策略?例如,我想要3次重试,第一次重试在1秒后,第二次重试在5秒后,第三次重试在1分钟后。
问题内容: 如何建立持续集成标签而不是分支的实践? 我具有标签存储库目录的以下结构: 我想配置我的持续集成工具(可以是CruiseControl,Hudson和Jenkins到TeamCity的任何工具),以在两个文件夹中的任何一个中创建了最新标记。 例如,如果结构已更改并且标签已出现在目录中,我想触发标签下的源代码构建: 是否可以使用任何现有的持续集成工具在标签下构建源代码,或者为此目的我应该编
构建 当项目开发完毕,只需要运行一行命令就可以打包你的应用: # 打包正式环境 npm run build:prod # 打包预发布环境 npm run build:stage 构建打包成功之后,会在根目录生成 dist 文件夹,里面就是构建打包好的文件,通常是 ***.js 、***.css、index.html 等静态文件。 如果需要自定义构建,比如指定 dist 目录等,则需要通过 co
在项目正常开发,预览效果达到要求的情况下,可以构建项目来进行更多的测试。 导读 熟悉构建发布面板 了解通用构建参数 命令行发布项目 定制项目的构建模版 扩展构建流程 构建流程简介与常见错误处理 发布到 web 平台 发布到原生平台 安装配置原生环境 原生平台 JavaScript 调试 发布到 iOS App Clip (轻 App) 发布到支付宝小游戏 发布到字节小游戏 发布到 Cocos Pl
我们有一个包含多个子项目的多项目gradle存储库。其中一个子项目应该生成单独的testJar并将其发布到本地maven存储库: 这有效,我可以看到是在中生成的。 然后,在根项目gradle中。kts我们设立了一个发布扩展: 这也正常工作,我可以看到模块发布在目录中。但是,没有发布。我找不到如何将工件附加到根项目中定义的发布。
现在我们来试一试从代码托管平台上拉取代码,并进行镜像构建和持续集成。 创建新项目并构建镜像 在控制台点击「代码构建」,然后在「代码构建」的界面中点击「创建新项目」。 第一步:填写项目名称 在「项目名称」上填写该项目的 Docker 镜像名称。 第二步:选择代码库 从第三方代码托管平台中选择我们想要构建的代码库,如果找不到需要的代码库可以点击右上角刷新按钮同步列表。 第三步:开启持续集成 持续集成是