我希望我的应用程序从RabbitMQ中的“消息”队列中读取一些内容,如果消息是“ping”,则用“pong”响应同一队列。到目前为止,我已经尝试了两件事:
一个消费者和供应商连接一个队列,但我总是得到"不能撰写任何与消费者"当应用程序启动。
spring:
cloud:
function:
definition: ping,pong
stream:
bindings:
ping-in-0:
destination: messages
pong-out-0:
destination: messages
rabbitmq:
username: rabbitmq
password: rabbitmq
@SpringBootApplication
public class SpringStreamPingPongApplication {
public static void main(String[] args) {
SpringApplication.run(SpringStreamPingPongApplication.class, args);
}
@Bean
public Consumer<String> ping() {
return input -> {
System.out.println("Received: " + input);
if (input.equals("ping"))
outQueue.add("pong");
};
}
private Queue<String> outQueue = new LinkedList<String>();
@Bean
public Supplier<String> pong() {
return () -> {
System.out.println("poll");
return outQueue.poll();
};
}
}
当输入和输出队列不同时,这是可行的,但我的要求是它们相同。
java.util.函数,如果条件不满足,则返回null。
spring:
cloud:
function:
definition: pingPong
stream:
bindings:
pingPong-in-0:
destination: messages
pingPong-out-0:
destination: messages
rabbitmq:
username: rabbitmq
password: rabbitmq
@SpringBootApplication
public class SpringStreamPingPongApplication {
public static void main(String[] args) {
SpringApplication.run(SpringStreamPingPongApplication.class, args);
}
@Bean
public Function<String, String> pingPong() {
return input -> {
System.out.println("Received: " + input);
if (input.equals("ping"))
return "pong";
return null;
};
}
}
当接收到非“ping”的内容时,我会看到消息“received:…”三次,结果失败了
2021-02-14 10:21:11.103 ERROR 382793 --- [fGaUNiBJ_Uu8A-1] o.s.integration.handler.LoggingHandler : org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.cloud.stream.function.FunctionConfiguration$FunctionToDestinationBinder$1@22f545cd]; nested exception is java.lang.NullPointerException: Cannot invoke "org.springframework.messaging.Message.getHeaders()" because "requestMessage" is null, failedMessage=GenericMessage [payload=byte[6], headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedExchange=messages, amqp_deliveryTag=3, deliveryAttempt=3, amqp_consumerQueue=messages.anonymous.JP2O2DzUTfGaUNiBJ_Uu8A, amqp_redelivered=false, id=db87cc60-6ec0-a8cc-4862-115c31a7f9b2, amqp_consumerTag=amq.ctag-5BBJ5nSfxegSvLHbqRlLug, sourceData=(Body:'[B@5119ad84(byte[6])' MessageProperties [headers={}, contentLength=0, receivedDeliveryMode=PERSISTENT, redelivered=false, receivedExchange=messages, receivedRoutingKey=, deliveryTag=3, consumerTag=amq.ctag-5BBJ5nSfxegSvLHbqRlLug, consumerQueue=messages.anonymous.JP2O2DzUTfGaUNiBJ_Uu8A]), contentType=application/json, timestamp=1613294468100, target-protocol=amqp}]
at org.springframework.integration.support.utils.IntegrationUtils.wrapInHandlingExceptionIfNecessary(IntegrationUtils.java:192)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:65)
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:317)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:272)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:208)
at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter.access$1300(AmqpInboundChannelAdapter.java:66)
at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.lambda$onMessage$0(AmqpInboundChannelAdapter.java:308)
at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:329)
at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:225)
at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.onMessage(AmqpInboundChannelAdapter.java:304)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1632)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1551)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1539)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1530)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1474)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:967)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:913)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1600(SimpleMessageListenerContainer.java:83)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.mainLoop(SimpleMessageListenerContainer.java:1288)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1194)
at java.base/java.lang.Thread.run(Thread.java:832)
Caused by: java.lang.NullPointerException: Cannot invoke "org.springframework.messaging.Message.getHeaders()" because "requestMessage" is null
at org.springframework.messaging.core.GenericMessagingTemplate.sendTimeout(GenericMessagingTemplate.java:252)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
at org.springframework.messaging.core.AbstractDestinationResolvingMessagingTemplate.send(AbstractDestinationResolvingMessagingTemplate.java:72)
at org.springframework.cloud.stream.function.FunctionConfiguration$FunctionToDestinationBinder$1.doSendMessage(FunctionConfiguration.java:585)
at org.springframework.cloud.stream.function.FunctionConfiguration$FunctionToDestinationBinder$1.handleMessageInternal(FunctionConfiguration.java:571)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:56)
... 27 more
什么是不退货的正确方法?
还有其他我没有看到的解决方案吗?
首先,你有definition:ping,pong
,它相当于definition:ping | pong
——这是函数组合。您正在从两个函数中组合一个函数,由于消费者
不产生任何输出,因此不能用任何东西组合它,这是行的末尾,因此您看到的消息就是这样的。如果要将两个函数标识为消息处理程序,则应使用
。例如
定义:ping;pong
。你可以在这里获得更多信息。
如果输入和输出是同一个目的地,你将创建一个无限循环,所以我很难假设这就是你想要或期望的。也许可以向您解释业务需求,以便我们可以帮助您协调正确的解决方案。说“……从RabbitMQ中的“消息”队列中读取一些内容,如果消息是“ping”,则用“pong”响应同一队列……”这不是一个业务需求,而是一个似乎是错误的假设解决方案。
我正在开发SpringCloudStream的Brooklyn.Release版本。我的用例具有多个接收器的HttpSource。当我将初学者应用程序依赖项添加到应用程序中并使用它时,如下所示: 我的聚合应用程序是 一直得到如下响应:
假设我有一个方法: 但有时当我运行这个方法时,我不需要对任何东西进行同步。 什么是有条件同步的好模式?我能想到的唯一模式是回调,类似于这样: 有没有别的方法,不用回拨?
我有一个包含15个独立ICD列(到)的数据帧,并且希望创建一个变量(0/1),当数字“323”出现在15个ICD列中的任何一个时。 dataframe本身包含30多个变量,如下所示 不完全确定我是否在正确的轨道上,但我编写了以下代码试图完成我的任务,但得到了一个错误: 在Pandas中多列上查找字符串
我想有条件地显示和隐藏这个按钮组,这取决于从父组件传递的内容,看起来像这样: .... .... 然而,{this.props.showBulkActions'show':'hidden'并没有发生任何事情。我做错什么了吗?
问题内容: 这听起来很简单,我想我的想法太复杂了。 我想制作一个数组,其元素是从两个形状相同的源数组生成的,具体取决于源数组中哪个元素更大。 为了显示: 我不知道如何产生一个将array1和array2的元素组合在一起的array3,以产生一个数组,其中仅取两个array1 / array2元素值中的较大者。 任何帮助将非常感激。谢谢。 问题答案: 我们可以使用NumPy内置的,正是为此而制作的-
我有如下代码: 在我的属性文件中,我有: 这不起作用,parseFooBar在第一秒每分钟都被执行。 但是,如果我添加字段: 所以我可以做一个日志,看看它是什么,parseFooBar不会被执行。移除注入的字符串会再次看到parseFooBar执行。我做错了什么? 编辑:这是使用Spring 4.1.5,Spring Boot 1.2.1和JDK 8 编辑2:将注释移动到类型也有效。(无需强制使用