当前位置: 首页 > 工具软件 > py-amqp > 使用案例 >

Web Crawler 网路爬虫源码-Py语言-立哥开发

戎洛城
2023-12-01

#Jacky 2020 All rights reserved.
#!/usr/bin/env python
# -*- encoding: utf-8 -*-
# vim: set et sw=4 ts=4 sts=4 ff=unix fenc=utf8:

import os
import sys
import six
import copy
import time
import shutil
import logging
import logging.config

import click
import pyspider
from pyspider.message_queue import connect_message_queue
from pyspider.database import connect_database
from pyspider.libs import utils


def read_config(ctx, param, value):
    if not value:
        return {}
    import json

    def underline_dict(d):
        if not isinstance(d, dict):
            return d
        return dict((k.replace('-', '_'), underline_dict(v)) for k, v in six.iteritems(d))

    config = underline_dict(json.load(value))
    ctx.default_map = config
    return config


def connect_db(ctx, param, value):
    if not value:
        return
    return utils.Get(lambda: connect_database(value))


def load_cls(ctx, param, value):
    if isinstance(value, six.string_types):
        return utils.load_object(value)
    return value


def connect_rpc(ctx, param, value):
    if not value:
        return
    try:
        from six.moves import xmlrpc_client
    except ImportError:
        import xmlrpclib as xmlrpc_client
    return xmlrpc_client.ServerProxy(value, allow_none=True)


@click.group(invoke_without_command=True)
@click.option('-c', '--config', callback=read_config, type=click.File('r'),
              help='a json file with default values for subcommands. {"webui": {"port":5001}}')
@click.option('--logging-config', default=os.path.join(os.path.dirname(__file__), "logging.conf"),
              help="logging config file for built-in python logging module", show_default=True)
@click.option('--debug', envvar='DEBUG', default=False, is_flag=True, help='debug mode')
@click.option('--queue-maxsize', envvar='QUEUE_MAXSIZE', default=100,
              help='maxsize of queue')
@click.option('--taskdb', envvar='TASKDB', callback=connect_db,
              help='database url for taskdb, default: sqlite')
@click.option('--projectdb', envvar='PROJECTDB', callback=connect_db,
              help='database url for projectdb, default: sqlite')
@click.option('--resultdb', envvar='RESULTDB', callback=connect_db,
              help='database url for resultdb, default: sqlite')
@click.option('--message-queue', envvar='AMQP_URL',
              help='connection url to message queue, '
              'default: builtin multiprocessing.Queue')
@click.option('--amqp-url', help='[deprecated] amqp url for rabbitmq. '
              'please use --message-queue instead.')
@click.option('--beanstalk', envvar='BEANSTALK_HOST',
              help='[deprecated] beanstalk config for beanstalk queue. '
              'please use --message-queue instead.')
@click.option('--phantomjs-proxy', envvar='PHANTOMJS_PROXY', help="phantomjs proxy ip:port")
@click.option('--puppeteer-proxy', envvar='PUPPETEER_PROXY', help="puppeteer proxy ip:port")
@click.option('--data-path', default='./data', help='data dir path')
@click.option('--add-sys-path/--not-add-sys-path', default=True, is_flag=True,
              help='add current working directory to python lib search path')
@click.version_option(version=pyspider.__version__, prog_name=pyspider.__name__)
@click.pass_context
def cli(ctx, **kwargs):
    """
    A powerful spider system in python.
    """
    if kwargs['add_sys_path']:
        sys.path.append(os.getcwd())

    logging.config.fileConfig(kwargs['logging_config'])

    # get db from env
    for db in ('taskdb', 'projectdb', 'resultdb'):
        if kwargs[db] is not None:
            continue
        if os.environ.get('MYSQL_NAME'):
            kwargs[db] = utils.Get(lambda db=db: connect_database(
                'sqlalchemy+mysql+%s://%s:%s/%s' % (
                    db, os.environ['MYSQL_PORT_3306_TCP_ADDR'],
                    os.environ['MYSQL_PORT_3306_TCP_PORT'], db)))
        elif os.environ.get('MONGODB_NAME'):
            kwargs[db] = utils.Get(lambda db=db: connect_database(
                'mongodb+%s://%s:%s/%s' % (
                    db, os.environ['MONGODB_PORT_27017_TCP_ADDR'],
                    os.environ['MONGODB_PORT_27017_TCP_PORT'], db)))
        elif os.environ.get('COUCHDB_NAME'):
            kwargs[db] = utils.Get(lambda db=db: connect_database(
                'couchdb+%s://%s:%s/%s' % (
                    db,
                    os.environ['COUCHDB_PORT_5984_TCP_ADDR'] or 'couchdb',
                    os.environ['COUCHDB_PORT_5984_TCP_PORT'] or '5984',
                    db)))
        elif ctx.invoked_subcommand == 'bench':
            if kwargs['data_path'] == './data':
                kwargs['data_path'] += '/bench'
                shutil.rmtree(kwargs['data_path'], ignore_errors=True)
                os.mkdir(kwargs['data_path'])
            if db in ('taskdb', 'resultdb'):
                kwargs[db] = utils.Get(lambda db=db: connect_database('sqlite+%s://' % (db)))
            elif db in ('projectdb', ):
                kwargs[db] = utils.Get(lambda db=db: connect_database('local+%s://%s' % (
                    db, os.path.join(os.path.dirname(__file__), 'libs/bench.py'))))
        else:
            if not os.path.exists(kwargs['data_path']):
                os.mkdir(kwargs['data_path'])
            kwargs[db] = utils.Get(lambda db=db: connect_database('sqlite+%s:///%s/%s.db' % (
                db, kwargs['data_path'], db[:-2])))
            kwargs['is_%s_default' % db] = True

    # create folder for counter.dump
    if not os.path.exists(kwargs['data_path']):
        os.mkdir(kwargs['data_path'])

    # message queue, compatible with old version
    if kwargs.get('message_queue'):
        pass
    elif kwargs.get('amqp_url'):
        kwargs['message_queue'] = kwargs['amqp_url']
    elif os.environ.get('RABBITMQ_NAME'):
        kwargs['message_queue'] = ("amqp://guest:guest@%(RABBITMQ_PORT_5672_TCP_ADDR)s"
                                   ":%(RABBITMQ_PORT_5672_TCP_PORT)s/%%2F" % os.environ)

    for name in ('newtask_queue', 'status_queue', 'scheduler2fetcher',
                 'fetcher2processor', 'processor2result'):
        if kwargs.get('message_queue'):
            kwargs[name] = utils.Get(lambda name=name: connect_message_queue(
                name, kwargs.get('message_queue'), kwargs['queue_maxsize']))
        else:
            kwargs[name] = connect_message_queue(name, kwargs.get('message_queue'),
                                                 kwargs['queue_maxsize'])

    # phantomjs-proxy
    if kwargs.get('phantomjs_proxy'):
        pass
    elif os.environ.get('PHANTOMJS_NAME'):
        kwargs['phantomjs_proxy'] = os.environ['PHANTOMJS_PORT_25555_TCP'][len('tcp://'):]

    # puppeteer-proxy
    if kwargs.get('puppeteer_proxy'):
        pass
    elif os.environ.get('PUPPETEER_NAME'):
        kwargs['puppeteer_proxy'] = os.environ['PUPPETEER_PORT_22222_TCP'][len('tcp://'):]

    ctx.obj = utils.ObjectDict(ctx.obj or {})
    ctx.obj['instances'] = []
    ctx.obj.update(kwargs)

    if ctx.invoked_subcommand is None and not ctx.obj.get('testing_mode'):
        ctx.invoke(all)
    return ctx


@cli.command()
@click.option('--xmlrpc', is_flag=True, help="Enable xmlrpc (Default=True)")
@click.option('--no-xmlrpc', is_flag=True, help="Disable xmlrpc")
@click.option('--xmlrpc-host', default='0.0.0.0')
@click.option('--xmlrpc-port', envvar='SCHEDULER_XMLRPC_PORT', default=23333)
@click.option('--inqueue-limit', default=0,
              help='size limit of task queue for each project, '
              'tasks will been ignored when overflow')
@click.option('--delete-time', default=24 * 60 * 60,
              help='delete time before marked as delete')
@click.option('--active-tasks', default=100, help='active log size')
@click.option('--loop-limit', default=1000, help='maximum number of tasks due with in a loop')
@click.option('--fail-pause-num', default=10, help='auto pause the project when last FAIL_PAUSE_NUM task failed, set 0 to disable')
@click.option('--scheduler-cls', default='pyspider.scheduler.ThreadBaseScheduler', callback=load_cls,
              help='scheduler class to be used.')
@click.option('--threads', default=None, help='thread number for ThreadBaseScheduler, default: 4')
@click.pass_context
def scheduler(ctx, xmlrpc, no_xmlrpc, xmlrpc_host, xmlrpc_port,
              inqueue_limit, delete_time, active_tasks, loop_limit, fail_pause_num,
              scheduler_cls, threads, get_object=False):
    """
    Run Scheduler, only one scheduler is allowed.
    """
    g = ctx.obj
    Scheduler = load_cls(None, None, scheduler_cls)

 类似资料: