当前位置: 首页 > 工具软件 > pyhs > 使用案例 >

python 连接hive_Python 中使用 pyhs2 连接 HiveServer 2 查询数据

冯阳成
2023-12-01

封装一下 pyhs2,让其使用起来和 torndb 一样简单好用。

提供 db.query、db.get、db.execute 三个方法,用法和 torndb 完全一样。

query 和 get 返回值数据类型也完全一样; 惟一不同的是 execute 方法,torndb 中会返回 lastrowid,本文中返回 None

# coding: u8

import pyhs2

from pyhs2.error import Pyhs2Exception

class Row(dict):

"""A dict that allows for object-like property access syntax."""

def __getattr__(self, name):

try:

return self[name]

except KeyError:

raise AttributeError(name)

class Connection(object):

def __init__(self, db_host, user, database, port=10000,

authMechanism="PLAIN"):

"""

create connection to hive server2

"""

self.conn = pyhs2.connect(host=db_host,

port=port,

authMechanism=authMechanism,

user=user,

database=database,

)

def query(self, sql):

"""Returns a row list for the given query and parameters."""

with self.conn.cursor() as cursor:

self._execute(cursor, sql)

column_names = [i['columnName'] for i in cursor.getSchema()]

return [Row(zip(column_names, row)) for row in cursor]

def _execute(self, cursor, sql):

try:

return cursor.execute(sql)

except Pyhs2Exception as e:

self.close()

raise(e)

def get(self, sql):

"""Returns the (singular) row returned by the given query.

If the query has no results, returns None. If it has

more than one result, raises an exception.

"""

rows = self.query(sql)

if not rows:

return None

elif len(rows) > 1:

raise Exception("Multiple rows returned for get() query")

else:

return rows[0]

def execute(self, sql):

"""Executes the given query, returning None."""

with self.conn.cursor() as cursor:

self._execute(cursor, sql)

def close(self):

if hasattr(self, 'conn'):

self.conn.close()

def __del__(self):

self.close()

def test_query(db):

sql = """

SELECT platform, dt

FROM mydb.pv_log

WHERE dt='20170827'

LIMIT 3

"""

rows = db.query(sql)

for row in rows:

print(row)

def test_get(db):

# get() should only return one row, else will raise an error

sql = """

SELECT platform, dt

FROM mydb.pv_log

WHERE dt='20170829'

LIMIT 1

"""

row = db.get(sql)

print(row)

# row is an dict-like object

print(row.dt)

print(row['dt'])

def test_execute(db):

# `UPDATE` or `DELETE` is DANGER!!!

# sql = """

# UPDATE xxx

# """

# db.execute(sql)

pass

def main():

host = '127.0.0.1'

db = Connection(db_host=host, port=10000, user='myuser',

database='mydb', authMechanism='PLAIN')

test_query(db)

test_get(db)

test_execute(db)

if __name__ == '__main__':

main()

 类似资料: