当前位置: 首页 > 知识库问答 >
问题:

最佳媒体类型,可通过服务器发送事件将 Flux 提供给 Angular/RxJs/observer 尽快返回 Flux

蒙经纶
2023-03-14

上下文:Spring Web Flux以服务器已发送事件样式将Flux返回到Angular。

个人知识:

根据其他stackoverflow主题

媒体类型applicationjson将在内存中缓冲流量,并在一次传递中将其序列化。

媒体类型applicationstream json将在网络上刷新流量输入的每个元素。当流是无限的,或者当您想在信息可用时立即将信息推送到客户端时,这种行为非常方便。

基于前两个前提,我选择了applicationstream json,因为我希望“在信息可用时尽快将其推送到客户端”,而不是“在内存中缓冲流量并在一次传递中序列化”。

好吧,如果我添加MediaType.APPLICATION_STREAM_JSON,就像我在WebFluxendpoint中一样,那么从Spring Webclient读取时就可以了,但是它不适用于通过SSE连接的Angular / RxJs。

// Delay 0.5 second
@GetMapping(path = "/search-with-delay/{parte_da_palavra}")
public Flux<Sugestao> getSugestoesDelay(@PathVariable("parte_da_palavra") String parte_da_palavra) {
    return sugestaoService.findAllMySugestoes("name", parte_da_palavra).delayElements(Duration.ofMillis(500));
}

@GetMapping(path = "/search/{parte_da_palavra}")
public Flux<Sugestao> getSugestoes(@PathVariable("parte_da_palavra") String parte_da_palavra) {
    return sugestaoService.findAllMySugestoes("name", parte_da_palavra);
}

app.component.ts

import { Component, OnInit } from '@angular/core';

//SERVICES
import { SseService } from './sse.service';
import { NosseService } from './nosse.service';

//MODELS
import { Sugestao } from './sugestao';

//RXJS
import { Observable } from 'rxjs';

@Component({
  selector: 'app-root',
  templateUrl: './app.component.html',
  styleUrls: ['./app.component.css']
})
export class AppComponent implements OnInit {
  sugestoes$: Observable<any>;
  restItems: any;

  //### SSE
  searchSseWithDelay(searchValue: string): void {
    this.sugestoes$ = this.sseService
      .getServerSentEvent("http://localhost:8080/sugestao/search-with-delay/" + searchValue);
  }

  searchSseWithoutDelay(searchValue: string): void {
    this.sugestoes$ = this.sseService
      .getServerSentEvent("http://localhost:8080/sugestao/search/" + searchValue);
  }

  //### Without SSE
  searchWithDelay(searchValue: string): void {
    this.sugestoes$ = this.nosseService.getWithoutServerSentEvent("http://localhost:8080/sugestao/search-with-delay/" + searchValue);
  }

  searchWthoutDelay(searchValue: string): void {
    this.sugestoes$ = this.nosseService.getWithoutServerSentEvent("http://localhost:8080/sugestao/search/" + searchValue);
  }

  constructor(
    private sseService: SseService, private nosseService: NosseService) { }

  ngOnInit() {
  }


}

及其服务

无上证

import { Injectable, NgZone } from '@angular/core';
import { Observable } from 'rxjs';
import { Sugestao } from './sugestao';
import { HttpClient } from '@angular/common/http';
import { map, tap } from 'rxjs/operators';

@Injectable({
    providedIn: "root"
})
export class NosseService {

    constructor(private http: HttpClient) { }

    getWithoutServerSentEvent(url: string): Observable<any> {

        return this.http
            .get<Sugestao[]>(url)
            .pipe()
            ;
    }
}

与上交所合作

import { Injectable, NgZone } from '@angular/core';
import { Observable } from 'rxjs';
import { Sugestao } from './sugestao';


@Injectable({
  providedIn: "root"
})
export class SseService {
  sugestoes: Sugestao[] = [];
  constructor(private _zone: NgZone) { }

  getServerSentEvent(url: string): Observable<any> {
    this.sugestoes = [];
    return Observable.create(observer => {
      const eventSource = this.getEventSource(url);
      eventSource.onmessage = event => {
        this._zone.run(() => {
          let json = JSON.parse(event.data);
          this.sugestoes.push(new Sugestao(json['id'], json['name'], json['phone'], json['account']));
          observer.next(this.sugestoes);
        });
      };
      eventSource.onerror = (error) => {
        if (eventSource.readyState === 0) {
          console.log('The stream has been closed by the server.');
          eventSource.close();
          observer.complete();
        } else {
          observer.error('EventSource error: ' + error);
        }
      }

    });
  }
  private getEventSource(url: string): EventSource {
    return new EventSource(url);
  }


}

谷歌搜索我发现基本上说我必须使用文本/事件流

“如果我们在不使用 Accept 标头的情况下请求内容,或者将其设置为 application/json,我们将得到一个同步的 JSON 格式响应。如果我们想在Spring中使用Server-Sent Events支持来实现我们的完整反应式堆栈,我们会在请求中(显式或在幕后)将Accept标头设置为文本/事件流,从而激活Spring中的反应式功能。

最后阅读我在Mozila文档中发现的HTML5

EventSource实例打开到HTTP服务器的持久连接,HTTP服务器以文本/事件流格式发送事件。该连接保持打开,直到调用EventSource.close()关闭

在那之后,我真的很困惑。我肯定会使用Angular/RxJs/SSE,我的意思是,我根本不会在我的客户端使用Spring WebClient。据我所知,MediaType.APPLICATION_STREAM_JSON是我业务需求中的最佳选择,但似乎因为我使用SSE,我必须更喜欢MediaType.TEXT_EVENT_STREAM当尝试使用任何返回事件流的MediaType时,我习惯于编写观察者的方式是不正确的。

所以我直截了当的问题是:什么是最合适的MediaType设置在我的endpoint打算返回Flux很快数据可用的Angular/RxJs/Oberver使用超文本标记语言5服务器发送事件功能?

共有1个答案

暴夕
2023-03-14

Spring开发者表示:

“应用程序/流json”已被弃用,取而代之的是“应用程序/x-ndjson”。杰克逊编码器和解码器现在也支持后者。虽然它不是一种官方注册的媒体类型,但至少它是一种似乎正在使用的常见约定,而JSON lines甚至没有这种约定。还有JSON文本序列,可以使用still和换行符作为分隔符,但是“application/json-seq”需要向Jackson编码器和解码器注册。至于换行符以外的分隔符,我不确定Jackson的非阻塞解析器是否支持。

总之,使用行分隔的 JSON,其中“application/x-ndjson”支持作为开箱即用的媒体类型,并用作“application/stream json”的替代品。或者,注册任何其他哑剧类型,只要它是行分隔的 JSON 格式,它的工作方式就相同。哑剧类型本身只是描述格式的约定,只要双方都理解它,就足够了。

 类似资料:
  • 使用系统将“服务器发送事件(SSE)样式”事件流式传输到F#中的前端的轻量级方式是什么。网Http库?我了解事件流格式(例如,这个PHP示例代码),但我正在寻求一些指导,以便在服务器端F#应用程序(我在.Net Framework 4.8上)中实现流部分。

  • 概述 客户端代码 概述 建立连接 open事件 message事件 error事件 自定义事件 close方法 数据格式 概述 data:数据栏 id:数据标识符 event栏:自定义信息类型 retry:最大间隔时间 服务器代码 参考链接 概述 传统的网页都是浏览器向服务器“查询”数据,但是很多场合,最有效的方式是服务器向浏览器“发送”数据。比如,每当收到新的电子邮件,服务器就向浏览器发送一个“

  • 概述 客户端代码 概述 建立连接 open事件 message事件 error事件 自定义事件 close方法 数据格式 概述 data:数据栏 id:数据标识符 event栏:自定义信息类型 retry:最大间隔时间 服务器代码 参考链接 概述 传统的网页都是浏览器向服务器“查询”数据,但是很多场合,最有效的方式是服务器向浏览器“发送”数据。比如,每当收到新的电子邮件,服务器就向浏览器发送一个“

  • 如果我在浏览器中点击http://localhost:8080/test,它只返回4个预期的事件。 你能解释一下我缺了什么吗?多谢了。

  • 问题内容: 我正在尝试使用SSE将JSON数据发送到浏览器,但似乎无法正确处理,而且我也不知道为什么。 服务器端看起来像这样: 如您所见,我已经注释掉了帖子内容,但最终我希望将testdata用作JSON本身,如下所示: 客户端看起来像这样: 我看到控制台日志,但 没有看到警报。 问题答案: 尝试发送适当的JSON(输出中未引用): 但最好:

  • 我试图让服务器发送的事件与Mozilla Firefox一起工作。给定一个Spring Boot的网络服务 使用Chrome浏览器或Edge(始终是最新版本)可以正常工作。我可以在网络分析器选项卡中看到未完成的请求,并且每秒都会显示一个新的时间戳。 然而,当我使用Firefox(84.0.2或更早版本)时,请求也会显示在网络选项卡中,但不会显示响应头或流数据。当我终止Spring后端时,Firef