当前位置: 首页 > 知识库问答 >
问题:

使用Akka通过WebSocket流化分页API响应

汪德明
2023-03-14

在服务器端,我正在使用一个HTTP API,它以页面形式返回结果。如中所示,响应包含x个结果,如果有超过0的结果,我可以再次调用它以获得下一个x个结果。x可以任意选择直到API的最大页面大小。

现在,我想要在WebSocket上高效地流式传输全套结果,而不会使它不堪重负(施加背压)。最初,我构建了整个resultset,然后从中创建了一个源代码:

getEventsFuture().foreach { events =>
  sender ! Flow.fromSinkAndSource(Sink.ignore, Source(events))
}

这样可以工作,WebSocket客户机以其最大速度接收所有事件。这样做的最大缺点是,在开始向客户机返回数据之前,我必须获取所有页面。理想情况下,我会使用较小的页面大小,并在连接后立即将结果返回给客户机,获取该过程中的下一个页面。

因此,我需要一个流,我可以添加数据的源后,流已经物化。我尝试使用source.actorref:

val events = Source.actorRef[Event](1000, OverflowStrategy.fail).mapMaterializedValue { outActor =>
  sendEvents(outActor)
  NotUsed
}

sender ! Flow.fromSinkAndSource(Sink.ignore, events)

本质上,我使用物化的actorRef并将所有事件发送给它。每次提取页面时,我都会将结果转储给执行元。现在,我对源代码的初始化可能已经告诉您,这并不总是起作用。有时,当响应足够大,而客户端没有像其他时候那样迅速地消耗时,套接字连接就会关闭。我觉得overflowstrategy.fail是反对丢弃事件的正确策略,因为我不希望客户机认为他们得到了所有东西,如果不是这样的话。

我没有要预先为缓冲区设置的sane值,我不想设置int.max或其他东西,因为我认为Akka内部确实为缓冲区大小分配了全部内存

我该怎么解决?我希望所有的事件尽可能快地和适当的背压到客户端,就像在第一个例子。

一旦第一个页面被取出,我就知道总共会有多少结果,所以我可以在前面取出一个小页面,并将缓冲区大小设置为完整的结果大小,但这似乎是一个变通方法。

共有1个答案

沈博涉
2023-03-14

我发现UnfoldAsync非常适合这个用例。

def unfoldAsync[S, E](s: S)(f: S ⇒ Future[Option[(S, E)]]): Source[E, NotUsed]

就像unfold,但是fold函数返回一个future,它将导致源代码完成或在源代码完成时发出。

当有需求时发出,且展开状态返回,未来完成时具有某些值

当展开函数返回的future以空值完成时完成

 类似资料:
  • 是否可以通过 Swagger 描述 Websocket server api? 我们通过websocket服务器广播事件,我想通过一个swagger规范文件来描述它们。 是否有用于WebSocket的文档生成器?

  • 我有一个来自URL的分页响应,我想继续点击我从上一个响应中获得的下一页URL,并继续收集项目,直到我的响应中没有“nextPage”URL。如何使用WebFlux的Spring引导WebClient在没有阻塞的情况下以反应式方式实现这一点? 在这里,我创建了模拟网址https://karthikdivi.com/apps/paginatedReviews/withNextPageTokens/it

  • 我有一个.json文件,我正试图将其作为文本字符串导入。 这是确切的文件内容: 我试图用这个导入它: 但是,作为一个数组出现,我不希望它出现-我只希望文本字符串,就像它在文件中一样。 我该如何实现这一点? 所以我的全部代码是: 但现在我看到下面的截图: 我想我想退出的是。为什么要将所有这些附加信息添加到我的文件中?我做错了什么? 当有人花时间向我解释一些事情,而我却不明白时,我总是感到很难过。所以

  • 我有一个终点 api响应看起来像 我正在尝试使用SpringV2在另一个微服务中使用该endpoint。5.0 via 的实现遵循这篇文章来处理分页响应。 然而,当运行我的应用程序时,我得到了这个错误 如何正确使用此终结点?

  • 问题内容: 我正在使用一些客户端JavaScript代码通过HTTP GET从Web服务器提取大量JSON数据。数据量可能很大,例如50 MB。这是在LAN上,因此这不是什么大问题,但仍然需要十秒钟左右。 为了使我的界面更具响应性,我想分块处理响应,并在UI可用时立即在UI中显示数据(例如,每MB或每秒)。浏览器兼容性不是问题。只要它可以在最新的Chrome和Firefox上运行,就可以了。但是,

  • 我的问题是,我有一个未知的组数,如果mapAsync的并行数少于我得到的组数,并且在最后一个接收器中出错 由于上游错误(Akka.Stream.Impl.StreamSubscriptionTimeoutSupport$$Anon$2),正在拆除SynchronousFileSink(/users/sam/dev/projects/akka-streams/target/log-error.txt