当前位置: 首页 > 工具软件 > PyCryptodome > 使用案例 >

python3.9中通过pycryptodome实现RSA2048的私钥加密公钥解密

裴学
2023-12-01

平时使用过程中总是使用公钥加密, 私钥解密; 但需求来了, 也只有客户至上; 说怎么弄就怎么弄呗
而对于pycryptodome提供的方法则只允许公钥加密, 私钥解密; 也就是我们常用的方式;
为了解决这个问题, 只有去研究他的源码了;
基于库: pycryptodome==3.14.1

概述

我们知道RSA2048是非对称加密算法, 秘钥对必然是一个加密一个解密; 基于此想到: 他们的算法是互逆的
也就是说,
如果我想用私钥来加密, 只需要调用库中已实现的解密的逻辑; 而在公钥解密的过程, 只需要调用库中已实现的公钥加密逻辑即可
经过实践, 证明了 openssl中该算法确实也是如此实现的, 故而问题得以解决

实现情况

class RsaEncryptor:
    def __init__(self, is_2048=True):
        self.is_2048 = is_2048
        self.device_pem = b''
        self.device_pub = b''
        self.m2s_pem = b''
        self.m2s_pub = b''
        self.load_cert()

    def encrypt_pub(self, origin_str: str, pub: bytes):
        """
        单次加密串的长度最大为 (key_size/8)-11
        1024bit的证书用100, 2048bit的证书用 200
        """
        step = 200 if self.is_2048 else 100
        origin_str = origin_str.encode('utf-8')
        cipher_public = PKCS1_v1_5.new(RSA.importKey(pub))
        res = []
        for i in range(0, len(origin_str), step):
            res.append(cipher_public.encrypt(origin_str[i:i + step]))
        return base64.b64encode(b"".join(res)).decode()

    def decrypt_pem(self, enc_str, pem: bytes):
        """
        1024bit的证书用128,2048bit证书用256位
        """
        step = 256 if self.is_2048 else 128
        enc_str = enc_str.encode('utf-8')
        enc_str = base64.b64decode(enc_str)
        cipher_private = PKCS1_v1_5.new(RSA.importKey(pem))
        res = []
        for i in range(0, len(enc_str), step):
            res.append(cipher_private.decrypt(enc_str[i:i + step], Random.new().read).decode())
        return ''.join(res)

    def encrypt_pem(self, origin_str, pem):
        step = 200 if self.is_2048 else 100
        if isinstance(origin_str, str):
            origin_str = origin_str.encode('utf-8')
        cipher = PKCS1_v1_5.new(RSA.importKey(pem))
        res = []
        for i in range(0, len(origin_str), step):
            tmp = origin_str[i:i + step]

            k = cipher._key.size_in_bytes()
            mLen = len(tmp)
            ps = []
            while len(ps) != k - mLen - 3:
                new_byte = cipher._randfunc(1)
                if bord(new_byte[0]) == 0x00:
                    continue
                ps.append(new_byte)
            ps = b"".join(ps)
            assert (len(ps) == k - mLen - 3)
            em = b'\x00\x02' + ps + b'\x00' + _copy_bytes(None, None, tmp)
            em_int = bytes_to_long(em)
            m_int = cipher._key._decrypt(em_int)
            res.append(long_to_bytes(m_int, k))
        return base64.b64encode(b"".join(res)).decode()

    def decrypt_pub(self, enc_str, pem: bytes):
        step = 256 if self.is_2048 else 128
        enc_str = enc_str.encode('utf-8')
        enc_str = base64.b64decode(enc_str)
        cipher = PKCS1_v1_5.new(RSA.importKey(pem))
        res = []
        for i in range(0, len(enc_str), step):
            tmp = enc_str[i:i + step]

            k = cipher._key.size_in_bytes()
            ct_int = bytes_to_long(tmp)
            m_int = cipher._key._encrypt(ct_int)
            em = long_to_bytes(m_int, k)
            sep_idx = em.index(b"\x00", 2)
            res.append(em[sep_idx + 1:].decode())
        return ''.join(res)

    def load_cert(self):
        cert_map = {
            'device_pem': './keys/EDLAuthKey-2048.pem',
            'device_pub': './keys/EDLAuthKey-2048.pub',
            'm2s_pem': './keys/EDLFactoryKey-2048.pem',
            'm2s_pub': './keys/EDLFactoryKey-2048.pub',
        }
        for field, item in cert_map.items():
            with open(item, 'rb') as rf:
                self.__dict__[field] = rf.read()


encryptor = RsaEncryptor()
 类似资料: