当前位置: 首页 > 知识库问答 >
问题:

如何使用Spring Cloud Stream MessageChannel配置@MessagingGateway?

史烨
2023-03-14

我已经开发了异步Spring云流服务,我正在尝试开发一个边缘服务,它使用@MessagingGateway提供对本质上异步的服务的同步访问。

我当前正在获取以下堆栈跟踪:

Caused by: org.springframework.messaging.core.DestinationResolutionException: no output-channel or replyChannel header available
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:355)
at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:271)
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:188)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:115)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127)
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116)
... 47 common frames omitted

我的@MessagingGateway:

@EnableBinding(AccountChannels.class)
@MessagingGateway

public interface AccountService {
  @Gateway(requestChannel = AccountChannels.CREATE_ACCOUNT_REQUEST,replyChannel = AccountChannels.ACCOUNT_CREATED, replyTimeout = 60000, requestTimeout = 60000)
  Account createAccount(@Payload Account account, @Header("Authorization") String authorization);
}

如果我通过@StreamListener在回复频道上使用消息,那么它的效果很好:

  @HystrixCommand(commandKey = "acounts-edge:accountCreated", fallbackMethod = "accountCreatedFallback", commandProperties = {@HystrixProperty(name = "execution.isolation.strategy", value = "SEMAPHORE")}, ignoreExceptions = {ClientException.class})
  @StreamListener(AccountChannels.ACCOUNT_CREATED)
  public void accountCreated(Account account, @Header(name = "spanTraceId", required = false) String traceId) {
    try {
      if (log.isInfoEnabled()) {
        log.info(new StringBuilder("Account created: ").append(objectMapper.writeValueAsString(account)).toString());
      }
    } catch (JsonProcessingException e) {
      log.error(e.getMessage(), e);
    }
  }

在生产者方面,我正在配置必需组以确保多个消费者可以处理消息,相应地,消费者具有匹配的配置。

消费者:

spring:
  cloud:
    stream:
      bindings:
        create-account-request:
          binder: rabbit1
          contentType: application/json
          destination: create-account-request
          requiredGroups: accounts-service-create-account-request
        account-created:
          binder: rabbit1
          contentType: application/json
          destination: account-created
          group: accounts-edge-account-created

生产商:

spring:
  cloud:
    stream:
      bindings:
        create-account-request:
          binder: rabbit1
          contentType: application/json
          destination: create-account-request
          group: accounts-service-create-account-request
        account-created:
          binder: rabbit1
          contentType: application/json
          destination: account-created
          requiredGroups: accounts-edge-account-created

生产者端处理请求并发送响应的代码位:

  accountChannels.accountCreated().send(MessageBuilder.withPayload(accountService.createAccount(account)).build());

我可以调试并查看请求是否被接收和处理,但当响应被发送到回复通道时,就会出现错误。

要使@MessagingGateway正常工作,我缺少哪些配置和/或代码?我知道我正在将Spring集成和Spring云网关结合在一起,所以我不确定一起使用它们是否会导致问题。

共有3个答案

古文康
2023-03-14

嗯,我有点困惑,也不知道你想完成什么,但让我们看看我们是否能解决这个问题。混合SI和SCSt绝对是自然的,因为一个建立在另一个之上,所以一切都应该工作:这是我刚刚从一个旧示例中挖掘出的示例代码片段,它公开了RESTendpoint,但委托(通过网关)到Source的输出通道。看看这是否有帮助:

@EnableBinding(Source.class)
@SpringBootApplication
@RestController
public class FooApplication {
    . . . 

    @Autowired
    private Source channels;

    @Autowired
    private CompletionService completionService;

    @RequestMapping("/complete")
    public String completeRequest(@RequestParam int id) {
        this.completionService.complete("foo");
        return "OK";
    }

    @MessagingGateway
    interface CompletionService {
        @Gateway(requestChannel = Source.OUTPUT)
        void complete(String message);
    }
}
姬选
2023-03-14

在阿泰姆的帮助下,我找到了我一直在寻找的解决方案。我已经将Artem发布的代码分为两个服务,一个网关服务和一个CloudStream服务。为了进行测试,我还添加了一个RestController。这基本上模仿了我想要对持久队列所做的操作。感谢Artem的帮助!我真的很感谢你的时间!我希望这能帮助其他想做同样事情的人。

网关代码

package com.example.demo;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;

import org.springframework.context.annotation.Bean;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.integration.annotation.Gateway;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.dsl.HeaderEnricherSpec;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;

@EnableBinding({GatewayApplication.GatewayChannels.class})
@SpringBootApplication
public class GatewayApplication {

  interface GatewayChannels {

    String TO_UPPERCASE_REPLY = "to-uppercase-reply";
    String TO_UPPERCASE_REQUEST = "to-uppercase-request";

    @Input(TO_UPPERCASE_REPLY)
    SubscribableChannel toUppercaseReply();

    @Output(TO_UPPERCASE_REQUEST)
    MessageChannel toUppercaseRequest();
  }

  @MessagingGateway
  public interface StreamGateway {
    @Gateway(requestChannel = ENRICH, replyChannel = GatewayChannels.TO_UPPERCASE_REPLY)
    String process(String payload);
  }

  private static final String ENRICH = "enrich";

  public static void main(String[] args) {
    SpringApplication.run(GatewayApplication.class, args);
  }

  @Bean
  public IntegrationFlow headerEnricherFlow() {
    return IntegrationFlows.from(ENRICH).enrichHeaders(HeaderEnricherSpec::headerChannelsToString)
        .channel(GatewayChannels.TO_UPPERCASE_REQUEST).get();
  }

  @RestController
  public class UppercaseController {
    @Autowired
    StreamGateway gateway;

    @GetMapping(value = "/string/{string}",
        produces = {MediaType.APPLICATION_JSON_VALUE, MediaType.APPLICATION_XML_VALUE})
    public ResponseEntity<String> getUser(@PathVariable("string") String string) {
      return new ResponseEntity<String>(gateway.process(string), HttpStatus.OK);
    }
  }

}

网关配置(application.yml)

spring:
  cloud:
    stream:
      bindings:
        to-uppercase-request:
          destination: to-uppercase-request
          producer:
            required-groups: stream-to-uppercase-request
        to-uppercase-reply:
          destination: to-uppercase-reply
          group: gateway-to-uppercase-reply
server:
  port: 8080

CloudStream代码

package com.example.demo;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.messaging.support.MessageBuilder;

@EnableBinding({CloudStreamApplication.CloudStreamChannels.class})
@SpringBootApplication
public class CloudStreamApplication {

  interface CloudStreamChannels {

    String TO_UPPERCASE_REPLY = "to-uppercase-reply";
    String TO_UPPERCASE_REQUEST = "to-uppercase-request";

    @Output(TO_UPPERCASE_REPLY)
    SubscribableChannel toUppercaseReply();

    @Input(TO_UPPERCASE_REQUEST)
    MessageChannel toUppercaseRequest();
  }

  public static void main(String[] args) {
    SpringApplication.run(CloudStreamApplication.class, args);
  }

  @StreamListener(CloudStreamChannels.TO_UPPERCASE_REQUEST)
  @SendTo(CloudStreamChannels.TO_UPPERCASE_REPLY)
  public Message<?> process(Message<String> request) {
    return MessageBuilder.withPayload(request.getPayload().toUpperCase())
        .copyHeaders(request.getHeaders()).build();
  }

}

CloudStream配置(application.yml)

spring:
  cloud:
    stream:
      bindings:
        to-uppercase-request:
          destination: to-uppercase-request
          group: stream-to-uppercase-request
        to-uppercase-reply:
          destination: to-uppercase-reply
          producer:
            required-groups: gateway-to-uppercase-reply
server:
  port: 8081
孟绪
2023-03-14

这是个好问题,也是个好主意。但这并不容易。

首先,我们必须自己确定网关意味着请求/回复,因此相关性也就意味着请求/回复。这可以在MessagingGateway中通过replyChannel头在临时replyChannel实例中使用。即使您有一个显式的replyChannel=AccountChannel。ACCOUNT\u已创建,仅通过提及的标题及其值进行关联。事实上,这个临时ReplyChannel是不可序列化的,不能通过网络传输给另一端的消费者。

幸运的是,Spring集成为我们提供了一些解决方案。它是HeaderEnricher及其HeaderChannel注册表后面的HeaderChannel字符串选项的一部分:

从Spring Integration 3.0开始,一个新的子元素<代码>

https://docs.spring.io/spring-integration/docs/5.0.0.RELEASE/reference/html/messaging-transformation-chapter.html#header-恩里彻

但在这种情况下,您必须引入一个从网关到主机的内部通道,只有最后一个通道将消息发送到帐户通道。创建\u帐户\u请求。因此,replyChannel头将转换为字符串表示,并能够在网络上传输。在消费者方面,当您发送回复时,您应该确保您也传输了回复频道标题。因此,消息何时到达AccountChannel。在生产者端创建的帐户,在那里我们有消息网关,相关机制能够将信道标识符转换为适当的临时应答信道,并将应答与等待的网关呼叫相关联。

这里唯一的问题是,您的producer应用程序必须是AccountChannel组中的单个消费者。ACCOUNT\u CREATED(帐户创建)-我们必须确保云中一次只运行一个实例。因为只有一个实例的内存中有临时ReplyChannel。

有关网关的更多信息:https://docs.spring.io/spring-integration/docs/5.0.0.RELEASE/reference/html/messaging-endpoints-chapter.html#gateway

使现代化

一些帮助代码:

@EnableBinding(AccountChannels.class)
@MessagingGateway

public interface AccountService {
  @Gateway(requestChannel = AccountChannels.INTERNAL_CREATE_ACCOUNT_REQUEST, replyChannel = AccountChannels.ACCOUNT_CREATED, replyTimeout = 60000, requestTimeout = 60000)
  Account createAccount(@Payload Account account, @Header("Authorization") String authorization);
}

@Bean
public IntegrationFlow headerEnricherFlow() {
   return IntegrationFlows.from(AccountChannels.INTERNAL_CREATE_ACCOUNT_REQUEST)
            .enrichHeaders(headerEnricher -> headerEnricher.headerChannelsToString())
            .channel(AccountChannels.CREATE_ACCOUNT_REQUEST)
            .get();

}

使现代化

一些简单的应用程序来演示PoC:

@EnableBinding({ Processor.class, CloudStreamGatewayApplication.GatewayChannels.class })
@SpringBootApplication
public class CloudStreamGatewayApplication {

    interface GatewayChannels {

        String REQUEST = "request";

        @Output(REQUEST)
        MessageChannel request();


        String REPLY = "reply";

        @Input(REPLY)
        SubscribableChannel reply();
    }

    private static final String ENRICH = "enrich";


    @MessagingGateway
    public interface StreamGateway {

        @Gateway(requestChannel = ENRICH, replyChannel = GatewayChannels.REPLY)
        String process(String payload);

    }

    @Bean
    public IntegrationFlow headerEnricherFlow() {
        return IntegrationFlows.from(ENRICH)
                .enrichHeaders(HeaderEnricherSpec::headerChannelsToString)
                .channel(GatewayChannels.REQUEST)
                .get();
    }

    @StreamListener(Processor.INPUT)
    @SendTo(Processor.OUTPUT)
    public Message<?> process(Message<String> request) {
        return MessageBuilder.withPayload(request.getPayload().toUpperCase())
                .copyHeaders(request.getHeaders())
                .build();
    }


    public static void main(String[] args) {
        ConfigurableApplicationContext applicationContext =
                SpringApplication.run(CloudStreamGatewayApplication.class, args);

        StreamGateway gateway = applicationContext.getBean(StreamGateway.class);

        String result = gateway.process("foo");

        System.out.println(result);
    }

}

应用程序。yml:

spring:
  cloud:
    stream:
      bindings:
        input:
          destination: requests
        output:
          destination: replies
        request:
          destination: requests
        reply:
          destination: replies

我使用spring-cloud-starter-stream-Rabbit

那个

MessageBuilder.withPayload(request.getPayload().toUpperCase())
            .copyHeaders(request.getHeaders())
            .build()

将请求头复制到回复消息的技巧。因此,网关能够在应答端将报头中的信道标识符转换为适当的临时应答信道,以便将应答正确地传递给网关的调用者。

SCSt关于该事项的问题:https://github.com/spring-cloud/spring-cloud-stream/issues/815

 类似资料:
  • 这段配置通过上下文进行扫描--我用调试器检查了它。问题可能出在哪里?

  • 问题内容: 我尝试执行此docker命令以使用Elasticsearch设置Jaeger Agent和Jaeger Collector。 但是此命令给出以下错误。如何用ElasticSearch配置Jaeger? 问题答案: 搜索解决方案一段时间后,我发现了一个docker-compose.yml文件,该文件具有Jaeger Query,Agent,collector和Elasticsearch配

  • 摘要/问题 我参与了一个使用的项目。该项目启用了maven,当我从cli运行时,一切都很顺利。 该项目也可以在NetBeans中打开,所有内容都可以正常显示,但是当我在eclipse中打开该项目时,我看到一些与相关的错误。在spring缓存中。xml位于“网页”文件夹下。这让我觉得eclipse项目可能不会被认为是一个动态web项目,但请遵循以下说明:https://www.mkyong.com/

  • 但是如果我让Spring Boot自动配置JOOQ,那么我应该把这个设置放在哪里呢? 基本的Spring Boot配置似乎只支持在中设置,如jooq-spring-boot-example所示。 我尝试将放入中,但这对SQL没有任何影响。 有没有办法自定义Spring boot JOOQ配置,而不必自己配置JOOQ? 我使用的是SpringBoot 2.1.7.Release和JOOQ 3.11.

  • 问题内容: 我将jaybird 2.2.3和hibernate 3.5一起使用,当我使用向导hibernate映射文件和pojos数据库时,出现以下错误“ java.lang.NullPointerException” 我在mysql上使用了相同的映射,并且可以正常工作,所以我相信这是jaybird中的问题 编辑: 有人能帮我吗? 问题答案: 在深入研究了Netbeans的代码并向Jaybird添

  • 如何使用Sonar 2.3和Maven 3.0配置JaCoCo。4. 我的项目是一个多模块Maven项目。我的顶级pom包括以下内容: 运行mvn声纳后:声纳我得到0%代码覆盖报告声纳。我应该看到至少50%。注意,我确实在每个子项目的目标文件夹中看到了jacoco.exec。

  • 我正在尝试了解如何使用Spring。 我试着效仿这个例子。但是,当我从Eclipse启动Tomcat时,我得到了以下错误: 这是DispatcherServlet的相关代码。xml salvaUtente()是一种方法,我将在其中处理多个插入(现在仍然是exmpty)。

  • 我以前的activiti版本是5.11,现在我正试图将其更新到5.21.0。在前面的配置中,使用restlet servlet。现在我尝试通过webconfigurer.class来配置它。我已经在web.xml中注入了webconfigurer侦听器。但它抛出了一个错误。 web.xml条目