Elastic Horovod API + Spark Auto-Scaling (#1849, #1956)
Elastic training enables Horovod to scale up and down the number of workers dynamically at runtime, without requiring a restart or resuming from checkpoints saved to durable storage. With elastic training, workers can come and go from the Horovod job without interrupting the training process.
Support for auto-scaling can be added to any existing Horovod script with just a few modifications:
- Decorate retryable functions with
@hvd.elastic.run
. - Track state that needs to be kept in sync across workers in a
hvd.elastic.State
object. - Perform all Horovod collective operations (allreduce, allgather, broadcast, etc.) inside the retryable functions.
Here's an example for PyTorch:
import torch
import horovod.torch as hvd
hvd.init()
torch.cuda.set_device(hvd.local_rank())
model = ...
dataset = ...
@hvd.elastic.run
def train(state):
for state.epoch in range(state.epoch, args.epochs + 1):
dataset.set_epoch(state.epoch)
dataset.set_batch_idx(state.batch_idx)
for state.batch_idx, (data, target) in enumerate(dataset):
state.optimizer.zero_grad()
output = state.model(data)
loss = F.nll_loss(output, target)
loss.backward()
state.optimizer.step()
state.commit()
optimizer = optim.SGD(model.parameters(), lr * hvd.size())
optimizer = hvd.DistributedOptimizer(optimizer)
def on_state_reset():
# adjust learning rate on reset
for param_group in optimizer.param_groups:
param_group['lr'] = lr * hvd.size()
state = hvd.elastic.TorchState(model, optimizer, epoch=1, batch_idx=0)
state.register_reset_callbacks([on_state_reset])
train(state)
Run using horovodrun
by specifying the minimum and maximum number of worker processes, as well as a "host discovery script" that will be used to find available workers to add at runtime:
$ horovodrun -np 8 --min-np 4 --max-np 12 --host-discovery-script discover_hosts.sh python train.py
Elastic Horovod is supported natively with Spark auto-scaling using the hvd.spark.run_elastic
API.
For more details, see Elastic Horovod.
Horovod on Ray (#2218)
Ray is a distributed execution framework that makes it easy to provision and scale distributed applications, and can now be used to execute Horovod jobs without needing to coordinate the workers by hand:
from horovod.ray import RayExecutor
# Start the Ray cluster or attach to an existing Ray cluster
ray.init()
# Start num_hosts * num_slots actors on the cluster
executor = RayExecutor(
setting, num_hosts=num_hosts, num_slots=num_slots, use_gpu=True)
# Launch the Ray actors on each machine
# This will launch `num_slots` actors on each machine
executor.start()
# Using the stateless `run` method, a function can take in any args or kwargs
def train_fn():
hvd.init()
# Train the model on each worker here
...
# Execute the function on all workers at once
results = executor.run(train_fn)
executor.shutdown()
Horovod now also integrates with Ray Tune to scale up your hyperparameter search jobs. Check out the example here.
For more details, see Horovod on Ray.
All-to-All Operation (#2143)
The all-to-all collective can be described as a combination of a scatter and gather, where each worker will scatter a tensor to each worker, while also gathering scattered data from other workers. This type of collective communication can arise in model-parallel training strategies.
The hvd.alltoall
function takes the form hvd.alltoall(tensor, splits=None)
,
where tensor
is a multi-dimensional tensor of data to scattered and splits
is an optional 1D tensor of integers with length equal to the number of workers, describing how to split and distribute tensor. splits
is applied along the first dimension of tensor
. If splits is not provided, an equal splitting is assumed, where the first dimension is divided by the number of workers.
The implementation supports TensorFlow, PyTorch, and MXNet using the MPI backend, the CUDA-aware MPI backend via HOROVOD_GPU_ALLTOALL=MPI
, and the NCCL backend via HOROVOD_GPU_ALLTOALL=NCCL
/ HOROVOD_GPU_OPERATIONS=NCCL
.
Gradient Predivide Factor (#1949)
We've added a gradient_predivide_factor
parameter in the DistributedOptimizer
, the purpose of which is to enable splitting the averaging before and after the allreduce. This can be useful in managing the numerical range for mixed precision computations.
The gradient_predivide_factor
is applied as follows:
If op == Average, gradient_predivide_factor splits the averaging
before and after the sum. Gradients are scaled by
1.0 / gradient_predivide_factor before the sum and
gradient_predivide_factor / size after the sum.
To facilitate this, additional arguments (prescale_factor
and postscale_factor
) have been added to the basic hvd.allreduce
functions, enabling the definition of multiplicative factors to scale the tensors before and after the allreduce respectively. For efficiency, the pre and post-scaling is implemented in the Horovod backend on the fused tensor buffer, rather than through framework level operations. For GPU, this required a CUDA kernel implementation to scale the GPU buffer which in turn, required adding compilation of CUDA code to the current build infrastructure.
As an additional general benefit from these changes, gradient averaging in the optimizer can now be carried out within the Horovod backend on the fused tensor buffer using the postscale_factor
argument, rather than on a tensor by tensor basis at the framework level, decreasing the overhead of each allreduce call.
CMake Build System (#2009)
CMake, previously used to compile the optional Gloo controller, is now required to install Horovod. This change introduces a number of exciting benefits for Horovod developers and users:
- Much faster installation times through a parallel task build
- Incremental builds (almost instantaneous build when developing and making small changes at a time)
- Separation of the build config phase with the build phase (less overhead for repeated builds)
- Reuse
find_package
modules provided by CMake for MPI, CUDA, etc. to better handle a range of environment configurations - Libraries can be built outside of the python build process (no longer requiring
setup.py
) - Flexibility for the build system (make, ninja, IDEs, etc.)
Detailed Changes
Added
-
Added bare-metal elastic mode implementation to enable auto-scaling and fault tolerance. (#1849)
-
Added Elastic Horovod support for Spark auto-scaling. (#1956)
-
Added All-to-All operation for TensorFlow, PyTorch, and MXNet. (#2143)
-
Added support for
gradient_predivide_factor
and averaging in Horovod backend. (#1949) -
Added NCCL implementation of the allgather operation. (#1952)
-
Added
HOROVOD_GPU_OPERATIONS
installation variable to simplify enabling NCCL support for all GPU operations. (#1960) -
Added TensorFlow implementation of
SyncBatchNormalization
layer. (#2075) -
Added
hvd.is_initialized()
method. (#2020) -
Added
hvd.allgather_object
function for TensorFlow, PyTorch, and MXNet. (#2166) -
Added
hvd.broadcast_object
function for MXNet. (#2122) -
Added
label_shapes
parameter to KerasEstimator and TorchEstimator. (#2140) -
Added optional
modelCheckPoint
callback to KerasEstimator params. (#2124) -
Added
ssh_identity_file
argument tohorovodrun
. (#2201) -
Added support for
horovodrun
onkubeflow/mpi-job
. (#2199) -
Added Ray integration. (#2218)
Changed
-
Moved
horovod.run.runner.run
tohorovod.run
. (#2099) -
HOROVOD_THREAD_AFFINITY accepts multiple values, one for every Horovod rank (#2131)
-
Migrated build system for native libraries to CMake (#2009)
Deprecated
- HOROVOD_CCL_BGT_AFFINITY is deprected. Use HOROVOD_THREAD_AFFINITY instead (#2131)
Removed
-
Dropped support for Python 2. (#1954)
-
Dropped support for TensorFlow < 1.15. (#2169)
-
Dropped support for PyTorch < 1.2. (#2086)
Fixed
-
Fixed MXNet allgather implementation to correctly handle resizing the output buffer. (#2092)
-
Fixed Keras Spark Estimator incompatibility with TensorFlow 1.15 due to
tf.autograph
. (#2069) -
Fixed API compatibility with PyTorch 1.6. (#2051)
-
Fixed Keras API compatibility with TensorFlow 2.4.0. (#2178)
-
Fixed allgather gradient for TensorFlow 2 in cases where the tensor shape is not known during graph construction. (#2121)
-
Fixed running using Gloo with an imbalanced number of workers per host. (#2212)