'Python multiprocessing-like execution over ssh

Looking at the basic example from python's multiprocessing docs page:

from multiprocessing import Pool

def f(x):
    return x*x

if __name__ == '__main__':
    with Pool(5) as p:
        print(p.map(f, [1, 2, 3]))

This will execute f in separate processes that are auto-started, but on the local machine.

I see that it has support for remote execution, but that requires the managers to be started manually and also looks to be networking-only (i.e. outside of SSH, with no support for e.g. stdin / stdout serialization or something of sorts).

Is there a way to call python functions (as opposed to executables, as can be done e.g. using paramiko.client.SSHClient.exec_command) on remote hosts via SSH automatically? By "automatically" I mean without needing to manually handle process starting / stopping and communication (serialization of input parameters and return value).



Solution 1:[1]

The following code below is an example of how I would execute multiple remote commands concurrently using a multithreading pool. command1 below shows how I would invoke a remote Python function. Note that the full path to the Python interpreter may be required (unless it is in your home directory) since the usual environment PATH variable is not set up since your .bash_profile script will not have been executed.

The return value from invoking foo is "printed" out and thus will be the stdout output response returned by function connect_and_execute_command. This by definition will be a string. If the type returned by foo is a builtin type, such as an int or a dict containing builtin types for its values, then the string representation of the type can be converted back into its "real" type using function ast.literal_eval.

import paramiko
from multiprocessing.pool import ThreadPool
from ast import literal_eval

def execute_command(client, command):
    """
    Execute a command with client, which is already connect to some host.
    """
    stdin_, stdout_, stderr_ = client.exec_command(command)
    return [stdout_.read().decode(), stderr_.read().decode()]

def connect(hostname, username, password=None):
    client = paramiko.client.SSHClient()
    client.load_system_host_keys()
    client.set_missing_host_key_policy(paramiko.client.AutoAddPolicy())
    client.connect(hostname=hostname, username=username, password=password)
    return client

def connect_and_execute_command(command, hostname, username, password=None):
    with connect(hostname, username) as client:
        return execute_command(client, command)

command0, host0, user0, password0 = 'ls -l', 'some_host0', 'some_username0', 'some_password0'
command1, host1, user1, password1 = '''~/my_local_python/python -c "from temp import foo; print(foo(6), end='')"''', 'some_host1', 'some_username0', 'some_password1'

requests = ((command0, host0, user0, password0), (command1, host1, user1, password1))
with ThreadPool(len(requests)) as pool:
    results = pool.starmap(connect_and_execute_command, requests)
    # Convert stdout response from command1:
    results[1][0] = literal_eval(results[1][0])
    for stdout_response, _ in results:
        print(stdout_response, end='')
        print()

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 Booboo