当前位置: 首页 > 知识库问答 >
问题:

到达服务器的消息在源到达其绑定后实现流处理会发生什么?

郜卓君
2023-03-14

我正在学习akka流,但显然它与任何流媒体框架都相关:)

引用akka文件:

Reactive Streams只是定义了一种通用机制,说明如何在不丢失、缓冲或资源耗尽的情况下跨异步边界移动数据

现在,据我所知,如果在streams之前,让我们以http服务器为例,当接收方没有完成请求时,请求就会出现,因此即将出现的新请求将收集在一个缓冲区中,该缓冲区将保存等待的请求,还有一个问题是,这个缓冲区的大小未知,如果服务器过载,我们可能会释放等待的请求。

因此,然后流处理开始发挥作用,他们将此缓冲区限制为可控的......因此,我们可以预定义我们想要排队的消息(在我的示例中的请求)的数量,并且我们可以一次处理每个消息。

我的问题是,如果我们实现服务器中的一个源最多可以有3条消息,那么如果第4个id来了会发生什么?

我的意思是当另一个服务器呼叫我们时,我们已经处理了3个请求。。。他的请求会怎么样?

共有1个答案

姬银龙
2023-03-14

您所描述的实际上并不是Reactive Streams实现解决的主要问题。

请求数量方面的背压通过常规网络工具解决。例如,在Java中,您可以将网络库(例如Netty)的线程池配置为某种并行级别,该库将负责接受尽可能多的请求。或者,如果使用同步套接字API,就更简单了——可以推迟在服务器套接字上调用accept(),直到为当前连接的所有客户端提供服务。在任何一种情况下,都没有“缓冲区”,只有在服务器接受连接之前,客户端才会被阻止(无论是在系统调用中阻止API,还是在异步API的事件循环中)。

反应流实现解决的是如何在更高级别的数据管道中处理背压。反应流实现(例如akka streams)提供了一种构建数据管道的方法,在这种方法中,当数据的使用者速度较慢时,生产者也会自动减速,这将在任何类型的底层传输中起作用,无论是HTTP、WebSockets、原始TCP连接还是进程内消息传递。

例如,考虑一个简单的WebSocket连接,其中客户端发送一个连续的信息流(例如来自某个传感器的数据),服务器将此数据写入某个数据库。现在假设服务器端的数据库由于某种原因(网络问题、磁盘过载等等)变得很慢。服务器现在无法跟上客户端发送的数据,也就是说,它无法在新数据到达之前及时将其保存到数据库中。如果在整个管道中使用反应流实现,服务器将自动向客户端发出信号,表示无法处理更多数据,客户端将自动调整其生成速率,以避免服务器过载。

当然,这可以在没有任何反应流实现的情况下完成,例如通过手动控制确认。然而,与许多其他库一样,反应流实现为您解决了这个问题。它们还提供了一种定义此类管道的简单方法,并且通常具有用于各种外部系统(如数据库)的接口。特别是,这样的库可能会在最低级别上实现背压,直到TCP连接,这可能很难手动实现。

至于反应流本身,它只是对可由库实现的API的描述,定义了常见的术语和行为,并允许此类库可互换或轻松交互,例如,您可以使用规范中的接口将akka Streams管道连接到Monix管道,组合管道将无缝工作,并支持反应流的所有背压特性。

 类似资料:
  • 我通过自定义管理的KMS密钥有一个加密的SQS队列和SNS主题。目前,我正在使用下面链接中所述的类似类型的SQS策略,它可以正常工作SQS策略 但是如果我使用下面的SQS策略,它就不起作用了。出于安全原因,我不想将主体设置为“*”。有人能解释一下为什么会发生这种情况吗

  • 当消息到达SQS时,如何触发AWS Lambda函数(Python),并将消息传递给HTTPendpoint,在处理API中的数据后,从SQS队列中删除消息?如何在Python lambda中实现这一点?

  • 我设置了一个EJB项目,使用JMS将持久性实体对象发送到MDB。我使用JBoss EAP 7,使用Apache ActiveMQ作为消息传递提供程序。我像这样设置ConnectionFactory和队列: 这是我的消息生成器,它接收“Account”实体对象作为参数并将其发送到队列: EntityEnqueueBean。Java语言 MDB从队列接收消息并对其进行处理: java账户 不确定我做错

  • 这段代码适用于发送数据并关闭连接的客户机,但是当使用一个连接的客户机多次发送时,数据没有被读取->我应该在读取完整正文后关闭连接吗?

  • 但是,如果我的Lambda不期望任何输入,它将自己转到SQS并拉出消息,有输入有意义吗?我是否可以让它无效,或者甚至完全使用其他方法签名(当然,在本例中不实现那个接口)?

  • 问题内容: 当我运行多线程代码时,系统(linux)有时会将线程从一个处理器移动到另一个处理器。由于我拥有与处理器一样多的线程,因此它没有充分的理由使缓存无效,并且使我的跟踪活动混乱。 您知道如何将线程绑定到处理器,为什么系统会这样做? 问题答案: 使用(这是特定于Linux的)。 为什么调度程序会在不同处理器之间切换线程?好吧,假设您的线程最后一次在处理器1上运行,并且当前正在等待再次安排执行时