当前位置: 首页 > 知识库问答 >
问题:

从另一种方法发送服务器事件

益光亮
2023-03-14

我试图实现一个服务器发送的事件控制器,用最新的数据更新我的Web浏览器客户端。

这是我当前的控制器,每5秒发送一次我的数据列表。我想在每次将数据保存在其他服务中时发送 SSE。我阅读了有关使用通道的信息,但是如何将其与 Flux 一起使用?

@GetMapping("/images-sse")
fun getImagesAsSSE(
    request: HttpServletRequest
): Flux<ServerSentEvent<MutableList<Image>>> {
    val subdomain = request.serverName.split(".").first()
    return Flux.interval(Duration.ofSeconds(5))
        .map {
            ServerSentEvent.builder<MutableList<Image>>()
                .event("periodic-event")
                .data(weddingService.getBySubdomain(subdomain)?.pictures).build()
        }
}

共有2个答案

阚正真
2023-03-14

我终于成功了。我还使用cookie添加了特定于用户的更新。

这是我的SSE控制器

@RestController
@RequestMapping("/api/sse")
class SSEController {

    val imageUpdateSink : Sinks.Many<Wedding> = Sinks.many().multicast().directBestEffort()
    @GetMapping("/images")
    fun getImagesAsSSE(
        request: HttpServletRequest
    ): Flux<ServerSentEvent<MutableList<Image>>> {
        val counter: AtomicLong = AtomicLong(0)
        return imageUpdateSink.asFlux()
            .filter { wedding ->
                val folderId = request.cookies.find {cookie ->
                    cookie.name == "folderId"
                }?.value

                folderId == wedding.folderId
            }.map { wedding ->
                     ServerSentEvent.builder<MutableList<Image>>()
                        .event("new-image")
                        .data(
                            wedding.pictures
                        ).id(counter.incrementAndGet().toString())
                        .build()
            }
    }
}

在我的服务中,我的数据被更新:

val updatedImageList = weddingRepository.findByFolderId(imageDTO.folderId)
sseController.imageUpdateSink.tryEmitNext(
    updatedImageList
)

我的Javascript看起来像这样:

document.cookie = "folderId=" + [[${wedding.folderId}]]
const evtSource = new EventSource("/api/sse/images")
evtSource.addEventListener("new-image", function(alpineContext){
    return function (event) {
        console.log(event.data)
        alpineContext.images = JSON.parse(event.data)
    };
}(this))
蔚宏大
2023-03-14

控制器的示例代码:

package sk.qpp;

import lombok.extern.slf4j.Slf4j;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.codec.ServerSentEvent;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.bind.annotation.ResponseStatus;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Sinks;

import java.util.concurrent.atomic.AtomicLong;

@Controller
@Slf4j
public class ReactiveController {
    record SomeDTO(String name, String address) {
    }

    private final Sinks.Many<SomeDTO> eventSink = Sinks.many().multicast().directBestEffort();

    @RequestMapping(path = "/sse", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<ServerSentEvent<SomeDTO>> sse() {
        final AtomicLong counter = new AtomicLong(0);
        return eventSink.asFlux()
                .map(e -> ServerSentEvent.builder(e)
                        .id(counter.incrementAndGet() + "")
                        //.event(e.getClass().getName())
                        .build());
    }

    // note, when you want this to work in production, ensure, that http request is not being cached on its way, using POST method for example.
    @ResponseStatus(HttpStatus.OK)
    @ResponseBody
    @GetMapping(path = "/sendSomething", produces = MediaType.TEXT_PLAIN_VALUE)
    public String sendSomething() {
        this.eventSink.emitNext(
                new SomeDTO("name", "address"),
                (signalType, emitResult) -> {
                    log.warn("Some event is being not send to all subscribers. It will vanish...");
                    // returning false, to not retry emitting given data again.
                    return false;
                }
        );
        return "Have a look at /sse endpoint (using \"curl http://localhost/sse\" for example), to see events in realtime.";
    }
}

Sink被用作一些“自定义通量”,您可以在其中放置任何内容(使用emitNext),并从中获取(使用asFlux()方法)。

在设置了示例控制器之后,在浏览器中打开http://localhost:9091/send something(即,对其执行get请求),并在控制台发出命令< code > curl http://localhost:9091/sse 来查看SSE事件(在每个GET请求之后,应该会出现新的事件)。也可以在chromium浏览器中直接看到sse事件。Firefox尝试下载并保存到文件系统作为文件(也可以)。

 类似资料:
  • 概述 客户端代码 概述 建立连接 open事件 message事件 error事件 自定义事件 close方法 数据格式 概述 data:数据栏 id:数据标识符 event栏:自定义信息类型 retry:最大间隔时间 服务器代码 参考链接 概述 传统的网页都是浏览器向服务器“查询”数据,但是很多场合,最有效的方式是服务器向浏览器“发送”数据。比如,每当收到新的电子邮件,服务器就向浏览器发送一个“

  • 概述 客户端代码 概述 建立连接 open事件 message事件 error事件 自定义事件 close方法 数据格式 概述 data:数据栏 id:数据标识符 event栏:自定义信息类型 retry:最大间隔时间 服务器代码 参考链接 概述 传统的网页都是浏览器向服务器“查询”数据,但是很多场合,最有效的方式是服务器向浏览器“发送”数据。比如,每当收到新的电子邮件,服务器就向浏览器发送一个“

  • 我试图让服务器发送的事件与Mozilla Firefox一起工作。给定一个Spring Boot的网络服务 使用Chrome浏览器或Edge(始终是最新版本)可以正常工作。我可以在网络分析器选项卡中看到未完成的请求,并且每秒都会显示一个新的时间戳。 然而,当我使用Firefox(84.0.2或更早版本)时,请求也会显示在网络选项卡中,但不会显示响应头或流数据。当我终止Spring后端时,Firef

  • 我有姜戈问题。我想将来自浏览器或django服务器上的业务逻辑的数据发送到另一个django服务器或只是同一服务器但不同端口来处理请求。我该怎么办?我试着用套接字来实现,但似乎不起作用。

  • 如果我正确理解了服务器发送事件的原则,那么每次客户端注册到EventSource时,它实际上会打开一个到管理事件的资源的新HTTP连接。与其他HTTP请求相反,连接保持活动状态,因此专用于此客户端的服务器进程/线程将一直运行,直到客户端断开连接。 如果我们有1000个客户端使用SSE连接到一个应用程序,该怎么办?我们是否会有1000个进程/线程(做同样的事情)并发运行以处理SSE?我想我错了,但如