'Linking subprocesses with tee in python
I'm playing with tee and Python's subprocesses, and have some issues. I would like to write a Python program that could take a command (that writes to stdout), and feed the output to a given number of commands. I have realized it may possible to use fifos for this.
I have also found various solutions involving Python; I am trying to restrict myself to low-level OS operations, for performance reasons (the idea would be to write a backup tool, so the data streams may be huge); although I might be misleading, and using pure python operation might the way to go (here's an example of how it could be done in Python).
Here's a small version of what I wrote so far
import os
import subprocess
def test_pipe_fanout_example():
fifo_input = "/tmp/fifo_i"
fifo_output = "/tmp/fifo_o"
for filename in [fifo_input, fifo_output]:
os.mkfifo(filename)
input_cmd = "echo -n 'hello world'"
out_file = "/tmp/out"
stdin = os.open(fifo_output, os.O_RDONLY | os.O_NONBLOCK)
op = subprocess.Popen(f"cat > {out_file}", stdin=stdin, shell=True, close_fds=True)
fanout_command = f"tee {fifo_output}"
fanout_stdin = os.open(fifo_input, os.O_RDONLY | os.O_NONBLOCK)
stdout = os.open(fifo_input, os.O_WRONLY)
print("Running input")
ip = subprocess.Popen(input_cmd, stdout=stdout, shell=True, close_fds=True)
print("Running fanout")
fanout = subprocess.Popen(
fanout_command,
stdin=fanout_stdin,
shell=True, close_fds=True
)
ip.wait()
print("IP", ip.returncode)
os.close(stdout)
fanout.wait()
os.close(fanout_stdin)
print(f"fanout {fanout}")
op.wait()
print(f"OP {op}")
for filename in [fifo_input, fifo_output]:
os.unlink(filename)
with open(out_file) as f:
data = f.read()
assert data == "hello world"
The program runs, but the /tmp/out file is empty at the end of the operation. Does anyone have any clues of what I might be doing wrong, or if using Python's primitives might be a better idea?
Edit: I feel like the problem might come from the fact that tee reads stdin in a blocking manner. This makes running the tee subprocess before the input process fail with /usr/bin/tee: read error: Resource temporarily unavailable; running it after the input subprocess will lead to some of the streamed data be lost.
Solution 1:[1]
Thanks to the comments in the OP, and after fidling some more, I think I can present a solution to this problem. The tee operation is indeed not too complicated to implement in Python !
import os
import subprocess
def fd_fanout(fd_in, fd_out):
file_in = os.fdopen(fd_in, "rb")
os.set_blocking(file_in.fileno(), False)
file_out = os.fdopen(fd_out, "wb")
while True:
data = file_in.read(1000)
if data is None:
break
file_out.write(data)
def test_pipe_fanout_example():
data_in = "hello world"*1000
fifo_input = "/tmp/fifo_i"
fifo_output = "/tmp/fifo_o"
for filename in [fifo_input, fifo_output]:
os.mkfifo(filename)
input_cmd = ["/usr/bin/echo", "-n", data_in]
out_file = "/tmp/out"
output_r = os.open(fifo_output, os.O_RDONLY | os.O_NONBLOCK)
output_w = os.open(fifo_output, os.O_WRONLY)
input_r = os.open(fifo_input, os.O_RDONLY | os.O_NONBLOCK)
input_w = os.open(fifo_input, os.O_WRONLY)
out = open(out_file, "wb")
print("running input")
ip = subprocess.Popen(input_cmd, stdout=input_w, close_fds=True)
print("Running output")
op = subprocess.Popen("cat", stdin=output_r, stdout=out, close_fds=True)
fd_fanout(input_r, output_w)
ip.wait()
print("IP", ip)
os.close(input_w)
# os.close(input_r)
op.wait()
print("OP", op)
os.close(output_r)
for filename in [fifo_input, fifo_output]:
os.unlink(filename)
with open(out_file) as f:
data = f.read()
print("DATA", len(data))
assert data == data_in
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|---|
| Solution 1 | saihtamtellim |
