'How to send and receive messages between processes in Python (through channels)?

I am trying to implement a simple program in which there are several processes that concurrently communicate with each other by sending and receiving messages. In the program, there are 4 participants (each of which corresponds to a process) and communicate with each other as follows:

P1 sends P2 some_message then P2 sends P3 another_message then P3 sends P4 a_message. Based on the messages each participant receives, they perform a specific action.

Obviously, when, for instance, P1 sends P2 a message, P2 is receiving that message from P1, so they are paired.

I have found different approaches none of which are suitable as they seem to be complicated for I am looking for. For example,

  • Python MPI which has a restriction of "There are not enough slots available in the system". There are a few ways suggested to sort out the issue but the solutions are a bit complicated.
  • Socket programming which mostly suits server and client scenario. But my program doesn't have a server. I also checked this answer, which is again based on socket programming.

My question is that isn't there any simpler approach than the above ones so that I can implement what I explained? Is it possible to create communication channels in Python fairly similar to the ones in Golang?



Solution 1:[1]

This code I wrote a while ago to get to grips with os.pipe - it is self contained but not "minimally reproducible" since I don't have the time to redo it. It uses tkinter Uis to simulate processes and sends and receives data between them. Note that the code was written only for my private purpose.

"""Test run of the use of pipes between processes.
       .. processes are control, startup , send and receive.
       .. pipes from control to startup and send
       .. pipe from startup to send
       .. pipe from send to receive
       .. startup, user input of run mode 
            ... prompt, timer (seconds) or number of runs
       .. send, user input of data 
       .. receive, display of data received

    . each process operates independently of, and in isolation from, the other processes until data is transferred through pipes 
    
""" 
#     fr read file descriptor
#     fw write file descriptor
#    wbs write bytes
#    snb string length of output filled with 0 to write as header
#   bsnb for number of bytes written, needed for read
# maxbuf number of bytes of header, 4 digits, max 9999 characters in a string/byte literal
#    onb output number of bytes
#    dbs data read in bytes

import tkinter as tk
from os import pipe as ospipe
from os import read as osread
from os import write as oswrite
from os import close as osclose
from datetime import datetime as dt
from time import monotonic as clock
from functools import partial

BG = '#fa4'
TBG = '#fe8'
SndBG = '#f91'
BLK = '#000'
STOP = '#d30'
START =  '#0b0'
start = clock()

def timer(halt):
    tm = int(clock())
    if int(tm - start) > halt:
        return True
    else: return False

def piperead(r):
    maxbuf = 4
    onb = osread(r,maxbuf)
    oi = int(onb.decode())
    dbs = osread(r,oi).decode()    # bytes to string
    osclose(r)
    return dbs

def pipewrite(w,s):
    wbs = bytes(s, encoding='utf-8')
    snb = str(len(s)).zfill(4)
    bsnb = bytes(snb, encoding='utf-8')
    wbs = bsnb + wbs
    oswrite(w,wbs)
    osclose(w)

def setpipe(process, sub=None, vars=None):
    fdr, fdw = ospipe()
    if sub: process(fdw,proc=(sub,vars))
    else: process(fdw)
    return piperead(fdr)

class Sloop():
    def __init__(sl, pipewrite=None):
        sl.fw = pipewrite
        sl.w = tk.Tk()
        sl.w.geometry('400x200-100+80')
        sl.w.overrideredirect(1)
        sl.w['bg'] = BG
        uifnt = sl.w.tk.call('font', 'create', 'uifnt', '-family','Consolas', '-size',11)
        sl.lvb = tk.Button(sl.w, bg=BLK, activebackground=BG, relief='flat', command=sl.stop)
        sl.lvb.pack()
        sl.lvb.place(width=15,height=15, x=380,y=10)
        sl.sndb = tk.Button(sl.w,bg=SndBG,activebackground=BG,fg=TBG,  text=chr(11166), command=sl.send)
        sl.sndb.pack()
        sl.sndb.place(width=25,height=25, x=20,y=160)
        sl.tlbl = tk.Label(sl.w,bg=BG, text='write data to send...')
        sl.tlbl.pack()
        sl.tlbl.place(x=20,y=20)
        sl.t = tk.Text(sl.w,bg=TBG)
        sl.t.pack()
        sl.t.place(width=300,height=100, x=20,y=45)
        sl.t.focus_set()
        sl.w.mainloop() 

    def send(sl):
        sl.output = sl.t.get('1.0','end')
        if sl.output != '\n':
            pipewrite(sl.fw,sl.output)
            sl.close()
        else:
            sl.error()

    def error(sl):
        def _clearlbl(ev):
            sl.erlbl.destroy()

        sl.erlbl = tk.Label(sl.w,bg=TBG,text='there is nothing to send')              
        sl.erlbl.pack()
        sl.erlbl.place(x=20,y=160)
        sl.t.focus_set()
        sl.t.bind('<KeyPress>',_clearlbl)

    def stop(sl):
        pipewrite(sl.fw,'stop')
        sl.close()

    def close(sl):
        sl.w.destroy()

class Rloop():
    def __init__(rl, pipefread=None):
        rl.fr = pipefread
        rl.w = tk.Tk()
        rl.w.geometry('400x200-100+320')
        rl.w.overrideredirect(1)
        rl.w['bg'] = BG
        uifnt = rl.w.tk.call('font', 'create', 'uifnt', '-family','Consolas', '-size',10)
        rl.lvb = tk.Button(rl.w, bg=BLK, activebackground=BG, relief='flat', command=rl.close)
        rl.lvb.pack()
        rl.lvb.place(width=15,height=15, x=380,y=10)
        rl.tlbl = tk.Label(rl.w,bg=BG, text='received...')
        rl.tlbl.pack()
        rl.tlbl.place(x=20,y=20)
        rl.t = tk.Text(rl.w,bg=TBG)
        rl.t['font'] = uifnt
        rl.t.pack()
        rl.t.place(width=300,height=100, x=20,y=45)
        rl.t.focus_set()
        rl.receive()
        rl.w.mainloop() 

    def receive(rl):
        rec = piperead(rl.fr)
        if rec != 'stop':
            rl.t.insert('end', '\n'.join([str(dt.now()), rec]))
        else: rl.close()    

    def close(rl):
        rl.w.destroy() 

class Startup():
    def __init__(su, pipefwrite=None):
        su.fw = pipefwrite
        su.mode = ''
        su.w = tk.Tk()
        su.w.geometry('400x200-100+500')
        su.w.overrideredirect(1)
        su.w['bg'] = BG
        uifnt = su.w.tk.call('font', 'create', 'uifnt', '-family','Consolas', '-size',11)
        su.lvb = tk.Button(su.w, bg=BLK, activebackground=BG, relief='flat', command=su.stop)
        su.lvb.pack()
        su.lvb.place(width=15,height=15, x=380,y=10)
        su.sndb = tk.Button(su.w,bg=SndBG,activebackground=BG,fg=TBG,  text=chr(11166), command=su.send)
        su.sndb.pack()
        su.sndb.place(width=25,height=25, x=20,y=160)
        su.title = tk.Label(su.w,bg=BG, text='Modes to continue data input')
        su.title.pack()
        su.titley = 10
        su.title.place(x=20,y=su.titley)

        su.ysp = 20
        su.margin = 200
        ptxt = 'prompt'
        su.pb = tk.Button(su.w,bg=BG, activebackground=BG, text=ptxt,  relief='flat', cursor='hand2', command=partial(su._get,e=None, nm='su.pb', ent=None))
        tmtxt = ' timer '
        su.tmb = tk.Button(su.w,bg=BG, activebackground=BG, text=tmtxt,  relief='flat', cursor='hand2', command=partial(su._enter,ent='su.tmb'))
        rntxt = '  runs  '
        su.rnb = tk.Button(su.w,bg=BG, activebackground=BG, text=rntxt, relief='flat', cursor='hand2', command=partial(su._enter,ent='su.rnb'))
        su.pb.pack()
        su.pby = su.titley + 1.5*su.ysp
        su.pb.place(x=25,y=su.pby)
        su.tmb.pack()
        su.tmby = su.pby + 2*su.ysp
        su.tmb.place(x=25,y=su.tmby)
        su.rnb.pack()
        su.rnby = su.pby + 4*su.ysp
        su.rnb.place(x=25,y=su.rnby)
        su.formd = {'su.pb':su.pb, 'su.tmb':su.tmb, 'su.rnb':su.rnb}
        su.w.mainloop()

    def _able(su,nm):
        for key in su.formd: 
            if nm[0:4] not in key: 
                su.formd[key]['state'] = 'disabled'
            else:
                su.formd[key]['state'] = 'normal'

    def _enter(su,ent):
        if ent == 'su.tmb':
            tmtxt = 'seconds'
            su.tmlbl = tk.Label(su.w,bg=BG, text=tmtxt)
            su.tment = tk.Entry(su.w,bg=TBG)
            su.tmlbl.pack()
            su.tment.pack()
            tmlbly = su.tmby
            su.tmlbl.place(x=su._margin(tmtxt), y=tmlbly)
            su.tment.place(x=su.margin, y=tmlbly)
            su.tment.focus_set()
            su.tment.bind('<Return>', partial(su._get,nm='su.tment', ent=su.tment))
            su.formd = su.formd | {'su.tmlbl':su.tmlbl, 'su.tment':su.tment}
        elif ent == 'su.rnb':
            rntxt = 'number'
            su.rnlbl = tk.Label(su.w,bg=BG, text=rntxt)
            su.rnent = tk.Entry(su.w,bg=TBG)
            su.rnlbl.pack()
            su.rnent.pack()
            rnlbly = su.rnby
            su.rnlbl.place(x=su._margin(rntxt), y=rnlbly)
            su.rnent.place(x=su.margin, y=rnlbly)
            su.rnent.focus_set()
            su.rnent.bind('<Return>', partial(su._get,nm='su.rnent', ent=su.rnent))
            su.formd = su.formd | {'su.rnlbl':su.rnlbl, 'su.rnent':su.rnent}
  
    def _get(su,e,nm,ent):
        if nm == 'su.pb':
            su._able('su.pb')
            su.mode = 'prompt,'+'1'
        else:
            su._able(nm)
            for key in su.formd:
                if key == nm:
                    if 'tm' in key: modestr = 'timer'
                    elif 'rn' in key: modestr = 'runs'
                    su.formd[key]['bg']=BG
                    su.mode = ','.join([modestr,str(ent.get())])
                    break

    def _margin(su,txt):
        return su.margin-(len(txt)*8)

    def send(su):
        pipewrite(su.fw,su.mode)
        su.close()

    def stop(su):
        pipewrite(su.fw,'stop')
        su.close()
 
    def close(su):
        su.w.destroy()

class Control():
    def __init__(c, pipefwrite=None, proc=None):
        c.fw = pipefwrite
        c.proc = proc
        if c.proc:
            c.proc = proc[0]
            if proc[1]:
                c.procv = proc[1]
            else:
                c.procvl = None
        c.procd = {'start':c._strtui, 'prompt':c._prui, 'timer':c._tmui, 'runs':c._rnui}
        c.w = tk.Tk()
        c.w.geometry('100x200-60+80')
        c.w.overrideredirect(1)
        c.w['bg'] = BG
        uifnt = c.w.tk.call('font', 'create', 'uifnt', '-family','Consolas', '-size',11)
        c.lvb = tk.Button(c.w, bg=BLK, activebackground=BG, relief='flat', command=c.stop)
        c.lvb.pack()
        c.lvb.place(width=15,height=15, x=80,y=10)
        c.title = tk.Label(c.w,bg=BG, text='pipe test\nControl')
        c.title.pack()
        c.title.place(x=5,y=5)
        c.stpclr = tk.Label(c.w,bg=STOP)
        c.stpclr.pack()
        stpy = 160
        c.stpclr.place(width=7,height=7,x=2,y=stpy+10)
        c.stopb = tk.Button(c.w, bg=BG, text='stop', cursor='hand2', relief='flat', activebackground=BG, command=c.stop)
        c.stopb.pack()
        c.stopb.place(x=10,y=160)
        c.procd[c.proc]()
        c.w.mainloop()

    def _strtui(c):
        c.strtclr = tk.Label(c.w,bg=START)
        c.strtclr.pack()
        strty = 60
        c.strtclr.place(width=7,height=7,x=2,y=strty+10)
        c.startb = tk.Button(c.w, bg=BG, text='start', cursor='hand2', relief='flat', activebackground=BG, command=c.strtup)
        c.startb.pack()
        c.startb.place(x=10,y=strty)

    def __write(c,s):
            pipewrite(c.fw,s)
            c.close()

    def _prui(c): 
        prb = tk.Button(c.w,bg=TBG, text='--- next ---', activebackground=BG, relief='flat',cursor='hand2', command=partial(c.__write,'prompt'))
        prb.pack()
        prb.place(x=10,y=80)

    def __confirm(c):
        cb = tk.Button(c.w, bg=TBG, text='confirm', activebackground=BG, relief= 'flat', cursor='hand2', command=partial(c.__write,'confirm'))
        cb.pack()
        cb.place(x=20,y=120)

    def _tmui(c):
        tmt = ''.join(['run for\n',str(c.procv),' seconds'])
        tmlbl = tk.Label(c.w,bg=BG, text=tmt)
        tmlbl.pack()
        tmlbl.place(x=10,y=80)
        c.__confirm()

    def _rnui(c): 
        rnt = ''.join(['run\n ',str(c.procv),' times'])
        rnlbl = tk.Label(c.w,bg=BG, text=rnt)
        rnlbl.pack()
        rnlbl.place(x=10,y=80)
        c.__confirm()

    def strtup(c): 
        pipewrite(c.fw,'startup')
        c.close()

    def stop(c):
        pipewrite(c.fw,'stop')
        c.close()
        
    def close(c):
        c.w.destroy()

def once():
    fr, fw = ospipe()
    Sloop(fw)
    Rloop(fr)

def many(mkey,mint=1):
    """modes are ('prompt',1), ('timer',secs), ('runs',runs)
    """
    if mkey == 'timer':
        rec = setpipe(Control,sub='timer',vars=mint)
        if rec == 'confirm':
            while not timer(mint): 
                once()
            return True
        elif rec == 'stop':
            return False
    elif mkey == 'runs': 
        rec = setpipe(Control,sub='runs',vars=mint)
        if rec ==  'confirm':
            for r in range(mint): 
                once()
            return True
        elif rec == 'stop':
            return False
    elif mkey == 'prompt':
        quit = False
        while not quit:
            once()
            rec = setpipe(Control,sub='prompt')
            if rec != 'prompt': 
                quit = True
        return True

def testui():
    incontrol = True
    while incontrol:    
        rec = setpipe(Control,sub='start')
        if rec == 'startup':
            rec = setpipe(Startup)
            if rec != 'stop':
                modes, p, ns = rec.partition(',')
                incontrol = many(modes,int(ns))
        else:
            incontrol = False

if __name__ == '__main__':
    testui()


Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 InhirCode