基于Python3.5 的asyncio 的 异步爬虫案例:
我们会实现以下功能:
1: 单进程 实现并发 异步爬虫
2: 解决并发过多报错 too many file descriptors in select
3: 实现异步 master-worker 主从模式
以下是具体代码实现:
1:
#coding:utf-8 import time,asyncio,aiohttp from threading import Thread async def do_some_work(n): # print ('start --------url',n) async with aiohttp.ClientSession() as session: async with session.get('https://www.baidu.com/') as resp: return await resp.text() def callable(fuction): print (fuction.result()) now =lambda :time.time() start = now() croutine1=do_some_work(1) tasks=[] for n in range(500): task=asyncio.ensure_future(do_some_work(n)) task.add_done_callback(callable) tasks.append(task) loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.wait(tasks)) print (now()-start)
以上 是爬虫的并发异步实现。 我的笔记本 windo环境下最多能支持500 个并发数量, 耗时在5s 到10 s 之间。
再多就 会报错 too many file descriptors in select。
2: 解决 too many file descriptors in select:
这个报错的原因是因为 Python 调取的 select 对打开的文件字符有最大长度限制。 在此我们需要限制 并发数量。 一次不要塞那么多任务,或者限制最大并发数量。这个仅供参考 。 数据少的时候正常 多了就不正常了。(建议还是使用限制loop的数量比较好)
#coding:utf-8 import time,asyncio,aiohttp from threading import Thread sem = asyncio.Semaphore(500) async def localserver(semaphore): async with semaphore: async with aiohttp.ClientSession() as session: async with session.get('https://www.baidu.com/') as resp: print('hello') # await asyncio.sleep(3) # 模拟网络延迟 now=lambda :time.time() start=now() async def coro(): semaphore = asyncio.Semaphore(10) # 限制并发量为5 to_get = [localserver(semaphore) for _ in range(100)] # 同时建立20个协程 await asyncio.wait(to_get) # 等待所有协程结束 loop = asyncio.get_event_loop() loop.run_until_complete(coro()) loop.close() print (now()-start)
3: 主从模式 开启两个进程。 一个进从redis 取要执行的url, 塞进另一个进程的loop里面执行:
这个也不能无限并发,和2面临的问题一样,打开的数据过多就报错了。 还有就是 不如1的速度快,1 的速度在我笔记本上单进程能达到 每分钟4k的访问量。 3 则只能达到 2k
import time,asyncio,aiohttp from threading import Thread FLAG = True re=[] flag=0 async def fetch(url,n): print ('--------',n) async with aiohttp.ClientSession() as session: async with session.get(url) as resp: pass # re.append(n) # a = await resp.text() print ('end ',n) return n def start_loop(loop): asyncio.set_event_loop(loop) loop.run_forever() now =lambda :time.time() start = now() new_loop = asyncio.new_event_loop() t = Thread(target=start_loop, args=(new_loop,)) t.setDaemon(True) t.start() try: n=0 while FLAG: if n<500: url = 'https://www.baidu.com/' asyncio.run_coroutine_threadsafe(fetch(url,n), new_loop) # asyncio.run_coroutine_threadsafe(do_some_work(n), new_loop) n += 1 continue print ('-------------------------', now() - start) except KeyboardInterrupt as e: print ('-------------------------',now()-start) print(asyncio.Task.all_tasks())