我正在使用SpringCloudStream进行消息传递。在消费者部分,我使用IntegrationFlow来监听队列。它正在监听并打印来自制作人的信息。但格式不同,这是我现在面临的问题。生产者的内容类型是application/json,IntegrationFLow消息负载显示ASCII数字。下面给出了为消费者编写的代码
@EnableBinding(UserOperationConsume.class)
public class ConsumerController {
@Bean
IntegrationFlow consumerIntgrationFlow(UserOperationConsume u) {
return IntegrationFlows
.from(u.userRegistraionProduces())
//.transform(Transformers.toJson()) // not working as expected
//.transform(Transformers.fromJson(UserDTO.class))
.handle(String.class, (payload, headers) -> {
System.out.println(payload.toString()); // here the output is 123,34,105,100,34,58,49,44,34,110,97,109,101,34,58,34,86,105,115,104,110,117,34,44,34,101,109,97,105,108,34,58,34,118...
return null;
}).get();
}
}
输入接口是,
public interface UserOperationConsume {
@Input
public SubscribableChannel userRegistraionProduces();
}
消费者的yml配置是,
server:
port: 8181
spring:
application:
name: nets-alert-service
---
spring:
cloud:
config:
name: notification-service
uri: http://localhost:8888
---
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
---
spring:
cloud:
stream:
bindings:
userRegistraionProduces:
destination: userOperations
input:
content-type: application/json
我试过了。类绑定,那一次我从队列中得到了一条确切的消息。因此,如果这个IntegrationFlow配置中有任何错误,请告诉我。因为我是spring cloud stream和IntegrationFlow的新手。有没有办法将这个ascii转换成精确的字符串?提前谢谢
使用IntegrationFlows。from(channel)
不提供任何转换提示,因此您只需获取原始的字节[]
有效负载(包含JSON)。不清楚为什么要使用toJson()
转换器。
你的。句柄(String.class,(有效载荷,标题)-
在任何情况下,您都没有正确使用该框架。使用
@StreamListener("userRegistraionProduces")
public void listen(UserDTO dto) {
System.out.println(dto);
}
...框架会为你处理转换。或者...
@StreamListener("userRegistraionProduces")
public void listen(Message<UserDTO> dtoMessage) {
System.out.println(dtoMessage);
}
如果你的制作人在标题中传达额外信息。
编辑
如果你更喜欢自己做转换,这很好。。。
@Bean
IntegrationFlow consumerIntgrationFlow(UserOperationConsume u) {
return IntegrationFlows.from(u.userRegistraionProduces())
.transform(Transformers.fromJson(UserDTO.class))
.handle((payload, headers) -> {
System.out.println(payload.toString());
return null;
}).get();
}
...因为Json转换器可以读取
字节[]
。
我正在尝试用spring cloud stream实现spring cloud契约。我有一个使用StreamBridge的制作人 方法sendMessage()是从rest控制器调用的。 我的合同是这样的: 当我运行测试时,会调用triggerCreateOrganization()方法,并在日志中看到日志消息“生产组织到主题”。 我在生成的测试的基类上有@AutoConfigureMessage
我目前正在使用带有的Kafka绑定器的Spring Cloud Stream为我的Spring Boot微服务执行消息记录。 我有: 生产者将消息发布到订阅频道 在消息从生产者发布到流并被消费者收听的整个过程中,可以观察到preSend方法被触发了两次: 一次在生产者端-消息发布到流时 然而,出于日志记录的目的,我只需要在消费者端截获并记录消息。 是否有任何方法可以仅在一侧(例如消费者侧)截获SC
我在ActiveMQ中使用异步消息使用者。我的制作人工作正常,向队列发送消息。现在,我的异步消息消费者正在等待调用onMessage(),但这从未发生过。因此,问题是: 异步使用者不会使用消息 ActiveMQ日志的快照还显示了许多刚刚堆积在挂起状态中的消息: 我想不出问题到底出在哪里。 计数: toPageIn 78 只是不断增加,信息仍然无法传递给消费者。 是服务器端问题还是客户端问题?
消费者使用Spring的JavaConfig类如下: Kafka主题侦听器使用@KafkaListener注释,如下所示: 我的pom包括依赖项: 现在当我打包到war并部署到tomcat时,它不会显示任何错误,即使在调试模式下也不会显示任何错误,只是部署war什么都没有。 请帮助我了解是否缺少触发kafkalistner的某些配置。 谢谢Gary我添加了上下文。xml和web。xml,但我得到了
我正在尝试用Java实现一个简单的生产者-->Kafka-->消费者应用程序。我能够成功地生成和使用消息,但是当我重新启动消费者时,问题就出现了,其中一些已经使用的消息再次被消费者从Kafka中拾取(不是所有的消息,而是最近使用的一些消息)。 我已在我的使用者中设置了,并且我的属性设置为1000毫秒。 “重新传递一些已使用的消息”是一个已知的问题,还是有任何其他设置,我没有在这里? 基本上,有没有
我正在使用JavaMail应用编程接口来获取一些电子邮件。我想得到一个消息流,然后在另一边得到一个电子邮件流。此外,我不想失去任何属性,如附件、目的地、发送者、正文等... 如何才能做到这一点?