在服务器端,我正在使用一个HTTP API,它以页面形式返回结果。如中所示,响应包含x个结果,如果有超过0的结果,我可以再次调用它以获得下一个x个结果。x可以任意选择直到API的最大页面大小。
现在,我想要在WebSocket上高效地流式传输全套结果,而不会使它不堪重负(施加背压)。最初,我构建了整个resultset,然后从中创建了一个源代码:
getEventsFuture().foreach { events =>
sender ! Flow.fromSinkAndSource(Sink.ignore, Source(events))
}
这样可以工作,WebSocket客户机以其最大速度接收所有事件。这样做的最大缺点是,在开始向客户机返回数据之前,我必须获取所有页面。理想情况下,我会使用较小的页面大小,并在连接后立即将结果返回给客户机,获取该过程中的下一个页面。
因此,我需要一个流,我可以添加数据的源后,流已经物化。我尝试使用source.actorref
:
val events = Source.actorRef[Event](1000, OverflowStrategy.fail).mapMaterializedValue { outActor =>
sendEvents(outActor)
NotUsed
}
sender ! Flow.fromSinkAndSource(Sink.ignore, events)
本质上,我使用物化的actorRef并将所有事件发送给它。每次提取页面时,我都会将结果转储给执行元。现在,我对源代码的初始化可能已经告诉您,这并不总是起作用。有时,当响应足够大,而客户端没有像其他时候那样迅速地消耗时,套接字连接就会关闭。我觉得overflowstrategy.fail
是反对丢弃事件的正确策略,因为我不希望客户机认为他们得到了所有东西,如果不是这样的话。
我没有要预先为缓冲区设置的sane值,我不想设置int.max或其他东西,因为我认为Akka内部确实为缓冲区大小分配了全部内存。
我该怎么解决?我希望所有的事件尽可能快地和适当的背压到客户端,就像在第一个例子。
一旦第一个页面被取出,我就知道总共会有多少结果,所以我可以在前面取出一个小页面,并将缓冲区大小设置为完整的结果大小,但这似乎是一个变通方法。
我发现UnfoldAsync
非常适合这个用例。
def unfoldAsync[S, E](s: S)(f: S ⇒ Future[Option[(S, E)]]): Source[E, NotUsed]
就像unfold
,但是fold函数返回一个future
,它将导致源代码完成或在源代码完成时发出。
当有需求时发出,且展开状态返回,未来完成时具有某些值
当展开函数返回的future以空值完成时完成
是否可以通过 Swagger 描述 Websocket server api? 我们通过websocket服务器广播事件,我想通过一个swagger规范文件来描述它们。 是否有用于WebSocket的文档生成器?
我有一个来自URL的分页响应,我想继续点击我从上一个响应中获得的下一页URL,并继续收集项目,直到我的响应中没有“nextPage”URL。如何使用WebFlux的Spring引导WebClient在没有阻塞的情况下以反应式方式实现这一点? 在这里,我创建了模拟网址https://karthikdivi.com/apps/paginatedReviews/withNextPageTokens/it
我有一个.json文件,我正试图将其作为文本字符串导入。 这是确切的文件内容: 我试图用这个导入它: 但是,作为一个数组出现,我不希望它出现-我只希望文本字符串,就像它在文件中一样。 我该如何实现这一点? 所以我的全部代码是: 但现在我看到下面的截图: 我想我想退出的是。为什么要将所有这些附加信息添加到我的文件中?我做错了什么? 当有人花时间向我解释一些事情,而我却不明白时,我总是感到很难过。所以
我有一个终点 api响应看起来像 我正在尝试使用SpringV2在另一个微服务中使用该endpoint。5.0 via 的实现遵循这篇文章来处理分页响应。 然而,当运行我的应用程序时,我得到了这个错误 如何正确使用此终结点?
问题内容: 我正在使用一些客户端JavaScript代码通过HTTP GET从Web服务器提取大量JSON数据。数据量可能很大,例如50 MB。这是在LAN上,因此这不是什么大问题,但仍然需要十秒钟左右。 为了使我的界面更具响应性,我想分块处理响应,并在UI可用时立即在UI中显示数据(例如,每MB或每秒)。浏览器兼容性不是问题。只要它可以在最新的Chrome和Firefox上运行,就可以了。但是,
我的问题是,我有一个未知的组数,如果mapAsync的并行数少于我得到的组数,并且在最后一个接收器中出错 由于上游错误(Akka.Stream.Impl.StreamSubscriptionTimeoutSupport$$Anon$2),正在拆除SynchronousFileSink(/users/sam/dev/projects/akka-streams/target/log-error.txt