当前位置: 首页 > 知识库问答 >
问题:

如何使用Spring订阅传入的SSE事件

哈朗
2023-03-14

我编写了一个Spring RestController,它返回一个SseEmitter(对于服务器发送的事件),并向每个事件添加HATEOAS链接。以下是该控制器的一个简化但有效的示例:

package hello;

import static org.springframework.hateoas.mvc.ControllerLinkBuilder.linkTo;
import static org.springframework.hateoas.mvc.ControllerLinkBuilder.methodOn;
import hello.Greeting.Status;
import java.io.IOException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.servlet.mvc.method.annotation.ResponseBodyEmitter;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;

@RestController
public class GreetingController {

    private static final Logger log = LoggerFactory.getLogger(GreetingController.class);

    private static final String template = "Hello, %s!";

    class GreetingRequestHandler implements Runnable {

        private ResponseBodyEmitter emitter;
        private Greeting greeting;

        public GreetingRequestHandler(final ResponseBodyEmitter emitter, final Greeting greeting) {
            this.emitter = emitter;
            this.greeting = greeting;
        }

        @Override
        public void run() {
            try {
                log.info(this.greeting.toString());
                this.emitter.send(this.greeting);
                Thread.sleep(5000);
                if (Status.COMPLETE.equals(this.greeting.getStatus())) {
                    this.emitter.complete();
                } else {
                    this.greeting.incrementStatus();
                    new Thread(new GreetingRequestHandler(this.emitter, this.greeting)).start();
                }
            } catch (IOException | InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    @RequestMapping(path = "/greeting")
    public SseEmitter greeting(@RequestParam(value = "name", defaultValue = "World") final String name) {
        SseEmitter emitter = new SseEmitter();
        Greeting greeting = new Greeting(String.format(template, name));
        greeting.add(linkTo(methodOn(GreetingController.class).greeting(name)).withSelfRel());
        new Thread(new GreetingRequestHandler(emitter, greeting)).start();
        log.info("returning emitter");
        return emitter;
    }
}

问候语课程如下:

package hello;

import java.util.concurrent.atomic.AtomicInteger;
import org.springframework.hateoas.ResourceSupport;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;

@JsonIgnoreProperties(ignoreUnknown = true)
public class Greeting extends ResourceSupport {

    private final String content;
    private final static AtomicInteger idProvider = new AtomicInteger();
    private int greetingId;
    private Status status;

    enum Status {
        ENQUEUED,
        PROCESSING,
        COMPLETE;
    }

    @JsonCreator
    public Greeting(@JsonProperty("content") final String content) {
        this.greetingId = idProvider.addAndGet(1);
        this.status = Status.ENQUEUED;
        this.content = content;
    }

    public Status getStatus() {
        return this.status;
    }

    protected void setStatus(final Status status) {
        this.status = status;
    }

    public int getGreetingId() {
        return this.greetingId;
    }

    public String getContent() {
        return this.content;
    }

    @Override
    public String toString() {
        return "Greeting{id='" + this.greetingId + "', status='" + this.status + "' content='" + this.content + "', " + super.toString() + "}";
    }

    public void incrementStatus() {
        switch (this.status) {
            case ENQUEUED:
                this.status = Status.PROCESSING;
                break;
            case PROCESSING:
                this.status = Status.COMPLETE;
                break;
            default:
                break;
        }
    }
}

此代码运行完美。如果我尝试使用Web浏览器访问REST服务,我会看到显示正确内容和链接的事件。

结果如下所示(每个事件出现在前一个事件之后5秒):

data:{"content":"Hello, Kraal!","greetingId":8,"status":"ENQUEUED","_links":{"self":{"href":"http://localhost:8080/greeting?name=Kraal"}}}
data:{"content":"Hello, Kraal!","greetingId":8,"status":"PROCESSING","_links":{"self":{"href":"http://localhost:8080/greeting?name=Kraal"}}}
data:{"content":"Hello, Kraal!","greetingId":8,"status":"COMPLETE","_links":{"self":{"href":"http://localhost:8080/greeting?name=Kraal"}}}

现在我需要调用这个REST服务并从另一个Spring应用程序读取这些事件。。。但我不知道如何使用Spring编写客户机代码。这不起作用,因为RestTemplate是为同步客户端HTTP访问而设计的。。。

    ObjectMapper mapper = new ObjectMapper();
    mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
    mapper.registerModule(new Jackson2HalModule());

    // required for HATEOAS
    MappingJackson2HttpMessageConverter converter = new MappingJackson2HttpMessageConverter();
    converter.setSupportedMediaTypes(MediaType.parseMediaTypes("application/hal+json"));
    converter.setObjectMapper(mapper);

    // required in order to be able to read serialized objects
    MappingJackson2HttpMessageConverter converter2 = new MappingJackson2HttpMessageConverter();
    converter2.setSupportedMediaTypes(MediaType.parseMediaTypes("application/octet-stream"));
    converter2.setObjectMapper(mapper);

    // required to understand SSE events
    MappingJackson2HttpMessageConverter converter3 = new MappingJackson2HttpMessageConverter();
    converter3.setSupportedMediaTypes(MediaType.parseMediaTypes("text/event-stream"));

    List<HttpMessageConverter<?>> converters = new ArrayList<HttpMessageConverter<?>>();
    converters.add(converter);
    converters.add(converter2);
    converters.add(converter3);

    // probably wrong template
    RestTemplate restTemplate = new RestTemplate();
    restTemplate = new RestTemplate(converters);
    // this does not work as I receive events and no a single object
    Greeting greeting = restTemplate.getForObject("http://localhost:8080/greeting/?name=Kraal", Greeting.class);
    log.info(greeting.toString());

我得到的错误消息是:

Caused by: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'data': was expecting ('true', 'false' or 'null')

事实上,每个事件都是一个SSE事件,并以“data:”开头...

因此,问题是:

  • 我应该注册什么ObjectMapper模块才能与Jackson一起映射SSE?
  • 如何使用Spring订阅传入的SSE事件(观察者模式)?

提前谢谢。

旁注:当我使用Spring努力做到这一点时,我尝试使用泽西SSE支持来做到这一点,如下所示。使用泽西我按预期接收事件,但我无法将它们强制转换为问候类(出于与上述相同的原因,我想这是我没有正确的转换器模块。):

Client client = ClientBuilder.newBuilder().register(converter).register(SseFeature.class).build();
WebTarget target = client.target("http://localhost:8080/greeting/?name=Kraal");
EventInput eventInput = target.request().get(EventInput.class);
while (!eventInput.isClosed()) {
    final InboundEvent inboundEvent = eventInput.read();
    if (inboundEvent == null) {
        // connection has been closed
        break;
    }
    // this works fine and prints out events as they are incoming
    System.out.println(inboundEvent.readData(String.class));
    // but this doesn't as no proper way to deserialize the
    // class with HATEOAS links can be found
    // Greeting greeting = inboundEvent.readData(Greeting.class);
    // System.out.println(greeting.toString());
}

共有1个答案

拓拔玺
2023-03-14

根据文件

您可以使用inboundEvent。readData(类

 类似资料:
  • 我喜欢做一个有redis回应的SSE。在quarkus中订阅。 我有一个来自quarkus快速入门的简单SSE示例 这个效果很好,每2秒钟我就会收到Hello。。。。在我的web浏览器中 现在我尝试订阅Redis,所以我应该会收到Redis的消息。 Redis示例: 现在,我用quarkus SSE尝试以下方法: 我收到的是一个例外: 有人能支持我吗?有一个简单的例子吗?我对此一无所知,我无法接收

  • 我有一个Spring靴2.0.0。M7 Spring Webflux应用程序,其中我使用的是Thymeleaf Reactive。 我注意到,在我的微服务上,当我在SSE模式(文本/事件流)下调用一个返回数据流的endpoint时,即使该数据流已被正确处理,也会在该数据流上发生cancel()。 例如,这里有一个简单的控制器endpoint: 以下是我在SSE模式下请求时得到的订阅流量日志: 我们

  • Node.js应用程序可以使用composer-client.BusinessNetworkConnection.onAPI调用从业务网络订阅事件。事件在业务网络模型文件中定义,并由交易处理函数文件中的指定交易处理。有关发布事件的更多信息,请参阅发布事件。 在你开始之前 在应用程序可以订阅事件之前,你必须定义一些事件和发送它们的交易。还必须部署业务网络,并且必须具有可连接到该业务网络的连接配置文件

  • 基本上,我试图使用ARM部署一个事件网格订阅来收集订阅中的特定事件(主题类型= Azure订阅)。我已经有一个创建了事件网格触发功能的功能应用程序,只需要将该功能与事件网格订阅绑定为webhook。 我正在使用Azure DevOps中的发布管道来自动化整个工作流。 以下是我使用的一个示例: 这最终部署了事件网格主题,而不是事件网格订阅。 然后,有人建议我尝试以下操作: 但是这最终以这个错误而失败

  • 语境: 给定一个WooCommerce和WooCommerce订阅的WordPress网站,我试图获取特定用户订阅的列表。在最近的更新之前,这一行代码已经为我解决了这个问题。以下是我一直在使用的代码: 其中$user\u id是WordPress中的有效用户id。 问题: 自上次更新以来,我们经常看到以下错误: 致命错误:在/home/warfarep/public_html/wp content

  • 我用Spring4、STOMP和sock做了一个简单的web套接字通信。js,接下来呢,https://spring.io/guides/gs/messaging-stomp-websocket/ 现在我想升级到简单聊天。我的问题是,当用户订阅新的聊天室时,他应该得到过去的消息。我不知道如何捕捉他订阅发送邮件列表的那一刻。 我尝试使用@MessageMapping注释,但没有成功: 然后我创造了: