'Process async results in another thread - app architecture (python-3.7)
I have a programm that receives Data (Trades) from the Binance API. This data will be processed and visualized in a web-app with dash and plotly.
In order to get best performance and the slightest delay my program has 3 threads:
Thread 1 - Binance API - get requests - Trades
if __name__ == "__main__":
try:
loop = asyncio.get_event_loop()
binance-thread = threading.Thread(target=start_thread_1)
...
def start_thread_1():
loop.run_until_complete(main(api_key,secret_key))
async def main(api_key,secret_key):
client = await AsyncClient.create(api_key,secret_key)
await trades_listener(client)
async def trades_listener(client):
bm = BinanceSocketManager(client)
symbol = 'BTCUSDT'
async with bm.trade_socket(symbol=symbol) as stream:
while True:
msg = await stream.recv()
event_type = msg['e']
...
trade = Trade(event_type,...)
# <-- safe trade SOMEWHERE to process in other thread ? safe to: process_trades_list
Thread 2 - Web App - Displays Trades and Processed Trades Data
web-thread = threading.Thread(target=webserver.run_server)
...
not worth to mention
Thread 3 - Process Data - Process Trades (calculate RSI, filter big trades, etc)
if __name__ == "__main__":
try:
loop = asyncio.get_event_loop()
binance-thread = threading.Thread(target=start_thread_1)
web-thread = threading.Thread(target=webserver.run_server)
process-thread = threading.Thread(target=start_thread_3)
...
.start()
.sleep()
etc.
.join()
def start_thread_3():
process_trades()
def process_trades():
global process_trades_list
while True:
while len(process_trades_list) > 0:
trade = process_trades_list[0]
process_trades_list.pop(0)
# ...do calculation etc.
HOW can I safe / hand over the data from thread_1 / async thread to thread_3?
I tried to put the trades to a list called process_trades_list and then loop while len(process_trades_list) > 0 all trades.
In the loop I pop() processed trades from the list - but this somehow seems to break the program without throwing errors.
What's best way to get this done?
It is possible that the async stream get's spammed by new incoming trades and I want to minimalize the load..
Solution 1:[1]
Here you want a queue.Queue instead of a list. Your last code snippet would look something like this:
import queue
if __name__ == "__main__":
try:
q = queue.Queue()
binance_thread = threading.Thread(target=start_thread_1,
args=(q,))
web_thread = threading.Thread(target=webserver.run_server)
process)thread = threading.Thread(target=process_trades,
args=(q,), daemon=True)
...
.start()
.sleep()
etc.
.join()
def process_trades(q):
while True:
trade = q.get()
# ...do calculation etc.
I eliminated the call to get_event_loop since you didn't use the returned object. I eliminated the start_thread_3 function, which is not necessary.
I made thread-3 a daemon, so it will not keep your application open if everything else is finished.
The queue should be created once, in the main thread, and passed explicitly to each thread that needs to access it. That eliminates the need for a global variable.
The process trade function becomes much simpler. The q.get() call blocks until an object is available. It also pops the object off the queue.
Next you must also modify thread-1 to put objects onto the queue, like this:
def start_thread_1(q):
asyncio.run(main(api_key,secret_key, q))
async def main(api_key,secret_key, q):
client = await AsyncClient.create(api_key,secret_key)
await trades_listener(client, q)
async def trades_listener(client, q):
bm = BinanceSocketManager(client)
symbol = 'BTCUSDT'
async with bm.trade_socket(symbol=symbol) as stream:
while True:
msg = await stream.recv()
event_type = msg['e']
...
trade = Trade(event_type,...)
q.put(trade)
The q.put function is how you safely put a trade object into the queue, which will then result in activity in thread-3.
I modified the start_thread1 function: here is a good place to start the event loop mechanism for this thread.
You ask about avoiding spam attacks on your program. Queues have methods that allow you to limit their size, and possibly throw away trades if they become full.
I don't understand what you are trying to do with the if __name__ == '__main__' logic in thread-1. The program can have only one entry point, and only one module named '__main__'. It looks to me like that has to be thread-3.
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|---|
| Solution 1 | Paul Cornelius |
