公共模块(public_method.py),log模块大家自己处理下,我的就不放上去了""" 定义公共方法 """ import os import io import time import json import hashlib import base64 import datetime from urllib import parse import qrcode import xmltodict from Crypto.PublicKey import RSA from Crypto.Signature import PKCS1_v1_5 from Crypto.Hash import SHA256 from log import Lzlog def rsa_verify_sign(data_str, sign, secret_key, key_is_file=False, need_import=False): """ :param data_str: 等待验签的数据 :param sign: 数据的签名 :param secret_key: 秘钥 :param key_is_file: 秘钥是否是一个文件目录 :param need_import: 是否需要预导入秘钥 :return: """ secret_key = get_secret(secret_key, key_is_file, need_import) signer = PKCS1_v1_5.new(secret_key) digest = SHA256.new() digest.update(data_str.encode("utf8")) if signer.verify(digest, base64.decodebytes(sign.encode("utf8"))): return True return False def get_secret(secret_key, key_is_file=False, need_import=False): """ :param secret_key: 秘钥、或者秘钥路径 :param key_is_file: 标识秘钥是否是一个文件 :param need_import: 是否需要做导入处理 :return: 秘钥 """ res = secret_key if secret_key: if key_is_file: res = read_secret(secret_key) else: if need_import: res = RSA.importKey(secret_key) return res def read_secret(secret_path, import_key=True): """ 从文件加载秘钥 :param secret_path: :param import_key: 是否需要导入秘钥 :return: """ with open(secret_path, "r") as fp: return RSA.importKey(fp.read()) if import_key else fp.read() def rsa_sign(unsigned_string, secret_key, key_is_file=False, need_import=False) -> str: """ RSA数字签名协议根据PKCS#1 v1.5 :param unsigned_string: 等待签名的字符串 :param secret_key: 秘钥 :param need_import: 如果为True, 则初始化秘钥 :param key_is_file: 为True 表示需要从文件读取 :return: 将签名base64编码 """ if unsigned_string and secret_key: if key_is_file: secret_key = read_secret(secret_key) else: if need_import: secret_key = RSA.importKey(secret_key) # 创建一个签名对象 signer = PKCS1_v1_5.new(secret_key) # 签名 signature = signer.sign(SHA256.new(unsigned_string)) # base64 编码,转换为unicode表示并移除回车 sign = base64.encodebytes(signature).decode("utf8").replace("\n", "") return sign return '' def join_tuple_param(data: list, quote_plus=False) -> str: """ :param data: [(key, value), (key2, value2)] :param quote_plus: 对于key、value中出现了&、=之类的符号会进行编码 :return: key=value&key2=value2 """ if isinstance(data, list): if quote_plus: return "&".join("{0}={1}".format(k, parse.quote_plus(v)) for k, v in data) return "&".join("{0}={1}".format(k, v) for k, v in data) return '' def join_tuple_param_alipay(data: list) -> str: """ :param data: [(key, value), (key2, value2)] :return: key=value&key2=value2 """ if isinstance(data, list): return "{" + ",".join('"{0}":"{1}"'.format(k, v.replace('/', '\/')) for k, v in data) + "}" return '' def ordered_dict(data: dict) -> []: """ 将字典排队 :param data: 字典 :return: [(key, value), (key2, value2)] """ if isinstance(data, dict): complex_keys = [] for key, value in data.items(): if isinstance(value, dict): complex_keys.append(key) # 将字典类型的数据dump出来 for key in complex_keys: # for k, v in data[key].items(): data[key] = json.dumps(data[key], separators=(',', ':')) return sorted([(k, v) for k, v in data.items()]) return [] def generate_qr_code(url: str) -> str: """ 创建一个二维码 :param url: 二维码url :return: base64编码的图片 """ try: qr = qrcode.QRCode( version=1, error_correction=qrcode.constants.ERROR_CORRECT_L, box_size=12, border=0.1, ) qr.make(fit=True) qr.add_data(url) img = qr.make_image(fill_color="white", back_color="#000000") buf = io.BytesIO() img.save(buf, format='PNG') return 'data:image/png;base64,' + base64.b64encode(buf.getvalue()).decode() except Exception as e: Lzlog.error('二维码生成错误%s' % e) return '' def sign_md5(msg: bytes) -> str: """ 将传递的字节数据签名 :param msg: :return: """ if isinstance(msg, bytes): m = hashlib.md5() m.update(msg) return m.hexdigest() return '' def xml_to_dict(xml_str: str) -> dict: data_orderedD = xmltodict.parse(xml_str) return json.loads(json.dumps(data_orderedD, indent=4)) def dicttoxml(dict_data: dict) -> str: return xmltodict.unparse(dict_data, pretty=True, encoding='utf-8') def now_datetime() -> str: return str(datetime.datetime.fromtimestamp(int(time.time()))) def now_date() -> str: return str(datetime.date.today()) def timestamp() -> int: return int(time.time()) def long_timestamp() -> str: return str(time.time()).replace(".", "") class Dict(dict): """ 使字典可以属性方式访问值 """ def __getattr__(self, name): return self.get(name) def __setattr__(self, key, value): self[key] = value def catch_error(func): """ 捕获函数运行异常装饰器 :param func: :return: """ def inner(*args, **kwargs): try: return func(*args, **kwargs) except Exception as e: Lzlog.error(f"{func.__qualname__} 发生异常 {e}", exc_info=1) return dict(code=-1, msg='接口内部发生异常', data=None) return inner def param_diff(param: [dict, list], _in: [dict, list]): """ 检查 _in 是否包含了 param中的所有项 如果传递的是 dict 会将它们的key转换为一个集合,并使用集合的 difference 方法判断差集来实现 :param param: :param _in: :return: """ if isinstance(param, dict): param = set(param.keys()) if isinstance(_in, dict): _in = set(_in.keys()) return list(set(param).difference(set(_in))) def get_res(code=0, msg='success', data=None) -> dict: return dict(code=code, msg=msg, data=data) def file_base_name(abs_path: str) -> str: return os.path.basename(abs_path) if __name__ == '__main__': need =["name", "age", "hobby", "sex"] args = {"name": 1, "age": 2, "hobby": 1, "sex": 1} print(param_diff(need, args)) # print({1,2,3}.difference({1,2}) )
微信支付模块(wechat.py)
import requests from config import Conf_Impl from copy import deepcopy from log import Lzlog from public_mthod import timestamp, sign_md5, xml_to_dict, dicttoxml, generate_qr_code, ordered_dict, join_tuple_param, \ read_secret class WechatPay: def __init__(self): self.notify_url = "http://127.0.0.1/notify" self.key_secert = "T6m9iK73b0kn9g5v426MKfHQH7X8rKwb" # 开发者秘钥 self.app_id = "wx8397f8696b538317" # 应用ID self.mch_id = "1473426802" # 商户ID def create_pay(self, title, number, money): try: body = dict() body["appid"] = self.app_id body["mch_id"] = self.mch_id body["nonce_str"] = str(timestamp()) # 一个随机的字符串 body["body"] = title # 商品的标题 body["out_trade_no"] = number # 商品的订单号,可以使用随机字符串生成,记得保存好 body["total_fee"] = money # 商品的价格,单位是分,支付的时候建议写 1,也就是一分 body["spbill_create_ip"] = "127.0.0.1" body["notify_url"] = self.notify_url # 这个接口地址是你自己的,微信支付成功后会调用改接口通知你 body["trade_type"] = "NATIVE" # 代表订单预创建方式,也就是生成二维码支付 # 排序 unsigned_params = ordered_dict(body) # 使用固定格式拼接 string_param = join_tuple_param(unsigned_params) + "&key=" + self.key_secert # md5签名 sign = sign_md5(string_param.encode("utf-8")) # 保存签名 body["sign"] = sign title = dict(xml=body) # 使用xml格式通信 request_data = dicttoxml(title) response = requests.post(url="https://api.mch.weixin.qq.com/pay/unifiedorder", data=request_data.encode("utf-8")) if response.status_code == 200: # 解析为字典 res = xml_to_dict(response.text) # 签名值拿出来,不参与排序 sign = res["xml"].pop("sign", None) # 排序 unsigned_item = ordered_dict(deepcopy(res["xml"])) unsinged_str = join_tuple_param(unsigned_item) + "&key=" + self.key_secert # 对比签名,判断数据是否被修改过 if sign == sign_md5(unsinged_str.encode()).upper(): if res["xml"]["return_code"] == "SUCCESS" and res["xml"]["result_code"] == "SUCCESS": # 二维码url return generate_qr_code(res["xml"]["code_url"]) else: Lzlog.error("订单创建信息有误:签名不正确") return False except Exception as e: print("支付错误", e) return False def verify(self, body_xml): """ 根据微信支付服务器返回信息,更新订单状态 :param body_xml: :return: """ res_dict = xml_to_dict(body_xml) sign = res_dict["xml"].pop("sign", None) unsigner_item = ordered_dict(deepcopy(res_dict["xml"])) unsigner_string = join_tuple_param(unsigner_item) + "&key=" + self.key_secert order_no = res_dict["xml"]["out_trade_no"] # 对微信返回的数据进行签名,然后判断签名是否一致 my_sign = sign_md5(unsigner_string.encode("utf-8")).upper() True if sign == my_sign else False wechat_pay = WechatPay() # 创建订单 qr_code_url = wechat_pay.create_pay("MBP2016", "201621302139021", 12900 * 100) # 这个方法自己写在微信回调的接口中进行验证 # wechat_pay.verify()
支付宝(alipay.py)
# _*_ coding=utf-8 _*_ from datetime import datetime from urllib.parse import quote_plus, urlencode from public_mthod import ordered_dict, join_tuple_param, rsa_sign, read_secret, generate_qr_code, rsa_verify_sign, join_tuple_param_alipay, Dict from copy import deepcopy from config import Conf_Impl import json from Crypto.Signature import PKCS1_v1_5 from Crypto.Hash import SHA256 from base64 import encodebytes, decodebytes import requests class AliPay(object): """ 支付宝支付接口(PC端支付接口) """ def __init__(self, appid, app_private_key_path, alipay_public_key_path, app_notify_url, return_url): self.appid = appid self.app_key_path = app_private_key_path self.alipy_key_path = alipay_public_key_path self.app_notify_url = app_notify_url self.return_url = return_url def create_pay(self, subject, out_trade_no, total_amount, notify_url, redirect_url, **kwargs): """ 创建订单 :param subject: 订单主题 :param out_trade_no: 订单号 :param total_amount: 订单金额,单位:元,精确位数:2,示例:6.66 :param notify_url: 支付成功通知地址 :param redirect_url: 支付成功后重定向定制 :param kwargs: 扩展信息 :return: """ body = dict( app_id=self.appid, # 应用id timestamp=datetime.now().strftime("%Y-%m-%d %H:%M:%S"), notify_url=notify_url, return_url=redirect_url, charset="utf-8", sign_type="RSA2", version="1.0", # method="alipay.trade.page.pay", method="alipay.trade.precreate", biz_content=dict( # 订单基本内容 out_trade_no=out_trade_no, qr_code_timeout_express="90m", subject=subject, total_amount=total_amount # product_code="FAST_INSTANT_TRADE_PAY", ) ) # 支持扩展字段 body["biz_content"].update(kwargs) # 将传递的参数进行排队,接口对接要求 unsigned_items = ordered_dict(body) # 在使用&符号将参赛连接起来 unsigned_string = join_tuple_param(unsigned_items) # 签名 sign = rsa_sign(unsigned_string.encode("utf-8"), self.app_key_path, key_is_file=True) # 在使用&符号将参赛连接起来 query_parma = join_tuple_param(unsigned_items, quote_plus=True) # 拼接查询参数 signed_string = query_parma + "&sign=" + quote_plus(sign) response = requests.get(alipay_conf.pay_url + signed_string) if response.ok: res_data = response.json() main_data = res_data["alipay_trade_precreate_response"] if main_data["code"] == "10000": response_sign = res_data.pop("sign") response_unsigned_items = ordered_dict(main_data) # 这里这个拼接的我搞了好久,最后终于成功了。 response_unsigned_string = join_tuple_param_alipay(response_unsigned_items) res = rsa_verify_sign(response_unsigned_string, response_sign, self.alipy_key_path, key_is_file=True) if res_data["alipay_trade_precreate_response"]["code"] == "10000" and res: return generate_qr_code(res_data["alipay_trade_precreate_response"]["qr_code"]) return None def sign_data(self, data: dict): data.pop("sign", None) # 将传递的参数进行排队,接口对接要求 unsigned_items = ordered_dict(data) # 在使用&符号将参赛连接起来 unsigned_string = join_tuple_param(unsigned_items) # 将数据签名,返回是一个经过base64编码的字符串 sign = self.sign(unsigned_string.encode("utf-8")) quoted_string = "&".join("{0}={1}".format(k, quote_plus(v)) for k, v in unsigned_items) # 获得最终的订单信息字符串 signed_string = quoted_string + "&sign=" + quote_plus(sign) print(signed_string) return signed_string @staticmethod def ordered_data(data): complex_keys = [] for key, value in data.items(): if isinstance(value, dict): complex_keys.append(key) # 将字典类型的数据dump出来 for key in complex_keys: data[key] = json.dumps(data[key], separators=(',', ':')) return sorted([(k, v) for k, v in data.items()]) def sign(self, unsigned_string): # 开始计算签名 key = read_secret(self.app_key_path) signer = PKCS1_v1_5.new(key) signature = signer.sign(SHA256.new(unsigned_string)) # base64 编码,转换为unicode表示并移除回车 sign = encodebytes(signature).decode("utf8").replace("\n", "") return sign def _verify(self, raw_content, signature): # 开始计算签名 key = self.alipay_public_key signer = PKCS1_v1_5.new(key) digest = SHA256.new() digest.update(raw_content.encode("utf8")) if signer.verify(digest, decodebytes(signature.encode("utf8"))): return True return False def verify(self, data, signature): if "sign_type" in data: sign_type = data.pop("sign_type") # 排序后的字符串 unsigned_items = self.ordered_data(data) message = "&".join(u"{}={}".format(k, v) for k, v in unsigned_items) return self._verify(message, signature) # 应用的私钥参考格式 # private_key = """-----BEGIN RSA PRIVATE KEY----- # MIIEowIBAAKCAQEAl0pU2tm6ieeK4N2AJmIhHg20FbX0ze7W3DBa+o+MpwdimqFdscqAOA+jyU9esdn4QQMB3+yVtzKsjO4UbpnNBGXE2/oHGzpAxCIgYP0QxCNF7ZYLbgs8kSrFzWRsp8JtyIQnDtuqCjYkLQGtewKsOoJIGg1qctjbGfPobNpwECCxLQ/Nns3nBuVg2NR6kXmI/qKV6qr3TMscA7bUPVk2HhRsAX9STOMHEKfEyK/lBxD52TfNnVL+U4YYFFyyiusrJpE5gAOPgc+XW4MpOBCIduic/+9prD/i4PTdv7N2XsQjgOkfdpprVJySc+HUcFMXpE+E0d71FZj+EIHnPZYBJQIDAQABAoIBAA7TyyMzyZNwbO0C6GdaoLJIV4j1L0vrh4VG+/Ook/lewOw0unENTqmv5rZ5H+fAXBNLDyj6D+ZHgh/ByaDZU/2FV9jTVVT2zZgrXA8FXlpKtrTFStN7KHF1xrMNj5SVepr3ULilELI1gjAjBPSUW3rUf+qFvBQLatXNUM8yTV9XHT5X8NyAx/RIyINTjSVIIjaq81uxwq5DpXPwOkPBskgA/J4ozTfdxlm+OKZ5/o4Pd/3ulmrkdwnJmmmHNIYWJgTwlFxUpEeF1DO5Erk+t+bVAM6j28H3F045FcB1HqvhKrjfjVSEIXy4CVyBt8TLQmGEkW91MtYk0RP1/b8l8yECgYEA+TRVy4UAmeAnJFGN9h0oI4QN8kjj0KiTx7Gl7xRQvoc9t7XhKevcH9Dvs3XCRj2fvMA9d/xh1BoaZ8xLc4qwqEWYXgWxswo7kzEFtFmgJKQ88dPZPZYO3+Cd9vZUxwil1hZkJwQD/B04z4/zVagHqYJg06LW06XjGqcMfIv56AkCgYEAm2p4b4efIQb83AfkAzSdn2YMP5DVhjeNtdOnL9o84JV5DlNz7zKiS9OlAB4sCOInECm8BkE19eHeoKcFfHz+SJCY1JYPWxXii4mTzGG1p7eIK69Ev7khxpDiZrSHpwvU8SVlmFmophrpPQeZq0gxosrOupbA70hs8Q1EZ5RTvz0CgYBMAR45mDL6u1a0yPrXGUVor5nhT0HMHd4UhaXqKPQHaA/2u84Ujw7v1TWGMmAyNBFH7AnTUnIz0lJDXheVAbOnXrJ92pa72F8bIVRwEPW6tyyvRMF4+w9GUKdc7vwkSArsJKAfFiZw+iidhXXdpgXQOSd25K9IlcuSEWjJg5eQoQKBgA50LXU56Lu6maOg/DysFQixBeyXfLQ50G0bnQ3fPxAn9pU1f6+8RsnEijDjnXbKCZYAO6NdRzZx5jGMtv7n5QI8qGoE9rKi62nMxrkYUTui3wApEby+6/w6l0O0AHWxrQEsWDF+DSg9knmBjnIWib85G1bRFGpsku0sLbNwYQWFAoGBAO6evUG81bFcf0smIxbqCBBeYbn3E9mWZnQ0UnG0hW3JyLtT/rOkYEDFa4Zdfns5xD7Blx7Eh2oQsafThbReVsPIrE/qEGjQE2g+ZUGLGxv2sdJKpTHbuqsJSMIFmNOKdhzV+BQG+I8YcOsJyqmb+nU9lwdeyWTq3kh1lXb73aBS # -----END RSA PRIVATE KEY-----""" # # alipay的公钥 # public_key = """-----BEGIN RSA PUBLIC KEY----- # MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAnNf1Fo39VdghVcWv8RCb6MYdZyw8WP9vYGkTIFVVyFGMEJX2WM74+K6JOneCrB8k0DZBmDn9uphuWrmchUxeDRPJ3I0mV3k+ygC4lr/lZfc8thMiK8e/keH5fCLiNYEkNAYrRSpSxkeffWE3Yng0/IsQ9Z8kiut55wngDrv4hVgPUIiNNFP3JlHGTh94S9B9WnJzaKzvND2xsNAQW2MG+ndcULBsTssfMGiJSke58zJqKXpeqRDkuEQZIl0huSrcezM/MZgLU4OkDRvKzjjgc4TmT3ZzvAQqOdddbz8G0/ee381Ll4qBvFl8EzN4CeiueNophLLwG1i31czetuKFdQIDAQAB # -----END RSA PUBLIC KEY-----""" # 我这里使用的配置,大家需要手动将自己的参数填进去,建议key传递保存的路径 alipay_conf = Dict( app_id="", app_key_path="", ali_key_path="", notify_url="", redirect_url="" ) # 初始化 alipay_impl = AliPay( appid=alipay_conf.app_id, # 支付宝沙箱里面的APPID app_private_key_path=alipay_conf.app_key_path, # 支付宝公钥 alipay_public_key_path=alipay_conf.ali_key_path, # 应用私钥 app_notify_url=alipay_conf.notify_url, # 如果支付成功,支付宝会向这个地址发送POST请求(校验是否支付已经完成),此地址要能够在公网进行访问 return_url=alipay_conf.redirect_url, # 如果支付成功,重定向回到你的网站的地址。 ) # 下面参数按照注释填写上去 base64_qr_code = alipay_impl.create_pay( subject="商品简单描述", # 商品简单描述 out_trade_no="商户订单号", # 商户订单号 total_amount="1", # 交易金额(单位: 元 保留俩位小数) notify_url="notify_url", # 支付成功后通知服务器 redirect_url="使用二维码支付可不填" # 支付成功后跳转的页面 ) # 这是支付宝异步验签,也就是客户支付成功后,异步通知给你的 def verify(data: dict, sign): try: unsigned_item = ordered_dict(data) unsigned_str = join_tuple_param(unsigned_item) alipay_conf = Conf_Impl.get_alipy_config() order_no = data["out_trade_no"].split("-")[0] pay_success = rsa_verify_sign(unsigned_str, sign, alipay_conf.ali_key_path, key_is_file=True) except Exception as e: Lzlog.error("支付包异步验签过程出错: %s data: %s sign:%s" % (e, data, sign)) else: Order.update_order(pay_success, order_no, "支付宝")