我有一个spring boot后端,我想为它实现一个SSEendpoint。我想使用基于Xamarin表单的应用程序使用这个endpoint。
我设法为双方实现了一些例子,但是,我没有在应用程序上收到任何消息。
对于后端部分,我实现了以下示例:
@RequestMapping(value = "/event-stream", method = RequestMethod.GET)
public ResponseEntity<ResponseBodyEmitter> streamEvents() {
ResponseBodyEmitter emitter = new ResponseBodyEmitter();
executor.execute(() -> {
try {
for (int i = 0; i < 30; i++) {
Thread.sleep(2000);
var msg = new ResponseTestObject("this is a message from " + new Date(), i);
emitter.send(msg, MediaType.APPLICATION_JSON);
}
emitter.complete();
} catch (Exception ex) {
emitter.completeWithError(ex);
}
});
return new ResponseEntity<>(emitter, HttpStatus.OK);
}
@AllArgsConstructor
public static class ResponseTestObject {
public String message;
public int id;
}
注意:我特意以达到默认30秒超时的方式实现它。使用postman调用此方法,它将加载所述30秒并同时显示所有已发送的消息:
{
"message": "this is a message from Thu May 05 11:36:25 CEST 2022",
"id": 0
}{
"message": "this is a message from Thu May 05 11:36:27 CEST 2022",
"id": 1
}
[...]
{
"message": "this is a message from Thu May 05 11:36:51 CEST 2022",
"id": 13
}{
"message": "this is a message from Thu May 05 11:36:53 CEST 2022",
"id": 14
}
在我的应用程序部分,我使用了ServiceStack ServerEventsClient:
EventClient = new ServerEventsClient(BackendConnector.BACKEND_HOST + "/events/") {
OnMessage = OnMessage,
OnException = (ex) => {
Console.WriteLine("OnException: " + ex.Message);
}
};
// another REST backend connection is made previously in the app and I use its session cookie for authentication
EventClient.ServiceClient.SetCookie(BackendConnector.SESSION_COOKIE_VALUE, BackendConnector.BackendSessionCookieId);
EventClient.Start();
启动客户端后,我让它定期发布如下状态:
Console.WriteLine("SSE " + EventClient.Status);
我能看到的是以下内容:
补充说明:我还有一个使用 SseEmitter 的服务器端测试实现。在这种情况下,我也可以在“订阅”期间看到应用程序请求,但它会在某个时候超时,并且客户端状态永远不会离开“正在启动”
我的第一个问题显然是:我错过了什么,我永远不会收到消息?
我的第二个问题是:为什么它首先会超时?根据留档,它会发送周期性心跳。我需要在Spring侧采用不同的方法吗?或者我需要为心跳实现一个单独的endpoint?
谢谢你的帮助!
编辑:
遵循@mythz答案并接受在这里使用ServiceStack不是一个好主意,我遵循了以下示例,并使用标准的HttpClient和StreamReader实现了一种简单的方法:https://makolyte.com/event-driven-dotnet-how-to-consume-an-sse-endpoint-with-httpclient/
但是,这导致了我之前使用postman或浏览器时遇到的相同问题。所有消息都在30年代超时后批量发送。因此,我还更改了后端部分并使用Spring Web通量代替:
public Flux<ServerSentEvent<String>> streamEvents() {
return Flux.interval(Duration.ofSeconds(1))
.map(sequence -> {
var msg = new ResponseTestObject("this is a message from " + new Date(), Math.toIntExact(sequence));
ObjectMapper mapper = new ObjectMapper();
try {
return ServerSentEvent.<String>builder()
.id(String.valueOf(sequence))
.event("periodic-event")
.data(mapper.writeValueAsString(msg))
.build();
} catch (JsonProcessingException e) {
return null;
}
});
}
我强烈建议您不要将ServiceStack的ServerEventsClient
与ServiceStack的Server Events Feature之外的任何内容结合起来,这是其所有类型化的Server Events客户端都设计用于使用的功能。
例如。为了启用断开的网络连接和自动重试连接功能,客户端会发回周期性的心跳。这只是ServiceStack实现中的一个功能,因为SSE标准中没有这样的概念。
这只是一个例子,基本上C#服务器事件客户端中的每个高级都使用ServiceStack服务器功能,这些功能在任何其他第三方实现中都不存在。
我的Ruby项目中有一个模型重发,它包含内容和状态列。 使用EventMachine使用状态为0的所有记录的最佳/最快方式是什么? 我想创建一个简单的worker,它尝试在每个时段(比如每5分钟)查找status==0的记录 我对EventMachine还是新手,找不到那么多关于如何处理DB的例子。 到目前为止,我做了如下工作,但不确定这是否是最好的实现: 任何帮助都将不胜感激
在我们的spring boot应用程序中,我们注意到Kafka消费者偶尔会在prod env中随机消费两次消息。我们在PCF中部署了6个实例和6个分区。我们发现在同一主题中收到两次具有相同偏移量和分区的消息,这会导致重复,对我们来说是业务关键。我们在非生产环境中没有注意到这一点,在非生产环境中很难复制。我们最近转向Kafka,但我们无法找到根本问题。 我们使用的是spring cloud stre
我已经建立了一个由3个节点组成的AWS集群。我修改了节点的/etc/hosts文件,看起来像这样 当我从其中一个节点运行命令时 bin/kafka-console-consumer.sh --bootstrap-server kafka1:9092 --topic first_topic --from-start 它可以工作,但是当我用ip替换主机名并用下面的命令运行它时 bin/kafka-co
我有一个管理Web应用程序位于远程服务器上。这个应用程序是使用MEAN堆栈编写的,我有一个连接到网络应用程序所需的所有RESTful路由的列表。 我正在编写一个Java客户端应用程序,它需要从这个管理应用程序发送和接收数据。如果我有服务器的IP地址和REST路由,如何将客户端连接到web应用程序? 我想我需要提供一个到服务器和RESTAPI文件的URL连接,然后只需调用路由函数,如和。
我有一个简单的Kafka设置。生成器正在以较高的速率向单个分区生成具有单个主题的消息。单个使用者正在使用来自此分区的消息。在此过程中,使用者可能会多次暂停处理消息。停顿可以持续几分钟。生产者停止产生消息后,所有排队的消息都将由使用者处理。生产者产生的消息似乎不会立即被消费者看到。我使用的是Kafka0.10.1.0。这里会发生什么?下面是使用消息的代码部分: 代理上的所有配置都保留为kafka默认
是否有一种方法以编程方式访问和打印使用者滞后偏移,或者说使用者读取的最后一条记录的偏移与某个生产者写入该使用者分区的最后一条记录的偏移之间的位置差。 要知道我的最终目标是将这个值发送到prometheus进行监视,我应该在上面添加哪些语句来得到滞后偏移值?