PyAthena is a Python DB API 2.0 (PEP 249) client for Amazon Athena.
$ pip install PyAthena
Extra packages:
Package | Install command | Version |
---|---|---|
Pandas | pip install PyAthena[Pandas] |
>=1.0.0 |
SQLAlchemy | pip install PyAthena[SQLAlchemy] |
>=1.0.0, <2.0.0 |
from pyathena import connect
cursor = connect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/",
region_name="us-west-2").cursor()
cursor.execute("SELECT * FROM one_row")
print(cursor.description)
print(cursor.fetchall())
from pyathena import connect
cursor = connect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/",
region_name="us-west-2").cursor()
cursor.execute("SELECT * FROM many_rows LIMIT 10")
for row in cursor:
print(row)
Supported DB API paramstyle is only PyFormat
.PyFormat
only supports named placeholders with old %
operator style and parameters specify dictionary format.
from pyathena import connect
cursor = connect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/",
region_name="us-west-2").cursor()
cursor.execute("""
SELECT col_string FROM one_row_complex
WHERE col_string = %(param)s
""", {"param": "a string"})
print(cursor.fetchall())
if %
character is contained in your query, it must be escaped with %%
like the following:
SELECT col_string FROM one_row_complex
WHERE col_string = %(param)s OR col_string LIKE 'a%%'
Install SQLAlchemy with pip install "SQLAlchemy>=1.0.0, <2.0.0"
or pip install PyAthena[SQLAlchemy]
.Supported SQLAlchemy is 1.0.0 or higher and less than 2.0.0.
from urllib.parse import quote_plus # PY2: from urllib import quote_plus
from sqlalchemy.engine import create_engine
from sqlalchemy.sql.expression import select
from sqlalchemy.sql.functions import func
from sqlalchemy.sql.schema import Table, MetaData
conn_str = "awsathena+rest://{aws_access_key_id}:{aws_secret_access_key}@athena.{region_name}.amazonaws.com:443/"\
"{schema_name}?s3_staging_dir={s3_staging_dir}"
engine = create_engine(conn_str.format(
aws_access_key_id=quote_plus("YOUR_ACCESS_KEY_ID"),
aws_secret_access_key=quote_plus("YOUR_SECRET_ACCESS_KEY"),
region_name="us-west-2",
schema_name="default",
s3_staging_dir=quote_plus("s3://YOUR_S3_BUCKET/path/to/")))
many_rows = Table("many_rows", MetaData(bind=engine), autoload=True)
print(select([func.count("*")], from_obj=many_rows).scalar())
The connection string has the following format:
awsathena+rest://{aws_access_key_id}:{aws_secret_access_key}@athena.{region_name}.amazonaws.com:443/{schema_name}?s3_staging_dir={s3_staging_dir}&...
If you do not specify aws_access_key_id
and aws_secret_access_key
using instance profile or boto3 configuration file:
awsathena+rest://:@athena.{region_name}.amazonaws.com:443/{schema_name}?s3_staging_dir={s3_staging_dir}&...
NOTE: s3_staging_dir
requires quote. If aws_access_key_id
, aws_secret_access_key
and other parameter contain special characters, quote is also required.
You can use the pandas.read_sql_query to handle the query results as a DataFrame object.
from pyathena import connect
import pandas as pd
conn = connect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/",
region_name="us-west-2")
df = pd.read_sql_query("SELECT * FROM many_rows", conn)
print(df.head())
NOTE: Poor performance when using pandas.read_sql #222
The pyathena.pandas.util
package also has helper methods.
from pyathena import connect
from pyathena.pandas.util import as_pandas
cursor = connect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/",
region_name="us-west-2").cursor()
cursor.execute("SELECT * FROM many_rows")
df = as_pandas(cursor)
print(df.describe())
If you want to use the query results output to S3 directly, you can use PandasCursor.This cursor fetches query results faster than the default cursor. (See benchmark results.)
You can use pandas.DataFrame.to_sql to write records stored in DataFrame to Amazon Athena.pandas.DataFrame.to_sql uses SQLAlchemy, so you need to install it.
import pandas as pd
from urllib.parse import quote_plus
from sqlalchemy import create_engine
conn_str = "awsathena+rest://:@athena.{region_name}.amazonaws.com:443/"\
"{schema_name}?s3_staging_dir={s3_staging_dir}&s3_dir={s3_dir}&compression=snappy"
engine = create_engine(conn_str.format(
region_name="us-west-2",
schema_name="YOUR_SCHEMA",
s3_staging_dir=quote_plus("s3://YOUR_S3_BUCKET/path/to/"),
s3_dir=quote_plus("s3://YOUR_S3_BUCKET/path/to/")))
df = pd.DataFrame({"a": [1, 2, 3, 4, 5]})
df.to_sql("YOUR_TABLE", engine, schema="YOUR_SCHEMA", index=False, if_exists="replace", method="multi")
The location of the Amazon S3 table is specified by the s3_dir
parameter in the connection string.If s3_dir
is not specified, s3_staging_dir
parameter will be used. The following rules apply.
s3://{s3_dir or s3_staging_dir}/{schema}/{table}/
The data format only supports Parquet. The compression format is specified by the compression
parameter in the connection string.
The pyathena.pandas.util
package also has helper methods.
import pandas as pd
from pyathena import connect
from pyathena.pandas.util import to_sql
conn = connect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/",
region_name="us-west-2")
df = pd.DataFrame({"a": [1, 2, 3, 4, 5]})
to_sql(df, "YOUR_TABLE", conn, "s3://YOUR_S3_BUCKET/path/to/",
schema="YOUR_SCHEMA", index=False, if_exists="replace")
This helper method supports partitioning.
import pandas as pd
from datetime import date
from pyathena import connect
from pyathena.pandas.util import to_sql
conn = connect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/",
region_name="us-west-2")
df = pd.DataFrame({
"a": [1, 2, 3, 4, 5],
"dt": [
date(2020, 1, 1), date(2020, 1, 1), date(2020, 1, 1),
date(2020, 1, 2),
date(2020, 1, 3)
],
})
to_sql(df, "YOUR_TABLE", conn, "s3://YOUR_S3_BUCKET/path/to/",
schema="YOUR_SCHEMA", partitions=["dt"])
cursor = conn.cursor()
cursor.execute("SHOW PARTITIONS YOUR_TABLE")
print(cursor.fetchall())
Conversion to Parquet and upload to S3 use ThreadPoolExecutor by default.It is also possible to use ProcessPoolExecutor.
import pandas as pd
from concurrent.futures.process import ProcessPoolExecutor
from pyathena import connect
from pyathena.pandas.util import to_sql
conn = connect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/",
region_name="us-west-2")
df = pd.DataFrame({"a": [1, 2, 3, 4, 5]})
to_sql(df, "YOUR_TABLE", conn, "s3://YOUR_S3_BUCKET/path/to/",
schema="YOUR_SCHEMA", index=False, if_exists="replace",
chunksize=1, executor_class=ProcessPoolExecutor, max_workers=5)
DictCursor retrieve the query execution result as a dictionary type with column names and values.
You can use the DictCursor by specifying the cursor_class
with the connect method or connection object.
from pyathena import connect
from pyathena.cursor import DictCursor
cursor = connect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/",
region_name="us-west-2",
cursor_class=DictCursor).cursor()
from pyathena.connection import Connection
from pyathena.cursor import DictCursor
cursor = Connection(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/",
region_name="us-west-2",
cursor_class=DictCursor).cursor()
It can also be used by specifying the cursor class when calling the connection object's cursor method.
from pyathena import connect
from pyathena.cursor import DictCursor
cursor = connect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/",
region_name="us-west-2").cursor(DictCursor)
from pyathena.connection import Connection
from pyathena.cursor import DictCursor
cursor = Connection(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/",
region_name="us-west-2").cursor(DictCursor)
The basic usage is the same as the Cursor.
from pyathena.connection import Connection
from pyathena.cursor import DictCursor
cursor = Connection(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/",
region_name="us-west-2").cursor(DictCursor)
cursor.execute("SELECT * FROM many_rows LIMIT 10")
for row in cursor:
print(row["a"])
If you want to change the dictionary type (e.g., use OrderedDict), you can specify like the following.
from collections import OrderedDict
from pyathena import connect
from pyathena.cursor import DictCursor
cursor = connect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/",
region_name="us-west-2",
cursor_class=DictCursor).cursor(dict_type=OrderedDict)
from collections import OrderedDict
from pyathena import connect
from pyathena.cursor import DictCursor
cursor = connect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/",
region_name="us-west-2").cursor(cursor=DictCursor, dict_type=OrderedDict)
AsynchronousCursor is a simple implementation using the concurrent.futures package.This cursor does not follow the DB API 2.0 (PEP 249).
You can use the AsynchronousCursor by specifying the cursor_class
with the connect method or connection object.
from pyathena import connect
from pyathena.async_cursor import AsyncCursor
cursor = connect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/",
region_name="us-west-2",
cursor_class=AsyncCursor).cursor()
from pyathena.connection import Connection
from pyathena.async_cursor import AsyncCursor
cursor = Connection(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/",
region_name="us-west-2",
cursor_class=AsyncCursor).cursor()
It can also be used by specifying the cursor class when calling the connection object's cursor method.
from pyathena import connect
from pyathena.async_cursor import AsyncCursor
cursor = connect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/",
region_name="us-west-2").cursor(AsyncCursor)
from pyathena.connection import Connection
from pyathena.async_cursor import AsyncCursor
cursor = Connection(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/",
region_name="us-west-2").cursor(AsyncCursor)
The default number of workers is 5 or cpu number * 5.If you want to change the number of workers you can specify like the following.
from pyathena import connect
from pyathena.async_cursor import AsyncCursor
cursor = connect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/",
region_name="us-west-2",
cursor_class=AsyncCursor).cursor(max_workers=10)
The execute method of the AsynchronousCursor returns the tuple of the query ID and the future object.
from pyathena import connect
from pyathena.async_cursor import AsyncCursor
cursor = connect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/",
region_name="us-west-2",
cursor_class=AsyncCursor).cursor()
query_id, future = cursor.execute("SELECT * FROM many_rows")
The return value of the future object is an AthenaResultSet
object.This object has an interface that can fetch and iterate query results similar to synchronous cursors.It also has information on the result of query execution.
from pyathena import connect
from pyathena.async_cursor import AsyncCursor
cursor = connect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/",
region_name="us-west-2",
cursor_class=AsyncCursor).cursor()
query_id, future = cursor.execute("SELECT * FROM many_rows")
result_set = future.result()
print(result_set.state)
print(result_set.state_change_reason)
print(result_set.completion_date_time)
print(result_set.submission_date_time)
print(result_set.data_scanned_in_bytes)
print(result_set.engine_execution_time_in_millis)
print(result_set.query_queue_time_in_millis)
print(result_set.total_execution_time_in_millis)
print(result_set.query_planning_time_in_millis)
print(result_set.service_processing_time_in_millis)
print(result_set.output_location)
print(result_set.description)
for row in result_set:
print(row)
from pyathena import connect
from pyathena.async_cursor import AsyncCursor
cursor = connect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/",
region_name="us-west-2",
cursor_class=AsyncCursor).cursor()
query_id, future = cursor.execute("SELECT * FROM many_rows")
result_set = future.result()
print(result_set.fetchall())
A query ID is required to cancel a query with the AsynchronousCursor.
from pyathena import connect
from pyathena.async_cursor import AsyncCursor
cursor = connect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/",
region_name="us-west-2",
cursor_class=AsyncCursor).cursor()
query_id, future = cursor.execute("SELECT * FROM many_rows")
cursor.cancel(query_id)
NOTE: The cancel method of the future object does not cancel the query.
AsyncDIctCursor is an AsyncCursor that can retrieve the query execution resultas a dictionary type with column names and values.
You can use the DictCursor by specifying the cursor_class
with the connect method or connection object.
from pyathena import connect
from pyathena.async_cursor import AsyncDictCursor
cursor = connect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/",
region_name="us-west-2",
cursor_class=AsyncDictCursor).cursor()
from pyathena.connection import Connection
from pyathena.async_cursor import AsyncDictCursor
cursor = Connection(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/",
region_name="us-west-2",
cursor_class=AsyncDictCursor).cursor()
It can also be used by specifying the cursor class when calling the connection object's cursor method.
from pyathena import connect
from pyathena.async_cursor import AsyncDictCursor
cursor = connect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/",
region_name="us-west-2").cursor(AsyncDictCursor)
from pyathena.connection import Connection
from pyathena.async_cursor import AsyncDictCursor
cursor = Connection(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/",
region_name="us-west-2").cursor(AsyncDictCursor)
The basic usage is the same as the AsyncCursor.
from pyathena.connection import Connection
from pyathena.cursor import DictCursor
cursor = Connection(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/",
region_name="us-west-2").cursor(AsyncDictCursor)
query_id, future = cursor.execute("SELECT * FROM many_rows LIMIT 10")
result_set = future.result()
for row in result_set:
print(row["a"])
If you want to change the dictionary type (e.g., use OrderedDict), you can specify like the following.
from collections import OrderedDict
from pyathena import connect
from pyathena.async_cursor import AsyncDictCursor
cursor = connect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/",
region_name="us-west-2",
cursor_class=AsyncDictCursor).cursor(dict_type=OrderedDict)
from collections import OrderedDict
from pyathena import connect
from pyathena.async_cursor import AsyncDictCursor
cursor = connect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/",
region_name="us-west-2").cursor(cursor=AsyncDictCursor, dict_type=OrderedDict)
PandasCursor directly handles the CSV file of the query execution result output to S3.This cursor is to download the CSV file after executing the query, and then loaded into DataFrame object.Performance is better than fetching data with Cursor.
You can use the PandasCursor by specifying the cursor_class
with the connect method or connection object.
from pyathena import connect
from pyathena.pandas.cursor import PandasCursor
cursor = connect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/",
region_name="us-west-2",
cursor_class=PandasCursor).cursor()
from pyathena.connection import Connection
from pyathena.pandas.cursor import PandasCursor
cursor = Connection(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/",
region_name="us-west-2",
cursor_class=PandasCursor).cursor()
It can also be used by specifying the cursor class when calling the connection object's cursor method.
from pyathena import connect
from pyathena.pandas.cursor import PandasCursor
cursor = connect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/",
region_name="us-west-2").cursor(PandasCursor)
from pyathena.connection import Connection
from pyathena.pandas.cursor import PandasCursor
cursor = Connection(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/",
region_name="us-west-2").cursor(PandasCursor)
The as_pandas method returns a DataFrame object.
from pyathena import connect
from pyathena.pandas.cursor import PandasCursor
cursor = connect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/",
region_name="us-west-2",
cursor_class=PandasCursor).cursor()
df = cursor.execute("SELECT * FROM many_rows").as_pandas()
print(df.describe())
print(df.head())
Support fetch and iterate query results.
from pyathena import connect
from pyathena.pandas.cursor import PandasCursor
cursor = connect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/",
region_name="us-west-2",
cursor_class=PandasCursor).cursor()
cursor.execute("SELECT * FROM many_rows")
print(cursor.fetchone())
print(cursor.fetchmany())
print(cursor.fetchall())
from pyathena import connect
from pyathena.pandas.cursor import PandasCursor
cursor = connect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/",
region_name="us-west-2",
cursor_class=PandasCursor).cursor()
cursor.execute("SELECT * FROM many_rows")
for row in cursor:
print(row)
The DATE and TIMESTAMP of Athena's data type are returned as pandas.Timestamp type.
from pyathena import connect
from pyathena.pandas.cursor import PandasCursor
cursor = connect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/",
region_name="us-west-2",
cursor_class=PandasCursor).cursor()
cursor.execute("SELECT col_timestamp FROM one_row_complex")
print(type(cursor.fetchone()[0])) # <class 'pandas._libs.tslibs.timestamps.Timestamp'>
Execution information of the query can also be retrieved.
from pyathena import connect
from pyathena.pandas.cursor import PandasCursor
cursor = connect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/",
region_name="us-west-2",
cursor_class=PandasCursor).cursor()
cursor.execute("SELECT * FROM many_rows")
print(cursor.state)
print(cursor.state_change_reason)
print(cursor.completion_date_time)
print(cursor.submission_date_time)
print(cursor.data_scanned_in_bytes)
print(cursor.engine_execution_time_in_millis)
print(cursor.query_queue_time_in_millis)
print(cursor.total_execution_time_in_millis)
print(cursor.query_planning_time_in_millis)
print(cursor.service_processing_time_in_millis)
print(cursor.output_location)
If you want to customize the Dataframe object dtypes and converters, create a converter class like this:
from pyathena.converter import Converter
class CustomPandasTypeConverter(Converter):
def __init__(self):
super(CustomPandasTypeConverter, self).__init__(
mappings=None,
types={
"boolean": object,
"tinyint": float,
"smallint": float,
"integer": float,
"bigint": float,
"float": float,
"real": float,
"double": float,
"decimal": float,
"char": str,
"varchar": str,
"array": str,
"map": str,
"row": str,
"varbinary": str,
"json": str,
}
)
def convert(self, type_, value):
# Not used in PandasCursor.
pass
Specify the combination of converter functions in the mappings argument and the dtypes combination in the types argument.
Then you simply specify an instance of this class in the convertes argument when creating a connection or cursor.
from pyathena import connect
from pyathena.pandas.cursor import PandasCursor
cursor = connect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/",
region_name="us-west-2").cursor(PandasCursor, converter=CustomPandasTypeConverter())
from pyathena import connect
from pyathena.pandas.cursor import PandasCursor
cursor = connect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/",
region_name="us-west-2",
converter=CustomPandasTypeConverter()).cursor(PandasCursor)
If you want to change the NaN behavior of Pandas Dataframe,you can do so by using the keep_default_na
, na_values
and quoting
arguments of the cursor object's execute method.
from pyathena import connect
from pyathena.pandas.cursor import PandasCursor
cursor = connect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/",
region_name="us-west-2",
cursor_class=PandasCursor).cursor()
df = cursor.execute("SELECT * FROM many_rows",
keep_default_na=False,
na_values=[""]).as_pandas()
NOTE: PandasCursor handles the CSV file on memory. Pay attention to the memory capacity.
AsyncPandasCursor is an AsyncCursor that can handle Pandas DataFrame.This cursor directly handles the CSV of query results output to S3 in the same way as PandasCursor.
You can use the AsyncPandasCursor by specifying the cursor_class
with the connect method or connection object.
from pyathena import connect
from pyathena.pandas.async_cursor import AsyncPandasCursor
cursor = connect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/",
region_name="us-west-2",
cursor_class=AsyncPandasCursor).cursor()
from pyathena.connection import Connection
from pyathena.pandas.async_cursor import AsyncPandasCursor
cursor = Connection(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/",
region_name="us-west-2",
cursor_class=AsyncPandasCursor).cursor()
It can also be used by specifying the cursor class when calling the connection object's cursor method.
from pyathena import connect
from pyathena.pandas.async_cursor import AsyncPandasCursor
cursor = connect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/",
region_name="us-west-2").cursor(AsyncPandasCursor)
from pyathena.connection import Connection
from pyathena.pandas.async_cursor import AsyncPandasCursor
cursor = Connection(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/",
region_name="us-west-2").cursor(AsyncPandasCursor)
The default number of workers is 5 or cpu number * 5.If you want to change the number of workers you can specify like the following.
from pyathena import connect
from pyathena.pandas.async_cursor import AsyncPandasCursor
cursor = connect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/",
region_name="us-west-2",
cursor_class=AsyncPandasCursor).cursor(max_workers=10)
The execute method of the AsynchronousPandasCursor returns the tuple of the query ID and the future object.
from pyathena import connect
from pyathena.pandas.async_cursor import AsyncPandasCursor
cursor = connect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/",
region_name="us-west-2",
cursor_class=AsyncPandasCursor).cursor()
query_id, future = cursor.execute("SELECT * FROM many_rows")
The return value of the future object is an AthenaPandasResultSet
object.This object has an interface similar to AthenaResultSetObject
.
from pyathena import connect
from pyathena.pandas.async_cursor import AsyncPandasCursor
cursor = connect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/",
region_name="us-west-2",
cursor_class=AsyncPandasCursor).cursor()
query_id, future = cursor.execute("SELECT * FROM many_rows")
result_set = future.result()
print(result_set.state)
print(result_set.state_change_reason)
print(result_set.completion_date_time)
print(result_set.submission_date_time)
print(result_set.data_scanned_in_bytes)
print(result_set.engine_execution_time_in_millis)
print(result_set.query_queue_time_in_millis)
print(result_set.total_execution_time_in_millis)
print(result_set.query_planning_time_in_millis)
print(result_set.service_processing_time_in_millis)
print(result_set.output_location)
print(result_set.description)
for row in result_set:
print(row)
from pyathena import connect
from pyathena.pandas.async_cursor import AsyncPandasCursor
cursor = connect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/",
region_name="us-west-2",
cursor_class=AsyncPandasCursor).cursor()
query_id, future = cursor.execute("SELECT * FROM many_rows")
result_set = future.result()
print(result_set.fetchall())
This object also has an as_pandas method that returns a DataFrame object similar to the PandasCursor.
from pyathena import connect
from pyathena.pandas.async_cursor import AsyncPandasCursor
cursor = connect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/",
region_name="us-west-2",
cursor_class=AsyncPandasCursor).cursor()
query_id, future = cursor.execute("SELECT * FROM many_rows")
result_set = future.result()
df = result_set.as_pandas()
print(df.describe())
print(df.head())
The DATE and TIMESTAMP of Athena's data type are returned as pandas.Timestamp type.
from pyathena import connect
from pyathena.pandas.async_cursor import AsyncPandasCursor
cursor = connect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/",
region_name="us-west-2",
cursor_class=AsyncPandasCursor).cursor()
query_id, future = cursor.execute("SELECT col_timestamp FROM one_row_complex")
result_set = future.result()
print(type(result_set.fetchone()[0])) # <class 'pandas._libs.tslibs.timestamps.Timestamp'>
As with AsynchronousCursor, you need a query ID to cancel a query.
from pyathena import connect
from pyathena.pandas.async_cursor import AsyncPandasCursor
cursor = connect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/",
region_name="us-west-2",
cursor_class=AsyncPandasCursor).cursor()
query_id, future = cursor.execute("SELECT * FROM many_rows")
cursor.cancel(query_id)
You can attempt to re-use the results from a previously executed query to help save time and money in the cases where your underlying data isn't changing.Set the cache_size
or cache_expiration_time
parameter of cursor.execute()
to a number larger than 0 to enable caching.
from pyathena import connect
cursor = connect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/",
region_name="us-west-2").cursor()
cursor.execute("SELECT * FROM one_row") # run once
print(cursor.query_id)
cursor.execute("SELECT * FROM one_row", cache_size=10) # re-use earlier results
print(cursor.query_id) # You should expect to see the same Query ID
The unit of expiration_time
is seconds. To use the results of queries executed up to one hour ago, specify like the following.
from pyathena import connect
cursor = connect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/",
region_name="us-west-2").cursor()
cursor.execute("SELECT * FROM one_row", cache_expiration_time=3600) # Use queries executed within 1 hour as cache.
If cache_size
is not specified, the value of sys.maxsize
will be automatically set and all query results executed up to one hour ago will be checked.Therefore, it is recommended to specify cache_expiration_time
together with cache_size
like the following.
from pyathena import connect
cursor = connect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/",
region_name="us-west-2").cursor()
cursor.execute("SELECT * FROM one_row", cache_size=100, cache_expiration_time=3600) # Use the last 100 queries within 1 hour as cache.
Results will only be re-used if the query strings match exactly,and the query was a DML statement (the assumption being that you always want to re-run queries like CREATE TABLE
and DROP TABLE
).
The S3 staging directory is not checked, so it's possible that the location of the results is not in your provided s3_staging_dir
.
Support Boto3 credentials.
Additional environment variable:
$ export AWS_ATHENA_S3_STAGING_DIR=s3://YOUR_S3_BUCKET/path/to/
$ export AWS_ATHENA_WORK_GROUP=YOUR_WORK_GROUP
from pyathena import connect
cursor = connect(aws_access_key_id="YOUR_ACCESS_KEY_ID",
aws_secret_access_key="YOUR_SECRET_ACCESS_KEY",
s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/",
region_name="us-west-2").cursor()
from pyathena import connect
cursor = connect(aws_access_key_id="YOUR_ACCESS_KEY_ID",
aws_secret_access_key="YOUR_SECRET_ACCESS_KEY",
aws_session_token="YOUR_SESSION_TOKEN",
s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/",
region_name="us-west-2").cursor()
You will be prompted to enter the MFA code.The program execution will be blocked until the MFA code is entered.
from pyathena import connect
cursor = connect(duration_seconds=3600,
serial_number="arn:aws:iam::ACCOUNT_NUMBER_WITHOUT_HYPHENS:mfa/MFA_DEVICE_ID",
s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/",
region_name="us-west-2").cursor()
The shared credentials file has a default location of ~/.aws/credentials.
If you use the default profile, there is no need to specify credential information.
from pyathena import connect
cursor = connect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/",
region_name="us-west-2").cursor()
You can also specify a profile other than the default.
from pyathena import connect
cursor = connect(profile_name="YOUR_PROFILE_NAME",
s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/",
region_name="us-west-2").cursor()
from pyathena import connect
cursor = connect(role_arn="YOUR_ASSUME_ROLE_ARN",
role_session_name="PyAthena-session",
duration_seconds=3600,
s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/",
region_name="us-west-2").cursor()
You will be prompted to enter the MFA code.The program execution will be blocked until the MFA code is entered.
from pyathena import connect
cursor = connect(role_arn="YOUR_ASSUME_ROLE_ARN",
role_session_name="PyAthena-session",
duration_seconds=3600,
serial_number="arn:aws:iam::ACCOUNT_NUMBER_WITHOUT_HYPHENS:mfa/MFA_DEVICE_ID",
s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/",
region_name="us-west-2").cursor()
No need to specify credential information.
from pyathena import connect
cursor = connect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/",
region_name="us-west-2").cursor()
Depends on the following environment variables:
$ export AWS_ACCESS_KEY_ID=YOUR_ACCESS_KEY_ID
$ export AWS_SECRET_ACCESS_KEY=YOUR_SECRET_ACCESS_KEY
$ export AWS_DEFAULT_REGION=us-west-2
$ export AWS_ATHENA_S3_STAGING_DIR=s3://YOUR_S3_BUCKET/path/to/
And you need to create a workgroup named test-pyathena
with the Query result location
configuration.
$ pip install poetry
$ poetry install -v
$ poetry run scripts/test_data/upload_test_data.sh
$ poetry run pytest
$ poetry run scripts/test_data/delete_test_data.sh
$ pip install poetry
$ poetry install -v
$ poetry run scripts/test_data/upload_test_data.sh
$ pyenv local 3.9.1 3.8.2 3.7.2 3.6.8
$ poetry run tox
$ poetry run scripts/test_data/delete_test_data.sh
The code formatting uses black and isort.
$ make fmt
$ make chk
下面帖子的内容基于网上其它博友的帖子,原帖子的代码执行失败,估计与版本有关。在原帖子的基础上进行了修改。原帖子的链接为:aws之athena使用_weixin_43843414的博客-CSDN博客 # cat test1.py #coding:utf-8 #Using JDBC import os import configparser import pyathenajdbc # Get aws
什么是Amazon Athena Amazon Athena是一个交互式查询服务,可以直接在Amazon Simple Storage Service (Amazon S3)中使用SQL进行数据分析。这个工具可以通过AWS管理控制台,以及ODBC驱动或API访问。如果你在S3中存储了海量数据,而且你有SQL技能,直接使用Athena来快速分析海量数据集是非常理想的选择。 Athena的好处 轻松地