'value processing in each process in Python Pool()
I want to get the difference between 2 values of many data sources in real-time, so I am using Pool() to parallelly run functions infinitely. But the problem is, that the "next" process can't get the previous data to calculate stuff because I declare global variables in the function.
def initializer():
global height
global block_time_2
height = 0
block_time_2 = 0
def query(name, rpc):
block_time_1 = 0 #the first value
global height #get the data from RPC. This is the cause of the problem, always get 0
try:
request = Request("{}/block".format(rpc), None, header)
data = json.loads(urlopen(request).read())
if height != int(data['result']['block']['header']['height']):
height = int(data['result']['block']['header']['height'])
block_time_1 = parser.parse(data['result']['block']['header']['time']) # first variables is updated
blockTime = getBlockTime(name, block_time_1, height, rpc) # get the second value and calculate the difference
return blockTime
else:
return
except:
traceback.print_exc()
def getBlockTime(name, block_time_1, height, rpc):
# second value. Pool doesn't share the memory among processes so doesn't need to
# declare an array for different data source. After calculation, this will be replace
# with the first value, so the fetch function should be called fewer time.
global block_time_2
print("{} time block 2: {}".format(name, block_time_2)) # always get 0, but sometimes randomly get the data
if block_time_2 != 0:
blockTime = (block_time_1 - block_time_2).total_seconds()
block_time_2 = block_time_1
return blockTime
else:
print("{} block time 2 recalculated".format(name))
request = Request("{}/block?height={}".format(rpc, height - 1), None, header)
data = json.loads(urlopen(request).read())
block_time_2 = parser.parse(data['result']['block']['header']['time'])
blockTime = (block_time_1 - block_time_2).total_seconds()
block_time_2 = block_time_1
return blockTime
def multi_query(args):
return query(*args)
if __name__ == '__main__':
# try many ways but not help
# manager = Manager()
# chainsData = manager.dict()
# q = Queue()
p = Pool(25, initializer, ())
data = json.load(
open('/home/source-data.json'))
while True:
chainsData = dict(p.map(multi_query, [
(data["source-1"]["name"], data["source-1"]["rpc"]),
(data["source-2"]["name"], data["source-2"]["rpc"]),
(data["source-3"]["name"], data["source-3"]["rpc"]),
]))
export(chainsData) # the function just works, the only problem is it fetched from RPC so many times
print("done\n---------")
time.sleep(1)
Manager() and other functions as far as I learned are about sharing value among processes, but I can't find any ways to get the process using data of the previous process (in infinite while True loop). I know that the global variables I declared above are the problem, but I can't find any solutions for it yet.
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|
