subprocess — 子进程操作库
优质
小牛编辑
131浏览
2023-12-01
运行外部命令
import subprocess
completed = subprocess.run(['ls', '-l'])
print('returncode:', completed.returncode)
import subprocess
completed = subprocess.run('echo $HOME', shell=True)
print('returncode:', completed.returncode)
错误处理
import subprocess
try:
subprocess.run(['false'], check=True)
except subprocess.CalledProcessError as err:
print('ERROR:', err)
捕获输出
import subprocess
completed = subprocess.run(
['ls', '-1'],
stdout=subprocess.PIPE,
)
print('returncode:', completed.returncode)
print('Have {} bytes in stdout:\n{}'.format(
len(completed.stdout),
completed.stdout.decode('utf-8'))
)
import subprocess
try:
completed = subprocess.run(
'echo to stdout; echo to stderr 1>&2; exit 1',
check=True,
shell=True,
stdout=subprocess.PIPE,
)
except subprocess.CalledProcessError as err:
print('ERROR:', err)
else:
print('returncode:', completed.returncode)
print('Have {} bytes in stdout: {!r}'.format(
len(completed.stdout),
completed.stdout.decode('utf-8'))
)
import subprocess
try:
completed = subprocess.run(
'echo to stdout; echo to stderr 1>&2; exit 1',
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
except subprocess.CalledProcessError as err:
print('ERROR:', err)
else:
print('returncode:', completed.returncode)
print('Have {} bytes in stdout: {!r}'.format(
len(completed.stdout),
completed.stdout.decode('utf-8'))
)
print('Have {} bytes in stderr: {!r}'.format(
len(completed.stderr),
completed.stderr.decode('utf-8'))
)
import subprocess
try:
output = subprocess.check_output(
'echo to stdout; echo to stderr 1>&2',
shell=True,
stderr=subprocess.STDOUT,
)
except subprocess.CalledProcessError as err:
print('ERROR:', err)
else:
print('Have {} bytes in output: {!r}'.format(
len(output),
output.decode('utf-8'))
)
隐藏输出
import subprocess
try:
completed = subprocess.run(
'echo to stdout; echo to stderr 1>&2; exit 1',
shell=True,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
except subprocess.CalledProcessError as err:
print('ERROR:', err)
else:
print('returncode:', completed.returncode)
print('stdout is {!r}'.format(completed.stdout))
print('stderr is {!r}'.format(completed.stderr))
与管道直接合作
单向通信进程
import subprocess
print('read:')
proc = subprocess.Popen(
['echo', '"to stdout"'],
stdout=subprocess.PIPE,
)
stdout_value = proc.communicate()[0].decode('utf-8')
print('stdout:', repr(stdout_value))
import subprocess
print('write:')
proc = subprocess.Popen(
['cat', '-'],
stdin=subprocess.PIPE,
)
proc.communicate('stdin: to stdin\n'.encode('utf-8'))
双向通信进程
import subprocess
print('popen2:')
proc = subprocess.Popen(
['cat', '-'],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
)
msg = 'through stdin to stdout'.encode('utf-8')
stdout_value = proc.communicate(msg)[0].decode('utf-8')
print('pass through:', repr(stdout_value))
捕获错误输出
import subprocess
print('popen3:')
proc = subprocess.Popen(
'cat -; echo "to stderr" 1>&2',
shell=True,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
msg = 'through stdin to stdout'.encode('utf-8')
stdout_value, stderr_value = proc.communicate(msg)
print('pass through:', repr(stdout_value.decode('utf-8')))
print('stderr :', repr(stderr_value.decode('utf-8')))
Combining Regular and Error Output
import subprocess
print('popen4:')
proc = subprocess.Popen(
'cat -; echo "to stderr" 1>&2',
shell=True,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
)
msg = 'through stdin to stdout\n'.encode('utf-8')
stdout_value, stderr_value = proc.communicate(msg)
print('combined output:', repr(stdout_value.decode('utf-8')))
print('stderr value :', repr(stderr_value))
Connecting Segments of a Pipe
import subprocess
cat = subprocess.Popen(
['cat', 'index.rst'],
stdout=subprocess.PIPE,
)
grep = subprocess.Popen(
['grep', '.. literalinclude::'],
stdin=cat.stdout,
stdout=subprocess.PIPE,
)
cut = subprocess.Popen(
['cut', '-f', '3', '-d:'],
stdin=grep.stdout,
stdout=subprocess.PIPE,
)
end_of_pipe = cut.stdout
print('Included files:')
for line in end_of_pipe:
print(line.decode('utf-8').strip())
与另一个命令交互
import sys
sys.stderr.write('repeater.py: starting\n')
sys.stderr.flush()
while True:
next_line = sys.stdin.readline()
sys.stderr.flush()
if not next_line:
break
sys.stdout.write(next_line)
sys.stdout.flush()
sys.stderr.write('repeater.py: exiting\n')
sys.stderr.flush()
import io
import subprocess
print('One line at a time:')
proc = subprocess.Popen(
'python3 repeater.py',
shell=True,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
)
stdin = io.TextIOWrapper(
proc.stdin,
encoding='utf-8',
line_buffering=True, # send data on newline
)
stdout = io.TextIOWrapper(
proc.stdout,
encoding='utf-8',
)
for i in range(5):
line = '{}\n'.format(i)
stdin.write(line)
output = stdout.readline()
print(output.rstrip())
remainder = proc.communicate()[0].decode('utf-8')
print(remainder)
print()
print('All output at once:')
proc = subprocess.Popen(
'python3 repeater.py',
shell=True,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
)
stdin = io.TextIOWrapper(
proc.stdin,
encoding='utf-8',
)
for i in range(5):
line = '{}\n'.format(i)
stdin.write(line)
stdin.flush()
output = proc.communicate()[0].decode('utf-8')
print(output)
进程间发信号
signal_child.py
import os
import signal
import time
import sys
pid = os.getpid()
received = False
def signal_usr1(signum, frame):
"Callback invoked when a signal is received"
global received
received = True
print('CHILD {:>6}: Received USR1'.format(pid))
sys.stdout.flush()
print('CHILD {:>6}: Setting up signal handler'.format(pid))
sys.stdout.flush()
signal.signal(signal.SIGUSR1, signal_usr1)
print('CHILD {:>6}: Pausing to wait for signal'.format(pid))
sys.stdout.flush()
time.sleep(3)
if not received:
print('CHILD {:>6}: Never received signal'.format(pid))
signal_parent.py
import os
import signal
import subprocess
import time
import sys
proc = subprocess.Popen(['python3', 'signal_child.py'])
print('PARENT : Pausing before sending signal...')
sys.stdout.flush()
time.sleep(1)
print('PARENT : Signaling child')
sys.stdout.flush()
os.kill(proc.pid, signal.SIGUSR1)
Process Groups / Sessions
import os
import signal
import subprocess
import tempfile
import time
import sys
script = '''#!/bin/sh
echo "Shell script in process $$"
set -x
python3 signal_child.py
'''
script_file = tempfile.NamedTemporaryFile('wt')
script_file.write(script)
script_file.flush()
proc = subprocess.Popen(['sh', script_file.name])
print('PARENT : Pausing before signaling {}...'.format(
proc.pid))
sys.stdout.flush()
time.sleep(1)
print('PARENT : Signaling child {}'.format(proc.pid))
sys.stdout.flush()
os.kill(proc.pid, signal.SIGUSR1)
time.sleep(3)
import os
import signal
import subprocess
import tempfile
import time
import sys
def show_setting_prgrp():
print('Calling os.setpgrp() from {}'.format(os.getpid()))
os.setpgrp()
print('Process group is now {}'.format(os.getpgrp()))
sys.stdout.flush()
script = '''#!/bin/sh
echo "Shell script in process $$"
set -x
python3 signal_child.py
'''
script_file = tempfile.NamedTemporaryFile('wt')
script_file.write(script)
script_file.flush()
proc = subprocess.Popen(
['sh', script_file.name],
preexec_fn=show_setting_prgrp,
)
print('PARENT : Pausing before signaling {}...'.format(
proc.pid))
sys.stdout.flush()
time.sleep(1)
print('PARENT : Signaling process group {}'.format(
proc.pid))
sys.stdout.flush()
os.killpg(proc.pid, signal.SIGUSR1)
time.sleep(3)