当前位置: 首页 > 面试题库 >

如何从ActiveMQ队列创建Spring Reactor Flux?

岑彬炳
2023-03-14
问题内容

我正在尝试使用Spring Reactor 3组件和Spring Integration从JMS队列创建反应式流(Flux)。

我正在尝试从JMS队列(使用Spring Integration的ActiveMQ)创建客户端的响应流(Spring Reactor 3
Flux),以使客户端异步获取JMS消息。我相信我已经正确连接了所有东西,但是在服务器停止之前,客户端不会收到任何JMS消息。然后,所有消息一次被“推送”到客户端。

任何帮助,将不胜感激。

这是我用来配置JMS,集成组件和反应式发布者的配置文件:

@Configuration
@EnableJms
@EnableIntegration
public class JmsConfiguration {

    @Value("${spring.activemq.broker-url:tcp://localhost:61616}")
    private String defaultBrokerUrl;

    @Value("${queues.patient:patient}")
    private String patientQueue;

    @Autowired
    MessageListenerAdapter messageListenerAdapter;

    @Bean
    public DefaultJmsListenerContainerFactory myFactory(
            DefaultJmsListenerContainerFactoryConfigurer configurer) {
        DefaultJmsListenerContainerFactory factory =
                new DefaultJmsListenerContainerFactory();
        configurer.configure(factory, jmsConnectionFactory());
        return factory;
    }

    @Bean
    public Queue patientQueue() {
        return new ActiveMQQueue(patientQueue);

    }

    @Bean
    public ActiveMQConnectionFactory jmsConnectionFactory() {
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
        connectionFactory.setBrokerURL(defaultBrokerUrl);
        connectionFactory.setTrustedPackages(Arrays.asList("com.sapinero"));
        return connectionFactory;
    }

    // Set the jackson message converter
    @Bean
    public JmsTemplate jmsTemplate() {
        JmsTemplate template = new JmsTemplate();
        template.setConnectionFactory(jmsConnectionFactory());
        template.setDefaultDestinationName(patientQueue);
        template.setMessageConverter(jacksonJmsMessageConverter());
        return template;
    }

    @Bean
    public MessageListenerAdapter messageListenerAdapter() {
        MessageListenerAdapter messageListenerAdapter = new MessageListenerAdapter();
        messageListenerAdapter.setMessageConverter(jacksonJmsMessageConverter());
        return messageListenerAdapter;
    }

    @Bean
    public AbstractMessageListenerContainer messageListenerContainer() {
        DefaultMessageListenerContainer defaultMessageListenerContainer = new DefaultMessageListenerContainer();
        defaultMessageListenerContainer.setMessageConverter(jacksonJmsMessageConverter());
        defaultMessageListenerContainer.setConnectionFactory(jmsConnectionFactory());
        defaultMessageListenerContainer.setDestinationName(patientQueue);
        defaultMessageListenerContainer.setMessageListener(messageListenerAdapter());
        defaultMessageListenerContainer.setCacheLevel(100);
        defaultMessageListenerContainer.setErrorHandler(new ErrorHandler() {
            @Override
            public void handleError(Throwable t) {
                t.printStackTrace();
            }
        });

        return defaultMessageListenerContainer;
    }

    @Bean // Serialize message content to json using TextMessage
    public MessageConverter jacksonJmsMessageConverter() {
        MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
        converter.setTargetType(MessageType.TEXT);
        converter.setTypeIdPropertyName("_type");
        return converter;
    }


    @Bean
    public MessageChannel jmsOutboundInboundReplyChannel() {
        return MessageChannels.queue().get();
    }

    @Bean
    public Publisher<Message<String>> pollableReactiveFlow() {
        return IntegrationFlows
                .from(Jms.messageDrivenChannelAdapter(messageListenerContainer()).get())
                .channel(MessageChannels.queue())
                .log(LoggingHandler.Level.DEBUG)
                .log()
                .toReactivePublisher();
    }

    @Bean
    public MessageChannel jmsChannel() {
        return new DirectChannel();
    }

创建磁通的控制器为:

@RestController
@RequestMapping("patients")
public class PatientChangePushController {
    private LocalDateTime lastTimePatientDataRetrieved = LocalDateTime.now();
    private int durationInSeconds = 30;
    private Patient patient;
    AtomicReference<SignalType> checkFinally = new AtomicReference<>();

    @Autowired
    PatientService patientService;

    @Autowired
    @Qualifier("pollableReactiveFlow")
    private
    Publisher<Message<String>> pollableReactiveFlow;

    @Autowired
    private JmsTemplate jmsTemplate;

    @Autowired
    private Queue patientQueue;

    /**
     * Subscribe to a Flux of a patient that has been updated.
     *
     * @param id
     * @return
     */
    @GetMapping(value = "/{id}/alerts", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<Message<String>> getPatientAlerts(@PathVariable Long id) {

        Flux<Message<String>> messageFlux = Flux.from(pollableReactiveFlow);
        return messageFlux;
    }

    @GetMapping(value = "/generate")
    public void generateJmsMessage() {
        for (long i = 0L; i < 100; i++) {
            Patient patient = new Patient();
            patient.setId(i);
            send(patient);
            System.out.println("Message was sent to the Queue");
        }

    }

    void send(Patient patient) {
        this.jmsTemplate.convertAndSend(this.patientQueue, patient);
    }

}

如果有人能告诉我为什么直到服务器被杀死后才将消息发送给客户端,我将不胜感激。


问题答案:

对我来说效果很好:

@SpringBootApplication
@RestController
public class SpringIntegrationSseDemoApplication {

    public static void main(String[] args) {
        SpringApplication.run(SpringIntegrationSseDemoApplication.class, args);
    }

    @Autowired
    private ConnectionFactory connectionFactory;

    @Autowired
    private JmsTemplate jmsTemplate;

    @Bean
    public Publisher<Message<String>> jmsReactiveSource() {
        return IntegrationFlows
                .from(Jms.messageDrivenChannelAdapter(this.connectionFactory)
                        .destination("testQueue"))
                .channel(MessageChannels.queue())
                .log(LoggingHandler.Level.DEBUG)
                .log()
                .toReactivePublisher();
    }

    @GetMapping(value = "/events", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<String> getPatientAlerts() {
        return Flux.from(jmsReactiveSource())
                .map(Message::getPayload);
    }

    @GetMapping(value = "/generate")
    public void generateJmsMessage() {
        for (int i = 0; i < 100; i++) {
            this.jmsTemplate.convertAndSend("testQueue", "testMessage #" + (i + 1));
        }
    }

}

在一个终端中,我有curl http://localhost:8080/events一个等待来自那里的SSE Flux

在另一个终端中,我执行curl http://localhost:8080/generate并在第一个终端中看到:

data:testMessage #1

data:testMessage #2

data:testMessage #3

data:testMessage #4

我使用Spring Boot 2.0.0.BUILD-SNAPSHOT。

另请参阅此处:https : //spring.io/blog/2017/03/08/spring-tips-server-sent-events-
sse



 类似资料:
  • 我现有的使用阻止队列的代码创建了一个阻止队列列表(如私有列表 任何帮助将不胜感激。

  • 我正在使用SpringReactor3组件和SpringIntegration进行实验,以从JMS队列创建反应流(Flux)。 我试图从JMS队列(使用SpringIntegration的ActiveMQ)创建一个反应流(SpringReactor3Flux),以供客户端异步获取JMS消息。我相信我已经正确连接了所有内容,但在服务器停止之前,客户端不会接收任何JMS消息。然后,所有消息都被“推送”

  • 因为正如我在Active MQ Artemis文档中看到的,持久值是一个布尔值,但在amqpnetlite库中它是一个uint,我的理解是,超过0的所有内容都应该是true,而0应该是false。 起初,这种行为非常奇怪:即使当Aretemis Web界面显示为持久队列时,一旦没有用户连接,它也会被删除。 我发现:ActiveMQ Artemis queue在关闭消费客户机后被删除,这描述了即使是

  • 我试图实现从阻塞队列创建的Reactor通量,但不确定哪个操作符最适合我的用例? 我正在创建一个流式RESTendpoint,其中的响应是流量,需要不断从阻塞队列中发出消息,作为获取REST调用的响应。 我已经尝试过论坛和留档,只能找到从可迭代集合或响应数据源发起的Flux,但没有任何BlockingQueue的示例。

  • 我是activeMQ的新手,在将消息从驻留在另一台服务器上的消息生成器推送到activeMQ定义的队列时遇到问题。 我在activeMQ上使用camel routes创建的应用程序中有几个队列。我尝试从另一台服务器上的应用程序对这些队列执行远程JNDI查找。我使用了来自http://activemq.apache.org/jndi-support.html页面的activemq文档片段。 我可以连