github horovod/horovod v0.20.0
Elastic Horovod, Ray integration, All-to-All, Gradient Predivide, CMake build system

latest releases: v0.28.1, v0.28.0, v0.27.0...
3 years ago

Elastic Horovod API + Spark Auto-Scaling (#1849, #1956)

Elastic training enables Horovod to scale up and down the number of workers dynamically at runtime, without requiring a restart or resuming from checkpoints saved to durable storage. With elastic training, workers can come and go from the Horovod job without interrupting the training process.

Support for auto-scaling can be added to any existing Horovod script with just a few modifications:

  1. Decorate retryable functions with @hvd.elastic.run.
  2. Track state that needs to be kept in sync across workers in a hvd.elastic.State object.
  3. Perform all Horovod collective operations (allreduce, allgather, broadcast, etc.) inside the retryable functions.

Here's an example for PyTorch:

import torch
import horovod.torch as hvd

hvd.init()
torch.cuda.set_device(hvd.local_rank())

model = ...
dataset = ...

@hvd.elastic.run
def train(state):
    for state.epoch in range(state.epoch, args.epochs + 1):
        dataset.set_epoch(state.epoch)
        dataset.set_batch_idx(state.batch_idx)
        for state.batch_idx, (data, target) in enumerate(dataset):
            state.optimizer.zero_grad()
            output = state.model(data)
            loss = F.nll_loss(output, target)
            loss.backward()
            state.optimizer.step()
            state.commit()

optimizer = optim.SGD(model.parameters(), lr * hvd.size())
optimizer = hvd.DistributedOptimizer(optimizer)

def on_state_reset():
    # adjust learning rate on reset
    for param_group in optimizer.param_groups:
        param_group['lr'] = lr * hvd.size()

state = hvd.elastic.TorchState(model, optimizer, epoch=1, batch_idx=0)
state.register_reset_callbacks([on_state_reset])
train(state)

Run using horovodrun by specifying the minimum and maximum number of worker processes, as well as a "host discovery script" that will be used to find available workers to add at runtime:

$ horovodrun -np 8 --min-np 4 --max-np 12 --host-discovery-script discover_hosts.sh python train.py

Elastic Horovod is supported natively with Spark auto-scaling using the hvd.spark.run_elastic API.

For more details, see Elastic Horovod.

Horovod on Ray (#2218)

Ray is a distributed execution framework that makes it easy to provision and scale distributed applications, and can now be used to execute Horovod jobs without needing to coordinate the workers by hand:

from horovod.ray import RayExecutor

# Start the Ray cluster or attach to an existing Ray cluster
ray.init()

# Start num_hosts * num_slots actors on the cluster
executor = RayExecutor(
    setting, num_hosts=num_hosts, num_slots=num_slots, use_gpu=True)

# Launch the Ray actors on each machine
# This will launch `num_slots` actors on each machine
executor.start()

# Using the stateless `run` method, a function can take in any args or kwargs
def train_fn():
    hvd.init()
    # Train the model on each worker here
    ...

# Execute the function on all workers at once
results = executor.run(train_fn)

executor.shutdown()

Horovod now also integrates with Ray Tune to scale up your hyperparameter search jobs. Check out the example here.

For more details, see Horovod on Ray.

All-to-All Operation (#2143)

The all-to-all collective can be described as a combination of a scatter and gather, where each worker will scatter a tensor to each worker, while also gathering scattered data from other workers. This type of collective communication can arise in model-parallel training strategies.

The hvd.alltoall function takes the form hvd.alltoall(tensor, splits=None),
where tensor is a multi-dimensional tensor of data to scattered and splits is an optional 1D tensor of integers with length equal to the number of workers, describing how to split and distribute tensor. splits is applied along the first dimension of tensor. If splits is not provided, an equal splitting is assumed, where the first dimension is divided by the number of workers.

The implementation supports TensorFlow, PyTorch, and MXNet using the MPI backend, the CUDA-aware MPI backend via HOROVOD_GPU_ALLTOALL=MPI, and the NCCL backend via HOROVOD_GPU_ALLTOALL=NCCL / HOROVOD_GPU_OPERATIONS=NCCL.

Gradient Predivide Factor (#1949)

We've added a gradient_predivide_factor parameter in the DistributedOptimizer, the purpose of which is to enable splitting the averaging before and after the allreduce. This can be useful in managing the numerical range for mixed precision computations.

The gradient_predivide_factor is applied as follows:

        If op == Average, gradient_predivide_factor splits the averaging
        before and after the sum. Gradients are scaled by
        1.0 / gradient_predivide_factor before the sum and
        gradient_predivide_factor / size after the sum. 

To facilitate this, additional arguments (prescale_factor and postscale_factor) have been added to the basic hvd.allreduce functions, enabling the definition of multiplicative factors to scale the tensors before and after the allreduce respectively. For efficiency, the pre and post-scaling is implemented in the Horovod backend on the fused tensor buffer, rather than through framework level operations. For GPU, this required a CUDA kernel implementation to scale the GPU buffer which in turn, required adding compilation of CUDA code to the current build infrastructure.

As an additional general benefit from these changes, gradient averaging in the optimizer can now be carried out within the Horovod backend on the fused tensor buffer using the postscale_factor argument, rather than on a tensor by tensor basis at the framework level, decreasing the overhead of each allreduce call.

CMake Build System (#2009)

CMake, previously used to compile the optional Gloo controller, is now required to install Horovod. This change introduces a number of exciting benefits for Horovod developers and users:

  • Much faster installation times through a parallel task build
  • Incremental builds (almost instantaneous build when developing and making small changes at a time)
  • Separation of the build config phase with the build phase (less overhead for repeated builds)
  • Reuse find_package modules provided by CMake for MPI, CUDA, etc. to better handle a range of environment configurations
  • Libraries can be built outside of the python build process (no longer requiring setup.py)
  • Flexibility for the build system (make, ninja, IDEs, etc.)

Detailed Changes

Added

  • Added bare-metal elastic mode implementation to enable auto-scaling and fault tolerance. (#1849)

  • Added Elastic Horovod support for Spark auto-scaling. (#1956)

  • Added All-to-All operation for TensorFlow, PyTorch, and MXNet. (#2143)

  • Added support for gradient_predivide_factor and averaging in Horovod backend. (#1949)

  • Added NCCL implementation of the allgather operation. (#1952)

  • Added HOROVOD_GPU_OPERATIONS installation variable to simplify enabling NCCL support for all GPU operations. (#1960)

  • Added TensorFlow implementation of SyncBatchNormalization layer. (#2075)

  • Added hvd.is_initialized() method. (#2020)

  • Added hvd.allgather_object function for TensorFlow, PyTorch, and MXNet. (#2166)

  • Added hvd.broadcast_object function for MXNet. (#2122)

  • Added label_shapes parameter to KerasEstimator and TorchEstimator. (#2140)

  • Added optional modelCheckPoint callback to KerasEstimator params. (#2124)

  • Added ssh_identity_file argument to horovodrun. (#2201)

  • Added support for horovodrun on kubeflow/mpi-job. (#2199)

  • Added Ray integration. (#2218)

Changed

  • Moved horovod.run.runner.run to horovod.run. (#2099)

  • HOROVOD_THREAD_AFFINITY accepts multiple values, one for every Horovod rank (#2131)

  • Migrated build system for native libraries to CMake (#2009)

Deprecated

  • HOROVOD_CCL_BGT_AFFINITY is deprected. Use HOROVOD_THREAD_AFFINITY instead (#2131)

Removed

  • Dropped support for Python 2. (#1954)

  • Dropped support for TensorFlow < 1.15. (#2169)

  • Dropped support for PyTorch < 1.2. (#2086)

Fixed

  • Fixed MXNet allgather implementation to correctly handle resizing the output buffer. (#2092)

  • Fixed Keras Spark Estimator incompatibility with TensorFlow 1.15 due to tf.autograph. (#2069)

  • Fixed API compatibility with PyTorch 1.6. (#2051)

  • Fixed Keras API compatibility with TensorFlow 2.4.0. (#2178)

  • Fixed allgather gradient for TensorFlow 2 in cases where the tensor shape is not known during graph construction. (#2121)

  • Fixed running using Gloo with an imbalanced number of workers per host. (#2212)

Don't miss a new horovod release

NewReleases is sending notifications on new releases.