Flask—SQLAlchemy
App启动类 编写接口部分
# -*- coding: utf-8 -*-
# @Time : 2022/6/26 20:07
# @Author : ChowRunFa
# @File : app.py
# @Software: PyCharm
import datetime
from flask import Flask, jsonify, request, make_response
from flask_sqlalchemy import SQLAlchemy
from models import *
from server import MySQLConfig
app = Flask(__name__)
app.config.from_object(MySQLConfig)
with app.app_context():
db.init_app(app)
db.create_all()
@app.route("/house",methods=["GET"])
def getHouse():
houses = housing.query.all()
data = []
for house in houses:
house = house.__dict__
del house['_sa_instance_state']
data.append(house)
return result(200, {'house':data})
@app.route("/history",methods=["GET"])
def getHistory():
historires = history.query.all()
data = []
for hist in historires:
hist = hist.__dict__
del hist['_sa_instance_state']
data.append(hist)
return result(200, {'hist':data})
if __name__ == "__main__":
app.run(debug=True)
数据库表实体类
# -*- coding: utf-8 -*-
# @Time : 2022/6/26 16:22
# @Author : ChowRunFa
# @File : models.py
# @Software: PyCharm
from flask_sqlalchemy import SQLAlchemy
import simplejson as json#pip install simplejson 或者 pip install python-simplejson
db = SQLAlchemy()
def result(code=200,d={}):
data = dict()#object.__dict__
data['code'] = code
data['data'] = d
return json.dumps(data,ensure_ascii=False)
class housing(db.Model):
__tablename__ = "housing"
# __table_args__ = {'mysql_collate':'utf8_general_ci'}
name = db.Column(db.String(256),primary_key=True)
addr = db.Column(db.String(256))
price = db.Column(db.String(256))
type = db.Column(db.String(256))
area = db.Column(db.String(256))
feature = db.Column(db.String(256))
class history(db.Model):
__tablename__ = "history"
__table_args__ = {'mysql_collate':'utf8_general_ci'}
addr = db.Column(db.String(256),primary_key=True)
time = db.Column(db.String(256),primary_key=True)
price = db.Column(db.String(256))
rate = db.Column(db.String(256))
数据库配置类
# -*- coding: utf-8 -*-
# @Time : 2022/6/25 11:50
# @Author : ChowRunFa
# @File : server.py.py
# @Software: PyCharm
import pymysql
from dbConfig import *
import time
import logging
import traceback
import pymysql.cursors
#mysql settings
class MySQLConfig(object):
SQLALCHEMY_DATABASE_URI = "mysql+pymysql://{username}:{password}@{ipaddress}:{port}/{database}"\
.format(username="root",password="123456",ipaddress="127.0.0.1",port="3306",database="jx_db4")
SQLALCHEMY_TRACK_MODIFICATIONS = True#动态追踪修改设置
SQLALCHEMY_ECHO = True
# WHITE_NAME_LIST = [""]
class Connection(object):
"""A lightweight wrapper around PyMySQL.
"""
def __init__(self,max_idle_time=7 * 3600, connect_timeout=10,
time_zone="+0:00", charset = "utf8mb4", sql_mode="TRADITIONAL"):
self.host = MYSQL_HOST
self.database = MYSQL_DBNAME
self.max_idle_time = float(max_idle_time)
args = dict(use_unicode=True, charset=MYSQL_CHARSET,
database=MYSQL_DBNAME,
init_command=('SET time_zone = "%s"' % time_zone),
cursorclass=pymysql.cursors.DictCursor,
connect_timeout=connect_timeout, sql_mode=sql_mode)
args["user"] = MYSQL_USER
args["passwd"] = MYSQL_PASSWORD
# We accept a path to a MySQL socket file or a host(:port) string
args["host"] = MYSQL_HOST
args["port"] = MYSQL_PORT
self._db = None
self._db_args = args
self._last_use_time = time.time()
try:
self.reconnect()
except Exception:
logging.error("Cannot connect to MySQL on %s", self.host,
exc_info=True)
def _ensure_connected(self):
# Mysql by default closes client connections that are idle for
# 8 hours, but the client library does not report this fact until
# you try to perform a query and it fails. Protect against this
# case by preemptively closing and reopening the connection
# if it has been idle for too long (7 hours by default).
if (self._db is None or
(time.time() - self._last_use_time > self.max_idle_time)):
self.reconnect()
self._last_use_time = time.time()
def _cursor(self):
self._ensure_connected()
return self._db.cursor()
def __del__(self):
self.close()
def close(self):
"""Closes this database connection."""
if getattr(self, "_db", None) is not None:
self._db.close()
self._db = None
def reconnect(self):
"""Closes the existing database connection and re-opens it."""
self.close()
self._db = pymysql.connect(**self._db_args)
self._db.autocommit(True)
def query(self, query, *parameters, **kwparameters):
"""Returns a row list for the given query and parameters."""
cursor = self._cursor()
try:
cursor.execute(query, kwparameters or parameters)
result = cursor.fetchall()
return result
finally:
cursor.close()
def get(self, query, *parameters, **kwparameters):
"""Returns the (singular) row returned by the given query.
"""
cursor = self._cursor()
try:
cursor.execute(query, kwparameters or parameters)
return cursor.fetchone()
finally:
cursor.close()
def execute(self, query, *parameters, **kwparameters):
"""Executes the given query, returning the lastrowid from the query."""
cursor = self._cursor()
try:
cursor.execute(query, kwparameters or parameters)
return cursor.lastrowid
except Exception as e:
if e.args[0] == 1062:
pass
else:
traceback.print_exc()
raise e
finally:
cursor.close()
insert = execute
## =============== high level method for table ===================
def table_has(self, table_name, field, value):
if isinstance(value, str):
value = value.encode('utf8')
sql = 'SELECT %s FROM %s WHERE %s="%s"' % (
field,
table_name,
field,
value)
d = self.get(sql)
return d
def table_insert(self, table_name, item):
'''item is a dict : key is mysql table field'''
fields = list(item.keys())
values = list(item.values())
fieldstr = ','.join(fields)
valstr = ','.join(['%s'] * len(item))
for i in range(len(values)):
if isinstance(values[i], str):
values[i] = values[i].encode('utf8')
sql = 'INSERT INTO %s (%s) VALUES(%s)' % (table_name, fieldstr, valstr)
try:
last_id = self.execute(sql, *values)
return last_id
except Exception as e:
if e.args[0] == 1062:
# just skip duplicated item
pass
else:
traceback.print_exc()
print('sql:', sql)
print('item:')
for i in range(len(fields)):
vs = str(values[i])
if len(vs) > 300:
print(fields[i], ' : ', len(vs), type(values[i]))
else:
print(fields[i], ' : ', vs, type(values[i]))
raise e
def table_update(self, table_name, updates,
field_where, value_where):
'''updates is a dict of {field_update:value_update}'''
upsets = []
values = []
for k, v in updates.items():
s = '%s=%%s' % k
upsets.append(s)
values.append(v)
upsets = ','.join(upsets)
sql = 'UPDATE %s SET %s WHERE %s="%s"' % (
table_name,
upsets,
field_where, value_where,
)
self.execute(sql, *(values))