zlib — GNU zlib 压缩库
优质
小牛编辑
131浏览
2023-12-01
处理内存中的数据
# zlib_memory.py
import zlib
import binascii
original_data = b'This is the original text.'
print('Original :', len(original_data), original_data)
compressed = zlib.compress(original_data)
print('Compressed :', len(compressed),
binascii.hexlify(compressed))
decompressed = zlib.decompress(compressed)
print('Decompressed :', len(decompressed), decompressed)
# zlib_lengths.py
import zlib
original_data = b'This is the original text.'
template = '{:>15} {:>15}'
print(template.format('len(data)', 'len(compressed)'))
print(template.format('-' * 15, '-' * 15))
for i in range(5):
data = original_data * i
compressed = zlib.compress(data)
highlight = '*' if len(data) < len(compressed) else ''
print(template.format(len(data), len(compressed)), highlight)
# zlib_compresslevel.py
import zlib
input_data = b'Some repeated text.\n' * 1024
template = '{:>5} {:>5}'
print(template.format('Level', 'Size'))
print(template.format('-----', '----'))
for i in range(0, 10):
data = zlib.compress(input_data, i)
print(template.format(i, len(data)))
增量压缩和解压缩
# zlib_incremental.py
import zlib
import binascii
compressor = zlib.compressobj(1)
with open('lorem.txt', 'rb') as input:
while True:
block = input.read(64)
if not block:
break
compressed = compressor.compress(block)
if compressed:
print('Compressed: {}'.format(
binascii.hexlify(compressed)))
else:
print('buffering...')
remaining = compressor.flush()
print('Flushed: {}'.format(binascii.hexlify(remaining)))
混合内容流
# zlib_mixed.py
import zlib
lorem = open('lorem.txt', 'rb').read()
compressed = zlib.compress(lorem)
combined = compressed + lorem
decompressor = zlib.decompressobj()
decompressed = decompressor.decompress(combined)
decompressed_matches = decompressed == lorem
print('Decompressed matches lorem:', decompressed_matches)
unused_matches = decompressor.unused_data == lorem
print('Unused data matches lorem :', unused_matches)
校验和
# zlib_checksums.py
import zlib
data = open('lorem.txt', 'rb').read()
cksum = zlib.adler32(data)
print('Adler32: {:12d}'.format(cksum))
print(' : {:12d}'.format(zlib.adler32(data, cksum)))
cksum = zlib.crc32(data)
print('CRC-32 : {:12d}'.format(cksum))
print(' : {:12d}'.format(zlib.crc32(data, cksum)))
网络数据压缩
# zlib_server.py
import zlib
import logging
import socketserver
import binascii
BLOCK_SIZE = 64
class ZlibRequestHandler(socketserver.BaseRequestHandler):
logger = logging.getLogger('Server')
def handle(self):
compressor = zlib.compressobj(1)
# Find out what file the client wants
filename = self.request.recv(1024).decode('utf-8')
self.logger.debug('client asked for: %r', filename)
# Send chunks of the file as they are compressed
with open(filename, 'rb') as input:
while True:
block = input.read(BLOCK_SIZE)
if not block:
break
self.logger.debug('RAW %r', block)
compressed = compressor.compress(block)
if compressed:
self.logger.debug(
'SENDING %r',
binascii.hexlify(compressed))
self.request.send(compressed)
else:
self.logger.debug('BUFFERING')
# Send any data being buffered by the compressor
remaining = compressor.flush()
while remaining:
to_send = remaining[:BLOCK_SIZE]
remaining = remaining[BLOCK_SIZE:]
self.logger.debug('FLUSHING %r',
binascii.hexlify(to_send))
self.request.send(to_send)
return
if __name__ == '__main__':
import socket
import threading
from io import BytesIO
logging.basicConfig(
level=logging.DEBUG,
format='%(name)s: %(message)s',
)
logger = logging.getLogger('Client')
# Set up a server, running in a separate thread
address = ('localhost', 0) # let the kernel assign a port
server = socketserver.TCPServer(address, ZlibRequestHandler)
ip, port = server.server_address # what port was assigned?
t = threading.Thread(target=server.serve_forever)
t.setDaemon(True)
t.start()
# Connect to the server as a client
logger.info('Contacting server on %s:%s', ip, port)
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect((ip, port))
# Ask for a file
requested_file = 'lorem.txt'
logger.debug('sending filename: %r', requested_file)
len_sent = s.send(requested_file.encode('utf-8'))
# Receive a response
buffer = BytesIO()
decompressor = zlib.decompressobj()
while True:
response = s.recv(BLOCK_SIZE)
if not response:
break
logger.debug('READ %r', binascii.hexlify(response))
# Include any unconsumed data when
# feeding the decompressor.
to_decompress = decompressor.unconsumed_tail + response
while to_decompress:
decompressed = decompressor.decompress(to_decompress)
if decompressed:
logger.debug('DECOMPRESSED %r', decompressed)
buffer.write(decompressed)
# Look for unconsumed data due to buffer overflow
to_decompress = decompressor.unconsumed_tail
else:
logger.debug('BUFFERING')
to_decompress = None
# deal with data reamining inside the decompressor buffer
remainder = decompressor.flush()
if remainder:
logger.debug('FLUSHED %r', remainder)
buffer.write(remainder)
full_response = buffer.getvalue()
lorem = open('lorem.txt', 'rb').read()
logger.debug('response matches file contents: %s',
full_response == lorem)
# Clean up
s.close()
server.socket.close()