当前位置: 首页 > 知识库问答 >
问题:

通过JDK 7下的Java DSL配置Spring集成聚合器

胡安怡
2023-03-14

这是我第一次在Java7下通过DSL配置Spring集成。因为我们知道Lambda表达式只在Java8下工作。所以我参考了Spring集成JavaDSL和Spring集成JavaDSL(Java8):逐行教程的例子,使我的配置如下收集相同资源的每100条消息发送到远程RESTful服务。

@Bean
public IntegrationFlow rawDataParsingAndSendingFlow(@Autowired HttpRequestExecutingMessageHandler httpOutboundAdapter,
                                          @Autowired @Qualifier("headerEnricher") HeaderEnricher headerEnricher) {

    return IntegrationFlows.from("rawStringParsingRequestChannel")
                           .transform(new RawStringToCheckDataMessageTransformer())
                           .transform(new DataMessageToDtoTransformer())
                           .aggregate(new Consumer<AggregatorSpec>(){

                                @Override public void accept(AggregatorSpec aggregatorSpec) {
                                    aggregatorSpec.processor(new SimpleMessageGroupProcessor(), null)
                                                  .correlationStrategy(new HeaderAttributeCorrelationStrategy("resource"))
                                                  .releaseStrategy(new MessageCountReleaseStrategy(100))
                                                  .sendPartialResultOnExpiry(true)
                                                  .groupTimeoutExpression("60000") ;
                                }
                           })
                           .transform(headerEnricher)
                           .transform(new ObjectToJsonTransformer())
                           .handle(httpOutboundAdapter)
                           .get();
}

然而,配置对我不起作用,它会引发如下异常。

Exception in thread "main" java.lang.IllegalStateException: Failed to process message list
    at org.springframework.integration.aggregator.MethodInvokingMessageListProcessor.process(MethodInvokingMessageListProcessor.java:79)
    at org.springframework.integration.aggregator.MethodInvokingMessageGroupProcessor.aggregatePayloads(MethodInvokingMessageGroupProcessor.java:86)
    at org.springframework.integration.aggregator.AbstractAggregatingMessageGroupProcessor.processMessageGroup(AbstractAggregatingMessageGroupProcessor.java:84)
    at org.springframework.integration.dsl.AggregatorSpec$MessageGroupProcessorWrapper.processMessageGroup(AggregatorSpec.java:127)
    at org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler.completeGroup(AbstractCorrelatingMessageHandler.java:665)
    at org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler.handleMessageInternal(AbstractCorrelatingMessageHandler.java:418)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127)
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:148)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:121)
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:89)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:423)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:373)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45)
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:105)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:358)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:269)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:186)
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:115)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127)
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:148)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:121)
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:89)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:423)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:373)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45)
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:105)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:358)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:269)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:186)
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:115)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127)
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:148)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:121)
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:89)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:423)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:373)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45)
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:105)
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.convertAndSend(AbstractMessageSendingTemplate.java:143)
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.convertAndSend(AbstractMessageSendingTemplate.java:135)
    at org.springframework.integration.gateway.MessagingGatewaySupport.send(MessagingGatewaySupport.java:392)
    at org.springframework.integration.gateway.GatewayProxyFactoryBean.invokeGatewayMethod(GatewayProxyFactoryBean.java:477)
    at org.springframework.integration.gateway.GatewayProxyFactoryBean.doInvoke(GatewayProxyFactoryBean.java:429)
    at org.springframework.integration.gateway.GatewayProxyFactoryBean.invoke(GatewayProxyFactoryBean.java:420)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:179)
    at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:213)
    at com.sun.proxy.$Proxy45.sendRawData(Unknown Source)
    at prototype.healthcloud.historic.data.pusher.HistoricDataRetriever$1.extractData(HistoricDataRetriever.java:82)
    at prototype.healthcloud.historic.data.pusher.HistoricDataRetriever$1.extractData(HistoricDataRetriever.java:68)
    at org.springframework.jdbc.core.JdbcTemplate$1.doInPreparedStatement(JdbcTemplate.java:697)
    at org.springframework.jdbc.core.JdbcTemplate.execute(JdbcTemplate.java:633)
    at org.springframework.jdbc.core.JdbcTemplate.query(JdbcTemplate.java:684)
    at org.springframework.jdbc.core.JdbcTemplate.query(JdbcTemplate.java:716)
    at org.springframework.jdbc.core.JdbcTemplate.query(JdbcTemplate.java:726)
    at prototype.healthcloud.historic.data.pusher.HistoricDataRetriever.retrieveHistoricData(HistoricDataRetriever.java:92)
    at prototype.healthcloud.historic.data.pusher.Application.main(Application.java:119)
Caused by: org.springframework.expression.AccessException: Unable to access property 'payload' through getter method
    at org.springframework.expression.spel.support.ReflectivePropertyAccessor$OptimalPropertyAccessor.read(ReflectivePropertyAccessor.java:640)
    at org.springframework.expression.spel.ast.PropertyOrFieldReference.readProperty(PropertyOrFieldReference.java:211)
    at org.springframework.expression.spel.ast.PropertyOrFieldReference.getValueInternal(PropertyOrFieldReference.java:94)
    at org.springframework.expression.spel.ast.PropertyOrFieldReference.getValueInternal(PropertyOrFieldReference.java:81)
    at org.springframework.expression.spel.ast.MethodReference.getArguments(MethodReference.java:154)
    at org.springframework.expression.spel.ast.MethodReference.getValueRef(MethodReference.java:71)
    at org.springframework.expression.spel.ast.CompoundExpression.getValueRef(CompoundExpression.java:66)
    at org.springframework.expression.spel.ast.CompoundExpression.getValueInternal(CompoundExpression.java:87)
    at org.springframework.expression.spel.ast.SpelNodeImpl.getTypedValue(SpelNodeImpl.java:131)
    at org.springframework.expression.spel.standard.SpelExpression.getValue(SpelExpression.java:330)
    at org.springframework.integration.util.AbstractExpressionEvaluator.evaluateExpression(AbstractExpressionEvaluator.java:169)
    at org.springframework.integration.util.MessagingMethodInvokerHelper.processInternal(MessagingMethodInvokerHelper.java:319)
    at org.springframework.integration.util.MessagingMethodInvokerHelper.process(MessagingMethodInvokerHelper.java:160)
    at org.springframework.integration.aggregator.MethodInvokingMessageListProcessor.process(MethodInvokingMessageListProcessor.java:73)
    ... 61 more
Caused by: java.lang.reflect.InvocationTargetException
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.springframework.expression.spel.support.ReflectivePropertyAccessor$OptimalPropertyAccessor.read(ReflectivePropertyAccessor.java:636)
    ... 74 more
Caused by: java.lang.IllegalStateException: Invalid method parameter for payload: was expecting collection.
    at org.springframework.util.Assert.state(Assert.java:70)
    at org.springframework.integration.util.MessagingMethodInvokerHelper$ParametersWrapper.getPayload(MessagingMethodInvokerHelper.java:920)
    ... 79 more

根本原因在o.s.i.u.MessagingMethodInvokerHelper$HandlerMethod generateExpression中,annotationType为NULL,parameterType o.s.i.s.MessageGroup既不是集合集合的子接口

由于我的聚合逻辑非常简单,我找到了一个解决方案,方法是如下所示指定输出表达式。

@Bean
public IntegrationFlow rawDataParsingAndSendingFlow(@Autowired HttpRequestExecutingMessageHandler httpOutboundAdapter,
                                          @Autowired @Qualifier("headerEnricher") HeaderEnricher headerEnricher) {

    return IntegrationFlows.from("rawStringParsingRequestChannel")
                           .transform(new RawStringToCheckDataMessageTransformer())
                           .transform(new DataMessageToDtoTransformer())
                           .aggregate(new Consumer<AggregatorSpec>(){

                                @Override public void accept(AggregatorSpec aggregatorSpec) {
                                    aggregatorSpec.outputExpression("#this.![payload]")
                                                  .correlationStrategy(new HeaderAttributeCorrelationStrategy("resource"))
                                                  .releaseStrategy(new MessageCountReleaseStrategy(100))
                                                  .sendPartialResultOnExpiry(true)
                                                  .groupTimeoutExpression("60000") ;
                                }
                           })
                           .transform(headerEnricher)
                           .transform(new ObjectToJsonTransformer())
                           .handle(httpOutboundAdapter)
                           .get();
}

到目前为止,变通解决方案对我来说是可行的,但我的问题是,如果聚合逻辑很复杂,如何配置处理器。


共有1个答案

戴品
2023-03-14

聚合器pec.processor(new SimpleMessageGroupProcencer(), null)

你不能用具体的MessageGroupProcessor来处理这个方法;它需要一个POJO bean和方法名(如果bean上只有一个符合条件的方法,则可以为null)。

使用

aggregatorSpec。outputProcessor(新的SimpleMessageGroupProcessor())

请注意,该处理器的输出将是消息组,这可能不是您想要的。

您可能需要考虑使用<代码> Debug TegGalgEngEngEngulpPultPux<代码>(如果您不提供<代码> OutPuthPrime<代码>),则默认为默认值。

 类似资料:
  • 我正在尝试做一个GroupBy基于共享ID的GeoJSON功能列表,以便通过使用拆分/聚合来聚合这些功能的单个字段,如下所示: 除非我取消对这三行的注释,否则聚合器永远不会发布组,数据库也不会收到任何更新。如果我将groupTimeout设置为小于5秒,则会丢失部分结果。 我预计发布策略默认为,我预计在处理完所有(拆分)功能后会自动释放所有组(REST服务消息中总共只有129个功能)。手动将其设置

  • 我想使用聚合器从两条消息中创建一条消息,但我不知道如何做到这一点。 目前,我正在从一个目录中读取两个文件,并希望将这些消息聚合为一个。 我的整个项目是这样的: 读入。拉链- 如果我可以在解压缩文件后发送一条包含两个有效负载的消息,那就太好了,但在读取后聚合就足够了。 我的拉链看起来像这样: 它将这些文件放入两个目录中,我使用FileReadingMessageSource再次从中读取它们。我还想只

  • 如何通过注释而不是常规配置文件配置入站通道适配器?我可以为会话工厂定义bean,如下所示: 如何配置通过注释下给出的入站通道适配器? 我正在寻找的是在应用程序启动时连接所有bean,然后公开一些方法来开始轮询服务器,处理它们,然后从本地删除它们,类似于 其中getPollableChannel()为我提供了用于轮询的bean。

  • 在我的用例中,最简单的集成组件安排是什么: 接收来自多个来源和多种格式的消息(所有消息都是JSON序列化对象)。 将消息存储在缓冲区中最多10秒(聚合) 通过不同的类属性getter(例如class1.someId(),class2.otherId(),...) 释放所有分组的消息并转换为新的聚合消息。 到目前为止(第1点和第2点),我正在使用聚合器,但不知道3)处的问题是否有现成的解决方案或者我

  • 我有以下应用程序要求: 从RabbitMq接收消息,然后根据一些更复杂的规则进行聚合,例如基于属性(具有预先给定的类型时间映射)和基于消息在队列中等待的现有时间(属性) 正如您在图中看到的一个用例:三条消息已经聚合并等待下一秒发布(因为当前速率为),但就在那时,以到达,并更新了,使其成为优先级最高的第一条消息。因此,在下一个选项中,我们不再发布聚合3,而是发布聚合2,因为它现在具有更高的优先级。

  • 我正在尝试使用Spring集成配置以下内容: 向频道发送消息。 将此消息发布到与 n 个使用者的兔子扇出(发布/订阅)交换。 每个使用者都提供一条响应消息。 让 Spring 集成聚合这些响应,然后再将它们返回给原始客户端。 到目前为止,我有一些问题。。。 > 我使用发布订阅通道来设置属性,以便correlationId、sequenceSize sequenceSize 属性仅设置为 1,即使在