当前位置: 首页 > 知识库问答 >
问题:

为什么在默认Spring云流配置中更改Spring集成消息方法处理行为

宋新知
2023-03-14

我有一个已经使用Spring集成(5.1.6最新)的应用程序。配置如下流:

@Configuration
public class SomeConfigClass {
...
    @MessagingGateway(name = "someGateway")
    interface Gateway {
        @Gateway(requestChannel = "inboundChannel")
        @Payload("T(java.time.ZonedDateTime).now()")
        void replicate();
    }

    @Bean
    public DirectChannel inboundChannel() {
        return MessageChannels.direct().get();
    }

    @Bean
    public IntegrationFlow someFlow() {
        return IntegrationFlows.from(inboundChannel())
                .handle(someHandler())
                .channel(OUT)
                .get();
    }

    @Bean
    public SomeHandler someHandler() {
        return new SomeHandler();
    }
}

public class SomeHandler implements GenericHandler<Object> {
    @Override
    public Message<List<String>> handle(final Object payload, 
                                        final MessageHeaders headers) {
        ...
        return MessageBuilder
                .withPayload(someList)
                .copyHeaders(headers)
                .setHeader("custom", customHeader)
                .build();
    }
}

一切正常。

如果我试图在初始化的上下文中找到integrationArgumentResolverMessageConverterbean,我会看到下一个转换器:

  • MappingJackson2MessageConverter
  • ByteArrayMessageConverter
  • ObjectStringMessageConverter
  • GenericMessageConverter

之后,我添加到我的pom依赖项Spring Cloud Stream 2.1.2依赖项和Kinesis Binder 1.2.0。默认情况下配置绑定。

应用程序启动,但当我试图处理现有流时,它失败了,原因如下:

EL1004E:方法调用:在p. a. c. k. a. g. e类型上找不到方法句柄(java.time.ZonedDateTime,org.springframework.messaging.MessageHeaders)。在org.springframework.expression.spel.ast.的某个处理程序eference.findAccessorFor方法(方法eference.java:225)在org.springframework.expression.spel.ast.方法eference.get价值内部(方法eference.java:134)在org.springframework.expression.spel.ast.方法eference.access0美元(方法eference.java:54)在org.springframework.expression.spel.ast.方法参考$方法价值ef.get价值(方法eference.java:390)在org.springframework.expression.spel.ast.CompoundExpression.get价值内部(化合物xpression.java:90)在org.springframework.expression.spel.ast.SpelNodeImpl.getTypeValue(SpelNodeImpl.java:114)在org. springframework. space. handler. support。SpelExpress. getValue(SpelExpress. java: 365)在org. springframework. util。AbstractExpressionEvalue ator. java: 172)在org. springframework.集成。util。AbstractExpress sionEvalator.评估表达式(AbstractExpress. java: 160)在org. springframework.集成。handler. support。在org. springframework. handler. support上的MessagingFarodInvokerHelper.调用表达式(MessagingFarodInvokerHelper. java: 664)。在org. springframework. handler. support上的MessagingFarodInvokerHelper. tnkeHandlermethod(MessagingFarodInvokerHelper. java: 655)。在org. springframework. gmt. handler. support上的MessagingFarodInvokerHelper.进程(MessagingFarodInvokerHelper. java: 491)。在org. springframework. handler上的MessagingFarodInvokerHelper.进程(MessagingFarodInvokerHelper. java: 362)。在org. springframework. handler上的。服务活动GatewayProxyFactoryBean. invoke(GatewayProxyFactoryBean. java: 463)在org. springframework. aop. framework。在org. springframework. aop. framework。JdkDynamicAopProxy. Invoke(JdkDynamicAopProxy. java: 212)。代理。$Proxy444.复制(未知来源)

当我试图从初始化的上下文中获取相同的集成ArgumentResolverMessageConverbean时,我看到下一条链:

  • ApplicationJsonMessageMarshallingConverter
  • TupleJsonMessageConverter
  • ByteArrayMessageConverter
  • ObjectStringMessageConverter
  • JavaSerializationMessageConverter
  • KryoMessageConverter
  • jsonnamarshallingconverter

而且没有GenericMessageConver。据我所知,它不能被转换,因为这个转换器错过了(如果我错了,请纠正我)。

为什么在我添加Spring Cloud Stream默认配置时,行为会有所不同?或者如何定制使用特定流程的转换器链?或者如何为不同的集成流保持消息对话行为?

更新:因此,正如我所调查的那样,Spring云流不仅重新定义了默认的集成MessageConvers,而且它还重新定义了默认的HandlerFarodArgumentResolvers,用于将方法参数与消息映射。

在添加Spring云流之前:

  • 头方法ArgumentResolver
  • 头方法ArgumentResolver
  • MessageFarodArgumentResolver
  • PayloadExpressionArgumentResolver
  • NullAware PayloadArgumentResolver
  • PayloadsArgumentResolver
  • MapArgumentResolver
  • PayloadArgumentResolver

添加春云流后:

  • SmartPayloadArgumentResolver
  • 智能消息方法ArgumentResolver
  • 头方法ArgumentResolver
  • 头方法ArgumentResolver
  • PayloadExpressionArgumentResolver
  • NullAware PayloadArgumentResolver
  • PayloadExpressionArgumentResolver
  • PayloadsArgumentResolver
  • MapArgumentResolver

有两个已弃用的SmartPayloadArgumentResolverSmartMessageMethodArgumentResolver以及从字节[]有效负载转换为对象的修复程序。但我无法理解的是,为什么有两个PayloadExpressionArgumentResolver?。。

主要问题是:为什么Spring Cloud Stream默认应用程序上下文会影响Spring Integration默认应用程序上下文,我之前认为该流的解析器/转换器只与与流目标通道链接的消息endpoint相关。。。


共有2个答案

岳高明
2023-03-14

经过调查,我可以假设由于Spring Cloud Stream的问题和相应的Spring Framework的问题,方法消息处理的行为发生了变化。在Spring Binder自动配置中存在临时重写参数解析器初始化(静态方法上的bean定义BinderFactoryConfigsion#MessageHandlerFarodFactory)。

因此,链中的第一个解析器是SmartPayloadArgumentResolver,它决定需要对话。此对话由ApplicationJsonMessageMarshallingConverter启动并失败。例外情况:

org.springframework.messaging.converter.MessageConversionException: Could not read JSON: Unexpected character ('-' (code 45)): Expected space separating root-level values
 at [Source: (String)"2019-09-04T01:26:20.202Z[UTC]"; line: 1, column: 6]; nested exception is com.fasterxml.jackson.core.JsonParseException: Unexpected character ('-' (code 45)): Expected space separating root-level values
 at [Source: (String)"2019-09-04T01:26:20.202Z[UTC]"; line: 1, column: 6], failedMessage=GenericMessage [payload=2019-09-04T01:26:20.202Z[UTC], headers={spanTraceId=3f87b9afc373308a, spanId=3f87b9afc373308a, nativeHeaders={spanTraceId=[3f87b9afc373308a], spanId=[3f87b9afc373308a], spanSampled=[1]}, X-B3-SpanId=3f87b9afc373308a, X-B3-Sampled=1, X-B3-TraceId=3f87b9afc373308a, id=3fbe87e3-31c4-4d21-c3fb-506c018c0e25, spanSampled=1, timestamp=1567560380202}]

因此,我在MethodReference抛出的问题中遇到了前面提到的错误。

夏建木
2023-03-14

我不确定为什么Stream会丢弃那个转换器(可能是一个bug,也许在那里打开了一个GitHub问题),但我相信您可以将其添加回流文档中讨论的@StreamMessageConader@Bean

 类似资料:
  • 我能有一份只有奴隶而没有主人的工作,听rabbitmq队列吗?我想在spring boot应用程序中使用spring批处理和spring集成来侦听队列并以面向块的方式处理消息。 我想使用Michael Minella(https://www.youtube.com/watch?v=30Tdp1mfR0g)在Spring批处理的远程组块示例中解释的配置,但没有主配置。 下面是我的工作配置。 下面是我

  • 我不是很理解,因为我可以更改所有集成流的错误通道。我需要处理像InvalidAccessTokenException这样的异常,它们可以在路由器内部的子流中抛出。 我所尝试的是通过以下方式处理来自默认通道“ErrorChannel”的异常: 此错误由具有以下签名的方法处理: 集成流的配置我解释如下: 可以在任何社交网络服务中启动的异常示例如下: 是否可能将此异常绑定到特定的错误通道?。 以下是日志

  • 我正在尝试使用spring integration设置我的应用程序,作为一名新手,需要以下用例的建议- 有一个队列,来自另一个应用程序的消息将被推送到该队列。我的应用程序使用队列中的消息,进行一些数据处理,然后将其推送到另一个出站队列。目标是以并发方式处理消息。 根据我的理解,我们可以有两种方法- 1.使用#轮询器 2.使用#调度器 从基于轮询器的配置来看,池中似乎有多个可用线程,可以同时获取消息

  • 我正在尝试用spring cloud stream实现spring cloud契约。我有一个使用StreamBridge的制作人 方法sendMessage()是从rest控制器调用的。 我的合同是这样的: 当我运行测试时,会调用triggerCreateOrganization()方法,并在日志中看到日志消息“生产组织到主题”。 我在生成的测试的基类上有@AutoConfigureMessage

  • 我是Spring的新人。最近我试着让Spring批处理和Spring集成一起工作。我想有一个JobListener,它会监听消息到达特定的频道并启动Spring批处理作业。 我在github上找到了一个例子(https://github.com/chrisjs/spring-batch-scaling/tree/master/message-job-launch)我试图以某种方式将Spring批处