我在这里研究了几个与“太多客户”相关的话题,但仍然无法解决我的问题,所以我必须再次询问这个问题,针对我的具体案例。
基本上,我设置了我的本地Postgres服务器,需要做数万个查询,所以我使用了Python的心理包。这是我的代码:
import psycopg2
import pandas as pd
import numpy as np
from flashtext import KeywordProcessor
from psycopg2.pool import ThreadedConnectionPool
from concurrent.futures import ThreadPoolExecutor
df = pd.DataFrame({'S':['California', 'Ohio', 'Texas'], 'T':['Dispatcher', 'Zookeeper', 'Mechanics']})
# df = pd.concat([df]*10000) # repeat df 10000 times
DSN = "postgresql://User:password@localhost/db"
tcp = ThreadedConnectionPool(1, 800, DSN)
def do_one_query(inputS, inputT):
conn = tcp.getconn()
c = conn.cursor()
q = r"SELECT * from eridata where "State" = 'California' and "Title" = 'Dispatcher' limit 1;"
c.execute(q)
all_results = c.fetchall()
for row in all_results:
return row
tcp.putconn(conn, close=True)
cnt=0
for idx, row in df.iterrows():
cnt+=1
with ThreadPoolExecutor(max_workers=1) as pool:
ret = pool.submit(do_one_query, row["S"], row["T"])
print ret.result()
print cnt
代码使用小df运行良好。如果我重复df 10000次,我得到错误消息说连接池耗尽。我认为我使用的连接已经被这一行关闭:
tcp.putconn(conn,close=True),但我想它们实际上并没有关闭?我怎样才能避开这个问题?
这里的问题是,您实际上没有返回到池的连接,而是使用
tcp.putconn(conn, close=True)
请参阅此处的文档http://initd.org/psycopg/docs/pool.html
If close is True, discard the connection from the pool.
因此,如果您将800个连接放入池中,在801个循环之后,您将得到“耗尽的错误”,因为您的连接池大小为零。
您需要使用池顶上的队列。
类似于以下的方法应该可以工作:
import gevent, sys, random, psycopg2, logging
from contextlib import contextmanager
from gevent.queue import Queue
from gevent.socket import wait_read, wait_write
from psycopg2.pool import ThreadedConnectionPool
from psycopg2 import extensions, OperationalError
import sys
logger = logging.getLogger(__name__)
poolsize = 100 #number of max connections
pdsn = '' # put your dsn here
if sys.version_info[0] >= 3:
integer_types = (int,)
else:
import __builtin__
integer_types = (int, __builtin__.long)
class ConnectorError(Exception):
""" This is a base class for all CONNECTOR related exceptions """
pass
#simplified calls etc. db.fetchall(SQL, arg1, arg2...)
def cursor(): return Pcursor()
def fetchone(PSQL, *args): return Pcursor().fetchone(PSQL, *args)
def fetchall(PSQL, *args): return Pcursor().fetchall(PSQL, *args)
def execute(PSQL, *args): return Pcursor().execute(PSQL, *args)
#singleton connection pool, gets reset if a connection is bad or drops
_pgpool = None
def pgpool():
global _pgpool
if not _pgpool:
try:
_pgpool = PostgresConnectionPool(maxsize=poolsize)
except psycopg2.OperationalError as exc:
_pgpool = None
return _pgpool
class Pcursor(object):
def __init__(self, **kwargs):
#in case of a lost connection lets sit and wait till it's online
global _pgpool
if not _pgpool:
while not _pgpool:
try:
pgpool()
except:
logger.debug('Attempting Connection To Postgres...')
gevent.sleep(1)
def fetchone(self, PSQL, *args):
with _pgpool.cursor() as cursor:
try:
cursor.execute(PSQL, args)
except TypeError:
cursor.execute(PSQL, args[0])
except Exception as exc:
print(sys._getframe().f_back.f_code)
print(sys._getframe().f_back.f_code.co_name)
logger.warning(str(exc))
logger.debug(cursor.query)
return cursor.fetchone()
def fetchall(self, PSQL, *args):
with _pgpool.cursor() as cursor:
try:
cursor.execute(PSQL, args)
except TypeError:
cursor.execute(PSQL, args[0])
except Exception as exc:
print(sys._getframe().f_back.f_code)
print(sys._getframe().f_back.f_code.co_name)
logger.warning(str(exc))
logger.debug(cursor.query)
return cursor.fetchall()
def execute(self, PSQL, *args):
with _pgpool.cursor() as cursor:
try:
cursor.execute(PSQL, args)
except TypeError:
cursor.execute(PSQL, args[0])
except Exception as exc:
print(sys._getframe().f_back.f_code)
print(sys._getframe().f_back.f_code.co_name)
logger.warning(str(exc))
finally:
logger.debug(cursor.query)
return cursor.query
def fetchmany(self, PSQL, *args):
with _pgpool.cursor() as cursor:
try:
cursor.execute(PSQL, args)
except TypeError:
cursor.execute(PSQL, args[0])
while 1:
items = cursor.fetchmany()
if not items:
break
for item in items:
yield item
class AbstractDatabaseConnectionPool(object):
def __init__(self, maxsize=poolsize):
if not isinstance(maxsize, integer_types):
raise TypeError('Expected integer, got %r' % (maxsize, ))
self.maxsize = maxsize
self.pool = Queue()
self.size = 0
def create_connection(self):
#overridden by PostgresConnectionPool
raise NotImplementedError()
def get(self):
pool = self.pool
if self.size >= self.maxsize or pool.qsize():
return pool.get()
self.size += 1
try:
new_item = self.create_connection()
except:
self.size -= 1
raise
return new_item
def put(self, item):
self.pool.put(item)
def closeall(self):
while not self.pool.empty():
conn = self.pool.get_nowait()
try:
conn.close()
except Exception:
pass
@contextmanager
def connection(self, isolation_level=None):
conn = self.get()
try:
if isolation_level is not None:
if conn.isolation_level == isolation_level:
isolation_level = None
else:
conn.set_isolation_level(isolation_level)
yield conn
except:
if conn.closed:
conn = None
self.closeall()
raise
else:
if conn.closed:
raise OperationalError("Cannot commit because connection was closed: %r" % (conn, ))
finally:
if conn is not None and not conn.closed:
if isolation_level is not None:
conn.set_isolation_level(isolation_level)
self.put(conn)
@contextmanager
def cursor(self, *args, **kwargs):
isolation_level = kwargs.pop('isolation_level', None)
with self.connection(isolation_level) as conn:
try:
yield conn.cursor(*args, **kwargs)
except:
global _pgpool
_pgpool = None
del(self)
class PostgresConnectionPool(AbstractDatabaseConnectionPool):
def __init__(self,**kwargs):
try:
self.pconnect = ThreadedConnectionPool(1, poolsize, dsn=pdsn)
except:
global _pgpool
_pgpool = None
raise ConnectorError('Database Connection Failed')
maxsize = kwargs.pop('maxsize', None)
self.kwargs = kwargs
AbstractDatabaseConnectionPool.__init__(self, maxsize)
def create_connection(self):
self.conn = self.pconnect.getconn()
self.conn.autocommit = True
return self.conn
def gevent_wait_callback(conn, timeout=None):
"""A wait callback useful to allow gevent to work with Psycopg."""
while 1:
state = conn.poll()
if state == extensions.POLL_OK:
break
elif state == extensions.POLL_READ:
wait_read(conn.fileno(), timeout=timeout)
elif state == extensions.POLL_WRITE:
wait_write(conn.fileno(), timeout=timeout)
else:
raise ConnectorError("Bad result from poll: %r" % state)
extensions.set_wait_callback(gevent_wait_callback)
然后,您可以通过以下方式呼叫您的连接:
import db
db.Pcursor().execute(PSQL, arg1, arg2, arg3)
基本上,我借用了async postgres的gevent示例,并对其进行了修改,以支持通过pyplog2进行线程池。
https://github.com/gevent/gevent/blob/master/examples/psycopg2_pool.py
我在模块中添加了心理学绿色的功能,所以您所需要做的就是导入和调用类。对类的每次调用都会在队列上堆叠一个新查询,但只使用特定大小的池。这样你就不会用完连接。这基本上类似于PGBounser所做的,我认为这也会消除你的问题。
https://pgbouncer.github.io/
我一直在努力寻找关于ThreadedConnectionPool如何工作的详细信息。https://bbengfort.github.io/observations/2017/12/06/psycopg2-transactions.html 这并不坏,但事实证明,它声称getconn会阻塞直到连接可用的说法是不正确的。检查代码时,all ThreadedConnectionPool添加的是AbstractConnectionPool方法周围的锁,以防止竞争条件。如果在任何一点尝试使用多个maxconn连接,将引发连接池错误。
如果您想要比接受的答案稍微简单一点的方法,那么进一步将方法包装在提供阻塞的信号量中,直到连接可用为止,应该可以做到这一点:
from psycopg2.pool import ThreadedConnectionPool
from threading import Semaphore
class ReallyThreadedConnectionPool(ThreadedConnectionPool):
def __init__(self, minconn, maxconn, *args, **kwargs):
self._semaphore = Semaphore(maxconn)
super().__init__(minconn, maxconn, *args, **kwargs)
def getconn(self, *args, **kwargs):
self._semaphore.acquire()
return super().getconn(*args, **kwargs)
def putconn(self, *args, **kwargs):
super().putconn(*args, **kwargs)
self._semaphore.release()
我有使用hikari池创建连接池的Spring启动应用程序。我们正在使用postgres sql用于db。当我以低qps命中系统时,请求需要大约200毫秒来执行。当部署一个pod并且qps为15时,事情保持良好状态。但是一旦我将qps增加到20,请求就开始需要大约10秒来处理,连接池变空(java.sql.SQLTransientConntion异常:菲尼克斯-连接不可用,请求在30183毫秒后超
我目前在Azure中托管了几十个网站,最近开始在每个web应用的门户刀片中看到“内存资源耗尽”警告: 我在两个S3标准(大型)应用程序服务计划中托管我的网站,我在所有网站上都会收到警告,无论它们在哪个应用程序服务计划上。 有趣的是,当查看任一应用服务计划的内存使用率时,我总是低于40%,内存使用率实际上相当一致。我从未看到峰值或任何接近85%内存使用率的东西。 我的问题是,我是否误解了警告消息?是
在为了使用多线程而修改了一个服务方法之后,我发现如果不止一个用户多次尝试请求页面(并调用服务方法),服务器就会抛出“无法连接,池耗尽”异常。让我提供一个我的服务类的例子。 我已经在这个问题上挣扎了一个多星期,我找不到解决方案。我不太明白Grails如何与会话、连接和事务一起工作。我的猜测是跟随。当调用ConvertDocumentToJSON时,它从池中获取连接(4个用户,每个用户25个线程=10
我有业务托管在nkl.com,不幸的是我不能使用作曲家在那里了。支持写了更多,我已经达到了1.5 GB的限制,更多是不可能的。 现在我的问题是,有没有办法让Composer一块一块地更新或安装它,这样就可以释放中间的内存,或者有没有其他解决方案可以在服务器上运行我的Laravel应用程序? 目前,Composer JSON中包含以下包。还有一些计划。
根据我的理解,消费者阅读特定主题的消息,并且消费者客户机将定期提交偏移量。 因此,如果由于某种原因,使用者失败了一个特定的消息,该偏移量将不会被提交,然后您可以返回并重新处理该消息。 是否有任何东西跟踪您刚刚消耗的偏移和您随后提交的偏移?
我正在我的GTX 1060 6gb上使用Python中的Tensorflow 1.2训练LSTM。 在每个时代,我用这种方法保存模型: 一切正常,但在九个时代之后,当我试图用这种方法保存模型时,我得到了ResourceExhaustedError。 我在培训期间检查了我的资源,但没有耗尽任何资源。 我得到的错误如下: 2017-06-29 12:43:02.865845: W tenstorflo