当前位置: 首页 > 工具软件 > Python grpc > 使用案例 >

Python GRPC客户端实现ssl连接

郑乐池
2023-12-01

实现过程中的坑:

  1. 认证
  2. 客户端也需要设置传输过程中send,receive的文件大小
  3. AES加密解密

一:pfx文件不能直接使用,先要转成pem或crt文件

二:客户端也需要设置传输过程中send,receive的文件大小,否则稍大的文件在传输过程中会报错

class BaseClient(object):
    """client connection"""

    def __init__(self, host='', port=''):
        pem_file = os.path.join(settings.PEM, 'server.pem')
        with open(pem_file, 'rb') as f:
            cert_bytes = f.read()
        self.host = host
        self.port = port
        creds = grpc.ssl_channel_credentials(root_certificates=cert_bytes)
        # 最大发送和接收为20M
        self.conn = grpc.secure_channel(target=host + ':' + port, credentials=creds,
                                        options=[('grpc.max_send_message_length', 20 * 1024 * 1024),
                                                 ('grpc.max_receive_message_length', 20 * 1024 * 1024)], )

三.AES加密解密
自己写方法进行填充会导致加密过程错误。保险起见,加密过程中填充明文的过程要使用cryptography包。解密后的冗余需要进行去除。

from Crypto.Cipher import AES
import base64
from cryptography.hazmat.primitives import padding
from cryptography.hazmat.primitives.ciphers import algorithms

class Aescrypt:
    def __init__(self, key, iv_input):
        self.key = key.encode(encoding="utf-8")
        self.iv = base64.b64decode(iv_input.encode(encoding="utf-8"))

    @staticmethod
    def pkcs7_padding(data):
        """AES使用pkcs7填充模式"""
        if not isinstance(data, bytes):
            data = data.encode()
        padder = padding.PKCS7(algorithms.AES.block_size).padder()
        padded_data = padder.update(data) + padder.finalize()
        return padded_data

    def aes_encrypt(self, text_input):
        """一.对明文进行填充(方式为pkcs7)
           二.使用AES new 方法创建一个对象(模式为CBC)
           三.使用encrypt方法对填充后的明文进行加密
           四.使用base64encode进行编码
        """
        text_input = text_input.encode(encoding="utf-8")
        padding_text = self.pkcs7_padding(text_input)
        aes = AES.new(self.key, AES.MODE_CBC, self.iv)
        encrypt_text = base64.b64encode(aes.encrypt(padding_text)).decode(encoding="utf-8")
        return encrypt_text

    def aes_decrypt(self, text_input):
        """一.使用base64decode进行解码
           二.使用AES new 方法创建一个对象(模式为CBC)
           三.使用decrypt方法对密文进行解码
           四.使用strip进行冗余去除
        """
        text_input = base64.b64decode(text_input.encode(encoding="utf-8"))
        aes = AES.new(self.key, AES.MODE_CBC, self.iv)
        decrypt_text = aes.decrypt(text_input)
        decrypt_text_de = decrypt_text.strip(b"\x00").strip(b"\x01").strip(b"\x02").strip(b"\x03").strip(b"\x04").strip(
            b"\x05").strip(b"\x06").strip(b"\x07").strip(b"\x08").strip(b"\x09").strip(b"\x10").strip(b"\x0a").strip(
            b"\x0b").strip(b"\x0c").strip(b"\x0d").strip(b"\x0e").strip(b"\x0f").decode(encoding="utf-8")
        return decrypt_text_de

 类似资料: