'websocket connection in Multiprocessing

I want to run websocket in multiprocessing python. My code like here

import websocket_threads
import queue_and_trade_threads
from utils.logger_utils import LOGGER_FORMAT, listener_configurer, listener_process, worker_configurer

def main():
    logging.basicConfig(level=logging.INFO, format=LOGGER_FORMAT)

    gmo_ws = GmoWebsocket()
    symbol = "BTC_JPY"
    time_span = 5
    max_orderbook_table_rows = 1000
    max_tick_table_rows = 1000
    max_ohlcv_table_rows = 1000

    logging_queue = multiprocessing.Queue(-1)

    logging_process = multiprocessing.Process(
        target=listener_process,
        args=(
            logging_queue,
            listener_configurer,
        ),
    )

    logging_process.start()

    p1 = multiprocessing.Process(
        target=websocket_threads.main,
        args=(
            symbol,
            gmo_ws,
            logging.INFO,
            logging_queue,
            worker_configurer,
        ),
    )
    p1.start()
    p2 = multiprocessing.Process(
        target=queue_and_trade_threads.main,
        args=(
            symbol,
            time_span,
            max_orderbook_table_rows,
            max_tick_table_rows,
            max_ohlcv_table_rows,
            gmo_ws,
            logging.INFO,
            logging_queue,
            worker_configurer,
        ),
    )
    p2.start()

    p1.join()
    p2.join()
    logging_process.join()

Im getting error while pickling websocket_threads (my custom code)

I got

poetry run python ./gmo_websocket/connect.py
Traceback (most recent call last):
  File "/Users/akiranoda/.pyenv/versions/3.9.7/lib/python3.9/multiprocessing/queues.py", line 245, in _feed
    obj = _ForkingPickler.dumps(obj)
  File "/Users/akiranoda/.pyenv/versions/3.9.7/lib/python3.9/multiprocessing/reduction.py", line 51, in dumps
    cls(buf, protocol).dump(obj)
TypeError: cannot pickle 'TaskStepMethWrapper' object
Traceback (most recent call last):
  File "/Users/akiranoda/.pyenv/versions/3.9.7/lib/python3.9/multiprocessing/queues.py", line 245, in _feed
    obj = _ForkingPickler.dumps(obj)
  File "/Users/akiranoda/.pyenv/versions/3.9.7/lib/python3.9/multiprocessing/reduction.py", line 51, in dumps
    cls(buf, protocol).dump(obj)
TypeError: cannot pickle '_asyncio.Future' object
Traceback (most recent call last):
  File "/Users/akiranoda/.pyenv/versions/3.9.7/lib/python3.9/multiprocessing/queues.py", line 245, in _feed
    obj = _ForkingPickler.dumps(obj)
  File "/Users/akiranoda/.pyenv/versions/3.9.7/lib/python3.9/multiprocessing/reduction.py", line 51, in dumps
    cls(buf, protocol).dump(obj)
TypeError: cannot pickle '_asyncio.Future' object
Traceback (most recent call last):
  File "/Users/akiranoda/.pyenv/versions/3.9.7/lib/python3.9/multiprocessing/queues.py", line 245, in _feed
    obj = _ForkingPickler.dumps(obj)
  File "/Users/akiranoda/.pyenv/versions/3.9.7/lib/python3.9/multiprocessing/reduction.py", line 51, in dumps
    cls(buf, protocol).dump(obj)
TypeError: cannot pickle '_asyncio.Future' object

error at this line

I googled and I understand I can't pickle some asyncio objects. I have to reconsider another design of my application or fix this problem. Do you have some suggestions for new design or solution?

I want

  1. Run websocket_threads and queue_and_trade_threads in two processes.
  2. Use Queue to share data.


Solution 1:[1]

import logging
from typing import Tuple, Optional
import multiprocessing
from dotenv import load_dotenv

from queue_and_trade_manager import QueueAndTradeManager
import websocket_threads
import queue_and_trade_threads
from utils.logger_utils import LOGGER_FORMAT, listener_configurer, listener_process

# Load .env file
load_dotenv()


def main_process(
    symbol: str,
    queue_and_trade_manager: QueueAndTradeManager,
    logging_level: Tuple[str, int],
    logging_queue: Optional[multiprocessing.Queue] = None,
):
    """Main process

    Args:
        symbol (str): Name of symbol
        queue_and_trade_manager (QueueAndTradeManager): gmo websockets
        logging_level (Tuple[str, int]): Logging level
        logging_queue (multiprocessing.Queue): Logging queue for multiprocessing. Default is None
    """
    websocket_threads.main(symbol=symbol, queue_and_trade_manager=queue_and_trade_manager, logging_level=logging_level, logging_queue=logging_queue)


def sub_processes(
    symbol: str,
    time_span: int,
    max_orderbook_table_rows: int,
    max_tick_table_rows: int,
    max_ohlcv_table_rows: int,
    queue_and_trade_manager: QueueAndTradeManager,
    logging_level: Tuple[str, int],
    logging_queue: multiprocessing.Queue,
) -> Tuple[multiprocessing.Process, multiprocessing.Process]:
    """Sub processes. Logging process and manage_queue_and_trade process.

    Args:
        symbol (str): Name of symbol.
        time_span (int): Time span (seconds).
        max_orderbook_table_rows (int): Number of max orderbook table rows.
        max_tick_table_rows (int): Number of max tick table rows.
        max_ohlcv_table_rows (int): Number of max ohlcv table rows.
        queue_and_trade_manager (QueueAndTradeManager): Manage queue class.
        logging_level (Tuple[str, int]): Logging level.
        logging_queue (multiprocessing.Queue): Queue of multiprocessing.

    Return:
        (logging_process, queue_and_trade_process)
    """
    logging_process = multiprocessing.Process(
        target=listener_process,
        args=(
            logging_queue,
            listener_configurer,
        ),
    )

    # logging_process.start()

    queue_and_trade_process = multiprocessing.Process(
        target=queue_and_trade_threads.main,
        args=(
            symbol,
            time_span,
            max_orderbook_table_rows,
            max_tick_table_rows,
            max_ohlcv_table_rows,
            queue_and_trade_manager,
            logging_level,
            logging_queue,
        ),
    )
    # queue_and_trade_process.start()

    # queue_and_trade_process.join()
    # logging_process.join()

    return logging_process, queue_and_trade_process


if __name__ == "__main__":
    logging_level = logging.INFO
    logging_queue = multiprocessing.Queue(-1)
    logging.basicConfig(level=logging_level, format=LOGGER_FORMAT)

    queue_and_trade_manager = QueueAndTradeManager()
    symbol = "BTC_JPY"
    time_span = 5
    max_orderbook_table_rows = 1000
    max_tick_table_rows = 1000
    max_ohlcv_table_rows = 1000

    logging_process, queue_and_trade_process = sub_processes(
        symbol=symbol,
        time_span=time_span,
        max_orderbook_table_rows=max_orderbook_table_rows,
        max_tick_table_rows=max_tick_table_rows,
        max_ohlcv_table_rows=max_ohlcv_table_rows,
        queue_and_trade_manager=queue_and_trade_manager,
        logging_level=logging_level,
        logging_queue=logging_queue,
    )

    logging_process.start()
    queue_and_trade_process.start()

    main_process(symbol=symbol, queue_and_trade_manager=queue_and_trade_manager, logging_level=logging_level)

    logging_process.join()
    queue_and_trade_process.join()

I solved this problem indirectoly. I run websocket connection in main process and run other processes using multiprocessing.

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 Akira Noda