bz2 — bzip2 压缩库
优质
小牛编辑
132浏览
2023-12-01
在内存中单次操作
# bz2_memory.py
import bz2
import binascii
original_data = b'This is the original text.'
print('Original : {} bytes'.format(len(original_data)))
print(original_data)
print()
compressed = bz2.compress(original_data)
print('Compressed : {} bytes'.format(len(compressed)))
hex_version = binascii.hexlify(compressed)
for i in range(len(hex_version) // 40 + 1):
print(hex_version[i * 40:(i + 1) * 40])
print()
decompressed = bz2.decompress(compressed)
print('Decompressed : {} bytes'.format(len(decompressed)))
print(decompressed)
# bz2_lengths.py
import bz2
original_data = b'This is the original text.'
fmt = '{:>15} {:>15}'
print(fmt.format('len(data)', 'len(compressed)'))
print(fmt.format('-' * 15, '-' * 15))
for i in range(5):
data = original_data * i
compressed = bz2.compress(data)
print(fmt.format(len(data), len(compressed)), end='')
print('*' if len(data) < len(compressed) else '')
增量压缩和解压缩
# bz2_incremental.py
import bz2
import binascii
import io
compressor = bz2.BZ2Compressor()
with open('lorem.txt', 'rb') as input:
while True:
block = input.read(64)
if not block:
break
compressed = compressor.compress(block)
if compressed:
print('Compressed: {}'.format(
binascii.hexlify(compressed)))
else:
print('buffering...')
remaining = compressor.flush()
print('Flushed: {}'.format(binascii.hexlify(remaining)))
混合内容流
# bz2_mixed.py
import bz2
lorem = open('lorem.txt', 'rt').read().encode('utf-8')
compressed = bz2.compress(lorem)
combined = compressed + lorem
decompressor = bz2.BZ2Decompressor()
decompressed = decompressor.decompress(combined)
decompressed_matches = decompressed == lorem
print('Decompressed matches lorem:', decompressed_matches)
unused_matches = decompressor.unused_data == lorem
print('Unused data matches lorem :', unused_matches)
写压缩文件
# bz2_file_write.py
import bz2
import io
import os
data = 'Contents of the example file go here.\n'
with bz2.BZ2File('example.bz2', 'wb') as output:
with io.TextIOWrapper(output, encoding='utf-8') as enc:
enc.write(data)
os.system('file example.bz2')
# bz2_file_compresslevel.py
import bz2
import io
import os
data = open('lorem.txt', 'r', encoding='utf-8').read() * 1024
print('Input contains {} bytes'.format(
len(data.encode('utf-8'))))
for i in range(1, 10):
filename = 'compress-level-{}.bz2'.format(i)
with bz2.BZ2File(filename, 'wb', compresslevel=i) as output:
with io.TextIOWrapper(output, encoding='utf-8') as enc:
enc.write(data)
os.system('cksum {}'.format(filename))
# bz2_file_writelines.py
import bz2
import io
import itertools
import os
data = 'The same line, over and over.\n'
with bz2.BZ2File('lines.bz2', 'wb') as output:
with io.TextIOWrapper(output, encoding='utf-8') as enc:
enc.writelines(itertools.repeat(data, 10))
os.system('bzcat lines.bz2')
读压缩文件
# bz2_file_read.py
import bz2
import io
with bz2.BZ2File('example.bz2', 'rb') as input:
with io.TextIOWrapper(input, encoding='utf-8') as dec:
print(dec.read())
# bz2_file_seek.py
import bz2
import contextlib
with bz2.BZ2File('example.bz2', 'rb') as input:
print('Entire file:')
all_data = input.read()
print(all_data)
expected = all_data[5:15]
# rewind to beginning
input.seek(0)
# move ahead 5 bytes
input.seek(5)
print('Starting at position 5 for 10 bytes:')
partial = input.read(10)
print(partial)
print()
print(expected == partial)
读写Unicode数据
# bz2_unicode.py
import bz2
import os
data = 'Character with an åccent.'
with bz2.open('example.bz2', 'wt', encoding='utf-8') as output:
output.write(data)
with bz2.open('example.bz2', 'rt', encoding='utf-8') as input:
print('Full file: {}'.format(input.read()))
# Move to the beginning of the accented character.
with bz2.open('example.bz2', 'rt', encoding='utf-8') as input:
input.seek(18)
print('One character: {}'.format(input.read(1)))
# Move to the middle of the accented character.
with bz2.open('example.bz2', 'rt', encoding='utf-8') as input:
input.seek(19)
try:
print(input.read(1))
except UnicodeDecodeError:
print('ERROR: failed to decode')
压缩网络数据
# bz2_server.py
import bz2
import logging
import socketserver
import binascii
BLOCK_SIZE = 32
class Bz2RequestHandler(socketserver.BaseRequestHandler):
logger = logging.getLogger('Server')
def handle(self):
compressor = bz2.BZ2Compressor()
# Find out what file the client wants
filename = self.request.recv(1024).decode('utf-8')
self.logger.debug('client asked for: "%s"', filename)
# Send chunks of the file as they are compressed
with open(filename, 'rb') as input:
while True:
block = input.read(BLOCK_SIZE)
if not block:
break
self.logger.debug('RAW %r', block)
compressed = compressor.compress(block)
if compressed:
self.logger.debug(
'SENDING %r',
binascii.hexlify(compressed))
self.request.send(compressed)
else:
self.logger.debug('BUFFERING')
# Send any data being buffered by the compressor
remaining = compressor.flush()
while remaining:
to_send = remaining[:BLOCK_SIZE]
remaining = remaining[BLOCK_SIZE:]
self.logger.debug('FLUSHING %r',
binascii.hexlify(to_send))
self.request.send(to_send)
return
if __name__ == '__main__':
import socket
import sys
from io import StringIO
import threading
logging.basicConfig(level=logging.DEBUG,
format='%(name)s: %(message)s',
)
# Set up a server, running in a separate thread
address = ('localhost', 0) # let the kernel assign a port
server = socketserver.TCPServer(address, Bz2RequestHandler)
ip, port = server.server_address # what port was assigned?
t = threading.Thread(target=server.serve_forever)
t.setDaemon(True)
t.start()
logger = logging.getLogger('Client')
# Connect to the server
logger.info('Contacting server on %s:%s', ip, port)
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect((ip, port))
# Ask for a file
requested_file = (sys.argv[0]
if len(sys.argv) > 1
else 'lorem.txt')
logger.debug('sending filename: "%s"', requested_file)
len_sent = s.send(requested_file.encode('utf-8'))
# Receive a response
buffer = StringIO()
decompressor = bz2.BZ2Decompressor()
while True:
response = s.recv(BLOCK_SIZE)
if not response:
break
logger.debug('READ %r', binascii.hexlify(response))
# Include any unconsumed data when feeding the
# decompressor.
decompressed = decompressor.decompress(response)
if decompressed:
logger.debug('DECOMPRESSED %r', decompressed)
buffer.write(decompressed.decode('utf-8'))
else:
logger.debug('BUFFERING')
full_response = buffer.getvalue()
lorem = open(requested_file, 'rt').read()
logger.debug('response matches file contents: %s',
full_response == lorem)
# Clean up
server.shutdown()
server.socket.close()
s.close()