signal — 异步系统事件
优质
小牛编辑
126浏览
2023-12-01
接收信号
import signal
import os
import time
def receive_signal(signum, stack):
print('Received:', signum)
# Register signal handlers
signal.signal(signal.SIGUSR1, receive_signal)
signal.signal(signal.SIGUSR2, receive_signal)
# Print the process ID so it can be used with 'kill'
# to send this program signals.
print('My PID is:', os.getpid())
while True:
print('Waiting...')
time.sleep(3)
检索注册处理程序
import signal
def alarm_received(n, stack):
return
signal.signal(signal.SIGALRM, alarm_received)
signals_to_names = {
getattr(signal, n): n
for n in dir(signal)
if n.startswith('SIG') and '_' not in n
}
for s, name in sorted(signals_to_names.items()):
handler = signal.getsignal(s)
if handler is signal.SIG_DFL:
handler = 'SIG_DFL'
elif handler is signal.SIG_IGN:
handler = 'SIG_IGN'
print('{:<10} ({:2d}):'.format(name, s), handler)
发送信号
时钟
import signal
import time
def receive_alarm(signum, stack):
print('Alarm :', time.ctime())
# Call receive_alarm in 2 seconds
signal.signal(signal.SIGALRM, receive_alarm)
signal.alarm(2)
print('Before:', time.ctime())
time.sleep(4)
print('After :', time.ctime())
忽略信号
import signal
import os
import time
def do_exit(sig, stack):
raise SystemExit('Exiting')
signal.signal(signal.SIGINT, signal.SIG_IGN)
signal.signal(signal.SIGUSR1, do_exit)
print('My PID:', os.getpid())
signal.pause()
信号与线程
import signal
import threading
import os
import time
def signal_handler(num, stack):
print('Received signal {} in {}'.format(
num, threading.currentThread().name))
signal.signal(signal.SIGUSR1, signal_handler)
def wait_for_signal():
print('Waiting for signal in',
threading.currentThread().name)
signal.pause()
print('Done waiting')
# Start a thread that will not receive the signal
receiver = threading.Thread(
target=wait_for_signal,
name='receiver',
)
receiver.start()
time.sleep(0.1)
def send_signal():
print('Sending signal in', threading.currentThread().name)
os.kill(os.getpid(), signal.SIGUSR1)
sender = threading.Thread(target=send_signal, name='sender')
sender.start()
sender.join()
# Wait for the thread to see the signal (not going to happen!)
print('Waiting for', receiver.name)
signal.alarm(2)
receiver.join()
import signal
import time
import threading
def signal_handler(num, stack):
print(time.ctime(), 'Alarm in',
threading.currentThread().name)
signal.signal(signal.SIGALRM, signal_handler)
def use_alarm():
t_name = threading.currentThread().name
print(time.ctime(), 'Setting alarm in', t_name)
signal.alarm(1)
print(time.ctime(), 'Sleeping in', t_name)
time.sleep(3)
print(time.ctime(), 'Done with sleep in', t_name)
# Start a thread that will not receive the signal
alarm_thread = threading.Thread(
target=use_alarm,
name='alarm_thread',
)
alarm_thread.start()
time.sleep(0.1)
# Wait for the thread to see the signal (not going to happen!)
print(time.ctime(), 'Waiting for', alarm_thread.name)
alarm_thread.join()
print(time.ctime(), 'Exiting normally')