a.url管理模块
b.HTML下载模块
c.数据存储模块
d.爬虫调度模块
同步和异步关注的是消息通信机制
同步:再发出一个调用时,在没有得到结果之前,该调用就不会返回,但是一旦调用返回,就得到返回值。换句话说,就是由调用者主动等待这个调用结果
异步:调用在发出之后,这个调用就直接返回了,所以没有返回结果。当一个异步过程调用发出之后,调用者不会立刻得到结果,而是再在调用发出后,被调用这通过状态、通知来通知调用者,或通过回调函数处理这个调用
阻塞和非阻塞关注的是程序在等待调用结果(消息,返回值)时的状态
阻塞调用是指调用结果返回之前,当前线程会被挂起
调用线程只有在得到结果之后才会返回
非阻塞调用指在不能立刻得到结果之前,该调用不会阻塞当前线程
coroutines:一种比线程更加轻量级的存在,一个线程拥有多个协程。
协程被操作系统内核管理,完全可以自由程序控制,这样就是在性能上得到了很大的提升,因为单线程所以避免了上下文之间的切换
通过yield/send可以实现一个协程,yield可以让协程暂停和线程的阻塞是有本质的却别,协程暂停完全由程序控制,线程的阻塞状态由操作系统内核控制。开销远小于线程。python3.5以后 async/await成为了更好的替代方案
一个异步io库,从python3.4开始加入官方库,一直到python3.5才将许多临时api定格下来,知道python3.6才定格,python3.6为其加入async和await两大关键字,到python3.7加入asyncio.run方法,进一步简化运行入口的写法
事件循环是一种处理多并发量的有效方式,在维基百科中它被描述为“一种等待程序分配事件或消息的变成架构”,我们可以定义事件循环来简化使用轮询方法来监控时间。他的意义最通俗的说法是“当A发生时,执行B”。他是asyncio提供的“中央处理设备”
Cypthon编写、用来代替asyncio事件循环。
作者说“他在速度上至少比NOde.js、geven以及其他任何python异步框架快2倍”
mongodb的异步版本 Motor
mongodb文件存储系统 GridFS
GridFS:https://blog.csdn.net/u011194506/article/details/75578818?utm_medium=distribute.pc_relevant.none-task-blog-searchFromBaidu-1.control&depth_1-utm_source=distribute.pc_relevant.none-task-blog-searchFromBaidu-1.control
import asyncio
import aiohttp
# 这里加异常捕获是因为uvloop不支持windos环境
try:
import uvloop
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
except ImportError as e:
pass
url = "http://www.baidu.com"
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/88.0.4324.104 Safari/537.36",
}
async def fetch(url):
timeout = aiohttp.ClientTimeout(total=5)
# tc为一个connector 限制TCP的连接数 忽略SSL证书错误
async with aiohttp.TCPConnector(limit=30, verify_ssl=False)as tc:
# 定义一个session,通过异步上下文关键字定义
async with aiohttp.ClientSession(connector=tc,timeout=timeout)as session:
# req就是response
async with session.get(url, headers=headers)as req:
status = req.status
if status in [200, 201]:
# 因为异步所以需要用await去调用
source = await req.text()
print(source)
return source
if __name__ == '__main__':
# 创建一个事件循环a
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop = asyncio.get_event_loop()
tasks = [asyncio.ensure_future(fetch(url)) for i in range(10)]
loop.run_until_complete(asyncio.wait(tasks))
# loop.run_until_complete(asyncio.sleep(0))
loop.close()
pip install aiohttp-socks
from aiohttp_socks import SocksConnector
async with semaphore:
async with aiohttp.ClientSession(connector=SocksConnector.from_url(proxies) if proxies else None) as session:
async with session.get(url, headers=headers, allow_redirects=True, timeout=20) as response:
content, text = await response.read(), await response.text("utf-8","ignore")
status_code = response.status
response_headers = dict(response.headers)
reason = response.reason
return url, status_code, response_headers, content, text, reason
这里之前使用的是SocksConnector,但会出现错误(没有看具体原因),后来改用ProxyConnector
from aiohttp_socks import SocksConnector, ProxyConnector
async with aiohttp.ClientSession(connector=ProxyConnector.from_url(proxy) if proxy else None) as session:
import asyncio
import time
import aiohttp
import requests
import html
import logging.config
from new.log_config import log_conf
from lxml import etree
# 使用SocksConnector会存在某些问题
from aiohttp_socks import SocksConnector, ProxyConnector
from queue import Queue
from fake_useragent import UserAgent
# log
logging.config.dictConfig(log_conf)
logger = logging.getLogger("buggy")
session = requests.session()
proxy = '这里是socks5代理'
ua_sum = UserAgent()
ua = ua_sum.random
headers = {
"User-Agent": ua,
"Cookie": 'BAIDUID=1233:FG=1; Hm_lpvt_={}'.format(int(time.time())),
}
# 获取到的数据直接放到队列里
page_list_queue = Queue()
async def get_list_url(keyword, pagenum):
url_list = []
for num in range(pagenum):
number = num * 10
url = f"https://wenku.baidu.com/search/main?word={keyword}" \
f"&org=0&fd=0&lm=0&od=0&ie=gbk&fr=top_home&pn={number}"
url_list.append(url)
return url_list
async def listpage_request(session, url):
async with session.get(url=url, headers=headers, verify_ssl=False, timeout=10) as res:
response_text = await res.text("gbk", errors="ignore")
await deal_listpage(response_text)
async def deal_listpage(response_text):
serch_html = etree.HTML(response_text)
serch_list = serch_html.xpath("//div[@class='main']/div[@class='search-result']/dl")
for li in serch_list:
tag_str = etree.tostring(li, method='html').decode()
tag_html = html.unescape(tag_str)
tag_html.replace('<em>', '').replace('</em>', '')
li = etree.HTML(tag_html)
# title
title = li.xpath(".//dt/p[1]/a/@title")[0]
# url
url = li.xpath(".//dt/p[1]/a/@href")[0]
# 来源网站 source_web
source_web = "百度文库"
# 描述 describe
describe_tag = li.xpath('.//div[@class="summary-box fl"]/p[@class="summary lh21"]')[0]
describe_str = etree.tostring(describe_tag, method='html').decode("utf8", "ignore")
describe_html_str = html.unescape(describe_str)
describe = describe_html_str[24:-5].replace('<em>', '').replace('</em>', '')
data = {
"title": title,
"source_web": source_web,
"describe": describe,
"details_url": url,
}
page_list_queue.put(data)
async def get_list_page(keyword):
url_list = await get_list_url(keyword, 77)
async with aiohttp.ClientSession(connector=ProxyConnector.from_url(proxy) if proxy else None) as session:
tasks = [asyncio.create_task(listpage_request(session, url)) for url in url_list]
await asyncio.wait(tasks)
def baidu_list_main(keyword):
loop = asyncio.get_event_loop()
loop.run_until_complete(get_list_page(keyword))
data_list = []
while page_list_queue.empty() is not True:
data = page_list_queue.get()
data_list.append(data)
return {"keyword": keyword, "return_size": len(data_list), "return_list": data_list}
if __name__ == '__main__':
start = time.time()
print(baidu_list_main("的"))
print(time.time() - start)
如果把close注释掉,就不会出现这个问题,大佬(感谢博文大佬,本次关于aiohttp得学习完全靠博文大佬得支持)告诉我在实际开发中大部分也是不用close的,如果close必须,参考:https://github.com/aio-libs/aiohttp/issues/4324#issuecomment-733884349
解决办法:(重写close中的方法)
from functools import wraps
from asyncio.proactor_events import _ProactorBasePipeTransport
def silence_event_loop_closed(func):
@wraps(func)
def wrapper(self, *args, **kwargs):
try:
return func(self, *args, **kwargs)
except RuntimeError as e:
if str(e) != 'Event loop is closed':
raise
return wrapper
_ProactorBasePipeTransport.__del__ = silence_event_loop_closed(_ProactorBasePipeTransport.__del__)