当前位置: 首页 > 工具软件 > PyAthena > 使用案例 >

如何通过Python来使用AWS Athena服务

徐旻
2023-12-01

什么是Amazon Athena

Amazon Athena是一个交互式查询服务,可以直接在Amazon Simple Storage Service (Amazon S3)中使用SQL进行数据分析。这个工具可以通过AWS管理控制台,以及ODBC驱动或API访问。如果你在S3中存储了海量数据,而且你有SQL技能,直接使用Athena来快速分析海量数据集是非常理想的选择。

Athena的好处

  • 轻松地进行临时分析和复杂的请求
  • 用户无需配置或管理ETL流水线等基础设施,就可以查询数据。
  • 性能高

Athena的困难

  • 在Athena的查询执行时间可能会有很大的变化,对于15GB的数据,从60秒到2500秒不等。
  • 需要花了一点时间来处理我们做的一些小的日志格式变化。

Athena接口

  • AWS web控制台:SDK通过提供包括Amazon S3、EC2等许多AWS服务的API,帮助降低了编码的复杂性。例如,BOTO3 for Python。
  • JDBC 驱动程序:JDBC 软件开发者工具包 (SDK),提供了用于故障排除和调试 JDBC 应用程序的工具。

在Python中使用Athena

#Using AWS console
import boto3
import json
import time
#A session stores configuration state and allows you to create service clients and resources.
session = boto3.Session(profile_name='THE-NAME-OF-PROFILE')
athena = session.client('athena')

response = athena.start_query_execution(
    QueryString='SELECT * FROM DATABASE.TABLE LIMIT 5',
    QueryExecutionContext={
        'Database': 'YOUR-S3-DATABASE-NAME'
    },
    ResultConfiguration={
        'OutputLocation': 'YOUR-OUTPUT-S3-TABLE-NAME',
        'EncryptionConfiguration': {
            'EncryptionOption': 'SSE_S3'
        }
    }
)

while True:
    try:
        query_results = athena.get_query_results(
            QueryExecutionId=response['QueryExecutionId']
        )
        break
    except Exception as err:
        if 'Query has not yet finished' in err.message:
            time.sleep(3)
        else:
            raise(err)

print(json.dumps(query_results['ResultSet']['Rows'], indent=4, sort_keys=False))

#Using JDBC
import os
import configparser
import pyathenajdbc

# Get aws credentials
aws_config_file = '~/.aws/credentials'

Config = configparser.ConfigParser()
Config.read(os.path.expanduser(aws_config_file))

access_key_id = Config['CONFIG-FILE']['aws_access_key_id']
secret_key_id = Config['CONFIG-FILE']['aws_secret_access_key']

class PyAthenaLoader():

    def connect(self):
        self.conn = pyathenajdbc.connect(
            s3_staging_dir="YOUR-OUTPUT-S3-TABLE-NAME",
            access_key=access_key_id,
            secret_key=secret_key_id,
            region_name="REGION-NAME"
        )

    def query(self, req):
        self.connect()

        try:
            with self.conn.cursor() as cursor:
                cursor.execute(req)
                res = cursor.fetchall()
        except Exception as X:
            return X
        finally:
            self.conn.close()
        return res

athena = PyAthenaLoader()
print(athena.query('SELECT * FROM DATABASE.TABLE LIMIT 5;'))

 类似资料: