1.工具类
# 微信h5支付
import datetime
import hashlib
import json
import os
import random
import string
import time
from base64 import b64encode, b64decode
import requests
from Crypto.Cipher import AES
from Crypto.Hash import SHA256
from Crypto.PublicKey import RSA
from Crypto.Signature import pkcs1_15, PKCS1_v1_5
from app.tools.tools import _rget, _rset # redis的get和set
def wx_h5_pay():
wx_h5_configs = {
"wx_h5_v3_key": "xxxx",
"wx_h5_v2_key": "xxxx",
"wx_h5_appid": "xxxx",
"wx_h5_mchid": "xxxx",
"wx_h5_notify_url": "https://xxx.com/xxx",
"wx_h5_serial_no": "xxxxxxxx"
}
whp = WxH5Pay(mchid=sysconfigs['wx_h5_mchid'],
appid=sysconfigs['wx_h5_appid'],
v3key=sysconfigs['wx_h5_v3_key'],
v2key=sysconfigs['wx_h5_v2_key'],
notify_url=sysconfigs['wx_h5_notify_url'],
serial_no=sysconfigs['wx_h5_serial_no'],
apiclient_key=sysconfigs['wx_h5_apiclient_key'])
return whp
class WxH5Pay:
def __init__(self, mchid, appid, v3key, v2key, notify_url, serial_no, apiclient_key):
self.mchid = mchid
self.appid = appid
self.v3key = v3key # v3的密钥
self.v2key = v2key # v2的密钥
self.notify_url = notify_url
self.serial_no = serial_no # 商户号证书序列号
self.apiclient_key = apiclient_key
# 获取h5支付的url
def get_pay(self, out_trade_no, total, description, ip):
try:
payurl = "https://api.mch.weixin.qq.com/v3/pay/transactions/h5"
data = {
"mchid": self.mchid,
"out_trade_no": out_trade_no,
"appid": self.appid,
"description": description,
"notify_url": self.notify_url,
"amount": {
"total": total,
"currency": "CNY"
},
"scene_info": {
"payer_client_ip": ip,
"h5_info": {
"type": "Wap"
}
}
}
data = json.dumps(data) # 只能序列化一次
random_str = ''.join(random.choice(string.ascii_uppercase + string.digits) for _ in range(32))
time_stamps = str(int(time.time()))
"""
HTTP请求方法\n
URL\n
请求时间戳\n
请求随机串\n
请求报文主体\n
"""
sign_str = f"POST\n{'/v3/pay/transactions/h5'}\n{time_stamps}\n{random_str}\n{data}\n"
sign = self.get_sign(sign_str)
headers = {
'Content-Type': 'application/json; charset=UTF-8',
'Authorization': 'WECHATPAY2-SHA256-RSA2048 ' + f'mchid="{self.mchid}",nonce_str="{random_str}",signature="{sign}",timestamp="{time_stamps}",serial_no="{self.serial_no}"'
}
response = requests.post(payurl, data=data, headers=headers).json()
syslog.info(f"请求微信支付:data:{data},res:{response}")
except Exception as e:
syslog.error(f"请求微信支付失败:{e}")
return None
return response
# 签名
def get_sign(self, sign_str):
basedir = os.path.abspath(os.path.dirname(__file__))
# rsa_key = RSA.importKey(open(f'{basedir}/wx_h5_pay_cert/apiclient_key.pem').read())
rsa_key = RSA.importKey(self.apiclient_key)
signer = pkcs1_15.new(rsa_key)
digest = SHA256.new(sign_str.encode('utf8'))
sign = b64encode(signer.sign(digest)).decode('utf-8')
return sign
# 回调验签
def check_notify_sign(self, timestamp, nonce, body, response_signature):
body = body.decode("utf-8")
sign_str = f"{timestamp}\n{nonce}\n{body}\n"
# print(sign_str)
publicKey = RSA.importKey(self.get_cert())
h = SHA256.new(sign_str.encode('UTF-8')) # 对响应体进行RSA加密
verifier = PKCS1_v1_5.new(publicKey) # 创建验证对象
return verifier.verify(h, b64decode(response_signature)) # 验签
# 解密
def decode_notify_data(self, res_json):
try:
ciphertext = res_json['resource']['ciphertext']
nonce = res_json['resource']['nonce']
associated_data = res_json['resource']['associated_data']
cipher = AES.new(self.v3key.encode(), AES.MODE_GCM, nonce=nonce.encode())
cipher.update(associated_data.encode())
en_data = b64decode(ciphertext.encode('utf-8'))
auth_tag = en_data[-16:]
_en_data = en_data[:-16]
plaintext = cipher.decrypt_and_verify(_en_data, auth_tag)
decodejson = json.loads(plaintext.decode())
except Exception as e:
syslog.error(f"解密回调失败:{e}")
return None
return decodejson
# 主动查询支付结果
def get_pay_status(self, out_trade_no):
try:
time_stamps = str(int(time.time()))
random_str = ''.join(random.choice(string.ascii_uppercase + string.digits) for _ in range(32))
url = f"https://api.mch.weixin.qq.com/v3/pay/transactions/out-trade-no/{out_trade_no}?mchid={self.mchid}"
data = ""
sign_str = f"GET\n{'/v3/pay/transactions/out-trade-no/'}{out_trade_no}?mchid={self.mchid}\n{time_stamps}\n{random_str}\n{data}\n"
sign = self.get_sign(sign_str)
headers = {
'Content-Type': 'application/json; charset=UTF-8',
'Authorization': 'WECHATPAY2-SHA256-RSA2048 ' + f'mchid="{self.mchid}",nonce_str="{random_str}",signature="{sign}",timestamp="{time_stamps}",serial_no="{self.serial_no}"'
}
response = requests.get(url, data=data, headers=headers).json()
except Exception as e:
syslog.error(f"查询失败:{e}")
return None
return response
# 获取回调验签的公钥
def get_cert(self):
wx_h5_public_key = _rget("wx_h5_public_key")
if wx_h5_public_key:
return wx_h5_public_key
try:
time_stamps = str(int(time.time()))
random_str = ''.join(random.choice(string.ascii_uppercase + string.digits) for _ in range(32))
url = f"https://api.mch.weixin.qq.com/v3/certificates"
data = ""
sign_str = f"GET\n{'/v3/certificates'}\n{time_stamps}\n{random_str}\n{data}\n"
sign = self.get_sign(sign_str)
headers = {
'Content-Type': 'application/json; charset=UTF-8',
'Authorization': 'WECHATPAY2-SHA256-RSA2048 ' + f'mchid="{self.mchid}",nonce_str="{random_str}",signature="{sign}",timestamp="{time_stamps}",serial_no="{self.serial_no}"'
}
response = requests.get(url, data=data, headers=headers).json()
ciphertext = response['data'][0]['encrypt_certificate']['ciphertext']
nonce = response['data'][0]['encrypt_certificate']['nonce']
associated_data = response['data'][0]['encrypt_certificate']['associated_data']
cipher = AES.new(self.v3key.encode(), AES.MODE_GCM, nonce=nonce.encode())
cipher.update(associated_data.encode())
en_data = b64decode(ciphertext.encode('utf-8'))
auth_tag = en_data[-16:]
_en_data = en_data[:-16]
plaintext = cipher.decrypt_and_verify(_en_data, auth_tag)
response = plaintext.decode()
except Exception as e:
syslog.error(f"查询失败:{e}")
return None
syslog.info("====重新查询了回调验签用到的微信公钥====")
_rset("wx_h5_public_key", response, 3600)
return response
if __name__ == '__main__':
whp = wx_h5_pay()
print(whp.get_cert())
2.使用
order_no = "100000000001" #订单编号
amount = 1 # 订单金额
subject = "测试支付" # 商品名称
ip = "xxx" # 客户端ip
whp = wx_h5_pay()
resjson = whp.get_pay(out_trade_no=order_no, total=amount, description=subject, ip=ip)
h5_url = resjson.get("h5_url", "") # 得到支付的跳转地址
# 理论上这里就可以发起支付了,支付完成后会跳转回原来发起支付页面
return_url = "" # 支付后的跳转地址
h5_url = f"{h5_url}&redirect_url={return_url}" # 这里也跟一个重定向地址,如果跟了,那么支付完成后会跳转这个地址
3.支付结果异步回调
# 微信h5支付回调
@app.route('/api/pay_notify_wxh5', methods=['POST'])
def pay_notify_wxh5():
whp = wx_h5_pay()
data = request.get_data()
headers = request.headers
logger.info(f"回调请求头:{headers}")
logger.info(f"回调信息:{data}")
timestamp = request.headers.get("Wechatpay-Timestamp", None)
nonce = request.headers.get("Wechatpay-Nonce", None)
signature = request.headers.get("Wechatpay-Signature", None)
# 验签
res = {
"code": "FAIL",
"message": "失败"
}
check = whp.check_notify_sign(timestamp=timestamp, nonce=nonce, body=data, response_signature=signature)
syslog.info(f"签名验证结果:{check}")
if not check:
syslog.error("签名验证失败!")
return jsonify(res), 400
# 解密
try:
jsondata = json.loads(data.decode("utf-8"))
except Exception as e:
syslog.error("回调参数转json失败!")
return jsonify(res), 400
resjson = whp.decode_notify_data(jsondata)
if not resjson:
return jsonify(res), 400
# 获取解密后的参数,进行判断
"""
一般是这样的一个结构
{
"amount": {
"currency": "CNY",
"payer_currency": "CNY",
"payer_total": 1,
"total": 1
},
"appid": "xxxxx",
"attach": "",
"bank_type": "OTHERS",
"mchid": "xxxxx",
"out_trade_no": "xxxxxxxxxxxxxx",
"payer": {
"openid": "xxxxxxxxxxxxxxxx"
},
"success_time": "2022-03-25T11:59:47+08:00",
"trade_state": "SUCCESS",
"trade_state_desc": "支付成功",
"trade_type": "MWEB",
"transaction_id": "xxxxxxxxxxxxxxxxxxx"
}
"""
# 解密参数后,即可写相关的逻辑
4.主动查询支付结果。当然,用户支付完成后,我们只等回调也不太显示,最好是前端能主动掉一下查询接口,也就是说,当支付完成后跳转支付结果页,掉后端的查询支付结果接口。如果查询到还没有回调过来的话,就需要后端主动去查询微信支付的结果。
# 微信h5支付查询结果
whp = wx_h5_pay()
order_no = "xxxxxxxx" # 发起支付的订单号
resjson = whp.get_pay_status(out_trade_no=order_no)
logger.info(f"订单{order_no}主动查询支付状态:{resjson}")
总结:省略了很多细节,但是总体关键步骤都是有的。