aws-lambda之异步实现文件的下载上传

公西国发
2023-12-01
import asyncio
import time
from concurrent.futures import ThreadPoolExecutor
import threading
from urllib import request
from urllib.parse import urlparse

import aiobotocore
from aiohttp.client_reqrep import ClientResponse
from asyncio.futures import Future
import aiohttp
import datetime

import boto3
import requests
bucket = 'bucket'
REGION_NAME = "us-west-2"
# REGION_NAME = "ap-northeast-1"
AWS_ACCESS_KEY_ID = "AWS_ACCESS_KEY_ID"
AWS_SECRET_ACCESS_KEY = "AWS_SECRET_ACCESS_KEY"


async def download(l, m, n, media,client):
        path = media['url']
        filepath = media['filename']
        urlobj = urlparse(path)
        host = urlobj.netloc
        target = filepath
        print(target)
        headers = {
            'Host': host,
            'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; WOW64; rv:43.0) Gecko/20100101 Firefox/43.0',
            'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
            'Accept-Language': 'zh-CN,zh;q=0.8,en-US;q=0.5,en;q=0.3',
            'Accept-Encoding': 'gzip, deflate',
            'Referer': urlobj.scheme + '://' + host,
            'Connection': 'keep-alive',
            'Cache-Control': 'max-age=0',
        }
        try:
            async with aiohttp.request(method='GET',url=path,headers=headers) as response:
                    # async with session.get(path) as response:

                    if response.status==200:
                        # 获取media对象 上传到s3
                        filetype = filepath.split('/')[0]
                        mediatype = {'image': 'image/jpeg', 'video': 'video/mpeg4'}
                        chunk = await response.content.read()
                        await  client.put_object(Body=chunk,Bucket=bucket, Key=target,
                                                    ACL='public-read', ContentType= mediatype[filetype])

                        print("{} {} {} completed>>>>>>>>>> {} ".format(l, m, n, target))

                    else:
                        print("{}, {}".format(response.status, target))
                    return await response.release()
        except Exception as e:
            print(e,path)

if __name__ == '__main__':

    path = ''
    rep = requests.get(path)
    body = rep.text
    msgstr = '[' + body.replace('}{', '},{') + ']'
    msgstr = msgstr.replace('false', 'False').replace('true', 'True')
    msglist = eval(msgstr)
    loop = asyncio.get_event_loop()
    session = aiobotocore.get_session(loop=loop)
    client = session.create_client('s3', region_name='us-west-2', aws_secret_access_key=AWS_SECRET_ACCESS_KEY,
                                         aws_access_key_id=AWS_ACCESS_KEY_ID)
    items = list(range(100))
    tasks= []
    print("加载异步任务")
    print(len(msglist))
    tm = lambda :datetime.datetime.now()
    start = tm()
    i =0
    for j in msglist:
        m = j['media_urls']
        for n in m:
            tasks.append(download(1,2,3,n,client))
        i+=1
        if i>100:
            break
    a= asyncio.gather(*tasks)
    print("开始执行任务")
    loop.run_until_complete(a)
    client.close()
    loop.close()
    print("start {} \n end {}".format(start,tm()))


原文链接:http://www.aisir.cn/2017/07/21/aws-lambda%E4%B9%8B%E5%BC%82%E6%AD%A5%E5%AE%9E%E7%8E%B0%E6%96%87%E4%BB%B6%E7%9A%84%E4%B8%8B%E8%BD%BD%E4%B8%8A%E4%BC%A0

  

转载于:https://www.cnblogs.com/aisir/p/7215977.html

 类似资料: