'Linking subprocesses with tee in python

I'm playing with tee and Python's subprocesses, and have some issues. I would like to write a Python program that could take a command (that writes to stdout), and feed the output to a given number of commands. I have realized it may possible to use fifos for this.

I have also found various solutions involving Python; I am trying to restrict myself to low-level OS operations, for performance reasons (the idea would be to write a backup tool, so the data streams may be huge); although I might be misleading, and using pure python operation might the way to go (here's an example of how it could be done in Python).

Here's a small version of what I wrote so far

import os
import subprocess

def test_pipe_fanout_example():
    fifo_input = "/tmp/fifo_i"
    fifo_output = "/tmp/fifo_o"
    for filename in [fifo_input, fifo_output]:
        os.mkfifo(filename)

    input_cmd = "echo -n 'hello world'"
    out_file = "/tmp/out"

    stdin = os.open(fifo_output, os.O_RDONLY | os.O_NONBLOCK)
    op = subprocess.Popen(f"cat > {out_file}", stdin=stdin, shell=True, close_fds=True)

    fanout_command = f"tee {fifo_output}"
    fanout_stdin = os.open(fifo_input, os.O_RDONLY | os.O_NONBLOCK)
    stdout = os.open(fifo_input, os.O_WRONLY)

    print("Running input")
    ip = subprocess.Popen(input_cmd, stdout=stdout, shell=True, close_fds=True)

    print("Running fanout")
    fanout = subprocess.Popen(
        fanout_command,
        stdin=fanout_stdin,
        shell=True, close_fds=True
    )
    ip.wait()
    print("IP", ip.returncode)
    os.close(stdout)
    fanout.wait()
    os.close(fanout_stdin)
    print(f"fanout {fanout}")

    op.wait()
    print(f"OP {op}")


    for filename in [fifo_input, fifo_output]:
        os.unlink(filename)

    with open(out_file) as f:
        data = f.read()
        assert data == "hello world"

The program runs, but the /tmp/out file is empty at the end of the operation. Does anyone have any clues of what I might be doing wrong, or if using Python's primitives might be a better idea?


Edit: I feel like the problem might come from the fact that tee reads stdin in a blocking manner. This makes running the tee subprocess before the input process fail with /usr/bin/tee: read error: Resource temporarily unavailable; running it after the input subprocess will lead to some of the streamed data be lost.



Solution 1:[1]

Thanks to the comments in the OP, and after fidling some more, I think I can present a solution to this problem. The tee operation is indeed not too complicated to implement in Python !

import os
import subprocess


def fd_fanout(fd_in, fd_out):
    file_in = os.fdopen(fd_in, "rb")
    os.set_blocking(file_in.fileno(), False)
    file_out = os.fdopen(fd_out, "wb")
    while True:
        data = file_in.read(1000)
        if data is None:
            break
        file_out.write(data)


def test_pipe_fanout_example():
    data_in = "hello world"*1000
    fifo_input = "/tmp/fifo_i"
    fifo_output = "/tmp/fifo_o"
    for filename in [fifo_input, fifo_output]:
        os.mkfifo(filename)

    input_cmd = ["/usr/bin/echo", "-n", data_in]
    out_file = "/tmp/out"

    output_r = os.open(fifo_output, os.O_RDONLY | os.O_NONBLOCK)
    output_w = os.open(fifo_output, os.O_WRONLY)
    input_r = os.open(fifo_input, os.O_RDONLY | os.O_NONBLOCK)
    input_w = os.open(fifo_input, os.O_WRONLY)

    out = open(out_file, "wb")

    print("running input")
    ip = subprocess.Popen(input_cmd, stdout=input_w, close_fds=True)

    print("Running output")
    op = subprocess.Popen("cat", stdin=output_r, stdout=out, close_fds=True)

    fd_fanout(input_r, output_w)

    ip.wait()
    print("IP", ip)
    os.close(input_w)
    # os.close(input_r)

    op.wait()
    print("OP", op)
    os.close(output_r)

    for filename in [fifo_input, fifo_output]:
        os.unlink(filename)

    with open(out_file) as f:
        data = f.read()
        print("DATA", len(data))
        assert data == data_in

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 saihtamtellim