python接入微信平台超时,微信开放平台接入流程 python

钮兴安
2023-12-01

1. 验证票据

开放平台配置推送url后,微信会每隔10分钟post 数据到相应接口。接口需直接返回 success字符串。

def wx_handler(self, request, *args, **kwargs):

msg_sign = request.GET.get('msg_signature', "")

timestamp = request.GET.get('timestamp', "")

nonce = request.GET.get('nonce', "")

from_xml = request.body.decode('utf-8')

# 判断数据格式

if from_xml.startswith(''):

# 解码,使用开放平台配置信息

decrypt_test = WXBizMsgCrypt(open_platform.wx_token, open_platform.encodingAESKey, open_platform.appid)

ret, decryp_xml = decrypt_test.DecryptMsg(from_xml, msg_sign, timestamp, nonce)

if ret == 0:

# 获取凭证

ticket = decrypt_test.extract_ticket(decryp_xml)

return HttpResponse("success")

解码模块代码附在最后

component_verify_ticket 的有效时间为12小时。

2. 获取令牌

令牌(component_access_token)是第三方平台接口的调用凭据,有效期2小时。

使用 1 获取的 component_verify_ticket 调用接口获取令牌

data = {

"component_appid": appid,

"component_appsecret": appsecret,

"component_verify_ticket": component_verify_ticket

}

token_url = 'https://api.weixin.qq.com/cgi-bin/component/api_component_token'

# 接口请求

res = requests.post(token_url, json.dumps(data))

res_data = json.loads(res.content.decode('utf-8'))

component_access_token = res_data['component_access_token']

3. 获取预授权码

预授权码用于公众号授权,

使用 2 获取的 component_access_token 调用接口获取

POST https://api.weixin.qq.com/cgi-bin/component/api_create_preauthcode?component_access_token=COMPONENT_ACCESS_TOKEN

4. 授权页面

需要在链接上填写 3 获取的 pre_auth_code 以及 回调地址 redirect_uri

授权页面打开,需要公众号管理员扫码授权。

https://mp.weixin.qq.com/cgi-bin/componentloginpage?component_appid=wxb6048415cdc1e394&pre_auth_code={{ pre_auth_code }}&redirect_uri={{ redirect_uri}}

5. 授权回调接口

扫码授权成功后,微信会发送 auth_code 到 4 的 redirect_uri

使用 2 获取的 ** component_access_token** 调用接口获取授权信息

def rewrite_auth_code_handler(self, request, *args, **kwargs):

auth_code = request.GET.get('auth_code')

# 获取调用接口凭证 authorizer_access_token

url = "https://api.weixin.qq.com/cgi-bin/component/api_query_auth?component_access_token={}".format(component_access_token)

data = {

"component_appid": appid,

"authorization_code": auth_code

}

res = requests.post(url, json.dumps(data))

res_data = json.loads(res.content.decode('utf-8'))

authorization_info = res_data['authorization_info']

authorizer_appid = authorization_info['authorizer_appid']

authorizer_access_token = authorization_info['authorizer_access_token']

authorizer_refresh_token = authorization_info['authorizer_refresh_token']

最后,拿到

authorizer_appid

授权公众的appid , 用于刷新令牌

authorizer_access_token

开发接口授权令牌, 有效期为 ** 2 小时**

authorizer_refresh_token

刷新令牌

6. 刷新接口令牌

超时后, 使用 3 获取的 component_access_token, 5 获取的 authorizer_appid,authorizer_refresh_token 调用接口重新获取authorizer_refresh_token

POST https://api.weixin.qq.com/cgi-bin/component/api_authorizer_token?component_access_token=COMPONENT_ACCESS_TOKEN

使用 5 获取到的 authorizer_refresh_token ,就可以进行微信开发了。

解码模块代码:

import base64

import string

import random

import hashlib

import time

import struct

from Crypto.Cipher import AES

import xml.etree.cElementTree as ET

import socket

WXBizMsgCrypt_OK = 0

WXBizMsgCrypt_ValidateSignature_Error = -40001

WXBizMsgCrypt_ParseXml_Error = -40002

WXBizMsgCrypt_ComputeSignature_Error = -40003

WXBizMsgCrypt_IllegalAesKey = -40004

WXBizMsgCrypt_ValidateAppid_Error = -40005

WXBizMsgCrypt_EncryptAES_Error = -40006

WXBizMsgCrypt_DecryptAES_Error = -40007

WXBizMsgCrypt_IllegalBuffer = -40008

WXBizMsgCrypt_EncodeBase64_Error = -40009

WXBizMsgCrypt_DecodeBase64_Error = -40010

""" AES加解密用 pycrypto """

class FormatException(Exception):

pass

def throw_exception(message, exception_class=FormatException):

"""my define raise exception function"""

raise exception_class(message)

class SHA1:

"""计算公众平台的消息签名接口"""

def getSHA1(self, token, timestamp, nonce, encrypt):

"""用SHA1算法生成安全签名

@param token: 票据

@param timestamp: 时间戳

@param encrypt: 密文

@param nonce: 随机字符串

@return: 安全签名

"""

try:

token = token.decode()

sortlist = [token, timestamp, nonce, encrypt]

sortlist.sort()

sha = hashlib.sha1()

sha.update("".join(sortlist).encode("utf8"))

return WXBizMsgCrypt_OK, sha.hexdigest()

except Exception as e:

print(e)

return WXBizMsgCrypt_ComputeSignature_Error, None

class XMLParse(object):

"""提供提取消息格式中的密文及生成回复消息格式的接口"""

# xml消息模板

AES_TEXT_RESPONSE_TEMPLATE = """%(timestamp)s"""

def extract(self, xmltext):

"""提取出xml数据包中的加密消息

@param xmltext: 待提取的xml字符串

@return: 提取出的加密消息字符串

"""

try:

xml_tree = ET.fromstring(xmltext)

encrypt = xml_tree.find("Encrypt")

return WXBizMsgCrypt_OK, encrypt.text

except Exception as e:

print(e)

return WXBizMsgCrypt_ParseXml_Error, None, None

def generate(self, encrypt, signature, timestamp, nonce):

"""生成xml消息

@param encrypt: 加密后的消息密文

@param signature: 安全签名

@param timestamp: 时间戳

@param nonce: 随机字符串

@return: 生成的xml字符串

"""

resp_dict = {

'msg_encrypt': encrypt,

'msg_signaturet': signature,

'timestamp': timestamp,

'nonce': nonce,

}

resp_xml = self.AES_TEXT_RESPONSE_TEMPLATE % resp_dict

return resp_xml

class PKCS7Encoder(object):

"""提供基于PKCS7算法的加解密接口"""

block_size = 32

def encode(self, text):

""" 对需要加密的明文进行填充补位

@param text: 需要进行填充补位操作的明文

@return: 补齐明文字符串

"""

text_length = len(text)

# 计算需要填充的位数

amount_to_pad = self.block_size - (text_length % self.block_size)

if amount_to_pad == 0:

amount_to_pad = self.block_size

# 获得补位所用的字符

pad = chr(amount_to_pad).encode()

return text + pad * amount_to_pad

def decode(self, decrypted):

"""删除解密后明文的补位字符

@param decrypted: 解密后的明文

@return: 删除补位字符后的明文

"""

pad = ord(decrypted[-1])

if pad < 1 or pad > 32:

pad = 0

return decrypted[:-pad]

class Prpcrypt(object):

"""提供接收和推送给公众平台消息的加解密接口"""

def __init__(self, key):

# self.key = base64.b64decode(key+"=")

self.key = key

# 设置加解密模式为AES的CBC模式

self.mode = AES.MODE_CBC

def encrypt(self, text, appid):

"""对明文进行加密

@param text: 需要加密的明文

@return: 加密得到的字符串

"""

# 16位随机字符串添加到明文开头

len_str = struct.pack("I", socket.htonl(len(text.encode())))

# text = self.get_random_str() + binascii.b2a_hex(len_str).decode() + text + appid

text = self.get_random_str() + len_str + text.encode() + appid

# 使用自定义的填充方式对明文进行补位填充

pkcs7 = PKCS7Encoder()

text = pkcs7.encode(text)

# 加密

cryptor = AES.new(self.key, self.mode, self.key[:16])

try:

ciphertext = cryptor.encrypt(text)

# 使用BASE64对加密后的字符串进行编码

return WXBizMsgCrypt_OK, base64.b64encode(ciphertext).decode('utf8')

except Exception as e:

return WXBizMsgCrypt_EncryptAES_Error, None

def decrypt(self, text, appid):

"""对解密后的明文进行补位删除

@param text: 密文

@return: 删除填充补位后的明文

"""

try:

cryptor = AES.new(self.key, self.mode, self.key[:16])

# 使用BASE64对密文进行解码,然后AES-CBC解密

plain_text = cryptor.decrypt(base64.b64decode(text))

except Exception as e:

print(e)

return WXBizMsgCrypt_DecryptAES_Error, None

try:

# pad = ord(plain_text[-1])

pad = plain_text[-1]

# 去掉补位字符串

# pkcs7 = PKCS7Encoder()

# plain_text = pkcs7.encode(plain_text)

# 去除16位随机字符串

content = plain_text[16:-pad]

xml_len = socket.ntohl(struct.unpack("I", content[: 4])[0])

xml_content = content[4: xml_len + 4]

from_appid = content[xml_len + 4:]

except Exception as e:

return WXBizMsgCrypt_IllegalBuffer, None

if from_appid != appid:

return WXBizMsgCrypt_ValidateAppid_Error, None

return 0, xml_content.decode()

def get_random_str(self):

""" 随机生成16位字符串

@return: 16位字符串

"""

rule = string.ascii_letters + string.digits

str = random.sample(rule, 16)

return "".join(str).encode()

class WXBizMsgCrypt():

# 构造函数

# token4wx

# @param sToken: 公众平台上,开发者设置的Token

# token4wxtoken4wxtoken4wxtoken4wxtoken4wxtok

# @param sEncodingAESKey: 公众平台上,开发者设置的EncodingAESKey

# wxb6048415cdc1e394

# @param sAppId: 企业号的AppId

def __init__(self, sToken, sEncodingAESKey, sAppId):

try:

self.key = base64.b64decode(sEncodingAESKey + "=")

assert len(self.key) == 32

except Exception:

throw_exception("[error]: EncodingAESKey unvalid !", FormatException)

self.token = sToken.encode()

self.appid = sAppId.encode()

def EncryptMsg(self, sReplyMsg, sNonce, timestamp=None):

# 将公众号回复用户的消息加密打包

# @param sReplyMsg: 企业号待回复用户的消息,xml格式的字符串

# @param sTimeStamp: 时间戳,可以自己生成,也可以用URL参数的timestamp,如为None则自动用当前时间

# @param sNonce: 随机串,可以自己生成,也可以用URL参数的nonce

# sEncryptMsg: 加密后的可以直接回复用户的密文,包括msg_signature, timestamp, nonce, encrypt的xml格式的字符串,

# return:成功0,sEncryptMsg,失败返回对应的错误码None

pc = Prpcrypt(self.key)

ret, encrypt = pc.encrypt(sReplyMsg, self.appid)

if ret != 0:

return ret, None

if timestamp is None:

timestamp = str(int(time.time()))

# 生成安全签名

sha1 = SHA1()

ret, signature = sha1.getSHA1(self.token, timestamp, sNonce, encrypt)

if ret != 0:

return ret, None

xmlParse = XMLParse()

return ret, xmlParse.generate(encrypt, signature, timestamp, sNonce)

def VerifyURL(self, sMsgSignature, sTimeStamp, sNonce, sEchoStr):

sha1 = SHA1()

ret,signature = sha1.getSHA1(self.token, sTimeStamp, sNonce, sEchoStr)

if ret != 0:

return ret, None

if not signature == sMsgSignature:

return WXBizMsgCrypt_ValidateSignature_Error, None

pc = Prpcrypt(self.key)

ret,sReplyEchoStr = pc.decrypt(sEchoStr,self.appid)

return ret,sReplyEchoStr

def DecryptMsg(self, sPostData, sMsgSignature, sTimeStamp, sNonce):

# 检验消息的真实性,并且获取解密后的明文

# @param sMsgSignature: 签名串,对应URL参数的msg_signature

# @param sTimeStamp: 时间戳,对应URL参数的timestamp

# @param sNonce: 随机串,对应URL参数的nonce

# @param sPostData: 密文,对应POST请求的数据

# xml_content: 解密后的原文,当return返回0时有效

# @return: 成功0,失败返回对应的错误码

# 验证安全签名

xmlParse = XMLParse()

ret, encrypt = xmlParse.extract(sPostData)

if ret != 0:

return ret, None

sha1 = SHA1()

ret, signature = sha1.getSHA1(self.token, sTimeStamp, sNonce, encrypt)

if ret != 0:

return ret, None

if not signature == sMsgSignature:

return WXBizMsgCrypt_ValidateSignature_Error, None

pc = Prpcrypt(self.key)

ret, xml_content = pc.decrypt(encrypt, self.appid)

return ret, xml_content

 类似资料: