当前位置: 首页 > 知识库问答 >
问题:

正确处理来自异步调用的反压力

乐修远
2023-03-14

我有一些大致看起来像这样的代码:

Completable handle(Observable<Object> inbound) {
  return inbound.buffer(1, TimeUnit.Second, 250)
    .filter(l -> !l.isEmpty())
    .flatMapToCompletable(this::sendToClient);
}

Completable sendToClient(List<Object> list) {
  // call IO asynchronously
}

入站是一个冷可观察的,它在标准的同时循环中产生(但是在另一个线程上!)。在未配置时,获取一些数据并调用onNext()

我面临的问题是sendToClient太“慢”,它跟不上inbound生成数据的速度。这是一个异步操作,本质上是将对内存中某个位置的客户机的调用排队(无法访问此代码),并最终填充堆。

我需要找到一种方法,当这些调用开始堆积时,让入站停止生成。有没有一种方法可以使用内置操作符来实现这一点?

共有1个答案

公良琛
2023-03-14

我可能会建议创建一个线程池来处理sendToClient操作。这可以支持等待处理的请求队列,但也可以指定队列已满时的行为(即背压过大)。

看看java.util.concurrent.线程池执行器

 类似资料:
  • 问题内容: 我有一个库,其中为我们的客户提供了两种方法,sync和async。他们可以调用他们认为适合其目的的任何方法。 executeSynchronous()-等待直到得到结果,然后返回结果。 executeAsynchronous()-立即返回一个Future,如果需要,可以在完成其他操作之后进行处理。 他们将传递其中包含用户ID的DataKey对象。然后,我们将根据用户ID确定要调用的计算

  • 我正在开发一个Spring Boot REST API,它可以处理文档并对文档启动检查。 我有一个文档资源:: 使用创建文档 的CRUD操作的其余部分 用户应该通过上的查询参数选择同步还是异步检查? 是否应该创建2个单独的路径? 同样,在异步检查的情况下,我将创建一个临时任务资源,可以将其池化以了解检查的状态。 但是,如果check和task都从相同的路径返回,就会变得混乱,不是吗? 您将如何处理

  • 问题内容: 假设我有某种游戏。我有一个buyItem函数,如下所示: 如果我对该路由进行垃圾邮件处理,直到扣除用户余额(第二次查询),则用户余额仍为正。 我尝试过的 问题是将在第一〜5项要求。因此,这也不起作用。 我们如何处理这种情况?如果重要的话,我正在使用Sails.JS框架。 问题答案: 通过该方法,Sails 1.0现在具有完整的事务支持。例: 更新资料 正如一些评论者所指出的,启用连接池

  • 我有一个actor可以从外部系统(UDP/TCP)接收消息。根据传入数据的内容,有时我希望actor回调代码的非AKA部分。 换句话说,我不想用调用和actor并等待一些传入数据,而是异步回调。 我如何实现这一点而不关闭调用对象(在创建ActorRef时会在回调中传递琐碎的内容,但这会捕获调用者)?

  • 问题内容: 现在,我有几次遇到使用Firebase的同步和异步功能的问题。我的问题通常是我需要在我编写的函数中进行异步Firebase调用。作为一个简单的示例,假设我需要计算并显示对象的速度,而我的Firebase存储距离和时间: 当然,上述代码将无法使用,因为它是异步调用,因此在到达时尚未设置。如果我们将内部放置在回调函数中,则什么也不会返回。 一种解决方案是使我的函数也异步。 另一种解决方案是

  • 问题内容: 我对Node.js并不陌生,尽管我通常对JavaScript非常熟悉。我的问题是关于如何处理node.js中的错误的“最佳实践”。 通常,在对Web服务器,FastCGI服务器或各种语言的网页进行编程时,我在多线程环境中使用带有阻止处理程序的Exceptions。当有请求进入时,我通常会执行以下操作: 这样,我始终可以处理内部错误并向用户发送有效响应。 我知道使用node.js可以执行