当前位置: 首页 > 工具软件 > loop > 使用案例 >

aiohttp学习笔记及RuntimeError: Event loop is closed解决办法(close问题)

乜昆
2023-12-01

aiohttp学习笔记及RuntimeError: Event loop is closed解决办法(close问题)

一、概念

1.爬虫的基本组成
	a.url管理模块
	b.HTML下载模块
	c.数据存储模块
	d.爬虫调度模块
2.同步和异步
	同步和异步关注的是消息通信机制
	
	同步:再发出一个调用时,在没有得到结果之前,该调用就不会返回,但是一旦调用返回,就得到返回值。换句话说,就是由调用者主动等待这个调用结果
	
	异步:调用在发出之后,这个调用就直接返回了,所以没有返回结果。当一个异步过程调用发出之后,调用者不会立刻得到结果,而是再在调用发出后,被调用这通过状态、通知来通知调用者,或通过回调函数处理这个调用
3.阻塞与非阻塞
	阻塞和非阻塞关注的是程序在等待调用结果(消息,返回值)时的状态
	阻塞调用是指调用结果返回之前,当前线程会被挂起
	调用线程只有在得到结果之后才会返回
	非阻塞调用指在不能立刻得到结果之前,该调用不会阻塞当前线程 
4.协程
	coroutines:一种比线程更加轻量级的存在,一个线程拥有多个协程。
	协程被操作系统内核管理,完全可以自由程序控制,这样就是在性能上得到了很大的提升,因为单线程所以避免了上下文之间的切换
	通过yield/send可以实现一个协程,yield可以让协程暂停和线程的阻塞是有本质的却别,协程暂停完全由程序控制,线程的阻塞状态由操作系统内核控制。开销远小于线程。python3.5以后 async/await成为了更好的替代方案
5.asyncio
	一个异步io库,从python3.4开始加入官方库,一直到python3.5才将许多临时api定格下来,知道python3.6才定格,python3.6为其加入async和await两大关键字,到python3.7加入asyncio.run方法,进一步简化运行入口的写法
6.事件循环
	事件循环是一种处理多并发量的有效方式,在维基百科中它被描述为“一种等待程序分配事件或消息的变成架构”,我们可以定义事件循环来简化使用轮询方法来监控时间。他的意义最通俗的说法是“当A发生时,执行B”。他是asyncio提供的“中央处理设备”
7.uvlop
	Cypthon编写、用来代替asyncio事件循环。
	作者说“他在速度上至少比NOde.js、geven以及其他任何python异步框架快2倍”
8.其他
	mongodb的异步版本 Motor
	mongodb文件存储系统 GridFS

GridFS:https://blog.csdn.net/u011194506/article/details/75578818?utm_medium=distribute.pc_relevant.none-task-blog-searchFromBaidu-1.control&depth_1-utm_source=distribute.pc_relevant.none-task-blog-searchFromBaidu-1.control

二、示例代码

1.aiohttp
import asyncio
import aiohttp


# 这里加异常捕获是因为uvloop不支持windos环境
try:
    import uvloop
    asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
except ImportError as e:
    pass
url = "http://www.baidu.com"
headers = {
    "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) "
                  "Chrome/88.0.4324.104 Safari/537.36",
}


async def fetch(url):
    timeout = aiohttp.ClientTimeout(total=5)
    
    #  tc为一个connector          限制TCP的连接数 忽略SSL证书错误
    async with aiohttp.TCPConnector(limit=30, verify_ssl=False)as tc:
        
        #  定义一个session,通过异步上下文关键字定义
        async with aiohttp.ClientSession(connector=tc,timeout=timeout)as session:
            # req就是response
            async with session.get(url, headers=headers)as req:
                status = req.status
                if status in [200, 201]:
                    
                    # 因为异步所以需要用await去调用
                    source = await req.text()
                    print(source)
                    return source


if __name__ == '__main__':
    # 创建一个事件循环a
    loop = asyncio.new_event_loop()
    
    asyncio.set_event_loop(loop)
    loop = asyncio.get_event_loop()
    tasks = [asyncio.ensure_future(fetch(url)) for i in range(10)]
    loop.run_until_complete(asyncio.wait(tasks))
    # loop.run_until_complete(asyncio.sleep(0))
    loop.close()
2.aiohttp添加socks代理

pip install aiohttp-socks

from aiohttp_socks import SocksConnector
async with semaphore:
    async with aiohttp.ClientSession(connector=SocksConnector.from_url(proxies) if proxies else None) as session:
        async with session.get(url, headers=headers, allow_redirects=True, timeout=20) as response:
            content, text = await response.read(), await response.text("utf-8","ignore")
            status_code = response.status
            response_headers = dict(response.headers)
            reason = response.reason
            return url, status_code, response_headers, content, text, reason

这里之前使用的是SocksConnector,但会出现错误(没有看具体原因),后来改用ProxyConnector

from aiohttp_socks import SocksConnector, ProxyConnector
    async with aiohttp.ClientSession(connector=ProxyConnector.from_url(proxy) if proxy else None) as session:
3.百度文库列表页代码示例
import asyncio
import time
import aiohttp
import requests
import html
import logging.config
from new.log_config import log_conf
from lxml import etree
# 使用SocksConnector会存在某些问题
from aiohttp_socks import SocksConnector, ProxyConnector
from queue import Queue
from fake_useragent import UserAgent

# log
logging.config.dictConfig(log_conf)
logger = logging.getLogger("buggy")

session = requests.session()

proxy = '这里是socks5代理'

ua_sum = UserAgent()
ua = ua_sum.random
headers = {
    "User-Agent": ua,
    "Cookie": 'BAIDUID=1233:FG=1; Hm_lpvt_={}'.format(int(time.time())),
}
# 获取到的数据直接放到队列里
page_list_queue = Queue()


async def get_list_url(keyword, pagenum):
    url_list = []
    for num in range(pagenum):
        number = num * 10
        url = f"https://wenku.baidu.com/search/main?word={keyword}" \
              f"&org=0&fd=0&lm=0&od=0&ie=gbk&fr=top_home&pn={number}"
        url_list.append(url)
    return url_list


async def listpage_request(session, url):
    async with session.get(url=url, headers=headers, verify_ssl=False, timeout=10) as res:
        response_text = await res.text("gbk", errors="ignore")
        await deal_listpage(response_text)


async def deal_listpage(response_text):
    serch_html = etree.HTML(response_text)
    serch_list = serch_html.xpath("//div[@class='main']/div[@class='search-result']/dl")
    for li in serch_list:
        tag_str = etree.tostring(li, method='html').decode()
        tag_html = html.unescape(tag_str)
        tag_html.replace('<em>', '').replace('</em>', '')
        li = etree.HTML(tag_html)

        # title
        title = li.xpath(".//dt/p[1]/a/@title")[0]
        # url
        url = li.xpath(".//dt/p[1]/a/@href")[0]
        # 来源网站 source_web
        source_web = "百度文库"
        # 描述 describe
        describe_tag = li.xpath('.//div[@class="summary-box fl"]/p[@class="summary lh21"]')[0]
        describe_str = etree.tostring(describe_tag, method='html').decode("utf8", "ignore")
        describe_html_str = html.unescape(describe_str)
        describe = describe_html_str[24:-5].replace('<em>', '').replace('</em>', '')
        data = {
            "title": title,
            "source_web": source_web,
            "describe": describe,
            "details_url": url,
        }
        page_list_queue.put(data)


async def get_list_page(keyword):
    url_list = await get_list_url(keyword, 77)
    async with aiohttp.ClientSession(connector=ProxyConnector.from_url(proxy) if proxy else None) as session:
        tasks = [asyncio.create_task(listpage_request(session, url)) for url in url_list]
        await asyncio.wait(tasks)


def baidu_list_main(keyword):
    loop = asyncio.get_event_loop()
    loop.run_until_complete(get_list_page(keyword))
    data_list = []
    while page_list_queue.empty() is not True:
        data = page_list_queue.get()
        data_list.append(data)
    return {"keyword": keyword, "return_size": len(data_list), "return_list": data_list}


if __name__ == '__main__':
    start = time.time()
    print(baidu_list_main("的"))
    print(time.time() - start)

三、踩过的坑

代码完全没有问题(参考示例代码1),但会报RuntimeError: Event loop is closed

如果把close注释掉,就不会出现这个问题,大佬(感谢博文大佬,本次关于aiohttp得学习完全靠博文大佬得支持)告诉我在实际开发中大部分也是不用close的,如果close必须,参考:https://github.com/aio-libs/aiohttp/issues/4324#issuecomment-733884349

解决办法:(重写close中的方法)

from functools import wraps
from asyncio.proactor_events import _ProactorBasePipeTransport

def silence_event_loop_closed(func):
    @wraps(func)
    def wrapper(self, *args, **kwargs):
        try:
            return func(self, *args, **kwargs)
        except RuntimeError as e:
            if str(e) != 'Event loop is closed':
                raise
    return wrapper

_ProactorBasePipeTransport.__del__ = silence_event_loop_closed(_ProactorBasePipeTransport.__del__)
 类似资料: