'Locust - Run consecutive tests from script

As the title suggests I'm trying to run a sequence of tests one after the other.

I implemented two different possibilities:

  1. restart test when test state becomes STOPPED
rom locust import HttpUser, TaskSet, task, events, between

from gevent.lock import Semaphore
import gevent

all_users_spawned = Semaphore()
all_users_spawned.acquire()

from locust import events
from locust.runners import STATE_STOPPING, STATE_STOPPED, STATE_CLEANUP, STATE_INIT, STATE_MISSING, MasterRunner, LocalRunner


def checker_stopped(environment):
    if environment.runner.state == STATE_STOPPED:
        environment.runner.start(10, 1, True)


@events.init.add_listener
def on_locust_init(environment, **_kwargs):

    @environment.events.spawning_complete.add_listener
    def on_spawning_complete(**kw):
        all_users_spawned.release()

    if isinstance(environment.runner, MasterRunner) or isinstance(environment.runner, LocalRunner):
        print("Checker spawned")
        gevent.spawn(checker_stopped, environment)


@events.test_start.add_listener
def on_test_start(environment, **kwargs):
    if not isinstance(environment.runner, MasterRunner):
        print("Beginning test setup")
        checker_stopped(environment)
    else:
        print("Started test from Master node")


@events.test_stop.add_listener
def on_test_stop(environment, **kwargs):
    if not isinstance(environment.runner, MasterRunner):
        checker_stopped(environment)
        print("Cleaning up test data")
    else:
        print("Stopped test from Master node")


class UserTasks(TaskSet):
    def on_start(self):
        all_users_spawned.wait()
        self.wait()

    @task
    def index(self):
        self.client.get("/")


class WebsiteUser(HttpUser):
    host = "https://docs.locust.io"
    wait_time = between(2, 5)
    tasks = [UserTasks]
  1. Quit and start test at every iteration

Based on the example found in the Locust documentation "Using Locust as a library"

mport gevent
from locust import HttpUser, task, between
from locust.env import Environment
from locust.stats import stats_printer, stats_history
from locust.log import setup_logging

setup_logging("INFO", None)


class User(HttpUser):
    wait_time = between(1, 3)
    host = "https://docs.locust.io"

    @task
    def my_task(self):
        self.client.get("/")

    @task
    def task_404(self):
        self.client.get("/non-existing-path")

tot = 5
for i in range(tot):
    # setup Environment and Runner
    env = Environment(user_classes=[User])
    env.create_local_runner()

    # start a WebUI instance
    env.create_web_ui("127.0.0.1", 8089)

    # start a greenlet that periodically outputs the current stats
    gevent.spawn(stats_printer(env.stats))

    # start a greenlet that save current stats to history
    gevent.spawn(stats_history, env.runner)

    print(f"\n\n\nRun {i} of {tot}\n\n\n")
    # start the test
    env.runner.start(1, spawn_rate=10)

    # in 60 seconds stop the runner
    gevent.spawn_later(20, lambda: env.runner.quit())

    # wait for the greenlets
    env.runner.greenlet.join()

    # stop the web server for good measures
    env.web_ui.stop()

The first idea doesn't restart the test after is stopped the first run. The second one doesn't follow the time given and doesn't reach an end.

I would like to know if there is a better way to implement this project or more suitable functions in order to achieve my goal.

Thank you for your time


EDITED 2022/04/27

As @Cyberwiz suggested, I'm using this script with the locust "as a library" feature:

import os
import time
import gevent
from locust import HttpUser, task, between
from locust.env import Environment
from locust.stats import stats_printer, stats_history, StatsCSVFileWriter


class User(HttpUser):
    wait_time = between(1, 3)
    host = "https://docs.locust.io"

    @task
    def my_task(self):
        self.client.get("/")

    @task
    def task_404(self):
        self.client.get("/non-existing-path")


def start_locust(time_hour: int, time_min: int, time_sec: int, user: int, spawn_rate: int, csv_ext: str = ""):
    # setup Environment and Runner
    env = Environment(user_classes=[User])
    env.create_local_runner()

    # CSV writer
    stats_path = os.path.join(os.getcwd(), "data "+csv_ext)
    csv_writer = StatsCSVFileWriter(
        environment=env,
        base_filepath=stats_path,
        full_history=True,
        percentiles_to_report=[0.50, 0.95]
    )

    # start a WebUI instance
    env.create_web_ui(host="127.0.0.1", port=8089, stats_csv_writer=csv_writer)

    # start the test
    env.runner.start(user_count=user, spawn_rate=spawn_rate)

    # start a greenlet that periodically outputs the current stats
    gevent.spawn(stats_printer(env.stats))
    print(env.stats.serialize_stats())

    # start a greenlet that saves current stats to history
    gevent.spawn(stats_history, env.runner)

    # stop the runner in a given time
    time_in_seconds = (time_hour * 60 * 60) + (time_min * 60) + time_sec
    gevent.spawn_later(time_in_seconds, lambda: env.runner.quit())

    gevent.spawn(csv_writer.stats_writer) # Writing all the stats to a CSV file

    # wait for the greenlets
    env.runner.greenlet.join()

    # stop the web server for good measures
    env.web_ui.stop()


if __name__ == "__main__":
    tot = 5
    for i in range(tot):
        start = time.time()
        print(f"Run {i} of {tot}")
        start_locust(time_hour=0, time_min=0, time_sec=10, user=10, spawn_rate=2,csv_ext = f"{i}")
        end = time.time()
        duration = round((end - start) / 60, 2)
        print("\nFinished the load test in " + str(duration) + " min")

And looking at the CSV files it seems to work but there are a few flaws:

  • The web UI stays stuck at the first iteration
  • The gevent.spawn(stats_printer(env.stats)) doesn't output anything. The instruction print(env.stats.serialize_stats()) shows an empty list.


Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source