我正在使用Spring Cloud Stream和RabbitMQ活页夹。它可以很好地处理字节[]负载和Java本机序列化,但我需要处理JSON负载。
这是我的处理器类。
@EnableBinding(Processor.class)
public class MessageProcessor {
@ServiceActivator(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT)
public OutputDto handleIncomingMessage(InputDto inputDto) {
// Run some job.
return new OutputDto();
}
}
输入到和输出到是带有Jackson注释的POJO。
在消费者中,您可以添加内容类型配置,例如:。
spring.cloud.stream.bindings.input.content-type: application/x-java-object;type=my.package.InputDto
您还可以添加
spring.cloud.stream.bindings.output.content-type: application/json
强制传出消息负载为JSON(用于互操作等)。
请注意,“input”(输入)和“output”(输出)是活页夹频道名称(即,如应用程序中处理器的定义)。
我认为这很有可能变得更容易或更自动化,但要在Spring Cloud中实现这一点,还需要一些工程上的努力。如果您想遵循github中的问题:https://github.com/spring-cloud/spring-cloud-stream/issues/156.
要手动将消息发送到Spring Cloud Stream,您可以自己手动设置标头(但使用Stream更容易)。JSON消息在Rabbit管理员UI中如下所示:
priority: 0
delivery_mode: 2
headers:
contentType: text/plain
originalContentType: application/json
content_type: text/plain
我在使用spring-jms模块转换来自RabbitMQ的消息时遇到了一些问题。以前,我使用Rest APIendpoint发送消息,该endpoint将消息发送到RabbitMQ队列,并使用@JMSListener方法处理它。 在内部,这种行为添加了一个字段来确定Java类型,由Spring库管理。但是,现在我想避免Rest API调用,因为它不是必需的,而且我可以直接将消息发送到RabbitM
我正在尝试用spring cloud stream实现spring cloud契约。我有一个使用StreamBridge的制作人 方法sendMessage()是从rest控制器调用的。 我的合同是这样的: 当我运行测试时,会调用triggerCreateOrganization()方法,并在日志中看到日志消息“生产组织到主题”。 我在生成的测试的基类上有@AutoConfigureMessage
我目前正在使用带有的Kafka绑定器的Spring Cloud Stream为我的Spring Boot微服务执行消息记录。 我有: 生产者将消息发布到订阅频道 在消息从生产者发布到流并被消费者收听的整个过程中,可以观察到preSend方法被触发了两次: 一次在生产者端-消息发布到流时 然而,出于日志记录的目的,我只需要在消费者端截获并记录消息。 是否有任何方法可以仅在一侧(例如消费者侧)截获SC
使用Spring Cloud DataFlow 1.2.2版本,配置如下: 我正在尝试创建一个流,它将从特定主题中读取并将其刷新到长水槽中,如下所示: 查看日志文件,我可以看到以下错误: 我还试图为kafka源代码的消费者/生产者配置一些属性 但我得到的结果是一样的 以下是Spring DataFlow打印的消费者详细信息: 我看到了类似的查询,但没有有效的答案,什么是属性来接受二进制json消息
如何使用新的Spring Cloud Stream Kafka功能模型发送消息? 不推荐的方式是这样的。 但是我如何以函数式风格发送消息呢? 应用yml公司 我会自动连接MessageChannel,但对于process、process-out-0、output或类似的东西,没有MessageChannel Bean。或者我可以用供应商Bean发送消息吗?谁能给我举个例子吗?谢谢!
我有一个使用STOMP的SockJSJava客户端。基于此,https://github.com/rstoyanchev/spring-websocket-portfolio/blob/master/src/test/java/org/springframework/samples/portfolio/web/load/StompWebSocketLoadTestClient.java. 我的代码