'Stop all active threads after urllib.requests timeout has been reached
I am using ThreadPoolExecutor and urllib.request.urlopen(request, timeout=30) methods to spawn multiple threads, each one responsible for sending a POST request to a DNS and wait for the response. Depending on the request body, responses can take anywhere from 1-100s (and in very few cases up to 500s). I have a lot of requests to send and I'm trying to optimize for throughput, not for any individual request. I have a DeadLetter processor for all the requests that time out. In my main pipeline I want to process all the requests that only take 30s to fulfill.
From what I can tell, my threads aren't moving on after 30s, but instead continuing to wait but throwing a timeout exception whenever data gets returned after 30s. Instead I want to stop execution of all currently open threads and move on to the next batch (and let the DeadLetter processor handle the bigger ones in a separate process).
Here is my multithreaded and requests code
with concurrent.futures.ThreadPoolExecutor(max_workers=len(pks)) as executor:
future_to_url = {executor.submit(self.make_request, pk): pk for pk in pks}
for future in concurrent.futures.as_completed(future_to_url):
pk = int(future_to_url[future])
try:
data = json.loads(future.result().decode('utf-8'))
if data:
analytics[pk] = data
else:
analytics[pk] = {"error_msg": "ERROR: response is none for pk {}".format(pk)}
except Exception as exc:
analytics[pk] = {"error_msg": "worker generated an exception for pk {}: {}".format(pk, exc)}
return analytics
And the make-request method that the threads call
def make_request(self, pk):
data = dict(self.request_data)
data['pk'] = pk
encoded_data = parse.urlencode(data).encode('utf-8')
req = request.Request(self.worker_uri, data=encoded_data, method='POST')
with request.urlopen(req, timeout=30) as conn:
return conn.read()
However, when I look at the logs it appears as if I don't start a new batch until several minutes after starting the current one. I do, however, see a lot of timeout errors (anything that gets returned after 30s). It seems as if the threads are waiting for all the requests to be fulfilled, only to say "we don't care that you came back because you came back late so were calling you a timeout".
Can anyone help me modify my code to at 30s assume everything that hasn't yet returned is a timeout, record it as such, and move on to the next batch?
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|
