最近使用pyspider作为调度部署一个项目,因为客户方提供需要使用redis集群作为消息队列。所以在网上搜索了好多,都说不支持redis集群。静下心来一想,这么常规的需求不应该不支持呀。本着一切都在源码中的宗旨,打开了pyspider的源码,果然让我发现了其实是支持redis集群的。但是如果redis集群需要认证的话,就不支持了。因此,需要对pyspider的代码做一个改造,让它支持redis集群带认证的方式。
首先看redis_queue.py文件中,我们可以看到RedisQueue类的构造函数中是有关于StrictRedisCluster的信息的,说明pyspider是支持redis集群模式的。和单点的redis连接相比,其中并没有关于redis集群的密码认证的代码。
class RedisQueue(object):
"""
A Queue like message built over redis
"""
Empty = BaseQueue.Empty
Full = BaseQueue.Full
max_timeout = 0.3
def __init__(self, name, host='localhost', port=6379, db=0,
maxsize=0, lazy_limit=True, password=None, cluster_nodes=None):
"""
Constructor for RedisQueue
maxsize: an integer that sets the upperbound limit on the number of
items that can be placed in the queue.
lazy_limit: redis queue is shared via instance, a lazy size limit is used
for better performance.
"""
self.name = name
if(cluster_nodes is not None):
from rediscluster import StrictRedisCluster
self.redis = StrictRedisCluster(startup_nodes=cluster_nodes)
else:
self.redis = redis.StrictRedis(host=host, port=port, db=db, password=password)
self.maxsize = maxsize
self.lazy_limit = lazy_limit
self.last_qsize = 0
首先抛开redis集群的密码认证不说,如果是没有密码认证的集群,我们应该怎么配置呢。我们知道,在运行pypsider的时候,使用的命令是:
pyspider -c pyspider.json
这种方式。也就是说我们程序的入口就在pyspider命令中。
跟踪代码就会发现,与消息队列初始化有关的操作在run.py文件中,代码如下:
@cli.command()
@click.option('--fetcher-num', default=1, help='instance num of fetcher')
kwargs['is_%s_default' % db] = True
# create folder for counter.dump
if not os.path.exists(kwargs['data_path']):
os.mkdir(kwargs['data_path'])
# message queue, compatible with old version
if kwargs.get('message_queue'):
pass
elif kwargs.get('amqp_url'):
kwargs['message_queue'] = kwargs['amqp_url']
elif os.environ.get('RABBITMQ_NAME'):
kwargs['message_queue'] = ("amqp://guest:guest@%(RABBITMQ_PORT_5672_TCP_ADDR)s"
":%(RABBITMQ_PORT_5672_TCP_PORT)s/%%2F" % os.environ)
elif kwargs.get('beanstalk'):
kwargs['message_queue'] = "beanstalk://%s/" % kwargs['beanstalk']
for name in ('newtask_queue', 'status_queue', 'scheduler2fetcher',
'fetcher2processor', 'processor2result'):
if kwargs.get('message_queue'):
kwargs[name] = utils.Get(lambda name=name: connect_message_queue(
name, kwargs.get('message_queue'), kwargs['queue_maxsize']))
else:
kwargs[name] = connect_message_queue(name, kwargs.get('message_queue'),
kwargs['queue_maxsize'])
connect_message_queue 方法是非常关键的一个函数,继续跟踪,就会发现这个函数在message_queue包下面的__init__.py文件中,定义如下:
def connect_message_queue(name, url=None, maxsize=0, lazy_limit=True):
"""
create connection to message queue
name:
name of message queue
rabbitmq:
amqp://username:password@host:5672/%2F
see https://www.rabbitmq.com/uri-spec.html
beanstalk:
beanstalk://host:11300/
redis:
redis://host:6379/db
redis://host1:port1,host2:port2,...,hostn:portn (for redis 3.x in cluster mode)
kombu:
kombu+transport://userid:password@hostname:port/virtual_host
see http://kombu.readthedocs.org/en/latest/userguide/connections.html#urls
builtin:
None
"""
if not url:
from pyspider.libs.multiprocessing_queue import Queue
return Queue(maxsize=maxsize)
parsed = urlparse.urlparse(url)
if parsed.scheme == 'amqp':
from .rabbitmq import Queue
return Queue(name, url, maxsize=maxsize, lazy_limit=lazy_limit)
elif parsed.scheme == 'beanstalk':
from .beanstalk import Queue
return Queue(name, host=parsed.netloc, maxsize=maxsize)
elif parsed.scheme == 'redis':
from .redis_queue import Queue
if ',' in parsed.netloc:
"""
redis in cluster mode (there is no concept of 'db' in cluster mode)
ex. redis://host1:port1,host2:port2,...,hostn:portn
"""
cluster_nodes = []
for netloc in parsed.netloc.split(','):
cluster_nodes.append({'host': netloc.split(':')[0], 'port': int(netloc.split(':')[1])})
return Queue(name=name, maxsize=maxsize, lazy_limit=lazy_limit, cluster_nodes=cluster_nodes)
else:
db = parsed.path.lstrip('/').split('/')
try:
db = int(db[0])
except:
logging.warning('redis DB must zero-based numeric index, using 0 instead')
db = 0
password = parsed.password or None
return Queue(name=name, host=parsed.hostname, port=parsed.port, db=db, maxsize=maxsize, password=password, lazy_limit=lazy_limit)
elif url.startswith('kombu+'):
url = url[len('kombu+'):]
from .kombu_queue import Queue
return Queue(name, url, maxsize=maxsize, lazy_limit=lazy_limit)
else:
raise Exception('unknown connection url: %s', url)
由上面的代码我们就可以非常清楚的知道,如果使用redis集群模式时,首先要做的就是两件事:
pip install rediscluster
{
"message_queue": "redis://192.168.0.1:6379,192.168.0.1:6380,192.168.0.2:6380,192.168.0.2:6379"
}
节点与节点之间使用“,”分隔开。
在上面的代码中,我们可以看到如果是redis的单节点带认证的方式,配置文件应该是如下所示:
redis单节点有认证,则message_queue的url为:
冒号+密码+@+host:port
{
"message_queue": redis://:redispass@192.168.0.1:5000/db0
}
因此,我们可以模仿,如果使用redis集群带认证的方式按照redis单点类似的配置要怎么实现呢?
即配置文件如下:
{
"message_queue": "redis://:redispass@192.168.0.1:6379,192.168.0.1:6380,192.168.0.2:6380,192.168.0.2:6379"
}
如果要实现上面这种配置方式,显然我们是需要改代码的,首先要改的就是解析配置文件的地方。
def connect_message_queue(name, url=None, maxsize=0, lazy_limit=True):
"""
create connection to message queue
name:
name of message queue
rabbitmq:
amqp://username:password@host:5672/%2F
see https://www.rabbitmq.com/uri-spec.html
beanstalk:
beanstalk://host:11300/
redis:
redis://host:6379/db
redis://host1:port1,host2:port2,...,hostn:portn (for redis 3.x in cluster mode)
kombu:
kombu+transport://userid:password@hostname:port/virtual_host
see http://kombu.readthedocs.org/en/latest/userguide/connections.html#urls
builtin:
None
"""
if not url:
from pyspider.libs.multiprocessing_queue import Queue
return Queue(maxsize=maxsize)
parsed = urlparse.urlparse(url)
if parsed.scheme == 'amqp':
from .rabbitmq import Queue
return Queue(name, url, maxsize=maxsize, lazy_limit=lazy_limit)
elif parsed.scheme == 'beanstalk':
from .beanstalk import Queue
return Queue(name, host=parsed.netloc, maxsize=maxsize)
elif parsed.scheme == 'redis':
from .redis_queue import Queue
# 从URL中解析出password
password = parsed.password or None
if ',' in parsed.netloc:
"""
redis in cluster mode (there is no concept of 'db' in cluster mode)
ex. redis://host1:port1,host2:port2,...,hostn:portn
"""
cluster_nodes = []
for netloc in parsed.netloc.split(','):
#拿到了每一个节点,因为第一个节点中有password信息,所以需要做处理
if password is not None:
prefix = ":"+password+"@"
netloc =netloc.replace(prefix,"")
cluster_nodes.append({'host': netloc.split(':')[0], 'port': int(netloc.split(':')[1])})
# 在集群的构造函数中加入password
return Queue(name=name, maxsize=maxsize, lazy_limit=lazy_limit, cluster_nodes=cluster_nodes,password=password)
else:
db = parsed.path.lstrip('/').split('/')
try:
db = int(db[0])
except:
logging.warning('redis DB must zero-based numeric index, using 0 instead')
db = 0
password = parsed.password or None
return Queue(name=name, host=parsed.hostname, port=parsed.port, db=db, maxsize=maxsize, password=password, lazy_limit=lazy_limit)
elif url.startswith('kombu+'):
url = url[len('kombu+'):]
from .kombu_queue import Queue
return Queue(name, url, maxsize=maxsize, lazy_limit=lazy_limit)
else:
raise Exception('unknown connection url: %s', url)
由上述代码可知,我们改动的地方很少,所以还是非常简单的。
class RedisQueue(object):
"""
A Queue like message built over redis
"""
Empty = BaseQueue.Empty
Full = BaseQueue.Full
max_timeout = 0.3
def __init__(self, name, host='localhost', port=6379, db=0,
maxsize=0, lazy_limit=True, password=None, cluster_nodes=None):
"""
Constructor for RedisQueue
maxsize: an integer that sets the upperbound limit on the number of
items that can be placed in the queue.
lazy_limit: redis queue is shared via instance, a lazy size limit is used
for better performance.
"""
self.name = name
if(cluster_nodes is not None):
# StrictRedisCluster 方法在新的版本中已经被移除了,所以需要更改成RedisCluster
from rediscluster import RedisCluster
self.redis =
# 在构造函数中加入密码
RedisCluster(startup_nodes=cluster_nodes,password=password)
else:
self.redis = redis.StrictRedis(host=host, port=port, db=db, password=password)
self.maxsize = maxsize
self.lazy_limit = lazy_limit
self.last_qsize = 0
整个代码的改动也就不超过10行,就可以完美的实现我们的需求。
当然,因为rediscluster工具包不再维护了,我们需要更换新的redis集群工具包。
pip install redis==3.5.3
pip install redis-py-cluster==2.1.3