How to Run Distributed Load Tests with Python and Locust Using Remote Workers

Load testing is essential for ensuring your system can handle high traffic and stress conditions. Locust, a powerful open-source tool written in Python, makes creating and executing these tests straightforward. This guide explores how to set up and run distributed load tests with Locust using remote workers – an approach that allows you to simulate massive loads more efficiently.

distributed load testing with Python and Locust
Load Test with Workers

Other articles that may be of interest before this one.

Load testing is essential for ensuring your system can handle high traffic and stress conditions. Locust, a powerful open-source tool written in Python, makes creating and executing these tests straightforward. This guide explores how to set up and run distributed load tests with Locust using remote workers – an approach that allows you to simulate massive loads more efficiently.

What You’ll Learn

  • How to install and configure Locust across multiple machines
  • How to write efficient test scripts
  • How to execute distributed tests with remote workers
  • How to analyze and interpret test results
  • Tips for optimizing your load tests

Why Use Remote Workers for Load Testing?

When conducting load tests, a single machine often can’t generate enough traffic to adequately test robust systems. CPU, memory, and network limitations can restrict the number of virtual users you can simulate. Using remote workers (also called slaves or nodes), you can:

  • Distribute the load across multiple machines
  • Simulate a much larger number of concurrent users
  • Get more realistic results by testing from different network locations
  • Avoid having your test hardware become a bottleneck

Prerequisites

Before starting, make sure you have:

  • Python 3.6+ installed on all machines (master and workers)
  • SSH or similar access to the remote machines
  • Firewall configured to allow communication between machines
  • Permissions to install Python packages

Step 1: Installing Locust

First, install Locust on all machines that will participate in the test (both master and worker machines):

pip install locust

To verify the installation:

locust --version

Step 2: Creating Your Locust Test Script

Create a file called locustfile.py on the master machine with your test scenario. Here’s a basic example:

from locust import HttpUser, task, between

class WebsiteUser(HttpUser):
    wait_time = between(1, 5)  # Wait time between requests (1-5 seconds)
    
    @task
    def index_page(self):
        self.client.get("/")
        
    @task(3)  # This test will run 3x more frequently than others
    def view_product(self):
        self.client.get("/products/1")
        
    @task
    def add_to_cart(self):
        self.client.post("/cart", json={"product_id": 1, "quantity": 1})

Copy this file to all worker machines as well, keeping the same structure and location.

Step 3: Setting Up the Distributed Environment

On the master machine:

Start Locust in master mode, specifying the communication port for workers:

locust -f locustfile.py --master --master-bind-host=0.0.0.0 --master-bind-port=5557

Important parameters:

  • --master: Defines this instance as the master node
  • --master-bind-host=0.0.0.0: Allows connections from any IP (use with caution in production environments)
  • --master-bind-port=5557: The port that workers will use to connect

On worker machines:

On each worker machine, run the command:

locust -f locustfile.py --worker --master-host=MASTER_IP --master-port=5557

Replace MASTER_IP with the IP address of the master machine.

Step 4: Automating Remote Worker Setup

To simplify the configuration of multiple remote workers, you can create a Python script that uses SSH to start workers automatically:

import paramiko
import time
import threading

# List of worker machines (IP, username, password)
workers = [
    {"host": "192.168.1.101", "user": "username", "password": "password"},
    {"host": "192.168.1.102", "user": "username", "password": "password"},
    {"host": "192.168.1.103", "user": "username", "password": "password"}
]

def start_worker(worker_info):
    """Start Locust worker on remote machine"""
    try:
        client = paramiko.SSHClient()
        client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
        client.connect(
            hostname=worker_info["host"],
            username=worker_info["user"],
            password=worker_info["password"]
        )
        
        # Execute worker command
        command = f"cd /path/to/test && locust -f locustfile.py --worker --master-host=MASTER_IP --master-port=5557"
        stdin, stdout, stderr = client.exec_command(command)
        
        # Keep the script running to keep SSH session active
        while True:
            line = stdout.readline()
            if not line:
                break
            print(f"Worker {worker_info['host']}: {line.strip()}")
            
    except Exception as e:
        print(f"Error connecting to worker {worker_info['host']}: {e}")
    finally:
        client.close()

# Start threads for each worker
threads = []
for worker in workers:
    thread = threading.Thread(target=start_worker, args=(worker,))
    thread.daemon = True
    thread.start()
    threads.append(thread)
    time.sleep(1)  # Brief pause to avoid overload

# Start master locally
import os
os.system("locust -f locustfile.py --master --master-bind-host=0.0.0.0 --master-bind-port=5557")

# Wait for threads to complete (this will never happen unless process is interrupted)
for thread in threads:
    thread.join()

Save this script as start_distributed_test.py and run it on the master machine:

python start_distributed_test.py

Security note: This example uses direct passwords in the script for simplicity. In production environments, use SSH keys or other more secure methods.

Step 5: Running the Test

Once the master and workers are connected, access the Locust web interface at:

http://MASTER_IP:8089

In the web interface:

  1. Enter the total number of users you want to simulate
  2. Define the spawn rate (how many users to create per second)
  3. Enter the target host (e.g., https://qadebug.com)
  4. Click “Start swarming”

Step 6: Monitoring the Test and Analyzing Results

The Locust web interface provides real-time statistics:

  • Statistics: Shows response times, RPS (Requests Per Second), failures, and other data per endpoint
  • Charts: Visualizes user count, RPS, and response times over time
  • Failures: Lists details about any failures that occurred
  • Exceptions: Logs exceptions in test scripts
  • Download Data: Allows exporting results in CSV formats for later analysis

Tips for Distributed Load Testing

Performance Optimization

  • Adjust execution mode: The --headless flag reduces resource usage on workers.
locust --worker --master-host=MASTER_IP --master-port=5557 --headless
  • Configure appropriate timeouts:
class WebsiteUser(HttpUser):

@task
def index_page(self):
    self.client.get("/", timeout=5.0) # 5-second timeout
  • Use efficient resource management:
from locust import events # In locustfile.py

@events.test_start.add_listener

def on_test_start(environment, **kwargs):
    environment.runner.connection_broken = False

Advanced Monitoring

  • Custom logger:
import logging # In locustfile.py

@events.test_start.add_listener

def setup_logging(environment, **kwargs):

    if not environment.web_ui:
        logging.info("Test started (worker mode)")
  • Capture additional metrics:
@events.request.add_listener

def my_request_handler(request_type, name, response_time, response_length, **kwargs):
    if name == "/api/critical-endpoint": # Logic for endpoint-specific metrics
        print(f"Critical endpoint called: {response_time}ms")

Practical Example: Progressive Load Test

Here’s an advanced example of locustfile.py that implements a progressive load test:

import time
import random
from locust import HttpUser, task, between, events
import logging

class StressTestUser(HttpUser):
    wait_time = between(1, 3)
    
    def on_start(self):
        """Run when a virtual user starts"""
        # Mock login
        self.client.post("/login", json={
            "username": f"test_user_{random.randint(1, 10000)}",
            "password": "test_password"
        })
        self.token = "simulated_auth_token"
    
    @task(3)
    def browse_products(self):
        # Simulate navigation with correlation
        category_id = random.randint(1, 5)
        
        # Request with authentication headers
        headers = {"Authorization": f"Bearer {self.token}"}
        
        # List products in a category
        response = self.client.get(f"/categories/{category_id}/products", 
                                 headers=headers,
                                 name="/categories/[id]/products")
        
        if response.status_code == 200:
            try:
                # Simulate selecting a specific product
                product_data = response.json()
                if product_data and "products" in product_data and len(product_data["products"]) > 0:
                    product_id = random.choice(product_data["products"])["id"]
                    
                    # View the selected product
                    self.client.get(f"/products/{product_id}", 
                                  headers=headers,
                                  name="/products/[id]")
            except Exception as e:
                logging.error(f"Error processing JSON response: {e}")
    
    @task(1)
    def add_to_cart_flow(self):
        # Add a product to cart
        product_id = random.randint(1, 100)
        quantity = random.randint(1, 3)
        
        self.client.post("/cart/items", 
                       json={"product_id": product_id, "quantity": quantity},
                       headers={"Authorization": f"Bearer {self.token}"},
                       name="/cart/items")
        
        # View cart after addition
        self.client.get("/cart", 
                      headers={"Authorization": f"Bearer {self.token}"},
                      name="/cart")
        
        # Simulate 20% chance of checkout
        if random.random() < 0.2:
            self.client.post("/checkout",
                           json={"payment_method": "credit_card", "address_id": 1},
                           headers={"Authorization": f"Bearer {self.token}"},
                           name="/checkout")

# Event listeners for detailed logging
@events.test_start.add_listener
def on_test_start(environment, **kwargs):
    print(f"Starting test with {environment.runner.user_count} initial users")

@events.spawning_complete.add_listener
def on_spawning_complete(user_count, **kwargs):
    print(f"Spawn complete: {user_count} users")

Step 7: Finalizing the Test

To end the test:

  1. Click “Stop” in the Locust web interface
  2. Use Ctrl+C to terminate the master process
  3. Terminate remote worker processes (automatically if using the Python script or manually via SSH)

Conclusion

Distributed load testing with Locust and Python offers a powerful and flexible way to evaluate your application’s performance under real stress conditions. By following this guide, you can set up a robust distributed testing environment that allows you to:

  • Simulate extremely high user loads
  • Identify performance bottlenecks before they affect your real users
  • Validate your infrastructure’s scalability
  • Generate detailed metrics for continuous analysis and optimization

Remember that load tests should be conducted in controlled environments and never directed at production systems without proper authorization. With the power of remote workers, you can perform much more comprehensive and realistic tests, ensuring your application is ready for any scaling challenge it might face.

Want to further enhance your tests? Explore advanced Locust features like custom task weights, shape functions to simulate realistic traffic patterns, and integration with monitoring tools like Prometheus and Grafana for even more detailed analysis.

Additional Resources