'The publishing code below is stalling on time.sleep() how can i change this behaviour + what would be a good way to implement a thread safe cach
For refreshing skills for a new job i have been trying to implement a very simple price publisher + client + mtm_cache. In the code below in publishing.py the code stops at time.sleep() and doesnt seem to wakeup (say for example two clients connect, but they stop receiving the messages while publisher executes time.sleep()), how can i correct the behavior of the publisher. Also there is another class MTMCache where i hope to store the MTM sent by the client, would a dictionary be a good thread safe cache which stores list of tuples (time,MTMValue) per client. As a third point how can i change the implementation of this Publisher class using asyncio?
publishing.py
import threading
import socket
import time
import random
class Publisher(object):
def __init__(self):
self.list_subscribers = []#list of tuples with ip and port
self.mtm_cache = None
self.current_price = 50
self.listening_ports = [1100,1101,1102,1103,1104]
self.stop_threads = False
self.listening_port = 1100
self.sock = None
self.listening = False
#run listening function in another thread
def update_price(self):
tmp = random.randint(-10,10)
#print("Number generated is " + str(tmp))
self.current_price = self.current_price + tmp
def update_price_in_loop(self):
while(True):
self.update_price()
if(self.stop_threads):
break
def send_price_to_subscribers(self):
#in a parallel for loop send current price to subscribers
while(True):
#print(str(self.current_price))
if self.list_subscribers:
for cl in self.list_subscribers:
cl.send(str(self.current_price).encode())
if(self.stop_threads):
break
def start_listener_and_accept_connections(self):
if(not self.listening):
self.sock = socket.socket()
self.sock.bind(('',self.listening_port))
print("sock bound at ", self.listening_port)
while True:
if not self.listening:
print("listening for connections")
self.sock.listen(5)
#above is a blocking call, it is blocking other threads
self.listening = True # how to properly set and utilise this flag
c, addr = self.sock.accept()
print('Got connection from', addr)
self.list_subscribers.append(c)
if(self.stop_threads):
print("closing client connections")
for cl in self.list_subscribers:
cl.close()
self.sock.close()
break
class MTMCache(object):
def __init__(self,publisher):
self.publisher = publisher
self.cache = {} #subscriber token for each of the subscribers in publisher class, currenttime and mtm to be stored here, maybe some other structure besides dict could be used, explore that later
def receive_mtm(self,message):
#message could be string in json format
#have to design this method with appropriate data structures
pass
if __name__ == "__main__":
from threading import *
import time
pub = Publisher()
#pub.update_price()
#print(pub.current_price)
t = Thread(target = pub.update_price_in_loop)
t1 = Thread(target = pub.send_price_to_subscribers)
#t.start()
#t1.start()
t2 = Thread(target = pub.start_listener_and_accept_connections)
t2.start()
t.start()
t1.start()
time.sleep(5)
pub.stop_threads = True
#t2.raise_exception()
#t.raise_exception()
#t1.raise_exception()
t2.join()
t.join()
t1.join()
------
basic_client.py
import socket
s = socket.socket()
port = 1100
s.connect(('127.0.0.1', port))
while(True):
tmp = s.recv(1024).decode()
print(tmp)
if not tmp:
break
s.close()
----
in separate terminals:
python3 publishing.py
term2
python3 basic_client.py
term3
python3 basic_client.py
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|
