import asyncio
import time
from concurrent.futures import ThreadPoolExecutor
import threading
from urllib import request
from urllib.parse import urlparse
import aiobotocore
from aiohttp.client_reqrep import ClientResponse
from asyncio.futures import Future
import aiohttp
import datetime
import boto3
import requests
bucket = 'bucket'
REGION_NAME = "us-west-2"
# REGION_NAME = "ap-northeast-1"
AWS_ACCESS_KEY_ID = "AWS_ACCESS_KEY_ID"
AWS_SECRET_ACCESS_KEY = "AWS_SECRET_ACCESS_KEY"
async def download(l, m, n, media,client):
path = media['url']
filepath = media['filename']
urlobj = urlparse(path)
host = urlobj.netloc
target = filepath
print(target)
headers = {
'Host': host,
'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; WOW64; rv:43.0) Gecko/20100101 Firefox/43.0',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
'Accept-Language': 'zh-CN,zh;q=0.8,en-US;q=0.5,en;q=0.3',
'Accept-Encoding': 'gzip, deflate',
'Referer': urlobj.scheme + '://' + host,
'Connection': 'keep-alive',
'Cache-Control': 'max-age=0',
}
try:
async with aiohttp.request(method='GET',url=path,headers=headers) as response:
# async with session.get(path) as response:
if response.status==200:
# 获取media对象 上传到s3
filetype = filepath.split('/')[0]
mediatype = {'image': 'image/jpeg', 'video': 'video/mpeg4'}
chunk = await response.content.read()
await client.put_object(Body=chunk,Bucket=bucket, Key=target,
ACL='public-read', ContentType= mediatype[filetype])
print("{} {} {} completed>>>>>>>>>> {} ".format(l, m, n, target))
else:
print("{}, {}".format(response.status, target))
return await response.release()
except Exception as e:
print(e,path)
if __name__ == '__main__':
path = ''
rep = requests.get(path)
body = rep.text
msgstr = '[' + body.replace('}{', '},{') + ']'
msgstr = msgstr.replace('false', 'False').replace('true', 'True')
msglist = eval(msgstr)
loop = asyncio.get_event_loop()
session = aiobotocore.get_session(loop=loop)
client = session.create_client('s3', region_name='us-west-2', aws_secret_access_key=AWS_SECRET_ACCESS_KEY,
aws_access_key_id=AWS_ACCESS_KEY_ID)
items = list(range(100))
tasks= []
print("加载异步任务")
print(len(msglist))
tm = lambda :datetime.datetime.now()
start = tm()
i =0
for j in msglist:
m = j['media_urls']
for n in m:
tasks.append(download(1,2,3,n,client))
i+=1
if i>100:
break
a= asyncio.gather(*tasks)
print("开始执行任务")
loop.run_until_complete(a)
client.close()
loop.close()
print("start {} \n end {}".format(start,tm()))
原文链接:http://www.aisir.cn/2017/07/21/aws-lambda%E4%B9%8B%E5%BC%82%E6%AD%A5%E5%AE%9E%E7%8E%B0%E6%96%87%E4%BB%B6%E7%9A%84%E4%B8%8B%E8%BD%BD%E4%B8%8A%E4%BC%A0