欢迎关注原创视频教程
https://edu.csdn.net/course/detail/36074
https://edu.csdn.net/course/detail/35475
公共模块(public_method.py),log模块大家自己处理下,我的就不放上去了
"""
定义公共方法
"""
import os
import io
import time
import json
import hashlib
import base64
import datetime
from urllib import parse
import qrcode
import xmltodict
from Crypto.PublicKey import RSA
from Crypto.Signature import PKCS1_v1_5
from Crypto.Hash import SHA256
from log import Lzlog
def rsa_verify_sign(data_str, sign, secret_key, key_is_file=False, need_import=False):
"""
:param data_str: 等待验签的数据
:param sign: 数据的签名
:param secret_key: 秘钥
:param key_is_file: 秘钥是否是一个文件目录
:param need_import: 是否需要预导入秘钥
:return:
"""
secret_key = get_secret(secret_key, key_is_file, need_import)
signer = PKCS1_v1_5.new(secret_key)
digest = SHA256.new()
digest.update(data_str.encode("utf8"))
if signer.verify(digest, base64.decodebytes(sign.encode("utf8"))):
return True
return False
def get_secret(secret_key, key_is_file=False, need_import=False):
"""
:param secret_key: 秘钥、或者秘钥路径
:param key_is_file: 标识秘钥是否是一个文件
:param need_import: 是否需要做导入处理
:return: 秘钥
"""
res = secret_key
if secret_key:
if key_is_file:
res = read_secret(secret_key)
else:
if need_import:
res = RSA.importKey(secret_key)
return res
def read_secret(secret_path, import_key=True):
"""
从文件加载秘钥
:param secret_path:
:param import_key: 是否需要导入秘钥
:return:
"""
with open(secret_path, "r") as fp:
return RSA.importKey(fp.read()) if import_key else fp.read()
def rsa_sign(unsigned_string, secret_key, key_is_file=False, need_import=False) -> str:
"""
RSA数字签名协议根据PKCS#1 v1.5
:param unsigned_string: 等待签名的字符串
:param secret_key: 秘钥
:param need_import: 如果为True, 则初始化秘钥
:param key_is_file: 为True 表示需要从文件读取
:return: 将签名base64编码
"""
if unsigned_string and secret_key:
if key_is_file:
secret_key = read_secret(secret_key)
else:
if need_import:
secret_key = RSA.importKey(secret_key)
# 创建一个签名对象
signer = PKCS1_v1_5.new(secret_key)
# 签名
signature = signer.sign(SHA256.new(unsigned_string))
# base64 编码,转换为unicode表示并移除回车
sign = base64.encodebytes(signature).decode("utf8").replace("\n", "")
return sign
return ''
def join_tuple_param(data: list, quote_plus=False) -> str:
"""
:param data: [(key, value), (key2, value2)]
:param quote_plus: 对于key、value中出现了&、=之类的符号会进行编码
:return: key=value&key2=value2
"""
if isinstance(data, list):
if quote_plus:
return "&".join("{0}={1}".format(k, parse.quote_plus(v)) for k, v in data)
return "&".join("{0}={1}".format(k, v) for k, v in data)
return ''
def join_tuple_param_alipay(data: list) -> str:
"""
:param data: [(key, value), (key2, value2)]
:return: key=value&key2=value2
"""
if isinstance(data, list):
return "{" + ",".join('"{0}":"{1}"'.format(k, v.replace('/', '\/')) for k, v in data) + "}"
return ''
def ordered_dict(data: dict) -> []:
"""
将字典排队
:param data: 字典
:return: [(key, value), (key2, value2)]
"""
if isinstance(data, dict):
complex_keys = []
for key, value in data.items():
if isinstance(value, dict):
complex_keys.append(key)
# 将字典类型的数据dump出来
for key in complex_keys:
# for k, v in data[key].items():
data[key] = json.dumps(data[key], separators=(',', ':'))
return sorted([(k, v) for k, v in data.items()])
return []
def generate_qr_code(url: str) -> str:
"""
创建一个二维码
:param url: 二维码url
:return: base64编码的图片
"""
try:
qr = qrcode.QRCode(
version=1,
error_correction=qrcode.constants.ERROR_CORRECT_L,
box_size=12,
border=0.1,
)
qr.make(fit=True)
qr.add_data(url)
img = qr.make_image(fill_color="white", back_color="#000000")
buf = io.BytesIO()
img.save(buf, format='PNG')
return 'data:image/png;base64,' + base64.b64encode(buf.getvalue()).decode()
except Exception as e:
Lzlog.error('二维码生成错误%s' % e)
return ''
def sign_md5(msg: bytes) -> str:
"""
将传递的字节数据签名
:param msg:
:return:
"""
if isinstance(msg, bytes):
m = hashlib.md5()
m.update(msg)
return m.hexdigest()
return ''
def xml_to_dict(xml_str: str) -> dict:
data_orderedD = xmltodict.parse(xml_str)
return json.loads(json.dumps(data_orderedD, indent=4))
def dicttoxml(dict_data: dict) -> str:
return xmltodict.unparse(dict_data, pretty=True, encoding='utf-8')
def now_datetime() -> str:
return str(datetime.datetime.fromtimestamp(int(time.time())))
def now_date() -> str:
return str(datetime.date.today())
def timestamp() -> int:
return int(time.time())
def long_timestamp() -> str:
return str(time.time()).replace(".", "")
class Dict(dict):
"""
使字典可以属性方式访问值
"""
def __getattr__(self, name):
return self.get(name)
def __setattr__(self, key, value):
self[key] = value
def catch_error(func):
"""
捕获函数运行异常装饰器
:param func:
:return:
"""
def inner(*args, **kwargs):
try:
return func(*args, **kwargs)
except Exception as e:
Lzlog.error(f"{func.__qualname__} 发生异常 {e}", exc_info=1)
return dict(code=-1, msg='接口内部发生异常', data=None)
return inner
def param_diff(param: [dict, list], _in: [dict, list]):
"""
检查 _in 是否包含了 param中的所有项
如果传递的是 dict 会将它们的key转换为一个集合,并使用集合的 difference 方法判断差集来实现
:param param:
:param _in:
:return:
"""
if isinstance(param, dict):
param = set(param.keys())
if isinstance(_in, dict):
_in = set(_in.keys())
return list(set(param).difference(set(_in)))
def get_res(code=0, msg='success', data=None) -> dict:
return dict(code=code, msg=msg, data=data)
def file_base_name(abs_path: str) -> str:
return os.path.basename(abs_path)
if __name__ == '__main__':
need =["name", "age", "hobby", "sex"]
args = {"name": 1, "age": 2, "hobby": 1, "sex": 1}
print(param_diff(need, args))
# print({1,2,3}.difference({1,2}) )
import requests
from config import Conf_Impl
from copy import deepcopy
from log import Lzlog
from public_mthod import timestamp, sign_md5, xml_to_dict, dicttoxml, generate_qr_code, ordered_dict, join_tuple_param, \
read_secret
class WechatPay:
def __init__(self):
self.notify_url = "http://127.0.0.1/notify"
self.key_secert = "T6m9iK73b0kn9g5v426MKfHQH7X8rKwb" # 开发者秘钥
self.app_id = "wx8397f8696b538317" # 应用ID
self.mch_id = "1473426802" # 商户ID
def create_pay(self, title, number, money):
try:
body = dict()
body["appid"] = self.app_id
body["mch_id"] = self.mch_id
body["nonce_str"] = str(timestamp()) # 一个随机的字符串
body["body"] = title # 商品的标题
body["out_trade_no"] = number # 商品的订单号,可以使用随机字符串生成,记得保存好
body["total_fee"] = money # 商品的价格,单位是分,支付的时候建议写 1,也就是一分
body["spbill_create_ip"] = "127.0.0.1"
body["notify_url"] = self.notify_url # 这个接口地址是你自己的,微信支付成功后会调用改接口通知你
body["trade_type"] = "NATIVE" # 代表订单预创建方式,也就是生成二维码支付
# 排序
unsigned_params = ordered_dict(body)
# 使用固定格式拼接
string_param = join_tuple_param(unsigned_params) + "&key=" + self.key_secert
# md5签名
sign = sign_md5(string_param.encode("utf-8"))
# 保存签名
body["sign"] = sign
title = dict(xml=body)
# 使用xml格式通信
request_data = dicttoxml(title)
response = requests.post(url="https://api.mch.weixin.qq.com/pay/unifiedorder", data=request_data.encode("utf-8"))
if response.status_code == 200:
# 解析为字典
res = xml_to_dict(response.text)
# 签名值拿出来,不参与排序
sign = res["xml"].pop("sign", None)
# 排序
unsigned_item = ordered_dict(deepcopy(res["xml"]))
unsinged_str = join_tuple_param(unsigned_item) + "&key=" + self.key_secert
# 对比签名,判断数据是否被修改过
if sign == sign_md5(unsinged_str.encode()).upper():
if res["xml"]["return_code"] == "SUCCESS" and res["xml"]["result_code"] == "SUCCESS":
# 二维码url
return generate_qr_code(res["xml"]["code_url"])
else:
Lzlog.error("订单创建信息有误:签名不正确")
return False
except Exception as e:
print("支付错误", e)
return False
def verify(self, body_xml):
"""
根据微信支付服务器返回信息,更新订单状态
:param body_xml:
:return:
"""
res_dict = xml_to_dict(body_xml)
sign = res_dict["xml"].pop("sign", None)
unsigner_item = ordered_dict(deepcopy(res_dict["xml"]))
unsigner_string = join_tuple_param(unsigner_item) + "&key=" + self.key_secert
order_no = res_dict["xml"]["out_trade_no"]
# 对微信返回的数据进行签名,然后判断签名是否一致
my_sign = sign_md5(unsigner_string.encode("utf-8")).upper()
True if sign == my_sign else False
wechat_pay = WechatPay()
# 创建订单
qr_code_url = wechat_pay.create_pay("MBP2016", "201621302139021", 12900 * 100)
# 这个方法自己写在微信回调的接口中进行验证
# wechat_pay.verify()
# _*_ coding=utf-8 _*_
from datetime import datetime
from urllib.parse import quote_plus, urlencode
from public_mthod import ordered_dict, join_tuple_param, rsa_sign, read_secret, generate_qr_code, rsa_verify_sign, join_tuple_param_alipay, Dict
from copy import deepcopy
from config import Conf_Impl
import json
from Crypto.Signature import PKCS1_v1_5
from Crypto.Hash import SHA256
from base64 import encodebytes, decodebytes
import requests
class AliPay(object):
"""
支付宝支付接口(PC端支付接口)
"""
def __init__(self, appid, app_private_key_path, alipay_public_key_path, app_notify_url, return_url):
self.appid = appid
self.app_key_path = app_private_key_path
self.alipy_key_path = alipay_public_key_path
self.app_notify_url = app_notify_url
self.return_url = return_url
def create_pay(self, subject, out_trade_no, total_amount, notify_url, redirect_url, **kwargs):
"""
创建订单
:param subject: 订单主题
:param out_trade_no: 订单号
:param total_amount: 订单金额,单位:元,精确位数:2,示例:6.66
:param notify_url: 支付成功通知地址
:param redirect_url: 支付成功后重定向定制
:param kwargs: 扩展信息
:return:
"""
body = dict(
app_id=self.appid, # 应用id
timestamp=datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
notify_url=notify_url,
return_url=redirect_url,
charset="utf-8",
sign_type="RSA2",
version="1.0",
# method="alipay.trade.page.pay",
method="alipay.trade.precreate",
biz_content=dict( # 订单基本内容
out_trade_no=out_trade_no,
qr_code_timeout_express="90m",
subject=subject,
total_amount=total_amount
# product_code="FAST_INSTANT_TRADE_PAY",
)
)
# 支持扩展字段
body["biz_content"].update(kwargs)
# 将传递的参数进行排队,接口对接要求
unsigned_items = ordered_dict(body)
# 在使用&符号将参赛连接起来
unsigned_string = join_tuple_param(unsigned_items)
# 签名
sign = rsa_sign(unsigned_string.encode("utf-8"), self.app_key_path, key_is_file=True)
# 在使用&符号将参赛连接起来
query_parma = join_tuple_param(unsigned_items, quote_plus=True)
# 拼接查询参数
signed_string = query_parma + "&sign=" + quote_plus(sign)
response = requests.get(alipay_conf.pay_url + signed_string)
if response.ok:
res_data = response.json()
main_data = res_data["alipay_trade_precreate_response"]
if main_data["code"] == "10000":
response_sign = res_data.pop("sign")
response_unsigned_items = ordered_dict(main_data)
# 这里这个拼接的我搞了好久,最后终于成功了。
response_unsigned_string = join_tuple_param_alipay(response_unsigned_items)
res = rsa_verify_sign(response_unsigned_string, response_sign, self.alipy_key_path, key_is_file=True)
if res_data["alipay_trade_precreate_response"]["code"] == "10000" and res:
return generate_qr_code(res_data["alipay_trade_precreate_response"]["qr_code"])
return None
def sign_data(self, data: dict):
data.pop("sign", None)
# 将传递的参数进行排队,接口对接要求
unsigned_items = ordered_dict(data)
# 在使用&符号将参赛连接起来
unsigned_string = join_tuple_param(unsigned_items)
# 将数据签名,返回是一个经过base64编码的字符串
sign = self.sign(unsigned_string.encode("utf-8"))
quoted_string = "&".join("{0}={1}".format(k, quote_plus(v)) for k, v in unsigned_items)
# 获得最终的订单信息字符串
signed_string = quoted_string + "&sign=" + quote_plus(sign)
print(signed_string)
return signed_string
@staticmethod
def ordered_data(data):
complex_keys = []
for key, value in data.items():
if isinstance(value, dict):
complex_keys.append(key)
# 将字典类型的数据dump出来
for key in complex_keys:
data[key] = json.dumps(data[key], separators=(',', ':'))
return sorted([(k, v) for k, v in data.items()])
def sign(self, unsigned_string):
# 开始计算签名
key = read_secret(self.app_key_path)
signer = PKCS1_v1_5.new(key)
signature = signer.sign(SHA256.new(unsigned_string))
# base64 编码,转换为unicode表示并移除回车
sign = encodebytes(signature).decode("utf8").replace("\n", "")
return sign
def _verify(self, raw_content, signature):
# 开始计算签名
key = self.alipay_public_key
signer = PKCS1_v1_5.new(key)
digest = SHA256.new()
digest.update(raw_content.encode("utf8"))
if signer.verify(digest, decodebytes(signature.encode("utf8"))):
return True
return False
def verify(self, data, signature):
if "sign_type" in data:
sign_type = data.pop("sign_type")
# 排序后的字符串
unsigned_items = self.ordered_data(data)
message = "&".join(u"{}={}".format(k, v) for k, v in unsigned_items)
return self._verify(message, signature)
# 应用的私钥参考格式
# private_key = """-----BEGIN RSA PRIVATE KEY-----
# MIIEowIBAAKCAQEAl0pU2tm6ieeK4N2AJmIhHg20FbX0ze7W3DBa+o+MpwdimqFdscqAOA+jyU9esdn4QQMB3+yVtzKsjO4UbpnNBGXE2/oHGzpAxCIgYP0QxCNF7ZYLbgs8kSrFzWRsp8JtyIQnDtuqCjYkLQGtewKsOoJIGg1qctjbGfPobNpwECCxLQ/Nns3nBuVg2NR6kXmI/qKV6qr3TMscA7bUPVk2HhRsAX9STOMHEKfEyK/lBxD52TfNnVL+U4YYFFyyiusrJpE5gAOPgc+XW4MpOBCIduic/+9prD/i4PTdv7N2XsQjgOkfdpprVJySc+HUcFMXpE+E0d71FZj+EIHnPZYBJQIDAQABAoIBAA7TyyMzyZNwbO0C6GdaoLJIV4j1L0vrh4VG+/Ook/lewOw0unENTqmv5rZ5H+fAXBNLDyj6D+ZHgh/ByaDZU/2FV9jTVVT2zZgrXA8FXlpKtrTFStN7KHF1xrMNj5SVepr3ULilELI1gjAjBPSUW3rUf+qFvBQLatXNUM8yTV9XHT5X8NyAx/RIyINTjSVIIjaq81uxwq5DpXPwOkPBskgA/J4ozTfdxlm+OKZ5/o4Pd/3ulmrkdwnJmmmHNIYWJgTwlFxUpEeF1DO5Erk+t+bVAM6j28H3F045FcB1HqvhKrjfjVSEIXy4CVyBt8TLQmGEkW91MtYk0RP1/b8l8yECgYEA+TRVy4UAmeAnJFGN9h0oI4QN8kjj0KiTx7Gl7xRQvoc9t7XhKevcH9Dvs3XCRj2fvMA9d/xh1BoaZ8xLc4qwqEWYXgWxswo7kzEFtFmgJKQ88dPZPZYO3+Cd9vZUxwil1hZkJwQD/B04z4/zVagHqYJg06LW06XjGqcMfIv56AkCgYEAm2p4b4efIQb83AfkAzSdn2YMP5DVhjeNtdOnL9o84JV5DlNz7zKiS9OlAB4sCOInECm8BkE19eHeoKcFfHz+SJCY1JYPWxXii4mTzGG1p7eIK69Ev7khxpDiZrSHpwvU8SVlmFmophrpPQeZq0gxosrOupbA70hs8Q1EZ5RTvz0CgYBMAR45mDL6u1a0yPrXGUVor5nhT0HMHd4UhaXqKPQHaA/2u84Ujw7v1TWGMmAyNBFH7AnTUnIz0lJDXheVAbOnXrJ92pa72F8bIVRwEPW6tyyvRMF4+w9GUKdc7vwkSArsJKAfFiZw+iidhXXdpgXQOSd25K9IlcuSEWjJg5eQoQKBgA50LXU56Lu6maOg/DysFQixBeyXfLQ50G0bnQ3fPxAn9pU1f6+8RsnEijDjnXbKCZYAO6NdRzZx5jGMtv7n5QI8qGoE9rKi62nMxrkYUTui3wApEby+6/w6l0O0AHWxrQEsWDF+DSg9knmBjnIWib85G1bRFGpsku0sLbNwYQWFAoGBAO6evUG81bFcf0smIxbqCBBeYbn3E9mWZnQ0UnG0hW3JyLtT/rOkYEDFa4Zdfns5xD7Blx7Eh2oQsafThbReVsPIrE/qEGjQE2g+ZUGLGxv2sdJKpTHbuqsJSMIFmNOKdhzV+BQG+I8YcOsJyqmb+nU9lwdeyWTq3kh1lXb73aBS
# -----END RSA PRIVATE KEY-----"""
# # alipay的公钥
# public_key = """-----BEGIN RSA PUBLIC KEY-----
# MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAnNf1Fo39VdghVcWv8RCb6MYdZyw8WP9vYGkTIFVVyFGMEJX2WM74+K6JOneCrB8k0DZBmDn9uphuWrmchUxeDRPJ3I0mV3k+ygC4lr/lZfc8thMiK8e/keH5fCLiNYEkNAYrRSpSxkeffWE3Yng0/IsQ9Z8kiut55wngDrv4hVgPUIiNNFP3JlHGTh94S9B9WnJzaKzvND2xsNAQW2MG+ndcULBsTssfMGiJSke58zJqKXpeqRDkuEQZIl0huSrcezM/MZgLU4OkDRvKzjjgc4TmT3ZzvAQqOdddbz8G0/ee381Ll4qBvFl8EzN4CeiueNophLLwG1i31czetuKFdQIDAQAB
# -----END RSA PUBLIC KEY-----"""
# 我这里使用的配置,大家需要手动将自己的参数填进去,建议key传递保存的路径
alipay_conf = Dict(
app_id="",
app_key_path="",
ali_key_path="",
notify_url="",
redirect_url=""
)
# 初始化
alipay_impl = AliPay(
appid=alipay_conf.app_id, # 支付宝沙箱里面的APPID
app_private_key_path=alipay_conf.app_key_path, # 支付宝公钥
alipay_public_key_path=alipay_conf.ali_key_path, # 应用私钥
app_notify_url=alipay_conf.notify_url, # 如果支付成功,支付宝会向这个地址发送POST请求(校验是否支付已经完成),此地址要能够在公网进行访问
return_url=alipay_conf.redirect_url, # 如果支付成功,重定向回到你的网站的地址。
)
# 下面参数按照注释填写上去
base64_qr_code = alipay_impl.create_pay(
subject="商品简单描述", # 商品简单描述
out_trade_no="商户订单号", # 商户订单号
total_amount="1", # 交易金额(单位: 元 保留俩位小数)
notify_url="notify_url", # 支付成功后通知服务器
redirect_url="使用二维码支付可不填" # 支付成功后跳转的页面
)
# 这是支付宝异步验签,也就是客户支付成功后,异步通知给你的
def verify(data: dict, sign):
try:
unsigned_item = ordered_dict(data)
unsigned_str = join_tuple_param(unsigned_item)
alipay_conf = Conf_Impl.get_alipy_config()
order_no = data["out_trade_no"].split("-")[0]
pay_success = rsa_verify_sign(unsigned_str, sign, alipay_conf.ali_key_path, key_is_file=True)
except Exception as e:
Lzlog.error("支付包异步验签过程出错: %s data: %s sign:%s" % (e, data, sign))
else:
Order.update_order(pay_success, order_no, "支付宝")