尝试使用akka、rx、vert建立一个反应系统。x和mongodb中的流(下游)看起来很像:
<代码>发布者-
我遇到了这样一种情况,上游在mongo上调用了太多的更新收集操作,结果是:
<代码>com.mongodb.MongoWaitQueueFullException:等待连接的线程太多。已超过500个最大线程数(maxWaitQueueSize)
更新集合:
>
在同一集合上执行
1) 由于这是一个反应式系统,最好对源代码施加反压力,只执行尽可能频繁的http请求,因为这样可以写入mongo,这样等待连接mongo的线程队列就不会增长,
对于mongo的这种背压,有什么模式/例子可以遵循,或者我应该自己发明和实现它吗?
如何通过vertx mongo客户端访问和观察等待连接的线程数?
2) 在查看vertx mongo客户端的代码时,发现它没有保持与mongo的连接处于打开状态,并且每次更新操作都会打开新会话。您可以在io中观察到这一点。vertx。外景:mongo。实施。MongoClientImpl。updateCollection(…) 它下面调用MongoCollection的位置。updateOne(…)而不传递ClientSession ClientSession参数。
虽然每秒10次并发更新看起来很少,但问题是-可能是ClientSession
的创建需要很长时间,因此导致线程排队的情况吗?
另外,在vertx mongo客户端中不缓存与mongo的连接的设计决策是什么?
mongostat
是:
insert query update delete getmore command dirty used flushes vsize res qrw arw net_in net_out conn time
*0 *0 32 *0 0 139|0 9.8% 10.8% 0 1.84G 666M 0|0 1|89 29.6k 62.9k 104 Jan 19 02:33:51.980
*0 *0 6 *0 0 4|0 18.7% 18.7% 0 2.90G 1.50G 0|0 1|100 2.41k 9.59k 104 Jan 19 02:33:59.342
*0 *0 *0 *0 0 2|0 15.7% 17.2% 0 3.52G 1.60G 0|0 1|97 493b 7.90k 104 Jan 19 02:34:07.480
*0 *0 9 *0 0 3|0 14.7% 17.2% 0 3.52G 1.57G 0|0 1|100 3.10k 18.7k 104 Jan 19 02:34:10.955
*0 *0 1 *0 0 1|0 21.4% 23.1% 0 3.52G 1.57G 0|0 1|100 749b 7.46k 104 Jan 19 02:34:19.579
*0 *0 10 *0 0 16|0 36.7% 37.4% 0 3.57G 1.57G 0|0 1|100 4.79k 73.7k 104 Jan 19 02:34:20.443
*0 *0 *0 *0 0 9|0 53.6% 54.0% 0 3.62G 1.56G 0|0 1|100 1.33k 47.6k 104 Jan 19 02:34:21.769
*0 *0 1 *0 0 13|0 54.5% 55.2% 0 3.62G 1.57G 0|0 1|100 1.92k 70.6k 104 Jan 19 02:34:22.659
*0 *0 *0 *0 0 23|0 70.5% 70.9% 0 3.62G 1.56G 0|0 1|100 2.75k 122k 104 Jan 19 02:34:23.173
*0 *0 *0 *0 0 31|0 72.1% 72.5% 0 3.62G 1.58G 0|0 1|100 3.56k 153k 104 Jan 19 02:34:23.586
我非常感谢你的帮助。
我不认为你需要一个backpreassure,因为如果你一直得到的比你处理的更多,那么在你的情况下,它将分配所有内存和课程例外,但我认为选择是创建en emitter,其中每个流将项目发射到此发射器,并使用mongorepo。全部保存(发射器):
进程1-
进程2-
进程3-
处理器4-
处理器5-
处理器6-
处理器7-
处理器8-
处理器9-
处理器10-
代码如下 我已经创建了一个DynamoDB表我的主键是,它是字符串。 在DynamoDB中,的表值是我需要更新为David。上面是代码。为什么错误会抛出元模式 完整错误如下所示 “errorMessage”:“调用UpdateItem操作时发生错误(ValidationException):更新表达式中提供的文档路径对于更新无效”,“errorType”:“ClientError”, 在代码下面尝
问题内容: 我只能在用户的套接字ID直接存储在io.sockets.on(’connect’)函数中时向用户发出消息。我不知道为什么在登录后尝试存储其套接字ID时为什么不起作用。 加工: 无法运作: JavaScript客户端代码段 解决方案:感谢@alessioalex, 我不得不从登录页面中删除对socket.io的引用,并将以下内容添加到io.sockets.on(’connection’)
我想创建. zip文件,其中包含我从后端收到的压缩文件,然后将此文件发送给用户。两天来,我一直在寻找答案,但找不到合适的解决方案,也许你可以帮我:) 目前,代码是这样的:(我知道我不应该在spring控制器中完成所有工作,但不关心这一点,它只是为了测试目的,以找到使其工作的方法) 但问题是,使用的代码,当我输入URL:localhost:8080/zip我得到文件:test.zip.html而不是
我后来理解对了。实际上,我需要一条来自android客户端的MQTT消息发送到所有其他客户端,所以我想在消息正文中包含publish关键字,这是非常错误的。MQTT本身将接收到的消息发送给所有提供的客户端,如果客户端订阅了该主题的话。
我现在用C/C做并发套接字编程。我只是让服务器接收来自客户端的请求,并将响应数据包发送给客户端。我使用一个线程来接收来自客户端的请求。当服务器得到一个新请求时,一个新的线程将被创建,以便向客户端发送一些数据包。然而,当我的服务器向该特定客户端发送数据包时,我的客户端的recvfrom总是返回winsock错误10054。
我正在尝试通过 tcp 连接远程执行程序,我想在客户端之间实时共享标准输出和标准输出 我有以下没有错误处理的测试服务器:p我知道,目前我无法执行带有参数的程序,但这很容易处理:) 你看,我尝试与 c.Write() 共享标准输出,但这不起作用。 我认为cmd.Stdin的另一个问题将与Stdout的问题相同。此时我没有实现任何标准函数。 有人能给我一个关于这个函数的提示或示例代码吗?