'How can I utilize multiple cores to run my shape_email function
I'm trying to shape a bunch of data into a specific format. There are multiple millions of records in this data and its getting slow. Ideally I could use multiple cores to speed this up, however I cannot figure this out. The data is coming from MongoDB and the slow bit is mapping shape_map to the data It is not possible to reduce the args passed to shape_email Ideally I'd be able to map the shape_email function to my dataset but it takes 3 args
def shape_data(data, phish_sender_data, classifier_controller):
""" Shapes data from mongo to be ready for pre gen
"""
phish_senders = set(map(get_sender,phish_sender_data))
def shape_map(email):
return shape_email(email, phish_senders, classifier_controller)
output = list(filter(lambda x: x!= None,map(shape_map,data)))
return output
def get_sender(i:dict):
return i.get("_id")
I've attempted to use multiprocessing Pool() to get it to run faster but it either fails to pickle the local variables
below fails to pickle
from multiprocessing import Pool
def shape_data(data, phish_sender_data, classifier_controller):
""" Shapes data from mongo to be ready for pre gen
"""
phish_senders = set(map(get_sender,phish_sender_data))
def shape_map(email):
return shape_email(email, phish_senders, classifier_controller)
p = Pool()
output = list(filter(lambda x: x!= None,p.map(shape_map,data)))
return output
def get_sender(i:dict):
return i.get("_id")
I've also tried the below which succesfully uses the function I want with multiple arguments across multiple cores. BUT ITS NOT FASTER! I can see all the cores working and my cpu working much harder but it still takes the same amount of time.
def shape_data(data, phish_sender_data, cla_controller):
""" Shapes data from mongo to be ready for pre gen
"""
phish_senders_set = set(map(get_sender,phish_sender_data))
cores = cpu_count()//2
p = Pool(processes=cores)
output = list(filter(lambda x: x!= None,p.map(partial(shape_email,phish_senders=phish_senders_set, classifier_controller=cla_controller) ,data,chunksize=(len(data)//cores))))
return output
def get_sender(i:dict):
return i.get("_id")
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|
