#Jacky 2020 All rights reserved.
#!/usr/bin/env python
# -*- encoding: utf-8 -*-
# vim: set et sw=4 ts=4 sts=4 ff=unix fenc=utf8:
import os
import sys
import six
import copy
import time
import shutil
import logging
import logging.config
import click
import pyspider
from pyspider.message_queue import connect_message_queue
from pyspider.database import connect_database
from pyspider.libs import utils
def read_config(ctx, param, value):
if not value:
return {}
import json
def underline_dict(d):
if not isinstance(d, dict):
return d
return dict((k.replace('-', '_'), underline_dict(v)) for k, v in six.iteritems(d))
config = underline_dict(json.load(value))
ctx.default_map = config
return config
def connect_db(ctx, param, value):
if not value:
return
return utils.Get(lambda: connect_database(value))
def load_cls(ctx, param, value):
if isinstance(value, six.string_types):
return utils.load_object(value)
return value
def connect_rpc(ctx, param, value):
if not value:
return
try:
from six.moves import xmlrpc_client
except ImportError:
import xmlrpclib as xmlrpc_client
return xmlrpc_client.ServerProxy(value, allow_none=True)
@click.group(invoke_without_command=True)
@click.option('-c', '--config', callback=read_config, type=click.File('r'),
help='a json file with default values for subcommands. {"webui": {"port":5001}}')
@click.option('--logging-config', default=os.path.join(os.path.dirname(__file__), "logging.conf"),
help="logging config file for built-in python logging module", show_default=True)
@click.option('--debug', envvar='DEBUG', default=False, is_flag=True, help='debug mode')
@click.option('--queue-maxsize', envvar='QUEUE_MAXSIZE', default=100,
help='maxsize of queue')
@click.option('--taskdb', envvar='TASKDB', callback=connect_db,
help='database url for taskdb, default: sqlite')
@click.option('--projectdb', envvar='PROJECTDB', callback=connect_db,
help='database url for projectdb, default: sqlite')
@click.option('--resultdb', envvar='RESULTDB', callback=connect_db,
help='database url for resultdb, default: sqlite')
@click.option('--message-queue', envvar='AMQP_URL',
help='connection url to message queue, '
'default: builtin multiprocessing.Queue')
@click.option('--amqp-url', help='[deprecated] amqp url for rabbitmq. '
'please use --message-queue instead.')
@click.option('--beanstalk', envvar='BEANSTALK_HOST',
help='[deprecated] beanstalk config for beanstalk queue. '
'please use --message-queue instead.')
@click.option('--phantomjs-proxy', envvar='PHANTOMJS_PROXY', help="phantomjs proxy ip:port")
@click.option('--puppeteer-proxy', envvar='PUPPETEER_PROXY', help="puppeteer proxy ip:port")
@click.option('--data-path', default='./data', help='data dir path')
@click.option('--add-sys-path/--not-add-sys-path', default=True, is_flag=True,
help='add current working directory to python lib search path')
@click.version_option(version=pyspider.__version__, prog_name=pyspider.__name__)
@click.pass_context
def cli(ctx, **kwargs):
"""
A powerful spider system in python.
"""
if kwargs['add_sys_path']:
sys.path.append(os.getcwd())
logging.config.fileConfig(kwargs['logging_config'])
# get db from env
for db in ('taskdb', 'projectdb', 'resultdb'):
if kwargs[db] is not None:
continue
if os.environ.get('MYSQL_NAME'):
kwargs[db] = utils.Get(lambda db=db: connect_database(
'sqlalchemy+mysql+%s://%s:%s/%s' % (
db, os.environ['MYSQL_PORT_3306_TCP_ADDR'],
os.environ['MYSQL_PORT_3306_TCP_PORT'], db)))
elif os.environ.get('MONGODB_NAME'):
kwargs[db] = utils.Get(lambda db=db: connect_database(
'mongodb+%s://%s:%s/%s' % (
db, os.environ['MONGODB_PORT_27017_TCP_ADDR'],
os.environ['MONGODB_PORT_27017_TCP_PORT'], db)))
elif os.environ.get('COUCHDB_NAME'):
kwargs[db] = utils.Get(lambda db=db: connect_database(
'couchdb+%s://%s:%s/%s' % (
db,
os.environ['COUCHDB_PORT_5984_TCP_ADDR'] or 'couchdb',
os.environ['COUCHDB_PORT_5984_TCP_PORT'] or '5984',
db)))
elif ctx.invoked_subcommand == 'bench':
if kwargs['data_path'] == './data':
kwargs['data_path'] += '/bench'
shutil.rmtree(kwargs['data_path'], ignore_errors=True)
os.mkdir(kwargs['data_path'])
if db in ('taskdb', 'resultdb'):
kwargs[db] = utils.Get(lambda db=db: connect_database('sqlite+%s://' % (db)))
elif db in ('projectdb', ):
kwargs[db] = utils.Get(lambda db=db: connect_database('local+%s://%s' % (
db, os.path.join(os.path.dirname(__file__), 'libs/bench.py'))))
else:
if not os.path.exists(kwargs['data_path']):
os.mkdir(kwargs['data_path'])
kwargs[db] = utils.Get(lambda db=db: connect_database('sqlite+%s:///%s/%s.db' % (
db, kwargs['data_path'], db[:-2])))
kwargs['is_%s_default' % db] = True
# create folder for counter.dump
if not os.path.exists(kwargs['data_path']):
os.mkdir(kwargs['data_path'])
# message queue, compatible with old version
if kwargs.get('message_queue'):
pass
elif kwargs.get('amqp_url'):
kwargs['message_queue'] = kwargs['amqp_url']
elif os.environ.get('RABBITMQ_NAME'):
kwargs['message_queue'] = ("amqp://guest:guest@%(RABBITMQ_PORT_5672_TCP_ADDR)s"
":%(RABBITMQ_PORT_5672_TCP_PORT)s/%%2F" % os.environ)
for name in ('newtask_queue', 'status_queue', 'scheduler2fetcher',
'fetcher2processor', 'processor2result'):
if kwargs.get('message_queue'):
kwargs[name] = utils.Get(lambda name=name: connect_message_queue(
name, kwargs.get('message_queue'), kwargs['queue_maxsize']))
else:
kwargs[name] = connect_message_queue(name, kwargs.get('message_queue'),
kwargs['queue_maxsize'])
# phantomjs-proxy
if kwargs.get('phantomjs_proxy'):
pass
elif os.environ.get('PHANTOMJS_NAME'):
kwargs['phantomjs_proxy'] = os.environ['PHANTOMJS_PORT_25555_TCP'][len('tcp://'):]
# puppeteer-proxy
if kwargs.get('puppeteer_proxy'):
pass
elif os.environ.get('PUPPETEER_NAME'):
kwargs['puppeteer_proxy'] = os.environ['PUPPETEER_PORT_22222_TCP'][len('tcp://'):]
ctx.obj = utils.ObjectDict(ctx.obj or {})
ctx.obj['instances'] = []
ctx.obj.update(kwargs)
if ctx.invoked_subcommand is None and not ctx.obj.get('testing_mode'):
ctx.invoke(all)
return ctx
@cli.command()
@click.option('--xmlrpc', is_flag=True, help="Enable xmlrpc (Default=True)")
@click.option('--no-xmlrpc', is_flag=True, help="Disable xmlrpc")
@click.option('--xmlrpc-host', default='0.0.0.0')
@click.option('--xmlrpc-port', envvar='SCHEDULER_XMLRPC_PORT', default=23333)
@click.option('--inqueue-limit', default=0,
help='size limit of task queue for each project, '
'tasks will been ignored when overflow')
@click.option('--delete-time', default=24 * 60 * 60,
help='delete time before marked as delete')
@click.option('--active-tasks', default=100, help='active log size')
@click.option('--loop-limit', default=1000, help='maximum number of tasks due with in a loop')
@click.option('--fail-pause-num', default=10, help='auto pause the project when last FAIL_PAUSE_NUM task failed, set 0 to disable')
@click.option('--scheduler-cls', default='pyspider.scheduler.ThreadBaseScheduler', callback=load_cls,
help='scheduler class to be used.')
@click.option('--threads', default=None, help='thread number for ThreadBaseScheduler, default: 4')
@click.pass_context
def scheduler(ctx, xmlrpc, no_xmlrpc, xmlrpc_host, xmlrpc_port,
inqueue_limit, delete_time, active_tasks, loop_limit, fail_pause_num,
scheduler_cls, threads, get_object=False):
"""
Run Scheduler, only one scheduler is allowed.
"""
g = ctx.obj
Scheduler = load_cls(None, None, scheduler_cls)