原文链接:https://sxfblog.com/index.php/archives/388.html
在此先做记录,目前正在开发钉钉微应用,后续肯定会用到的
官方参考文档:https://open-doc.dingtalk.com/microapp/faquestions/ltr370
官方的python sdk没有回调的消息体加解密,只有java、php、c#。上网也找了一些资料,终于整好了,下面的Python代码为Python3.5版本的,python2的代码就直接改一下部分库。或则语法即可。
直接上加解密Class: (将这个class单独整成一个.py供后续的引用和调用)
# -*- coding:utf-8 -*-
import io, base64, binascii, hashlib, string, struct
from random import choice
from Crypto.Cipher import AES
class DingTalkCrypto:
def __init__(self, encodingAesKey, key):
self.encodingAesKey = encodingAesKey
self.key = key
self.aesKey = base64.b64decode(self.encodingAesKey + '=')
def encrypt(self, content):
"""
加密
"""
msg_len = self.length(content)
content = self.generateRandomKey(16) + msg_len.decode() + content + self.key
contentEncode = self.pks7encode(content)
iv = self.aesKey[:16]
aesEncode = AES.new(self.aesKey, AES.MODE_CBC, iv)
aesEncrypt = aesEncode.encrypt(contentEncode)
return base64.b64encode(aesEncrypt).decode().replace('\n', '')
def length(self, content):
"""
将msg_len转为符合要求的四位字节长度
"""
l = len(content)
return struct.pack('>l', l)
def pks7encode(self, content):
"""
安装 PKCS#7 标准填充字符串
"""
l = len(content)
output = io.StringIO()
val = 32 - (l % 32)
for _ in xrange(val):
output.write('%02x' % val)
return bytes(content, 'utf-8') + binascii.unhexlify(output.getvalue())
def pks7decode(self, content):
nl = len(content)
val = int(binascii.hexlify(content[-1].encode()), 16)
if val > 32:
raise ValueError('Input is not padded or padding is corrupt')
l = nl - val
return content[:l]
def decrypt(self, content):
"""
解密数据
"""
# 钉钉返回的消息体
content = base64.b64decode(content)
iv = self.aesKey[:16] # 初始向量
aesDecode = AES.new(self.aesKey, AES.MODE_CBC, iv)
decodeRes = aesDecode.decrypt(content)[20:].decode().replace(self.key, '')
# 获取去除初始向量,四位msg长度以及尾部corpid
return self.pks7decode(decodeRes)
def generateRandomKey(self, size,
chars=string.ascii_letters + string.ascii_lowercase + string.ascii_uppercase + string.digits):
"""
生成加密所需要的随机字符串
"""
return ''.join(choice(chars) for i in range(size))
def generateSignature(self, nonce, timestamp, token, msg_encrypt):
"""
生成签名
"""
signList = ''.join(sorted([nonce, timestamp, token, msg_encrypt])).encode()
return hashlib.sha1(signList).hexdigest()
解密使用方法:
# DingTalkCrypto为上面的class
encode_aes_key = '4g5j64qlyl3zvetqxz5jiocdr586fn2zvjpa8zls3ij'
din_corpid = 'suite4xxxxxxxxxxxxxxx'
encrypt = '1a3NBxmCFwkCJvfoQ7WhJHB+iX3qHPsc9JbaDznE1i03peOk1LaOQoRz3+nlyGNhwmwJ3vDMG+OzrHMeiZI7gTRWVdUBmfxjZ8Ej23JVYa9VrYeJ5as7XM/ZpulX8NEQis44w53h1qAgnC3PRzM7Zc/D6Ibr0rgUathB6zRHP8PYrfgnNOS9PhSBdHlegK+AGGanfwjXuQ9+0pZcy0w9lQ=='
# 调用上面的工具类
dtc = DingTalkCrypto(encode_aes_key, din_corpid)
# encode_aes_key 为秘钥
# din_corpid 为企业corpid
# ----------------------------
# 解密
msg = dtc.decrypt(encrypt) # encrypt为钉钉回调返回的消息体
print(msg) # 打印消息
加密使用方法:
encode_aes_key = '4g5j64qlyl3zvetqxz5jiocdr586fn2zvjpa8zls3ij'
din_corpid = 'suite4xxxxxxxxxxxxxxx'
# 调用上面的工具类
dtc = DingTalkCrypto(encode_aes_key, din_corpid)
# 加密
encrypt = dtc.encrypt('success') # 加密数据
timestamp = str(int(round(time.time()))) # 时间戳 (秒)
nonce = dtc.generateRandomKey(8) # 随机字符串
# 生成签名
signature = dtc.generateSignature(nonce, timestamp, token, encrypt)
# 构造返回数据
new_data = {
'data': {
'msg_signature': signature,
'timeStamp': timestamp,
'nonce': nonce,
'encrypt': encrypt
}
}
return new_data