https://github.com/Wangler2333/bilibili_video_stat
爬取b站视频信息,供大数据分析用户喜好。使用scrapy-redis分布式,在16核服务器上实现抓取2500万条/天。可长期部署抓取,实现视频趋势分析
pip3 install -r requirements.txt
python3 start.py 进程数
# python3 start.py 32
建议配合代理ip池使用,提供 https://github.com/arthurmmm/hq-proxies 代理池的调用中间件,需在settings中开启中间件
请在使用前设置settings.py中的redis和mongodb的连接
pipeline.py注释了使用mysql保存的代码,如使用,请在settings.py添加相应设置
# -*- coding: utf-8 -*-
from scrapy_redis.spiders import RedisSpider
import logging
from scrapy.exceptions import CloseSpider
from bilibili.items import BiliBiliData
import json
import time
logger = logging.getLogger(__name__)
class BilibiliSpiderSpider(RedisSpider):
name = 'bilibili_spider'
# allowed_domains = ['bilibili.com']
# 启动爬虫的命令
redis_key = "bilibili_spider:strat_urls"
def parse(self, response):
try:
# 若settings中HTTPERROR_ALLOW_ALL = True,则需检测状态吗
if response.status not in [200, 301, 302, 303, 307]:
raise CloseSpider("网址:%s 状态码异常:%s" % (response.url, response.status))
except CloseSpider as error:
logger.error(error)
else:
try:
# 解析json数据
json_data = json.loads(response.text)
except Exception as error:
# 若解析错误,记录url
json_data = {"code": 403}
logger.error((response.url, error))
with open("./error_json.txt", "a") as fb:
fb.write(response.url)
fb.write("\n")
item = BiliBiliData()
if json_data["code"] == 0:
# 解析json数据,若为"--"则计为0
data = json_data["data"]
item['aid'] = data.get("aid")
item['view'] = data.get("view", 0) if data.get("view", 0) != "--" else 0
item['danmaku'] = data.get("danmaku", 0) if data.get("danmaku", 0) != "--" else 0
item['reply'] = data.get("reply", 0) if data.get("reply", 0) != "--" else 0
item['favorite'] = data.get("favorite", 0) if data.get("favorite", 0) != "--" else 0
item['coin'] = data.get("coin", 0) if data.get("coin", 0) != "--" else 0
item['share'] = data.get("share", 0) if data.get("share", 0) != "--" else 0
item['time'] = time.time()
yield item
logger.info("爬取完成:%s" % response.url)
# 因logging等级设为了WARNING,则在log中增加一条完成记录
logger.warning("完成:[%s]" % response.url)
# -*- coding: utf-8 -*-
# Define here the models for your spider middleware
#
# See documentation in:
# https://doc.scrapy.org/en/latest/topics/spider-middleware.html
from redis import StrictRedis
from scrapy.conf import settings
from scrapy import signals
import random # 随机选择
from .useragent import agents # 导入前面的
from scrapy.downloadermiddlewares.useragent import UserAgentMiddleware # UserAegent中间件
from scrapy.downloadermiddlewares.retry import RetryMiddleware # 重写重试中间件
class UserRetryMiddleware(RetryMiddleware):
# 自定义重试中间件,未启用
def _retry(self, request, reason, spider):
redis_db = StrictRedis(
host=settings["REDIS_HOST"],
port=settings["REDIS_PORT"],
password=settings["REDIS_PASSWORD"],
db=settings["REDIS_PROXY_DB"],
)
print(request.url)
redis_db.lpush("bilibili_spider:strat_urls", request.url)
class UserAgentmiddleware(UserAgentMiddleware):
# 随机user agent的中间件
def process_request(self, request, spider):
agent = random.choice(agents)
request.headers["User-Agent"] = agent
class BilibiliDownloaderMiddleware(object):
@classmethod
def from_crawler(cls, crawler):
s = cls()
crawler.signals.connect(s.spider_opened, signal=signals.spider_opened)
crawler.signals.connect(s.spider_closed, signal=signals.spider_closed)
crawler.signals.connect(s.spider_error, signal=signals.spider_error)
return s
def process_request(self, request, spider):
# 使用代理ip池的中间件
# 配合https://github.com/arthurmmm/hq-proxies使用
# 若使用请在settings中开启
redis_db = StrictRedis(
host=settings["REDIS_HOST"],
port=settings["REDIS_PORT"],
password=settings["REDIS_PASSWORD"],
db=settings["REDIS_PROXY_DB"],
)
proxy = redis_db.srandmember("hq-proxies:proxy_pool", 1)
if proxy:
proxy = proxy[0].decode()
spider.logger.info('使用代理[%s]访问[%s]' % (proxy, request.url))
request.meta['proxy'] = proxy
else:
spider.logger.warning('不使用代理访问[%s]' % request.url)
return None
def process_response(self, request, response, spider):
return response
def process_exception(self, request, exception, spider):
pass
def spider_opened(self, spider):
spider.logger.info('爬虫开启: %s' % spider.name)
def spider_closed(self, spider):
spider.logger.info('爬虫关闭: %s' % spider.name)
def spider_error(self, failure, response, spider):
# 增加记录爬虫报错的函数,连接spider_error信号
spider.logger.error('[%s],错误:%s' % (response.url, failure.getTraceback()))
with open("./error_spider.txt", "a") as fa:
fa.write(response.url)
fa.write("\n")
with open("./error_spider_info.txt", "a") as fb:
fb.write("Error on {0}, traceback: {1}".format(response.url, failure.getTraceback()))
fb.write("\n")
# -*- coding: utf-8 -*-
# Define your item pipelines here
#
# Don't forget to add your pipeline to the ITEM_PIPELINES setting
# See: https://doc.scrapy.org/en/latest/topics/item-pipeline.html
# import pymysql
from scrapy.conf import settings
import logging as logger
import time
from pymongo import MongoClient
class BilibiliPipeline(object):
def __init__(self):
# 建立数据库连接
client = MongoClient(settings["MONGO_HOST"], settings["MONGO_PORT"])
# 连接目标数据库
db = client["bilibili"]
db.authenticate(settings["MONGO_USERNAME"], settings["MONGO_PASSWORD"])
# 连接集合
# 根据当前日期建立集合
col_name = "b_video_stat_" + time.strftime("%Y%m%d")
col = db[col_name]
self.col = col
def process_item(self, item, spider):
try:
data = dict(item)
self.col.insert_one(data)
except Exception as error:
# 记录保存错误的url
logger.error(error)
with open("./error_mongo.txt", "a") as fb:
fb.write("aid:" + str(item["aid"]))
fb.write("\n")
return item
# 保存到mysql
# def process_item(self, item, spider):
# # 连接数据库
# connect = pymysql.connect(
# host=settings.MYSQL_HOST,
# db=settings.MYSQL_DBNAME,
# user=settings.MYSQL_USER,
# passwd=settings.MYSQL_PASSWD,
# charset='utf8mb4',
# use_unicode=True)
#
# # 通过cursor执行增删查改
# cursor = connect.cursor()
# try:
# cursor.execute('''insert into b_video_stat(aid,view,danmaku,reply,favorite,coin,share)
# values (%d,%d,%d,%d,%d,%d,%d)''' % (item["aid"],
# item["view"],
# item["danmaku"],
# item["reply"],
# item["favorite"],
# item["coin"],
# item["share"]))
# # 提交sql语句
# connect.commit()
# except Exception as error:
# # 出现错误时打印错误日志
# logging.error((error, item))
#
# connect.close()
# return item
#!/usr/bin/env python
# encoding: utf-8
# @version:
# @author: liduo
# @license:
# @file: start.py
# @time: 2018/5/30 下午10:24
from multiprocessing import Pool
import os
def run():
# 发送命令,启动一个爬虫
cmd = "scrapy crawl bilibili_spider"
os.system(cmd)
def main(number):
# 创建进程池
p = Pool(number)
for n in range(number):
p.apply_async(run)
p.close()
p.join()
if __name__ == '__main__':
import sys
# 接收传入的参数,代表开启几个scrapy-redis进程
num = sys.argv[1]
num = int(num)
print("开启[%s]个进程" % num)
main(num)
print("进程结束")
1.请准备较大的代理IP池:若数据量在400条/秒,根据b站封ip的测试(短时间请求1000次则封ip),代理池需维持300-400的代理ip
2.请使用短有效期的代理ip:代理ip有效期在3-5分钟需300-400的代理池IP量,若有效期较长,需增加代理池的ip量
3.请准备较多的user agent组成池,useragent.py中的量差不多了
4.请开启重试:超时时间调小为10s,重试次数增加为10;实际测试中,重试1次约占30%,重试2次及以上约占10%,最多的重试了6次;重试占比于代理ip质量相关
5.若出现所有请求全部都重试的情况,请开启DOWNLOAD_DELAY = 0.01;实际测试,即使只延迟0.01s也会显著减少重试次数;若重试情况仍严重,请增加至0.25s
6.需详细记录失败的url,保持数据的完整性