Python Web Flask—SQLAlchemy

徐新荣
2023-12-01

Flask—SQLAlchemy

App启动类 编写接口部分

# -*- coding: utf-8 -*-
# @Time    : 2022/6/26 20:07
# @Author  : ChowRunFa
# @File    : app.py
# @Software: PyCharm
import datetime
from flask import Flask, jsonify, request, make_response
from flask_sqlalchemy import SQLAlchemy

from models import *
from server import MySQLConfig

app = Flask(__name__)
app.config.from_object(MySQLConfig)
with app.app_context():
	db.init_app(app)
	db.create_all()

@app.route("/house",methods=["GET"])
def getHouse():
		houses = housing.query.all()
		data = []
		for house in houses:
			house = house.__dict__
			del house['_sa_instance_state']
			data.append(house)
		return result(200, {'house':data})

@app.route("/history",methods=["GET"])
def getHistory():
		historires = history.query.all()
		data = []
		for hist in historires:
			hist = hist.__dict__
			del hist['_sa_instance_state']
			data.append(hist)
		return result(200, {'hist':data})


if __name__ == "__main__":
	app.run(debug=True)

数据库表实体类

# -*- coding: utf-8 -*-
# @Time    : 2022/6/26 16:22
# @Author  : ChowRunFa
# @File    : models.py
# @Software: PyCharm
from flask_sqlalchemy import SQLAlchemy
import simplejson as json#pip install simplejson 或者 pip install python-simplejson

db = SQLAlchemy()

def result(code=200,d={}):
	data = dict()#object.__dict__
	data['code'] = code
	data['data'] = d
	return json.dumps(data,ensure_ascii=False)


class housing(db.Model):
	__tablename__ = "housing"
	# __table_args__ = {'mysql_collate':'utf8_general_ci'}

	name = db.Column(db.String(256),primary_key=True)
	addr = db.Column(db.String(256))
	price = db.Column(db.String(256))
	type = db.Column(db.String(256))
	area = db.Column(db.String(256))
	feature = db.Column(db.String(256))


class history(db.Model):
	__tablename__ = "history"
	__table_args__ = {'mysql_collate':'utf8_general_ci'}

	addr = db.Column(db.String(256),primary_key=True)
	time = db.Column(db.String(256),primary_key=True)
	price = db.Column(db.String(256))
	rate = db.Column(db.String(256))

数据库配置类

# -*- coding: utf-8 -*-
# @Time    : 2022/6/25 11:50
# @Author  : ChowRunFa
# @File    : server.py.py
# @Software: PyCharm
import pymysql
from dbConfig import *
import time
import logging
import traceback
import pymysql.cursors

#mysql settings
class MySQLConfig(object):

	SQLALCHEMY_DATABASE_URI = "mysql+pymysql://{username}:{password}@{ipaddress}:{port}/{database}"\
        .format(username="root",password="123456",ipaddress="127.0.0.1",port="3306",database="jx_db4")
	SQLALCHEMY_TRACK_MODIFICATIONS = True#动态追踪修改设置
	SQLALCHEMY_ECHO = True

# WHITE_NAME_LIST = [""]


class Connection(object):
    """A lightweight wrapper around PyMySQL.
    """
    def __init__(self,max_idle_time=7 * 3600, connect_timeout=10,
                 time_zone="+0:00", charset = "utf8mb4", sql_mode="TRADITIONAL"):
        self.host = MYSQL_HOST
        self.database = MYSQL_DBNAME
        self.max_idle_time = float(max_idle_time)

        args = dict(use_unicode=True, charset=MYSQL_CHARSET,
                    database=MYSQL_DBNAME,
                    init_command=('SET time_zone = "%s"' % time_zone),
                    cursorclass=pymysql.cursors.DictCursor,
                    connect_timeout=connect_timeout, sql_mode=sql_mode)
        args["user"] = MYSQL_USER
        args["passwd"] = MYSQL_PASSWORD
        # We accept a path to a MySQL socket file or a host(:port) string
        args["host"] = MYSQL_HOST
        args["port"] = MYSQL_PORT
        self._db = None
        self._db_args = args
        self._last_use_time = time.time()
        try:
            self.reconnect()
        except Exception:
            logging.error("Cannot connect to MySQL on %s", self.host,
                          exc_info=True)
    def _ensure_connected(self):
        # Mysql by default closes client connections that are idle for
        # 8 hours, but the client library does not report this fact until
        # you try to perform a query and it fails.  Protect against this
        # case by preemptively closing and reopening the connection
        # if it has been idle for too long (7 hours by default).
        if (self._db is None or
            (time.time() - self._last_use_time > self.max_idle_time)):
            self.reconnect()
        self._last_use_time = time.time()

    def _cursor(self):
        self._ensure_connected()
        return self._db.cursor()

    def __del__(self):
        self.close()

    def close(self):
        """Closes this database connection."""
        if getattr(self, "_db", None) is not None:
            self._db.close()
            self._db = None

    def reconnect(self):
        """Closes the existing database connection and re-opens it."""
        self.close()
        self._db = pymysql.connect(**self._db_args)
        self._db.autocommit(True)

    def query(self, query, *parameters, **kwparameters):
        """Returns a row list for the given query and parameters."""
        cursor = self._cursor()
        try:
            cursor.execute(query, kwparameters or parameters)
            result = cursor.fetchall()
            return result
        finally:
            cursor.close()

    def get(self, query, *parameters, **kwparameters):
        """Returns the (singular) row returned by the given query.
        """
        cursor = self._cursor()
        try:
            cursor.execute(query, kwparameters or parameters)
            return cursor.fetchone()
        finally:
            cursor.close()

    def execute(self, query, *parameters, **kwparameters):
        """Executes the given query, returning the lastrowid from the query."""
        cursor = self._cursor()
        try:
            cursor.execute(query, kwparameters or parameters)
            return cursor.lastrowid
        except Exception as e:
            if e.args[0] == 1062:
                pass
            else:
                traceback.print_exc()
                raise e
        finally:
            cursor.close()

    insert = execute

    ## =============== high level method for table ===================

    def table_has(self, table_name, field, value):
        if isinstance(value, str):
            value = value.encode('utf8')
        sql = 'SELECT %s FROM %s WHERE %s="%s"' % (
            field,
            table_name,
            field,
            value)
        d = self.get(sql)
        return d

    def table_insert(self, table_name, item):
        '''item is a dict : key is mysql table field'''
        fields = list(item.keys())
        values = list(item.values())
        fieldstr = ','.join(fields)
        valstr = ','.join(['%s'] * len(item))
        for i in range(len(values)):
            if isinstance(values[i], str):
                values[i] = values[i].encode('utf8')
        sql = 'INSERT INTO %s (%s) VALUES(%s)' % (table_name, fieldstr, valstr)
        try:
            last_id = self.execute(sql, *values)
            return last_id
        except Exception as e:
            if e.args[0] == 1062:
                # just skip duplicated item
                pass
            else:
                traceback.print_exc()
                print('sql:', sql)
                print('item:')
                for i in range(len(fields)):
                    vs = str(values[i])
                    if len(vs) > 300:
                        print(fields[i], ' : ', len(vs), type(values[i]))
                    else:
                        print(fields[i], ' : ', vs, type(values[i]))
                raise e

    def table_update(self, table_name, updates,
                     field_where, value_where):
        '''updates is a dict of {field_update:value_update}'''
        upsets = []
        values = []
        for k, v in updates.items():
            s = '%s=%%s' % k
            upsets.append(s)
            values.append(v)
        upsets = ','.join(upsets)
        sql = 'UPDATE %s SET %s WHERE %s="%s"' % (
            table_name,
            upsets,
            field_where, value_where,
        )
        self.execute(sql, *(values))
 类似资料: