当前位置: 首页 > 知识库问答 >
问题:

Vertx mongo客户端通过并发更新压倒mongodb

蓝星辰
2023-03-14

尝试使用akka、rx、vert建立一个反应系统。x和mongodb中的流(下游)看起来很像:

<代码>发布者-

我遇到了这样一种情况,上游在mongo上调用了太多的更新收集操作,结果是:

<代码>com.mongodb.MongoWaitQueueFullException:等待连接的线程太多。已超过500个最大线程数(maxWaitQueueSize)

更新集合:

>

在同一集合上执行

正在向文档中嵌入数组中添加新元素

1) 由于这是一个反应式系统,最好对源代码施加反压力,只执行尽可能频繁的http请求,因为这样可以写入mongo,这样等待连接mongo的线程队列就不会增长,

对于mongo的这种背压,有什么模式/例子可以遵循,或者我应该自己发明和实现它吗?

如何通过vertx mongo客户端访问和观察等待连接的线程数?

2) 在查看vertx mongo客户端的代码时,发现它没有保持与mongo的连接处于打开状态,并且每次更新操作都会打开新会话。您可以在io中观察到这一点。vertx。外景:mongo。实施。MongoClientImpl。updateCollection(…) 它下面调用MongoCollection的位置。updateOne(…)而不传递ClientSession ClientSession参数。

虽然每秒10次并发更新看起来很少,但问题是-可能是ClientSession的创建需要很长时间,因此导致线程排队的情况吗?

另外,在vertx mongo客户端中不缓存与mongo的连接的设计决策是什么?

mongostat是:

insert query update delete getmore command dirty  used flushes vsize   res qrw  arw net_in net_out conn                time
    *0    *0     32     *0       0   139|0  9.8% 10.8%       0 1.84G  666M 0|0 1|89  29.6k   62.9k  104 Jan 19 02:33:51.980
    *0    *0      6     *0       0     4|0 18.7% 18.7%       0 2.90G 1.50G 0|0 1|100  2.41k   9.59k  104 Jan 19 02:33:59.342
    *0    *0     *0     *0       0     2|0 15.7% 17.2%       0 3.52G 1.60G 0|0  1|97   493b   7.90k  104 Jan 19 02:34:07.480
    *0    *0      9     *0       0     3|0 14.7% 17.2%       0 3.52G 1.57G 0|0 1|100  3.10k   18.7k  104 Jan 19 02:34:10.955
    *0    *0      1     *0       0     1|0 21.4% 23.1%       0 3.52G 1.57G 0|0 1|100   749b   7.46k  104 Jan 19 02:34:19.579
    *0    *0     10     *0       0    16|0 36.7% 37.4%       0 3.57G 1.57G 0|0 1|100  4.79k   73.7k  104 Jan 19 02:34:20.443
    *0    *0     *0     *0       0     9|0 53.6% 54.0%       0 3.62G 1.56G 0|0 1|100  1.33k   47.6k  104 Jan 19 02:34:21.769
    *0    *0      1     *0       0    13|0 54.5% 55.2%       0 3.62G 1.57G 0|0 1|100  1.92k   70.6k  104 Jan 19 02:34:22.659
    *0    *0     *0     *0       0    23|0 70.5% 70.9%       0 3.62G 1.56G 0|0 1|100  2.75k    122k  104 Jan 19 02:34:23.173
    *0    *0     *0     *0       0    31|0 72.1% 72.5%       0 3.62G 1.58G 0|0 1|100  3.56k    153k  104 Jan 19 02:34:23.586

我非常感谢你的帮助。

共有1个答案

葛季萌
2023-03-14

我不认为你需要一个backpreassure,因为如果你一直得到的比你处理的更多,那么在你的情况下,它将分配所有内存和课程例外,但我认为选择是创建en emitter,其中每个流将项目发射到此发射器,并使用mongorepo。全部保存(发射器):

进程1-

进程2-

进程3-

处理器4-

处理器5-

处理器6-

处理器7-

处理器8-

处理器9-

处理器10-

 类似资料:
  • 代码如下 我已经创建了一个DynamoDB表我的主键是,它是字符串。 在DynamoDB中,的表值是我需要更新为David。上面是代码。为什么错误会抛出元模式 完整错误如下所示 “errorMessage”:“调用UpdateItem操作时发生错误(ValidationException):更新表达式中提供的文档路径对于更新无效”,“errorType”:“ClientError”, 在代码下面尝

  • 问题内容: 我只能在用户的套接字ID直接存储在io.sockets.on(’connect’)函数中时向用户发出消息。我不知道为什么在登录后尝试存储其套接字ID时为什么不起作用。 加工: 无法运作: JavaScript客户端代码段 解决方案:感谢@alessioalex, 我不得不从登录页面中删除对socket.io的引用,并将以下内容添加到io.sockets.on(’connection’)

  • 我想创建. zip文件,其中包含我从后端收到的压缩文件,然后将此文件发送给用户。两天来,我一直在寻找答案,但找不到合适的解决方案,也许你可以帮我:) 目前,代码是这样的:(我知道我不应该在spring控制器中完成所有工作,但不关心这一点,它只是为了测试目的,以找到使其工作的方法) 但问题是,使用的代码,当我输入URL:localhost:8080/zip我得到文件:test.zip.html而不是

  • 我后来理解对了。实际上,我需要一条来自android客户端的MQTT消息发送到所有其他客户端,所以我想在消息正文中包含publish关键字,这是非常错误的。MQTT本身将接收到的消息发送给所有提供的客户端,如果客户端订阅了该主题的话。

  • 我现在用C/C做并发套接字编程。我只是让服务器接收来自客户端的请求,并将响应数据包发送给客户端。我使用一个线程来接收来自客户端的请求。当服务器得到一个新请求时,一个新的线程将被创建,以便向客户端发送一些数据包。然而,当我的服务器向该特定客户端发送数据包时,我的客户端的recvfrom总是返回winsock错误10054。

  • 我正在尝试通过 tcp 连接远程执行程序,我想在客户端之间实时共享标准输出和标准输出 我有以下没有错误处理的测试服务器:p我知道,目前我无法执行带有参数的程序,但这很容易处理:) 你看,我尝试与 c.Write() 共享标准输出,但这不起作用。 我认为cmd.Stdin的另一个问题将与Stdout的问题相同。此时我没有实现任何标准函数。 有人能给我一个关于这个函数的提示或示例代码吗?