当前位置: 首页 > 知识库问答 >
问题:

Spring 5 Web Reactive-Hot Publishing-如何使用EmitterProcessor将MessageListener桥接到事件流

澹台建华
2023-03-14

一个示例项目位于以下位置:https://github.com/codependent/spring5-playground

@Component
public class AlertEmitterProcessor {

    private Logger logger = LoggerFactory.getLogger(getClass());

    private EmitterProcessor<Alert> processor;

    public AlertEmitterProcessor(){
        processor = EmitterProcessor.<Alert>create();
        processor.connect();
    }

    public EmitterProcessor<Alert> getProcessor() {
        return processor;
    }

    public void onNext(Alert alert){
        logger.info("onNext [{}]", alert);
        processor.onNext(alert);
    }

    public void onComplete(){
        logger.info("onComplete");
        processor.onComplete();
    }

    public void onError(Throwable t){
        logger.error("onError", t);
        processor.onError(t);
    }
}
@Component
public class AlertMessageListener implements MessageListener{

    private Logger logger = LoggerFactory.getLogger(getClass());

    @Autowired 
    private AlertEmitterProcessor alertProcessor;

    @Autowired
    private MappingJackson2HttpMessageConverter jacksonMessageConverter;

    @Override
    public void onMessage(Message message) {
        logger.info("Message received: [{}]", message);
        TextMessage tm = (TextMessage)message;
        try {
            Alert alert = jacksonMessageConverter.getObjectMapper().readValue(tm.getText(), Alert.class);
            alertProcessor.onNext(alert);
        } catch (IOException | JMSException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }
}
@Autowired
private AlertEmitterProcessor alertTopicProcessor;

@Autowired 
private AlertMessageListener messageListener;

@Autowired
private MappingJackson2HttpMessageConverter jacksonMessageConverter;

@GetMapping(value="/accounts/{id}/alerts/live2", produces="text/event-stream")
public Flux<Alert> getAccountAlertsStreaming2(@PathVariable Integer id) {
    return alertTopicProcessor.getProcessor()
        .log().filter( a -> a.getAccountId().equals(id) );
}
@GetMapping(value="/mock/accounts/{id}/alerts/put", produces="text/event-stream")
public void putAlert(@PathVariable Integer id) throws JsonProcessingException {
    Alert alert = new Alert(id, (long)Math.round(Math.random()*10), "Message");
    String alertStr = jacksonMessageConverter.getObjectMapper().writeValueAsString(alert);
    TextMessage tm = new MockTextMessage(alertStr);
    messageListener.onMessage(tm);
}
2016-10-03 13:43:38.755 DEBUG 12800 --- [nio-8080-exec-1] o.s.web.reactive.DispatcherHandler       : Processing GET request for [http://localhost:8080/accounts/1/alerts/live2]
2016-10-03 13:43:38.770 DEBUG 12800 --- [nio-8080-exec-1] s.w.r.r.m.a.RequestMappingHandlerMapping : Looking up handler method for path /accounts/1/alerts/live2
2016-10-03 13:43:38.778 DEBUG 12800 --- [nio-8080-exec-1] s.w.r.r.m.a.RequestMappingHandlerMapping : Returning handler method [public reactor.core.publisher.Flux<com.codependent.spring5.playground.reactive.dto.Alert> com.codependent.spring5.playground.reactive.web.AccountsRestController.getAccountAlertsStreaming2(java.lang.Integer)]
2016-10-03 13:43:38.779 DEBUG 12800 --- [nio-8080-exec-1] o.s.b.f.s.DefaultListableBeanFactory     : Returning cached instance of singleton bean 'accountsRestController'
2016-10-03 13:43:38.800  INFO 12800 --- [nio-8080-exec-1] reactor.unresolved                       : onSubscribe(reactor.core.publisher.FluxPeek$PeekSubscriber@54d4fb36)
2016-10-03 13:43:38.802  INFO 12800 --- [nio-8080-exec-1] reactor.unresolved                       : request(unbounded)
2016-10-03 13:43:38.803  INFO 12800 --- [nio-8080-exec-1] reactor.unresolved                       : onNext(1)
2016-10-03 13:43:38.822  INFO 12800 --- [nio-8080-exec-1] reactor.Flux.EmitterProcessor.2          : onSubscribe(reactor.core.publisher.EmitterProcessor$EmitterSubscriber@227405f2)
2016-10-03 13:43:38.822  INFO 12800 --- [nio-8080-exec-1] reactor.Flux.EmitterProcessor.2          : request(1)
2016-10-03 13:43:38.823  INFO 12800 --- [nio-8080-exec-1] reactor.unresolved                       : onComplete()
2016-10-03 13:43:43.063 DEBUG 12800 --- [nio-8080-exec-2] o.s.web.reactive.DispatcherHandler       : Processing GET request for [http://localhost:8080/mock/accounts/1/alerts/put]
2016-10-03 13:43:43.063 DEBUG 12800 --- [nio-8080-exec-2] s.w.r.r.m.a.RequestMappingHandlerMapping : Looking up handler method for path /mock/accounts/1/alerts/put
2016-10-03 13:43:43.068 DEBUG 12800 --- [nio-8080-exec-2] s.w.r.r.m.a.RequestMappingHandlerMapping : Returning handler method [public void com.codependent.spring5.playground.reactive.web.AccountsRestController.putAlert(java.lang.Integer) throws com.fasterxml.jackson.core.JsonProcessingException]
2016-10-03 13:43:43.069 DEBUG 12800 --- [nio-8080-exec-2] o.s.b.f.s.DefaultListableBeanFactory     : Returning cached instance of singleton bean 'accountsRestController'
2016-10-03 13:43:43.071  INFO 12800 --- [nio-8080-exec-2] reactor.unresolved                       : onSubscribe(reactor.core.publisher.FluxPeek$PeekSubscriber@2ba7d09c)
2016-10-03 13:43:43.071  INFO 12800 --- [nio-8080-exec-2] reactor.unresolved                       : request(unbounded)
2016-10-03 13:43:43.072  INFO 12800 --- [nio-8080-exec-2] reactor.unresolved                       : onNext(1)
2016-10-03 13:43:43.112  INFO 12800 --- [nio-8080-exec-2] c.c.s.p.r.message.AlertMessageListener   : Message received: [com.codependent.spring5.playground.reactive.message.MockTextMessage@37262c9e]
2016-10-03 13:43:43.145  INFO 12800 --- [nio-8080-exec-2] c.c.s.p.r.message.AlertEmitterProcessor  : onNext [Alert [alertId=3, message=Message, accountId=1]]
2016-10-03 13:43:43.146  INFO 12800 --- [nio-8080-exec-2] reactor.Flux.EmitterProcessor.2          : onNext(Alert [alertId=3, message=Message, accountId=1])
2016-10-03 13:43:43.177  INFO 12800 --- [nio-8080-exec-2] reactor.unresolved                       : onComplete()
2016-10-03 13:43:43.177 DEBUG 12800 --- [nio-8080-exec-2] o.s.h.s.r.ServletHttpHandlerAdapter      : Successfully completed request
2016-10-03 13:45:07.726 DEBUG 12800 --- [nio-8080-exec-8] s.w.r.r.m.a.RequestMappingHandlerMapping : Returning handler method [public reactor.core.publisher.Flux<com.codependent.spring5.playground.reactive.dto.Alert> com.codependent.spring5.playground.reactive.web.AccountsRestController.getAccountAlertsStreaming2(java.lang.Integer)]
2016-10-03 13:45:07.726 DEBUG 12800 --- [nio-8080-exec-8] o.s.b.f.s.DefaultListableBeanFactory     : Returning cached instance of singleton bean 'accountsRestController'
2016-10-03 13:45:07.727  INFO 12800 --- [nio-8080-exec-8] reactor.unresolved                       : onSubscribe(reactor.core.publisher.FluxPeek$PeekSubscriber@909f06f)
2016-10-03 13:45:07.727  INFO 12800 --- [nio-8080-exec-8] reactor.unresolved                       : request(unbounded)
2016-10-03 13:45:07.727  INFO 12800 --- [nio-8080-exec-8] reactor.unresolved                       : onNext(1)
2016-10-03 13:45:07.729  INFO 12800 --- [nio-8080-exec-8] reactor.Flux.EmitterProcessor.9          : onSubscribe(reactor.core.publisher.EmitterProcessor$EmitterSubscriber@7ce1f3e)
2016-10-03 13:45:07.729  INFO 12800 --- [nio-8080-exec-8] reactor.Flux.EmitterProcessor.9          : request(1)
2016-10-03 13:45:07.729  INFO 12800 --- [nio-8080-exec-8] reactor.Flux.EmitterProcessor.9          : onNext(Alert [alertId=4, message=Message, accountId=1])
2016-10-03 13:45:07.730  INFO 12800 --- [nio-8080-exec-8] reactor.unresolved                       : onComplete()
2016-10-03 13:45:07.747  INFO 12800 --- [nio-8080-exec-8] reactor.Flux.EmitterProcessor.9          : request(1)
2016-10-03 13:45:07.747  INFO 12800 --- [nio-8080-exec-8] reactor.Flux.EmitterProcessor.9          : onNext(Alert [alertId=0, message=Message, accountId=1])
2016-10-03 13:45:07.748  INFO 12800 --- [nio-8080-exec-8] reactor.Flux.EmitterProcessor.9          : request(1)

...但是很多时候它没有得到任何东西。

共有1个答案

徐皓君
2023-03-14

在Spring5立方米解决了:

  • https://jira.spring.io/browse/spr-14803
  • https://jira.spring.io/browse/spr-14772
 类似资料:
  • 我想要trrigger点击事件上的span标记onclick happed。li创建时创建/单击span标记(创建集合)单击。 下面是我的span和ul li标签。li列表在单击SPAN时动态添加到ul中。 我只想在li上触发click事件-Encat值将为0。否则不需要触发。 下面是我正在尝试的jquery代码,但是它应用了所有的数据猫值(li) 任何人知道怎么做,然后告诉我。

  • 我正在创建一个Spring MessageListenerAdapter,用于侦听XML消息队列。 这是我的amqp配置: 当前,消息的接收工作正常,但仅当我的侦听器的返回类型是字符串时。 通过以下侦听器,我可以接收XML消息,所以这很好。但我无法获取原始消息的消息属性: 如果我将receiveMessage()的返回类型更改为字节[]或消息,则会收到以下错误消息: 我已经尝试过MessageCo

  • 我正在学习jboss Weld Event教程中的Weld Event,我想写一个观察事件并在事件被激发时打印helloword的示例。 这是我的代码: 它不起作用,给出以下异常信息: 容器中似乎没有可以初始化的bean 那么我该怎么做才能使它运行,我的beans.xml是空的 也许我应该在beans.xml中做些什么? 或者我应该编写一个实现事件接口的Java类? 任何内容都将适用。

  • 问题内容: Xcode更新后,我遇到了一个奇怪的问题。在我的应用程序中,当我从API获得响应时,我会手动解析它并将其映射到我的模型。我的代码中有很多地方使用空合并将JSON值转换为如下所示: 它过去可以正常工作,但是在更新后,它始终默认为0。为了确定原因,我尝试强制将其强制转换为如下所示 这导致以下错误 无法将NSNumber桥接到Float 经过进一步调查,我发现Xcode使用的是Swift 3

  • 我正在使用JmsTemplate转换和发送事件对象。我已经向JmsTemplate注册了MappingJackson2MessageConverter。 我正在使用并从该主题获取消息。我不知道如何将此消息转换回我的对象? JmsTemplate的messageconverter是 我尝试调用并传递此消息对象进行转换,但没有成功。如何将我收到的消息对象转换为我想要的自定义对象?

  • 问题内容: 如果我使用画布来显示数据,并且希望用户能够单击画布上的各个项目以获取更多信息或以某种方式与其进行交互,那么最好的方法是什么? 在网上搜索,我可以找到有关如何将事件绑定到标签的信息,但这似乎比我想要的更间接。我不想使用标签对项目进行分组,而是在用户单击画布上的特定项目时进行特定的功能调用。 问题答案: 要与包含在对象中的对象进行交互,您需要使用以下格式的对象: item参数可以是标签或I