我正在试验Spring Webflux和Spring集成,以从JMS队列创建反应流(Flux)。
我试图从JMS队列(使用Spring集成的IBM MQ)创建一个反应流(Spring Webflux),以便客户端异步获取JMS消息。我相信我已经把一切都正确地连接起来了,因为这些信息正被被动的听众所消耗。然而,我的反应流量流无法显示这些消息。任何帮助都将不胜感激。
这是我用来使我的JMS侦听器响应的代码:
UM网关
@Named
@Slf4j
public class UmGateway {
@Autowired
private ConnectionFactory connectionFactory;
@Autowired
private JmsTemplate jmsTemplate;
@Value("${um.mq.queueName}")
private String queueName;
@Bean
public Publisher<Message<MilestoneNotification>> jmsReactiveSource() {
return IntegrationFlows
.from(Jms.messageDrivenChannelAdapter(this.connectionFactory)
.destination(queueName))
.channel(MessageChannels.queue())
.log(Level.DEBUG)
.log()
.toReactivePublisher();
}
public Flux<MilestoneNotification> findAll() {
return Flux.from(jmsReactiveSource())
.map(Message::getPayload);
}
/**
* Method which sends Milestone Notifications to the UM Queue.
*/
public void send(final MilestoneNotification message) {
jmsTemplate.convertAndSend(queueName, message);
}
}
控制器
@RestController
@Slf4j
@RequiredArgsConstructor
@RequestMapping(ApiConstants.MILESTONE_UM)
public class MilestoneUmController {
@Autowired
private UmGateway umGateway;
@RequestMapping(value = "/message", method = RequestMethod.POST)
public ResponseEntity<Boolean> sendMessage(
final @RequestBody MilestoneNotification notification) {
umGateway.send(notification);
return new ResponseEntity<>(HttpStatus.OK);
}
@GetMapping(path = "/milestone-notification/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<MilestoneNotification> feed() {
return this.umGateway.findAll();
}
}
以下是日志:
- 2020.02.06 13:53:04.900 [jmsReactiveSource.org.springframework.jms.listener.DefaultMessageListenerContainer#0-1] INFO o.s.i.h.LoggingHandler - GenericMessage [payload={"messageId":"MAHESH_007","NotificationTag":"MAHESH_007","messageTimeStamp":"2020-01-21T10:56:33Z","processMilestoneId":"MAHESH_007","processMilestoneName":"MAHESH_007","accountNumber":"12345","previousStatus":"In Progress","currentStatus":"complete","isNew":true}, headers={JMS_IBM_Character_Set=UTF-8, JMS_IBM_MsgType=8, jms_destination=queue:///NOTIFICATIONQUEUE, _type=com.jpmc.wss.portal.domain.um.MilestoneNotification, JMSXUserID=cibcfdid , JMS_IBM_Encoding=273, priority=4, jms_timestamp=1580997184878, JMSXAppID=jpmc.wss.portal.Application , JMS_IBM_PutApplType=28, JMS_IBM_Format=MQSTR , jms_redelivered=false, JMS_IBM_PutDate=20200206, JMSXDeliveryCount=1, JMS_IBM_PutTime=13530511, id=5d277be2-49f5-3e5d-8916-5793db3b76e7, jms_messageId=ID:414d51204e41544d31383820202020201d9f3b5e03738521, timestamp=1580997184900}]
- 2020.02.06 13:53:04.968 [qtp2132762784-23] DEBUG c.j.w.p.u.RequestLoggingInterceptor - Returning status code 200 for POST request to /common/dataservice/di/milestone/um/message with query=[null] and http-user=[null]
- 2020.02.06 13:53:53.521 [qtp2132762784-18] INFO c.j.w.p.u.RequestLoggingInterceptor - Received GET request to /common/dataservice/di/milestone/um/milestone-notification/stream with query=[null] and http-user=[null]
- 2020.02.06 13:54:09.070 [qtp2132762784-16] INFO c.j.w.p.u.RequestLoggingInterceptor - Received POST request to /common/dataservice/di/milestone/um/message with query=[null] and http-user=[null]
- 2020.02.06 13:54:09.541 [jmsReactiveSource.org.springframework.jms.listener.DefaultMessageListenerContainer#0-1] INFO o.s.i.h.LoggingHandler - GenericMessage [payload={"messageId":"MAHESH_007","diNotificationTag":"MAHESH_007","messageTimeStamp":"2020-01-21T10:56:33Z","processMilestoneId":"MAHESH_007","processMilestoneName":"MAHESH_007","accountNumber":"12345","previousStatus":"In Progress","currentStatus":"complete","isNew":true}, headers={JMS_IBM_Character_Set=UTF-8, JMS_IBM_MsgType=8, jms_destination=queue:///NOTIFICATIONQUEUE, _type=com.jpmc.wss.portal.domain.um.MilestoneNotification, JMSXUserID=cibcfdid , JMS_IBM_Encoding=273, priority=4, jms_timestamp=1580997249519, JMSXAppID=jpmc.wss.portal.Application , JMS_IBM_PutApplType=28, JMS_IBM_Format=MQSTR , jms_redelivered=false, JMS_IBM_PutDate=20200206, JMSXDeliveryCount=1, JMS_IBM_PutTime=13540975, id=5421898e-5ef6-1f9b-aaa6-81ebc7668f50, jms_messageId=ID:414d51204e41544d31383820202020201d9f3b5e08738521, timestamp=1580997249541}]
- 2020.02.06 13:54:09.593 [qtp2132762784-16] DEBUG c.j.w.p.u.RequestLoggingInterceptor - Returning status code 200 for POST request to /common/dataservice/di/milestone/um/message with query=[null] and http-user=[null]
浏览器上的流量流
因此,当您需要服务器端事件时,您不能只从浏览器发出URL请求。
您需要从网页中使用一些特定的JavaScript API:https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events/Using_server-sent_events
我可以使用curl-N工具进行测试:curl-Structuring请求验证服务器发送的事件
或使用WebTestClient
表单Spring WebFlux:
Flux<String> stream =
this.webTestClient.get().uri("/stream")
.exchange()
.returnResult(String.class)
.getResponseBody();
StepVerifier
.create(stream)
.expectNext("m1", "m2", "m3")
.thenCancel()
.verify();
我们使用的是Spring kafka 2.7非阻塞重试机制。在Spring Kafka重试机制中,Kafka listenser使用来自main topic、Retry topic和DLT topic的消息,我们希望侦听器仅使用来自main和Retry topic的消息。 有没有简单的方法来进行设置? 因为我们不希望同一个消费者处理DLT消息。DLT还将被另一个进程使用,以发送请求通知。
如果我创建上面的类并尝试在tomcat7上部署war,我会看到以下错误。
我正在开发一个使用JMS作为消息传递层的应用程序。我还使用glassfish来托管jms/mq后端。该应用程序能够使用我最初设置的glassfish 3.1服务器中的连接工厂和主题进行发布/订阅消息传递。我现在有了glassfish (4.1)的另一个实例,它托管了一组新的应用程序使用的新功能,但是我仍然需要使用第一个glassfish服务器广播的消息。事实上,客户端使用特定于glassfish
因此,我使用Spring integration链接JMS和ActiveMQ,如下所示:- 如何使其工作,以便发送到此队列并从中接收消息?请帮忙。
我是这个Spring集成和JMS的新手,我开始使用它。在这里,我想通过activemq创建普通的jms消息,并通过spring inbound适配器(消息驱动)接收它。 以下是我的spring配置文件 这是我的测试课。 } 但问题是我不能保证交货。有些时候程序不能接收消息,有些时候它成功了,但有一些警告,如 无法刷新目标“queue://MSG_QUEUE”的JMS连接,将在5000毫秒后重试,原
我使用一个生产者在本地主机上运行的Kafka服务器中输入了一些消息。Zookeeper也在本地主机上。我使用了这里给出的。 但是,消费者似乎没有收到任何消息! 脚本可以提取所有这些消息,但代码不能。怎么了?消费者代码与该页面上给出的代码完全相同。 这是我发布消息的同一主题。以下是制作人的代码: