Inference & Training

Pytorch Elastic with Run:ai

by
Ekin Karabulut
Hagay Sharon
–
August 7, 2024

Distributed training is crucial for accelerating training times and handling models that can't fit into a single GPU. PyTorch simplifies this with Data Parallelism and various Model Parallelism techniques. When diving into distributed training using PyTorch, you'll frequently encounter PyTorch Elastic (Torch Elastic). This blog introduces PyTorch Elastic jobs, how they differ from regular distributed jobs, and how to leverage them as a Run:ai user.

Why Torch Elastic?

Torch Elastic is suitable for various scenarios, including:

  1. Spot instances might disappear anytime: When using spot instances that can be taken away at any time, you want your training to continue seamlessly. Torch Elastic allows for scaling up and down without interrupting your training or requiring manual intervention.
  2. Infrastructure failures: For critical workloads that must run without interruption, node-level failures like network connectivity loss and hardware or software errors can lead to the failure of the entire distributed training job. Elastic jobs can recover from infrastructure failures without waiting for a new node to become available.
  3. Over-quota system available: With an over-quota system, idle resources can be used temporarily and reclaimed by other teams when needed.

Side Note: If you are new to distributed training with PyTorch, please check out our previous blogs for more information.

How does Torch Elastic work?

Understanding how Torch Elastic works begins with a look at standard distributed training with PyTorch. If you're familiar with distributed training, you know how to start a job using torchrun:

# Torch Distributed

torchrun
    --nnodes=$NUM_NODES
    --nproc-per-node=$NUM_TRAINERS
    --rdzv-id=$JOB_ID
    --rdzv-backend=c10d
    --rdzv-endpoint=$HOST_NODE_ADDR
    YOUR_TRAINING_SCRIPT.py (--arg1 ... train script args...)

Moving from Fault-Tolerant to Elastic Jobs

PyTorch’s torchrun utility supports both fault-tolerant and elastic training capabilities, which are fundamentally different:

  • Fault-Tolerant Jobs: Imagine you're running a critical training job, and suddenly, one of the nodes fails. With fault-tolerant jobs, torchrun logs the error and attempts to restart all processes from the last checkpoint. However, it doesn't allow changes in the number of workers, thus no elasticity. You can specify the maximum number of restarts using the --max-restarts flag:
# Fault-tolerant Torch Distributed

torchrun
    --nnodes=$NUM_NODES
    --nproc-per-node=$NUM_TRAINERS
    --max-restarts=3
    --rdzv-id=$JOB_ID
    --rdzv-backend=c10d 
    --rdzv-endpoint=$HOST_NODE_ADDR
    YOUR_TRAINING_SCRIPT.py (--arg1 ... train script args...)
  • Elastic Jobs: Now, consider a scenario where your training job needs to adapt to dynamic changes, such as adding or removing nodes (membership changes). This is where elastic jobs come into play. If a node becomes unavailable, torchrun will terminate and respawn processes on the remaining devices, ensuring the training continues from the latest checkpoint without manual intervention. The --max-restarts flag here defines the number of allowed failures or membership changes, while $MIN_SIZE and $MAX_SIZE specify the minimum and maximum number of workers (--nnodes=MIN_SIZE:MAX_SIZE). The MIN_SIZE parameter ensures that your workload will train on at least the specified number of workers, while the MAX_SIZE parameter sets the maximum number of workers that your job can allocate:
# Torch Elastic Distributed

torchrun
    --nnodes=MIN_SIZE:MAX_SIZE
    --nproc-per-node=TRAINERS_PER_NODE
    --max-restarts=NUM_ALLOWED_FAILURES_OR_MEMBERSHIP_CHANGES
    --rdzv-id=JOB_ID
    --rdzv-backend=c10d
    --rdzv-endpoint=HOST_NODE_ADDR
    YOUR_TRAINING_SCRIPT.py (--arg1 ... train script args...)

The Elastic Training Flow

To illustrate how elastic training works, let’s walk through a typical scenario:

  1. Initiating the Elastic Run: Begin by specifying the minimum and maximum number of workers (--nnodes=MIN_SIZE:MAX_SIZE) and the number of allowed failures or membership changes (--max-restarts=NUM_ALLOWED_FAILURES_OR MEMBERSHIP_CHANGES).
  2. Handling Membership Changes or Worker Failures: If a worker becomes unavailable due to infrastructure issues or reclaimed resources:some text
    1. PyTorch kills and restarts the worker group to rebalance the workload.
    2. It checks how many workers are still available and reranks them.
    3. Loads the latest checkpoints.
    4. Resumes training with the remaining nodes.
  3. Restart Conditions: This process will continue smoothly as long as the --max-restarts value is not exceeded.

This dynamic handling ensures minimal interruption and maximizes resource utilization.

Important Note: The checkpoints are a crucial part of Torch Elastic. As it is stated in Pytorch documentation; “On failures or membership changes ALL surviving workers are killed immediately. Make sure to checkpoint your progress. The frequency of checkpoints should depend on your job’s tolerance for lost work.”

Do I Need to Change My Training Script?

No, you don’t! You can use the same training scripts. In case you are not usually checkpointing your progress, you just need to start saving those. Apart from that, just add the necessary flags to torchrun.

Why Torch Elastic is a Game-Changer for Run:ai Users

Apart from the obvious benefits of elastic distributed training, you can leverage Run:ai’s scheduling capabilities with Torch Elastic. Consider this use case:

  • In a 4-GPU cluster, you have 2 projects: project-a and project-b, each have 2 guaranteed GPUs and are allowed to go over-quota if resources are available.
  • Project-a wants to use the whole cluster resources if project-b is not using them but still runs on its guaranteed GPUs otherwise.

Elastic jobs are perfect here. Launch the job with a maximum of 4 replicas and a minimum of 2. Start training with 4 GPUs. If project-b starts using GPUs, the job stops briefly, scales down to project-a’s guaranteed quota, and resumes from the latest checkpoint without manual intervention. When project-b stops using its resources, the job scales back up to 4 replicas, accelerating training while keeping resources efficiently utilized.

Moreover, the Run:ai scheduler is aware of the “roles” of elastic workload pods, and prioritizes preempting PyTorch workers before preempting the master node. By intelligently managing preemption, the scheduler maintains the integrity of the training process even during scale-down events.

This approach ensures your training process is robust, scalable, and resource-efficient.

Step-by-Step Guide: Running Elastic Distributed Training on Run:ai

Prerequisites

Before diving into the distributed training setup, ensure that you have the following prerequisites in place:

  • Run:ai environment (v2.17 and later): Make sure you have access to a Run:ai environment with version 2.17 or a later release, including the CLI. Run:ai provides a comprehensive platform for managing and scaling deep learning workloads on Kubernetes.
  • Four nodes with a GPU each: For this tutorial, we will use a setup consisting of four nodes, each equipped with one GPU. However, you can scale up by adding more nodes or GPUs to suit your specific requirements.
  • Image Registry (e.g., Docker Hub Account): Prepare an image registry, such as a Docker Hub account, where you can store your custom Docker images for elastic distributed training.

We'll use the same use case explained above: we are working on project-a with 2 guaranteed GPUs and have over-quota permissions. If users from project-b  are not using their resources, we want to scale up our training job to 4 GPUs. When they reclaim their guaranteed resources, we want our training job to scale down to our guaranteed resources without manual intervention and losing any progress in the training.

In a previous blog, we shared a simple code example for distributed training using PyTorch. The script was already saving the checkpoints, so we will use the same scripts and setup for this blog. The only thing we need to change is the torchrun command in the launch.sh script.

#!/bin/bash

master_addr = $MASTER_ADDR
master_port = $MASTER_PORT
job_n = $WORLD_SIZE

LOGLEVEL="INFO" torchrun --nproc_per_node=1 --nnodes=2:${job_n} --max-restarts=3 --rdzv_endpoint=${master_addr}:${master_port} --rdzv_backend=c10d distributed.py --batch_size 8 1000 2

Here, there are multiple changes. We have:

  • LOGLEVEL="INFO": This will print out the node information in the beginning of the training. You can also delete this part if you don’t need the node information.
  • --nnodes=2:${job_n}: Since we will have a minimum of 2 GPUs and maximum of what is available to us, we will pass 2 as the MIN_SIZE and world size as the MAX_SIZE.
  • --max-restarts=3: We want to tolerate a maximum of 3 restarts of the training.

Building and Pushing the Docker Image

We will use the same Dockerfile as the previous blog to create the docker image and push it to our registry.

$ docker login -u YOUR-USER-NAME 
$ docker build -t YOUR-USER-NAME/distributed_training_pytorch .
$ docker push YOUR-USER-NAME/distributed_training_pytorch

Make sure to replace YOUR-USER-NAME with your actual Docker Hub username. These commands will build the image and push it to your Docker Hub repository. You can find the image that we created for this guide here.

Launching the Distributed Training on Run:ai

To start the elastic distributed training on Run:ai, ensure that you have the correct version of the CLI (v2.17 or later). To launch a distributed PyTorch training job, use the runai submit-dist pytorch command depending on your CLI version. Here is the command to submit our job:

runai submit-dist pytorch --name elastic-workload --workers=3 --max-replicas 4 --min-replicas 2 -g 1 -i docker.io/ekink/elastic_pytorch

This command will start a distributed job, which has a maximum of 4 and minimum of 2 pods running with a single GPU on each, depending on how many resources we have.

When we look at the UI, we can see that project-b has 2 workspaces running with 1 GPU on each. So our elastic workload started running with 2 GPUs, which are our guaranteed number of resources.

‍

When users from project-b stop their workspaces, we will see that the elastic workload will claim those released resources automatically and keep running the training from the latest checkpoint.

‍

Final Words

Run:ai enhances Torch Elastic’s benefits by maximizing resource utilization through its over-quota system, enabling dynamic scaling without manual intervention, and simplifying management with minimal script modifications. This synergy allows for efficient and resilient training, leveraging idle resources and handling connectivity issues smoothly with Torch Elastic. By integrating Torch Elastic with Run:ai, you can accelerate your model training with optimal performance and reliability.