'Locust - Run consecutive tests from script
As the title suggests I'm trying to run a sequence of tests one after the other.
I implemented two different possibilities:
- restart test when test state becomes STOPPED
rom locust import HttpUser, TaskSet, task, events, between
from gevent.lock import Semaphore
import gevent
all_users_spawned = Semaphore()
all_users_spawned.acquire()
from locust import events
from locust.runners import STATE_STOPPING, STATE_STOPPED, STATE_CLEANUP, STATE_INIT, STATE_MISSING, MasterRunner, LocalRunner
def checker_stopped(environment):
if environment.runner.state == STATE_STOPPED:
environment.runner.start(10, 1, True)
@events.init.add_listener
def on_locust_init(environment, **_kwargs):
@environment.events.spawning_complete.add_listener
def on_spawning_complete(**kw):
all_users_spawned.release()
if isinstance(environment.runner, MasterRunner) or isinstance(environment.runner, LocalRunner):
print("Checker spawned")
gevent.spawn(checker_stopped, environment)
@events.test_start.add_listener
def on_test_start(environment, **kwargs):
if not isinstance(environment.runner, MasterRunner):
print("Beginning test setup")
checker_stopped(environment)
else:
print("Started test from Master node")
@events.test_stop.add_listener
def on_test_stop(environment, **kwargs):
if not isinstance(environment.runner, MasterRunner):
checker_stopped(environment)
print("Cleaning up test data")
else:
print("Stopped test from Master node")
class UserTasks(TaskSet):
def on_start(self):
all_users_spawned.wait()
self.wait()
@task
def index(self):
self.client.get("/")
class WebsiteUser(HttpUser):
host = "https://docs.locust.io"
wait_time = between(2, 5)
tasks = [UserTasks]
- Quit and start test at every iteration
Based on the example found in the Locust documentation "Using Locust as a library"
mport gevent
from locust import HttpUser, task, between
from locust.env import Environment
from locust.stats import stats_printer, stats_history
from locust.log import setup_logging
setup_logging("INFO", None)
class User(HttpUser):
wait_time = between(1, 3)
host = "https://docs.locust.io"
@task
def my_task(self):
self.client.get("/")
@task
def task_404(self):
self.client.get("/non-existing-path")
tot = 5
for i in range(tot):
# setup Environment and Runner
env = Environment(user_classes=[User])
env.create_local_runner()
# start a WebUI instance
env.create_web_ui("127.0.0.1", 8089)
# start a greenlet that periodically outputs the current stats
gevent.spawn(stats_printer(env.stats))
# start a greenlet that save current stats to history
gevent.spawn(stats_history, env.runner)
print(f"\n\n\nRun {i} of {tot}\n\n\n")
# start the test
env.runner.start(1, spawn_rate=10)
# in 60 seconds stop the runner
gevent.spawn_later(20, lambda: env.runner.quit())
# wait for the greenlets
env.runner.greenlet.join()
# stop the web server for good measures
env.web_ui.stop()
The first idea doesn't restart the test after is stopped the first run. The second one doesn't follow the time given and doesn't reach an end.
I would like to know if there is a better way to implement this project or more suitable functions in order to achieve my goal.
Thank you for your time
EDITED 2022/04/27
As @Cyberwiz suggested, I'm using this script with the locust "as a library" feature:
import os
import time
import gevent
from locust import HttpUser, task, between
from locust.env import Environment
from locust.stats import stats_printer, stats_history, StatsCSVFileWriter
class User(HttpUser):
wait_time = between(1, 3)
host = "https://docs.locust.io"
@task
def my_task(self):
self.client.get("/")
@task
def task_404(self):
self.client.get("/non-existing-path")
def start_locust(time_hour: int, time_min: int, time_sec: int, user: int, spawn_rate: int, csv_ext: str = ""):
# setup Environment and Runner
env = Environment(user_classes=[User])
env.create_local_runner()
# CSV writer
stats_path = os.path.join(os.getcwd(), "data "+csv_ext)
csv_writer = StatsCSVFileWriter(
environment=env,
base_filepath=stats_path,
full_history=True,
percentiles_to_report=[0.50, 0.95]
)
# start a WebUI instance
env.create_web_ui(host="127.0.0.1", port=8089, stats_csv_writer=csv_writer)
# start the test
env.runner.start(user_count=user, spawn_rate=spawn_rate)
# start a greenlet that periodically outputs the current stats
gevent.spawn(stats_printer(env.stats))
print(env.stats.serialize_stats())
# start a greenlet that saves current stats to history
gevent.spawn(stats_history, env.runner)
# stop the runner in a given time
time_in_seconds = (time_hour * 60 * 60) + (time_min * 60) + time_sec
gevent.spawn_later(time_in_seconds, lambda: env.runner.quit())
gevent.spawn(csv_writer.stats_writer) # Writing all the stats to a CSV file
# wait for the greenlets
env.runner.greenlet.join()
# stop the web server for good measures
env.web_ui.stop()
if __name__ == "__main__":
tot = 5
for i in range(tot):
start = time.time()
print(f"Run {i} of {tot}")
start_locust(time_hour=0, time_min=0, time_sec=10, user=10, spawn_rate=2,csv_ext = f"{i}")
end = time.time()
duration = round((end - start) / 60, 2)
print("\nFinished the load test in " + str(duration) + " min")
And looking at the CSV files it seems to work but there are a few flaws:
- The web UI stays stuck at the first iteration
- The
gevent.spawn(stats_printer(env.stats))doesn't output anything. The instructionprint(env.stats.serialize_stats())shows an empty list.
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|
