平时使用过程中总是使用公钥加密, 私钥解密; 但需求来了, 也只有客户至上; 说怎么弄就怎么弄呗
而对于pycryptodome提供的方法则只允许公钥加密, 私钥解密; 也就是我们常用的方式;
为了解决这个问题, 只有去研究他的源码了;
基于库: pycryptodome==3.14.1
我们知道RSA2048是非对称加密算法, 秘钥对必然是一个加密一个解密; 基于此想到: 他们的算法是互逆的
也就是说,
如果我想用私钥来加密, 只需要调用库中已实现的解密的逻辑; 而在公钥解密的过程, 只需要调用库中已实现的公钥加密逻辑即可
经过实践, 证明了 openssl
中该算法确实也是如此实现的, 故而问题得以解决
class RsaEncryptor:
def __init__(self, is_2048=True):
self.is_2048 = is_2048
self.device_pem = b''
self.device_pub = b''
self.m2s_pem = b''
self.m2s_pub = b''
self.load_cert()
def encrypt_pub(self, origin_str: str, pub: bytes):
"""
单次加密串的长度最大为 (key_size/8)-11
1024bit的证书用100, 2048bit的证书用 200
"""
step = 200 if self.is_2048 else 100
origin_str = origin_str.encode('utf-8')
cipher_public = PKCS1_v1_5.new(RSA.importKey(pub))
res = []
for i in range(0, len(origin_str), step):
res.append(cipher_public.encrypt(origin_str[i:i + step]))
return base64.b64encode(b"".join(res)).decode()
def decrypt_pem(self, enc_str, pem: bytes):
"""
1024bit的证书用128,2048bit证书用256位
"""
step = 256 if self.is_2048 else 128
enc_str = enc_str.encode('utf-8')
enc_str = base64.b64decode(enc_str)
cipher_private = PKCS1_v1_5.new(RSA.importKey(pem))
res = []
for i in range(0, len(enc_str), step):
res.append(cipher_private.decrypt(enc_str[i:i + step], Random.new().read).decode())
return ''.join(res)
def encrypt_pem(self, origin_str, pem):
step = 200 if self.is_2048 else 100
if isinstance(origin_str, str):
origin_str = origin_str.encode('utf-8')
cipher = PKCS1_v1_5.new(RSA.importKey(pem))
res = []
for i in range(0, len(origin_str), step):
tmp = origin_str[i:i + step]
k = cipher._key.size_in_bytes()
mLen = len(tmp)
ps = []
while len(ps) != k - mLen - 3:
new_byte = cipher._randfunc(1)
if bord(new_byte[0]) == 0x00:
continue
ps.append(new_byte)
ps = b"".join(ps)
assert (len(ps) == k - mLen - 3)
em = b'\x00\x02' + ps + b'\x00' + _copy_bytes(None, None, tmp)
em_int = bytes_to_long(em)
m_int = cipher._key._decrypt(em_int)
res.append(long_to_bytes(m_int, k))
return base64.b64encode(b"".join(res)).decode()
def decrypt_pub(self, enc_str, pem: bytes):
step = 256 if self.is_2048 else 128
enc_str = enc_str.encode('utf-8')
enc_str = base64.b64decode(enc_str)
cipher = PKCS1_v1_5.new(RSA.importKey(pem))
res = []
for i in range(0, len(enc_str), step):
tmp = enc_str[i:i + step]
k = cipher._key.size_in_bytes()
ct_int = bytes_to_long(tmp)
m_int = cipher._key._encrypt(ct_int)
em = long_to_bytes(m_int, k)
sep_idx = em.index(b"\x00", 2)
res.append(em[sep_idx + 1:].decode())
return ''.join(res)
def load_cert(self):
cert_map = {
'device_pem': './keys/EDLAuthKey-2048.pem',
'device_pub': './keys/EDLAuthKey-2048.pub',
'm2s_pem': './keys/EDLFactoryKey-2048.pem',
'm2s_pub': './keys/EDLFactoryKey-2048.pub',
}
for field, item in cert_map.items():
with open(item, 'rb') as rf:
self.__dict__[field] = rf.read()
encryptor = RsaEncryptor()