'Cannot get Multiprocessing working in GUI + PySerial

I am working on a PyQt5 GUI to read data over bluetooth serial and plot in real-time. I have experimented with multithreading to split up the 'data acquisition' by reading the serial port with PySerial in a child thread, as well as the main GUI thread, however the real-time plotting is very laggy and I get bad data packets (which I believe is due to the multithreaded nature). Since these are CPU intensive tasks, I am attempting to shift towards multiprocessing, however I have struggled to get anything working so far:

  • Problem: Couldn't pickle the Class object to use with Queue(). Solution: Moved serial_run_data outside Class object so that it could be pickable (I also tried pathos.multiprocessing with no success)
  • Problem: GUI would plot a couple of samples then hang (probably to do with the queue communication). Solution: Attempt to use a Pipe() instead (which brings me to my current issue)

I am not entirely sure my multiprocessing.Pipe implementation is correct but I haven't gotten there yet because of the following error:

Process Process-1:
Traceback (most recent call last):
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/multiprocessing/process.py", line 315, in _bootstrap
    self.run()
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "/Users/juan/Desktop/Force Plate/FORCE PLATES/venv/force_test.py", line 38, in serial_run_data
    ser.flushInput()
  File "/Users/juan/Desktop/Force Plate/FORCE PLATES/venv/lib/python3.8/site-packages/serial/serialutil.py", line 588, in flushInput
    self.reset_input_buffer()
  File "/Users/juan/Desktop/Force Plate/FORCE PLATES/venv/lib/python3.8/site-packages/serial/serialposix.py", line 683, in reset_input_buffer
    self._reset_input_buffer()
  File "/Users/juan/Desktop/Force Plate/FORCE PLATES/venv/lib/python3.8/site-packages/serial/serialposix.py", line 677, in _reset_input_buffer
    termios.tcflush(self.fd, termios.TCIFLUSH)
termios.error: (9, 'Bad file descriptor')
Child process started

The GUI has too many parts to post so I will try my luck with the relevant snippets of the code that use/ call multiprocessing and the PySerial libraries:

import multiprocessing as mp
import serial.tools.list_ports

# Inside the GUI Class Initialisation
    self.pipeReceive, self.pipeSend = mp.Pipe(duplex=False)
    self.serial_flag = mp.Event()

# This function sits outside GUI Class (Child Process) and obtains serial data
def serial_run_data(flag, pipeSend, SOL, EOL, ser):
    print('Child process started')
    ser.flushInput()
    while True:
        if flag.is_set():
            pipeSend.close()
            break
        value = ser.read_between(SOL, EOL, 24)  # Start, end character, 24 bytes of data
        if len(value) == 24 and EOL in value and SOL in value:
            # queue.put_nowait(value)
            pipeSend.send_bytes(value)
        else:
            print(f"{len(value)}  {value}")

    # This function runs in Main Process and collects serial data for plotting
    def serial_data(self):  # Obtain data from serial and store
        # rxBuffer = [self.queueRaw.get_nowait() for _ in range(self.queueRaw.qsize())]
        print(self.pipeReceive.recv_bytes(24))
        while self.pipeReceive.poll():
            rxBuffer = self.pipeReceive.recv_bytes(24)  # read 24 bytes
        for bytes in rxBuffer:
            self.y1.append(int.from_bytes(bytes[3:6], "little", signed=False))
            self.y2.append(int.from_bytes(bytes[7:10], "little", signed=False))
            self.y3.append(int.from_bytes(bytes[11:14], "little", signed=False))
            self.y4.append(int.from_bytes(bytes[15:18], "little", signed=False))
            self.x.append(int.from_bytes(bytes[19:21], "little", signed=False))

    # This function inside GUI Class starts the child process to begin data acquisition
    def start_plot(self):  # Start/ Stop Button
        if self.start_button.text() == 'START':
            if not self.error:
                # self.t = threading.Thread(target=self.serial_run_data, args=(self.serial_flag, self.queueRaw, self.SOL, self.EOL, self.ser,))
                # self.t.start()
                self.p = mp.Process(target=serial_run_data, args=(self.serial_flag, self.pipeSend, self.SOL, self.EOL, self.ser,))
                self.p.start()

# This function initialises the serial com port 
def serial_setup(self):
    # ports = serial.tools.list_ports.comports()
    com_list = "/dev/cu.ESP32Test-ESP32SPP"
    if not self.open_com:
        try:
            self.ser = serial.Serial(com_list, baudrate=921600)
            self.ser.close()
            self.error = False
            self.ser.open()
            self.open_com = True

I replicated the same code using multithreading with no errors, and I believe the issue here might be to do with the child process creating a copy of the ser Serial object, but I have no idea how to fix that. I have the following questions:

  1. Is my multiprocessing implementation incorrect, so as to cause the errors with the PySerial library?
  2. If the above is correct (for multiprocessing), and the local copy of the com port object is the issue, how do I fix that?
  3. In a general sense, is there a better approach to inter-process communication other than a one-way pipe as I have attempted here?


Solution 1:[1]

I asked this question a bit prematurely, so here is the solution for anyone in the future - initialise the serial port object globally and there shouldn't be any issues when using multiprocessing. The Pipe() implementation I posted also works as was hoped.

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 paeion