Amazon Athena是一个交互式查询服务,可以直接在Amazon Simple Storage Service (Amazon S3)中使用SQL进行数据分析。这个工具可以通过AWS管理控制台,以及ODBC驱动或API访问。如果你在S3中存储了海量数据,而且你有SQL技能,直接使用Athena来快速分析海量数据集是非常理想的选择。
#Using AWS console
import boto3
import json
import time
#A session stores configuration state and allows you to create service clients and resources.
session = boto3.Session(profile_name='THE-NAME-OF-PROFILE')
athena = session.client('athena')
response = athena.start_query_execution(
QueryString='SELECT * FROM DATABASE.TABLE LIMIT 5',
QueryExecutionContext={
'Database': 'YOUR-S3-DATABASE-NAME'
},
ResultConfiguration={
'OutputLocation': 'YOUR-OUTPUT-S3-TABLE-NAME',
'EncryptionConfiguration': {
'EncryptionOption': 'SSE_S3'
}
}
)
while True:
try:
query_results = athena.get_query_results(
QueryExecutionId=response['QueryExecutionId']
)
break
except Exception as err:
if 'Query has not yet finished' in err.message:
time.sleep(3)
else:
raise(err)
print(json.dumps(query_results['ResultSet']['Rows'], indent=4, sort_keys=False))
#Using JDBC
import os
import configparser
import pyathenajdbc
# Get aws credentials
aws_config_file = '~/.aws/credentials'
Config = configparser.ConfigParser()
Config.read(os.path.expanduser(aws_config_file))
access_key_id = Config['CONFIG-FILE']['aws_access_key_id']
secret_key_id = Config['CONFIG-FILE']['aws_secret_access_key']
class PyAthenaLoader():
def connect(self):
self.conn = pyathenajdbc.connect(
s3_staging_dir="YOUR-OUTPUT-S3-TABLE-NAME",
access_key=access_key_id,
secret_key=secret_key_id,
region_name="REGION-NAME"
)
def query(self, req):
self.connect()
try:
with self.conn.cursor() as cursor:
cursor.execute(req)
res = cursor.fetchall()
except Exception as X:
return X
finally:
self.conn.close()
return res
athena = PyAthenaLoader()
print(athena.query('SELECT * FROM DATABASE.TABLE LIMIT 5;'))