当前位置: 首页 > 工具软件 > asyncio > 使用案例 >

asyncio 异步爬虫

潘宝
2023-12-01

基于Python3.5 的asyncio 的 异步爬虫案例:

我们会实现以下功能:

    1: 单进程 实现并发 异步爬虫

    2: 解决并发过多报错  too many file descriptors in select

    3: 实现异步 master-worker 主从模式

以下是具体代码实现:

1:

    

#coding:utf-8
import time,asyncio,aiohttp
from threading import Thread

async def do_some_work(n):
    # print ('start --------url',n)
    async with aiohttp.ClientSession() as session:
        async with session.get('https://www.baidu.com/') as resp:
            return await resp.text()

def callable(fuction):
    print (fuction.result())

now =lambda :time.time()
start = now()
croutine1=do_some_work(1)
tasks=[]
for n in range(500):
    task=asyncio.ensure_future(do_some_work(n))
    task.add_done_callback(callable)
    tasks.append(task)

loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
print (now()-start)

以上  是爬虫的并发异步实现。    我的笔记本 windo环境下最多能支持500 个并发数量,  耗时在5s 到10 s 之间。 

再多就 会报错  too many file descriptors in select。

2:  解决 too many file descriptors in select:

     这个报错的原因是因为 Python 调取的 select 对打开的文件字符有最大长度限制。  在此我们需要限制 并发数量。 一次不要塞那么多任务,或者限制最大并发数量。这个仅供参考  。 数据少的时候正常  多了就不正常了。(建议还是使用限制loop的数量比较好)

    

#coding:utf-8
import time,asyncio,aiohttp
from threading import Thread

sem = asyncio.Semaphore(500)

async def localserver(semaphore):
    async with semaphore:
        async with aiohttp.ClientSession() as session:
            async with session.get('https://www.baidu.com/') as resp:
                print('hello')
            # await asyncio.sleep(3) # 模拟网络延迟


now=lambda :time.time()
start=now()
async def coro():
    semaphore = asyncio.Semaphore(10) # 限制并发量为5
    to_get = [localserver(semaphore) for _ in range(100)] # 同时建立20个协程
    await asyncio.wait(to_get) # 等待所有协程结束

loop = asyncio.get_event_loop()
loop.run_until_complete(coro())
loop.close()
print (now()-start)

3:  主从模式  开启两个进程。 一个进从redis 取要执行的url, 塞进另一个进程的loop里面执行:

这个也不能无限并发,和2面临的问题一样,打开的数据过多就报错了。  还有就是 不如1的速度快,1 的速度在我笔记本上单进程能达到 每分钟4k的访问量。   3 则只能达到 2k

   

import time,asyncio,aiohttp
from threading import Thread
FLAG = True
re=[]
flag=0
async def fetch(url,n):
    print ('--------',n)
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as resp:
            pass
            # re.append(n)
            # a = await resp.text()
    print ('end ',n)
    return n

def start_loop(loop):
    asyncio.set_event_loop(loop)
    loop.run_forever()
now =lambda :time.time()
start = now()
new_loop = asyncio.new_event_loop()
t = Thread(target=start_loop, args=(new_loop,))
t.setDaemon(True)
t.start()
try:
    n=0
    while FLAG:
        if n<500:
            url = 'https://www.baidu.com/'
            asyncio.run_coroutine_threadsafe(fetch(url,n), new_loop)
            # asyncio.run_coroutine_threadsafe(do_some_work(n), new_loop)
            n += 1
        continue
    print ('-------------------------', now() - start)
except KeyboardInterrupt as e:
    print ('-------------------------',now()-start)
    print(asyncio.Task.all_tasks())







 类似资料: