selectors — I/O 多路选择的抽象
优质
小牛编辑
134浏览
2023-12-01
用途: 提供独立于平台的基于select模块的I/O多路复用的抽象
运行模式
回显服务端
# selectors_echo_server.py
import selectors
import socket
mysel = selectors.DefaultSelector()
keep_running = True
def read(connection, mask):
"Callback for read events"
global keep_running
client_address = connection.getpeername()
print('read({})'.format(client_address))
data = connection.recv(1024)
if data:
# A readable client socket has data
print(' received {!r}'.format(data))
connection.sendall(data)
else:
# Interpret empty result as closed connection
print(' closing')
mysel.unregister(connection)
connection.close()
# Tell the main loop to stop
keep_running = False
def accept(sock, mask):
"Callback for new connections"
new_connection, addr = sock.accept()
print('accept({})'.format(addr))
new_connection.setblocking(False)
mysel.register(new_connection, selectors.EVENT_READ, read)
server_address = ('localhost', 10000)
print('starting up on {} port {}'.format(*server_address))
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setblocking(False)
server.bind(server_address)
server.listen(5)
mysel.register(server, selectors.EVENT_READ, accept)
while keep_running:
print('waiting for I/O')
for key, mask in mysel.select(timeout=1):
callback = key.data
callback(key.fileobj, mask)
print('shutting down')
mysel.close()
回显客户端
# selectors_echo_client.py
import selectors
import socket
mysel = selectors.DefaultSelector()
keep_running = True
outgoing = [
b'It will be repeated.',
b'This is the message. ',
]
bytes_sent = 0
bytes_received = 0
# Connecting is a blocking operation, so call setblocking()
# after it returns.
server_address = ('localhost', 10000)
print('connecting to {} port {}'.format(*server_address))
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect(server_address)
sock.setblocking(False)
# Set up the selector to watch for when the socket is ready
# to send data as well as when there is data to read.
mysel.register(
sock,
selectors.EVENT_READ | selectors.EVENT_WRITE,
)
while keep_running:
print('waiting for I/O')
for key, mask in mysel.select(timeout=1):
connection = key.fileobj
client_address = connection.getpeername()
print('client({})'.format(client_address))
if mask & selectors.EVENT_READ:
print(' ready to read')
data = connection.recv(1024)
if data:
# A readable client socket has data
print(' received {!r}'.format(data))
bytes_received += len(data)
# Interpret empty result as closed connection,
# and also close when we have received a copy
# of all of the data sent.
keep_running = not (
data or
(bytes_received and
(bytes_received == bytes_sent))
)
if mask & selectors.EVENT_WRITE:
print(' ready to write')
if not outgoing:
# We are out of messages, so we no longer need to
# write anything. Change our registration to let
# us keep reading responses from the server.
print(' switching to read-only')
mysel.modify(sock, selectors.EVENT_READ)
else:
# Send the next message.
next_msg = outgoing.pop()
print(' sending {!r}'.format(next_msg))
sock.sendall(next_msg)
bytes_sent += len(next_msg)
print('shutting down')
mysel.unregister(connection)
connection.close()
mysel.close()