'How to run Python subprocess and stream but also filter stdout and stderr?

I have a server-like app I want to run from Python. It never stops until user interrupts it. I want to continuously redirect both stdout and stderr to parent when the app runs. Lucklily, that's exactly what subprocess.run does.

Shell:

$ my-app
1
2
3
...

wrapper.py:

import subprocess
subprocess.run(['my-app'])

Executing wrapper.py:

$ python wrapper.py
1
2
3
...

I believe it's thanks to the fact that subprocess.run inherits stdout and stderr file descriptiors from the parent process. Good.

But now I need to do something when the app outputs particular line. Imagine I want to run arbitrary Python code when the output line will contain 4:

$ python wrapper.py
1
2
3
4   <-- here I want to do something
...

Or I want to remove some lines from the output:

$ python wrapper.py   <-- allowed only odd numbers
1
3
...

I thought I could have a filtering function which I'll just hook somehow into the subprocess.run and it will get called with every line of the output, regardless whether it's stdout or stderr:

def filter_fn(line):
    if line ...:
        return line.replace(...
    ...

But how to achieve this? How to hook such or similar function into the subprocess.run call?


Note: I can't use the sh library as it has zero support for Windows.



Solution 1:[1]

If you want to be able to process stdout or stderr for a subprocess, just pass subprocess.PIPE for the parameter stdout (resp. stderr). You can then access the output stream from the subprocess as proc.stdout, by default as a byte stream, but you can get it as strings with universal_newlines = True. Example:

import subprocess
app = subprocess.Popen(['my-app'], stdout = subprocess.PIPE, universal_newlines = True)
for line in app.stdout:
    if line.strip() == '4':
        # special processing
    else:
        sys.stdout.write(line)

What you must pay attention, is that to be able to process output as soon as it is written by the subprocess, the subprocess must flush output after each line. By default, stdout is line buffered when directed to a terminal - each line is printed on the newline - but is size buffered when directed to a file or pipe, meaning that it is flushed only every 8k or 16k characters.

In that case, whatever you do on caller size, you will only get stdout when the program is finished.

Solution 2:[2]

I believe this code will do it. The previous answer does not address reading from two streams at the same time which requires asyncio. Otherwise the other answer could work for filtering stdout and then doing stderr after stdout.

This is python 3.8 which has more descriptive method names for asyncio.

Update 2021-Aug-25: Using asyncio.run and asyncio.gather as higher level, easier to understand functions rather than manipulating the asyncio loop directly.

import sys
import asyncio


async def output_filter(input_stream, output_stream):
    while not input_stream.at_eof():
        output = await input_stream.readline()
        if not output.startswith(b"filtered"):
            output_stream.buffer.write(output)
            output_stream.flush()


async def run_command(command):
    process = await asyncio.create_subprocess_exec(
        *command, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
    )

    await asyncio.gather(
        output_filter(process.stderr, sys.stderr),
        output_filter(process.stdout, sys.stdout),
    )
    # process.communicate() will have no data to read but will close the
    # pipes that are implemented in C, whereas process.wait() will not
    await process.communicate()


def main():
    asyncio.run(run_command(["python", "sample_process.py"]))


if __name__ == "__main__":
    main()

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1
Solution 2 Community