【转】Python3钉钉回调消息体加密、解密

满自明
2023-12-01

原文链接:https://sxfblog.com/index.php/archives/388.html
在此先做记录,目前正在开发钉钉微应用,后续肯定会用到的

Python3钉钉回调消息体加密、解密

官方参考文档:https://open-doc.dingtalk.com/microapp/faquestions/ltr370

官方的python sdk没有回调的消息体加解密,只有java、php、c#。上网也找了一些资料,终于整好了,下面的Python代码为Python3.5版本的,python2的代码就直接改一下部分库。或则语法即可。

直接上加解密Class: (将这个class单独整成一个.py供后续的引用和调用)

# -*- coding:utf-8 -*-
import io, base64, binascii, hashlib, string, struct
from random import choice
from Crypto.Cipher import AES


class DingTalkCrypto:
    def __init__(self, encodingAesKey, key):
        self.encodingAesKey = encodingAesKey
        self.key = key
        self.aesKey = base64.b64decode(self.encodingAesKey + '=')

    def encrypt(self, content):
        """
        加密
        """
        msg_len = self.length(content)
        content = self.generateRandomKey(16) + msg_len.decode() + content + self.key
        contentEncode = self.pks7encode(content)
        iv = self.aesKey[:16]
        aesEncode = AES.new(self.aesKey, AES.MODE_CBC, iv)
        aesEncrypt = aesEncode.encrypt(contentEncode)
        return base64.b64encode(aesEncrypt).decode().replace('\n', '')

    def length(self, content):
        """
        将msg_len转为符合要求的四位字节长度
        """
        l = len(content)
        return struct.pack('>l', l)

    def pks7encode(self, content):
        """
        安装 PKCS#7 标准填充字符串
        """
        l = len(content)
        output = io.StringIO()
        val = 32 - (l % 32)
        for _ in xrange(val):
            output.write('%02x' % val)
        return bytes(content, 'utf-8') + binascii.unhexlify(output.getvalue())

    def pks7decode(self, content):
        nl = len(content)
        val = int(binascii.hexlify(content[-1].encode()), 16)
        if val > 32:
            raise ValueError('Input is not padded or padding is corrupt')
        l = nl - val
        return content[:l]

    def decrypt(self, content):
        """
        解密数据
        """
        # 钉钉返回的消息体
        content = base64.b64decode(content)
        iv = self.aesKey[:16]  # 初始向量
        aesDecode = AES.new(self.aesKey, AES.MODE_CBC, iv)
        decodeRes = aesDecode.decrypt(content)[20:].decode().replace(self.key, '')
        # 获取去除初始向量,四位msg长度以及尾部corpid
        return self.pks7decode(decodeRes)

    def generateRandomKey(self, size,
                          chars=string.ascii_letters + string.ascii_lowercase + string.ascii_uppercase + string.digits):
        """
        生成加密所需要的随机字符串
        """
        return ''.join(choice(chars) for i in range(size))

    def generateSignature(self, nonce, timestamp, token, msg_encrypt):
        """
        生成签名
        """
        signList = ''.join(sorted([nonce, timestamp, token, msg_encrypt])).encode()
        return hashlib.sha1(signList).hexdigest()

解密使用方法:

# DingTalkCrypto为上面的class
encode_aes_key = '4g5j64qlyl3zvetqxz5jiocdr586fn2zvjpa8zls3ij'
din_corpid = 'suite4xxxxxxxxxxxxxxx'
encrypt = '1a3NBxmCFwkCJvfoQ7WhJHB+iX3qHPsc9JbaDznE1i03peOk1LaOQoRz3+nlyGNhwmwJ3vDMG+OzrHMeiZI7gTRWVdUBmfxjZ8Ej23JVYa9VrYeJ5as7XM/ZpulX8NEQis44w53h1qAgnC3PRzM7Zc/D6Ibr0rgUathB6zRHP8PYrfgnNOS9PhSBdHlegK+AGGanfwjXuQ9+0pZcy0w9lQ=='
# 调用上面的工具类
dtc = DingTalkCrypto(encode_aes_key, din_corpid) 
# encode_aes_key 为秘钥
# din_corpid 为企业corpid
# ----------------------------
# 解密
msg = dtc.decrypt(encrypt)  # encrypt为钉钉回调返回的消息体
print(msg)   # 打印消息

加密使用方法:

encode_aes_key = '4g5j64qlyl3zvetqxz5jiocdr586fn2zvjpa8zls3ij'
din_corpid = 'suite4xxxxxxxxxxxxxxx'
# 调用上面的工具类
dtc = DingTalkCrypto(encode_aes_key, din_corpid)
# 加密
encrypt = dtc.encrypt('success') # 加密数据
timestamp = str(int(round(time.time()))) # 时间戳 (秒)
nonce = dtc.generateRandomKey(8)  # 随机字符串
# 生成签名
signature = dtc.generateSignature(nonce, timestamp, token, encrypt)
# 构造返回数据
new_data = {
    'data': {
        'msg_signature': signature,
        'timeStamp': timestamp,
        'nonce': nonce,
        'encrypt': encrypt
    }
}
return new_data
 类似资料: