'Stop Locust When Specified Number Of User Tasks Complete
In my scenario, I'm running Locust without the web UI. The command I'm using is
locust -f my_locust_file --no_web -c 20 -r 4 # as a hack I add -t 10s
This corresponds to a 4 users being hatched every second up to a total of 20 users.
My objective is to have each of the 20 locust users execute a task and I'd like the locust run to complete and exit when the last user's (20th user) task completes. The collected statistics should only include the response times associated with each task.
In this scenario, 5 tasks (user scenarios) have been identified which can be randomly associated with a user:
class UserScenarios(TaskSet):
tasks = [Sequence_One, ServerSequence, Components_Sequence, Embedded_Sequence, Connectivity_Sequence]
class MyLocust(HttpLocust):
def __init__(self):
super().__init__()
MyLocust.counter += 1
print(f"Counter = {MyLocust.counter}")
counter = 0
task_set = UserScenarios
wait_time = between(1, 1)
host = 'https://*****.com'
Each task (user scenario) corresponds to a different sequence of 3 or 4 pages that should be loaded in order. An example sanitized and simplified sequence consisting of 2 pages is:
class Sequence_One(TaskSequence):
@seq_task(1)
def get_task1(self):
response = self.client.get(url='https://****',
name='https://****',
timeout=30,
allow_redirects=False,
headers={...})
@seq_task(2)
def get_task2(self):
response = self.client.get(url='https://****',
name='https://****',
timeout=30,
allow_redirects=False,
headers={...})
Is there a way to stop the test after the 20th (nth) user task completes? If every task visits 4 pages for example, I want the test to terminate after the 20*4 = 80 page request is made. In fact only 80 total page requests should be made as part of this test.
My experience with this test is that page requests continue to be made after the last user task completes until I either manually stop the test or use a time limit which is a bit longer than the tasks actually need to complete.
Solution 1:[1]
I use locust 1.3+ StopLocust() is deprecated
My framework creates a list of tuples of the credentials and support variables for every user. I have stored all my user credentials, tokens, support file names etc in those tuples as part of list. (Actually that is automatically done before starting locust)
I import that list in locustfile
# creds is created before running locust file and can be stored outside or part of locust # file
creds = [('demo_user1', 'pass1', 'lnla'),
('demo_user2', 'pass2', 'taam9'),
('demo_user3', 'pass3', 'wevee'),
('demo_user4', 'pass4', 'avwew')]
class RegisteredUser(SequentialTaskSet)
def on_start(self):
self.credentials = creds.pop()
@task
def task_one_name(self):
task_one_commands
@task
def task_two_name(self):
task_two_commands
@task
def stop(self):
if len(creds) == 0:
self.user.environment.reached_end = True
self.user.environment.runner.quit()
class ApiUser(HttpUser):
tasks = [RegisteredUser]
host = 'hosturl'
I use self.credentials in tasks. I created stop function in my class.
Also, observe that RegisteredUser is inherited from SequentialTaskSet to run all tasks in sequence.
Solution 2:[2]
use
raise exit()
if you want to make a definite hits create a sequirtial task set and raise exit() raise exit will stop that user
#its a way around as nothing else seen to work for me.
Solution 3:[3]
In headless mode, if you want to stop the whole locust service, you can use os.kill
class UserTasks(TaskSet):
@task
def test_api(self):
gobal FINISHED_USER, FINISHED_REQUEST
with self.user.client.post(f"/{api_type}", json=<my data>, headers=<my headers>, catch_response=True) as response:
# do something
FINISHED_REQUEST += 1
if FINISHED_REQUEST == schedule_num:
# this user finish all its task, will go to on_stop
self.user._state = "stopping"
FINISHED_USER += 1
def on_stop(self):
global FINISHED_USER
# do something
# make sure its the last user who finishes its task
if FINISHED_USER == <your user num>:
os.kill(<your pid>, signal.SIGTERM)
class WebsiteUser(HttpUser):
"""
User class that does requests to the locust web server running on localhost
"""
host = "http://{}".format(<host>)
wait_time = constant_throughput(1)
tasks = [UserTasks]
Solution 4:[4]
You can stop a single user by raising locust.exception.StopUser exception:
from locust.exception import StopUser
class Scenario(TaskSequence):
@seq_task(1)
def task_1(self):
self.client.get("/going_somewhere")
@seq_task(2)
def task_2(self):
self.client.get("/going_somewhere_else")
@seq_task(3)
def done(self):
raise StopUser()
P.S. StopLocust exception mentioned in other answers was deprecated in favor of StopUser exception.
Solution 5:[5]
another way to work with selenium and metamask that could spare you this problem would be to start Chrome with a defined profile that you would use in your code - please read more here
I know it's not really the answer to your question, but this actually made my life easier.
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|---|
| Solution 1 | Sachin Patil |
| Solution 2 | Ishan Handa |
| Solution 3 | 幽幽åçš„ç·å |
| Solution 4 | niekas |
| Solution 5 | nmduarte |
