当前位置: 首页 > 知识库问答 >
问题:

Spring Cloud Stream:有条件地回应相同主题

洪鸿
2023-03-14

我希望我的应用程序从RabbitMQ中的“消息”队列中读取一些内容,如果消息是“ping”,则用“pong”响应同一队列。到目前为止,我已经尝试了两件事:

一个消费者和供应商连接一个队列,但我总是得到"不能撰写任何与消费者"当应用程序启动。

    spring:
      cloud:
        function:
          definition: ping,pong
        stream:
          bindings:
            ping-in-0:
              destination: messages
            pong-out-0:
              destination: messages        
      rabbitmq:
        username: rabbitmq
        password: rabbitmq
    @SpringBootApplication
    public class SpringStreamPingPongApplication {
    
        public static void main(String[] args) {
            SpringApplication.run(SpringStreamPingPongApplication.class, args);
        }
    
        @Bean
        public Consumer<String> ping() {
            return input -> {
                System.out.println("Received: " + input);
                if (input.equals("ping"))
                    outQueue.add("pong");
            };
        }
        
        private Queue<String> outQueue = new LinkedList<String>();
        
        @Bean
        public Supplier<String> pong() {
            return () -> {
                System.out.println("poll");
                return outQueue.poll();
            };
        }
    
    }

当输入和输出队列不同时,这是可行的,但我的要求是它们相同。

java.util.函数,如果条件不满足,则返回null。

spring:
  cloud:
    function:
      definition: pingPong
    stream:
      bindings:
        pingPong-in-0:
          destination: messages
        pingPong-out-0:
          destination: messages        
  rabbitmq:
    username: rabbitmq
    password: rabbitmq
@SpringBootApplication
public class SpringStreamPingPongApplication {

    public static void main(String[] args) {
        SpringApplication.run(SpringStreamPingPongApplication.class, args);
    }

    @Bean
    public Function<String, String> pingPong() {
        return input -> {
            System.out.println("Received: " + input);
            if (input.equals("ping"))
                return "pong";
            return null;
        };
    }

}

当接收到非“ping”的内容时,我会看到消息“received:…”三次,结果失败了

2021-02-14 10:21:11.103 ERROR 382793 --- [fGaUNiBJ_Uu8A-1] o.s.integration.handler.LoggingHandler   : org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.cloud.stream.function.FunctionConfiguration$FunctionToDestinationBinder$1@22f545cd]; nested exception is java.lang.NullPointerException: Cannot invoke "org.springframework.messaging.Message.getHeaders()" because "requestMessage" is null, failedMessage=GenericMessage [payload=byte[6], headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedExchange=messages, amqp_deliveryTag=3, deliveryAttempt=3, amqp_consumerQueue=messages.anonymous.JP2O2DzUTfGaUNiBJ_Uu8A, amqp_redelivered=false, id=db87cc60-6ec0-a8cc-4862-115c31a7f9b2, amqp_consumerTag=amq.ctag-5BBJ5nSfxegSvLHbqRlLug, sourceData=(Body:'[B@5119ad84(byte[6])' MessageProperties [headers={}, contentLength=0, receivedDeliveryMode=PERSISTENT, redelivered=false, receivedExchange=messages, receivedRoutingKey=, deliveryTag=3, consumerTag=amq.ctag-5BBJ5nSfxegSvLHbqRlLug, consumerQueue=messages.anonymous.JP2O2DzUTfGaUNiBJ_Uu8A]), contentType=application/json, timestamp=1613294468100, target-protocol=amqp}]
    at org.springframework.integration.support.utils.IntegrationUtils.wrapInHandlingExceptionIfNecessary(IntegrationUtils.java:192)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:65)
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:317)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:272)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
    at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:208)
    at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter.access$1300(AmqpInboundChannelAdapter.java:66)
    at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.lambda$onMessage$0(AmqpInboundChannelAdapter.java:308)
    at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:329)
    at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:225)
    at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.onMessage(AmqpInboundChannelAdapter.java:304)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1632)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1551)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1539)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1530)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1474)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:967)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:913)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1600(SimpleMessageListenerContainer.java:83)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.mainLoop(SimpleMessageListenerContainer.java:1288)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1194)
    at java.base/java.lang.Thread.run(Thread.java:832)
Caused by: java.lang.NullPointerException: Cannot invoke "org.springframework.messaging.Message.getHeaders()" because "requestMessage" is null
    at org.springframework.messaging.core.GenericMessagingTemplate.sendTimeout(GenericMessagingTemplate.java:252)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
    at org.springframework.messaging.core.AbstractDestinationResolvingMessagingTemplate.send(AbstractDestinationResolvingMessagingTemplate.java:72)
    at org.springframework.cloud.stream.function.FunctionConfiguration$FunctionToDestinationBinder$1.doSendMessage(FunctionConfiguration.java:585)
    at org.springframework.cloud.stream.function.FunctionConfiguration$FunctionToDestinationBinder$1.handleMessageInternal(FunctionConfiguration.java:571)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:56)
    ... 27 more

什么是不退货的正确方法?

还有其他我没有看到的解决方案吗?

共有1个答案

穆招
2023-03-14

首先,你有definition:ping,pong,它相当于definition:ping | pong——这是函数组合。您正在从两个函数中组合一个函数,由于消费者不产生任何输出,因此不能用任何东西组合它,这是行的末尾,因此您看到的消息就是这样的。如果要将两个函数标识为消息处理程序,则应使用 。例如定义:ping;pong。你可以在这里获得更多信息。

如果输入和输出是同一个目的地,你将创建一个无限循环,所以我很难假设这就是你想要或期望的。也许可以向您解释业务需求,以便我们可以帮助您协调正确的解决方案。说“……从RabbitMQ中的“消息”队列中读取一些内容,如果消息是“ping”,则用“pong”响应同一队列……”这不是一个业务需求,而是一个似乎是错误的假设解决方案。

 类似资料:
  • 我正在开发SpringCloudStream的Brooklyn.Release版本。我的用例具有多个接收器的HttpSource。当我将初学者应用程序依赖项添加到应用程序中并使用它时,如下所示: 我的聚合应用程序是 一直得到如下响应:

  • 假设我有一个方法: 但有时当我运行这个方法时,我不需要对任何东西进行同步。 什么是有条件同步的好模式?我能想到的唯一模式是回调,类似于这样: 有没有别的方法,不用回拨?

  • 我有一个包含15个独立ICD列(到)的数据帧,并且希望创建一个变量(0/1),当数字“323”出现在15个ICD列中的任何一个时。 dataframe本身包含30多个变量,如下所示 不完全确定我是否在正确的轨道上,但我编写了以下代码试图完成我的任务,但得到了一个错误: 在Pandas中多列上查找字符串

  • 我想有条件地显示和隐藏这个按钮组,这取决于从父组件传递的内容,看起来像这样: .... .... 然而,{this.props.showBulkActions'show':'hidden'并没有发生任何事情。我做错什么了吗?

  • 问题内容: 这听起来很简单,我想我的想法太复杂了。 我想制作一个数组,其元素是从两个形状相同的源数组生成的,具体取决于源数组中哪个元素更大。 为了显示: 我不知道如何产生一个将array1和array2的元素组合在一起的array3,以产生一个数组,其中仅取两个array1 / array2元素值中的较大者。 任何帮助将非常感激。谢谢。 问题答案: 我们可以使用NumPy内置的,正是为此而制作的-

  • 我有如下代码: 在我的属性文件中,我有: 这不起作用,parseFooBar在第一秒每分钟都被执行。 但是,如果我添加字段: 所以我可以做一个日志,看看它是什么,parseFooBar不会被执行。移除注入的字符串会再次看到parseFooBar执行。我做错了什么? 编辑:这是使用Spring 4.1.5,Spring Boot 1.2.1和JDK 8 编辑2:将注释移动到类型也有效。(无需强制使用