'websocket connection in Multiprocessing
I want to run websocket in multiprocessing python. My code like here
import websocket_threads
import queue_and_trade_threads
from utils.logger_utils import LOGGER_FORMAT, listener_configurer, listener_process, worker_configurer
def main():
logging.basicConfig(level=logging.INFO, format=LOGGER_FORMAT)
gmo_ws = GmoWebsocket()
symbol = "BTC_JPY"
time_span = 5
max_orderbook_table_rows = 1000
max_tick_table_rows = 1000
max_ohlcv_table_rows = 1000
logging_queue = multiprocessing.Queue(-1)
logging_process = multiprocessing.Process(
target=listener_process,
args=(
logging_queue,
listener_configurer,
),
)
logging_process.start()
p1 = multiprocessing.Process(
target=websocket_threads.main,
args=(
symbol,
gmo_ws,
logging.INFO,
logging_queue,
worker_configurer,
),
)
p1.start()
p2 = multiprocessing.Process(
target=queue_and_trade_threads.main,
args=(
symbol,
time_span,
max_orderbook_table_rows,
max_tick_table_rows,
max_ohlcv_table_rows,
gmo_ws,
logging.INFO,
logging_queue,
worker_configurer,
),
)
p2.start()
p1.join()
p2.join()
logging_process.join()
Im getting error while pickling websocket_threads (my custom code)
I got
poetry run python ./gmo_websocket/connect.py
Traceback (most recent call last):
File "/Users/akiranoda/.pyenv/versions/3.9.7/lib/python3.9/multiprocessing/queues.py", line 245, in _feed
obj = _ForkingPickler.dumps(obj)
File "/Users/akiranoda/.pyenv/versions/3.9.7/lib/python3.9/multiprocessing/reduction.py", line 51, in dumps
cls(buf, protocol).dump(obj)
TypeError: cannot pickle 'TaskStepMethWrapper' object
Traceback (most recent call last):
File "/Users/akiranoda/.pyenv/versions/3.9.7/lib/python3.9/multiprocessing/queues.py", line 245, in _feed
obj = _ForkingPickler.dumps(obj)
File "/Users/akiranoda/.pyenv/versions/3.9.7/lib/python3.9/multiprocessing/reduction.py", line 51, in dumps
cls(buf, protocol).dump(obj)
TypeError: cannot pickle '_asyncio.Future' object
Traceback (most recent call last):
File "/Users/akiranoda/.pyenv/versions/3.9.7/lib/python3.9/multiprocessing/queues.py", line 245, in _feed
obj = _ForkingPickler.dumps(obj)
File "/Users/akiranoda/.pyenv/versions/3.9.7/lib/python3.9/multiprocessing/reduction.py", line 51, in dumps
cls(buf, protocol).dump(obj)
TypeError: cannot pickle '_asyncio.Future' object
Traceback (most recent call last):
File "/Users/akiranoda/.pyenv/versions/3.9.7/lib/python3.9/multiprocessing/queues.py", line 245, in _feed
obj = _ForkingPickler.dumps(obj)
File "/Users/akiranoda/.pyenv/versions/3.9.7/lib/python3.9/multiprocessing/reduction.py", line 51, in dumps
cls(buf, protocol).dump(obj)
TypeError: cannot pickle '_asyncio.Future' object
error at this line
I googled and I understand I can't pickle some asyncio objects. I have to reconsider another design of my application or fix this problem. Do you have some suggestions for new design or solution?
I want
- Run websocket_threads and queue_and_trade_threads in two processes.
- Use Queue to share data.
Solution 1:[1]
import logging
from typing import Tuple, Optional
import multiprocessing
from dotenv import load_dotenv
from queue_and_trade_manager import QueueAndTradeManager
import websocket_threads
import queue_and_trade_threads
from utils.logger_utils import LOGGER_FORMAT, listener_configurer, listener_process
# Load .env file
load_dotenv()
def main_process(
symbol: str,
queue_and_trade_manager: QueueAndTradeManager,
logging_level: Tuple[str, int],
logging_queue: Optional[multiprocessing.Queue] = None,
):
"""Main process
Args:
symbol (str): Name of symbol
queue_and_trade_manager (QueueAndTradeManager): gmo websockets
logging_level (Tuple[str, int]): Logging level
logging_queue (multiprocessing.Queue): Logging queue for multiprocessing. Default is None
"""
websocket_threads.main(symbol=symbol, queue_and_trade_manager=queue_and_trade_manager, logging_level=logging_level, logging_queue=logging_queue)
def sub_processes(
symbol: str,
time_span: int,
max_orderbook_table_rows: int,
max_tick_table_rows: int,
max_ohlcv_table_rows: int,
queue_and_trade_manager: QueueAndTradeManager,
logging_level: Tuple[str, int],
logging_queue: multiprocessing.Queue,
) -> Tuple[multiprocessing.Process, multiprocessing.Process]:
"""Sub processes. Logging process and manage_queue_and_trade process.
Args:
symbol (str): Name of symbol.
time_span (int): Time span (seconds).
max_orderbook_table_rows (int): Number of max orderbook table rows.
max_tick_table_rows (int): Number of max tick table rows.
max_ohlcv_table_rows (int): Number of max ohlcv table rows.
queue_and_trade_manager (QueueAndTradeManager): Manage queue class.
logging_level (Tuple[str, int]): Logging level.
logging_queue (multiprocessing.Queue): Queue of multiprocessing.
Return:
(logging_process, queue_and_trade_process)
"""
logging_process = multiprocessing.Process(
target=listener_process,
args=(
logging_queue,
listener_configurer,
),
)
# logging_process.start()
queue_and_trade_process = multiprocessing.Process(
target=queue_and_trade_threads.main,
args=(
symbol,
time_span,
max_orderbook_table_rows,
max_tick_table_rows,
max_ohlcv_table_rows,
queue_and_trade_manager,
logging_level,
logging_queue,
),
)
# queue_and_trade_process.start()
# queue_and_trade_process.join()
# logging_process.join()
return logging_process, queue_and_trade_process
if __name__ == "__main__":
logging_level = logging.INFO
logging_queue = multiprocessing.Queue(-1)
logging.basicConfig(level=logging_level, format=LOGGER_FORMAT)
queue_and_trade_manager = QueueAndTradeManager()
symbol = "BTC_JPY"
time_span = 5
max_orderbook_table_rows = 1000
max_tick_table_rows = 1000
max_ohlcv_table_rows = 1000
logging_process, queue_and_trade_process = sub_processes(
symbol=symbol,
time_span=time_span,
max_orderbook_table_rows=max_orderbook_table_rows,
max_tick_table_rows=max_tick_table_rows,
max_ohlcv_table_rows=max_ohlcv_table_rows,
queue_and_trade_manager=queue_and_trade_manager,
logging_level=logging_level,
logging_queue=logging_queue,
)
logging_process.start()
queue_and_trade_process.start()
main_process(symbol=symbol, queue_and_trade_manager=queue_and_trade_manager, logging_level=logging_level)
logging_process.join()
queue_and_trade_process.join()
I solved this problem indirectoly. I run websocket connection in main process and run other processes using multiprocessing.
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|---|
| Solution 1 | Akira Noda |
