# Copyright 2021 NVIDIA Corporation. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
Multi-GPU with MovieLens: ETL and Training
Overview
NVIDIA Merlin is a open source framework to accelerate and scale end-to-end recommender system pipelines on GPU. In this notebook, we use NVTabular, Merlin’s ETL component, to scale feature engineering and pre-processing to multiple GPUs and then perform data-parallel distributed training of a neural network on multiple GPUs with PyTorch, Horovod, and NCCL.
The pre-requisites for this notebook are to be familiar with NVTabular and its API:
You can read more about NVTabular, its API and specialized dataloaders in Getting Started with Movielens notebooks.
You can read more about scaling NVTabular ETL in Scaling Criteo notebooks.
In this notebook, we will focus only on the new information related to multi-GPU training, so please check out the other notebooks first (if you haven’t already.)
Learning objectives
In this notebook, we learn how to scale ETL and deep learning taining to multiple GPUs
Learn to use larger than GPU/host memory datasets for ETL and training
Use multi-GPU or multi node for ETL with NVTabular
Use NVTabular dataloader to accelerate PyTorch pipelines
Scale PyTorch training with Horovod
Dataset
In this notebook, we use the MovieLens25M dataset. It is popular for recommender systems and is used in academic publications. The dataset contains 25M movie ratings for 62,000 movies given by 162,000 users. Many projects use only the user/item/rating information of MovieLens, but the original dataset provides metadata for the movies, as well.
Note: We are using the MovieLens 25M dataset in this example for simplicity, although the dataset is not large enough to require multi-GPU training. However, the functionality demonstrated in this notebook can be easily extended to scale recommender pipelines for larger datasets in the same way.
Tools
Horovod is a distributed deep learning framework that provides tools for multi-GPU optimization.
The NVIDIA Collective Communication Library (NCCL) provides the underlying GPU-based implementations of the allgather and allreduce cross-GPU communication operations.
Download and Convert
First, we will download and convert the dataset to Parquet. This section is based on 01-Download-Convert.ipynb.
Download
# External dependencies
import os
import pathlib
import cudf # cuDF is an implementation of Pandas-like Dataframe on GPU
from nvtabular.utils import download_file
INPUT_DATA_DIR = os.environ.get(
"INPUT_DATA_DIR", "~/nvt-examples/multigpu-movielens/data/"
)
BASE_DIR = pathlib.Path(INPUT_DATA_DIR).expanduser()
zip_path = pathlib.Path(BASE_DIR, "ml-25m.zip")
download_file(
"http://files.grouplens.org/datasets/movielens/ml-25m.zip", zip_path, redownload=False
)
downloading ml-25m.zip: 262MB [00:09, 28.3MB/s]
unzipping files: 100%|██████████| 8/8 [00:04<00:00, 1.91files/s]
Convert
movies = cudf.read_csv(pathlib.Path(BASE_DIR, "ml-25m", "movies.csv"))
movies["genres"] = movies["genres"].str.split("|")
movies = movies.drop("title", axis=1)
movies.to_parquet(pathlib.Path(BASE_DIR, "ml-25m", "movies_converted.parquet"))
Split into train and validation datasets
ratings = cudf.read_csv(pathlib.Path(BASE_DIR, "ml-25m", "ratings.csv"))
ratings = ratings.drop("timestamp", axis=1)
# shuffle the dataset
ratings = ratings.sample(len(ratings), replace=False)
# split the train_df as training and validation data sets.
num_valid = int(len(ratings) * 0.2)
train = ratings[:-num_valid]
valid = ratings[-num_valid:]
train.to_parquet(pathlib.Path(BASE_DIR, "train.parquet"))
valid.to_parquet(pathlib.Path(BASE_DIR, "valid.parquet"))
ETL with NVTabular
We finished downloading and converting the dataset. We will preprocess and engineer features with NVTabular on multiple GPUs. You can read more
about NVTabular’s features and API in getting-started-movielens/02-ETL-with-NVTabular.ipynb.
scaling NVTabular ETL to multiple GPUs scaling-criteo/02-ETL-with-NVTabular.ipynb.
Deploy a Distributed-Dask Cluster
This section is based on scaling-criteo/02-ETL-with-NVTabular.ipynb and multi-gpu-toy-example/multi-gpu_dask.ipynb
# Standard Libraries
import shutil
# External Dependencies
import cudf
from dask_cuda import LocalCUDACluster
from dask.distributed import Client
import rmm
# NVTabular
import nvtabular as nvt
from nvtabular.io import Shuffle
from nvtabular.utils import device_mem_size
# define some information about where to get our data
input_path = pathlib.Path(BASE_DIR, "converted", "movielens")
dask_workdir = pathlib.Path(BASE_DIR, "test_dask", "workdir")
output_path = pathlib.Path(BASE_DIR, "test_dask", "output")
stats_path = pathlib.Path(BASE_DIR, "test_dask", "stats")
# Make sure we have a clean worker space for Dask
if pathlib.Path.is_dir(dask_workdir):
shutil.rmtree(dask_workdir)
dask_workdir.mkdir(parents=True)
# Make sure we have a clean stats space for Dask
if pathlib.Path.is_dir(stats_path):
shutil.rmtree(stats_path)
stats_path.mkdir(parents=True)
# Make sure we have a clean output path
if pathlib.Path.is_dir(output_path):
shutil.rmtree(output_path)
output_path.mkdir(parents=True)
# Get device memory capacity
capacity = device_mem_size(kind="total")
# Deploy a Single-Machine Multi-GPU Cluster
protocol = "tcp" # "tcp" or "ucx"
visible_devices = "0,1" # Delect devices to place workers
device_spill_frac = 0.5 # Spill GPU-Worker memory to host at this limit.
# Reduce if spilling fails to prevent
# device memory errors.
cluster = None # (Optional) Specify existing scheduler port
if cluster is None:
cluster = LocalCUDACluster(
protocol=protocol,
CUDA_VISIBLE_DEVICES=visible_devices,
local_directory=dask_workdir,
device_memory_limit=capacity * device_spill_frac,
)
# Create the distributed client
client = Client(cluster)
client
Client
|
Cluster
|
# Initialize RMM pool on ALL workers
def _rmm_pool():
rmm.reinitialize(
pool_allocator=True,
initial_pool_size=None, # Use default size
)
client.run(_rmm_pool)
{'tcp://127.0.0.1:39189': None, 'tcp://127.0.0.1:41297': None}
Defining our Preprocessing Pipeline
This subsection is based on getting-started-movielens/02-ETL-with-NVTabular.ipynb. The only difference is that we initialize the NVTabular workflow using the LocalCUDACluster client with nvt.Workflow(output, client=client)
.
movies = cudf.read_parquet(pathlib.Path(BASE_DIR, "ml-25m", "movies_converted.parquet"))
joined = ["userId", "movieId"] >> nvt.ops.JoinExternal(movies, on=["movieId"])
cat_features = joined >> nvt.ops.Categorify()
ratings = nvt.ColumnSelector(["rating"]) >> nvt.ops.LambdaOp(lambda col: (col > 3).astype("int8"))
output = cat_features + ratings
# USE client in NVTabular workflow
workflow = nvt.Workflow(output, client=client)
!rm -rf $BASE_DIR/train
!rm -rf $BASE_DIR/valid
train_iter = nvt.Dataset([str(pathlib.Path(BASE_DIR, "train.parquet"))], part_size="100MB")
valid_iter = nvt.Dataset([str(pathlib.Path(BASE_DIR, "valid.parquet"))], part_size="100MB")
workflow.fit(train_iter)
workflow.save(pathlib.Path(BASE_DIR, "workflow"))
shuffle = Shuffle.PER_WORKER # Shuffle algorithm
out_files_per_proc = 4 # Number of output files per worker
workflow.transform(train_iter).to_parquet(
output_path=pathlib.Path(BASE_DIR, "train"),
shuffle=shuffle,
out_files_per_proc=out_files_per_proc,
)
workflow.transform(valid_iter).to_parquet(
output_path=pathlib.Path(BASE_DIR, "valid"),
shuffle=shuffle,
out_files_per_proc=out_files_per_proc,
)
client.shutdown()
cluster.close()
/opt/conda/lib/python3.8/site-packages/distributed/worker.py:3560: UserWarning: Large object of size 1.90 MiB detected in task graph:
("('read-parquet-da3b70ae6b307343d9879321b01eda75' ... 79321b01eda75')
Consider scattering large objects ahead of time
with client.scatter to reduce scheduler burden and
keep data on workers
future = client.submit(func, big_data) # bad
big_future = client.scatter(big_data) # good
future = client.submit(func, big_future) # good
warnings.warn(
Training with PyTorch on multiGPUs
In this section, we will train a PyTorch model with multi-GPU support. In the NVTabular v0.5 release, we added multi-GPU support for NVTabular dataloaders. We will modify the getting-started-movielens/03-Training-with-PyTorch.ipynb to use multiple GPUs. Please review that notebook, if you have questions about the general functionality of the NVTabular dataloaders or the neural network architecture.
NVTabular dataloader for PyTorch
We’ve identified that the dataloader is one bottleneck in deep learning recommender systems when training pipelines with PyTorch. The normal PyTorch dataloaders cannot prepare the next training batches fast enough and therefore, the GPU is not fully utilized.
We developed a highly customized tabular dataloader for accelerating existing pipelines in PyTorch. In our experiments, we see a speed-up by 9x of the same training workflow with NVTabular dataloader. NVTabular dataloader’s features are:
removing bottleneck of item-by-item dataloading
enabling larger than memory dataset by streaming from disk
reading data directly into GPU memory and remove CPU-GPU communication
preparing batch asynchronously in GPU to avoid CPU-GPU communication
supporting commonly used .parquet format
easy integration into existing PyTorch pipelines by using similar API
supporting multi-GPU training with Horovod
You can find more information on the dataloaders in our blogpost.
Using Horovod with PyTorch and NVTabular
The training script below is based on getting-started-movielens/03-Training-with-PyTorch.ipynb, with a few important changes:
We provide several additional parameters to the
TorchAsyncItr
class, including the total number of workershvd.size()
, the current worker’s id numberhvd.rank()
, and a function for generating random seedsseed_fn()
.
train_dataset = TorchAsyncItr(
...
global_size=hvd.size(),
global_rank=hvd.rank(),
seed_fn=seed_fn,
)
The seed function uses Horovod to collectively generate a random seed that’s shared by all workers so that they can each shuffle the dataset in a consistent way and select partitions to work on without overlap. The seed function is called by the dataloader during the shuffling process at the beginning of each epoch:
def seed_fn():
max_rand = torch.iinfo(torch.int).max // hvd.size()
# Generate a seed fragment
seed_fragment = cupy.random.randint(0, max_rand)
# Aggregate seed fragments from all Horovod workers
seed_tensor = torch.tensor(seed_fragment)
reduced_seed = hvd.allreduce(seed_tensor, name="shuffle_seed", op=hvd.mpi_ops.Sum)
return reduced_seed % max_rand
We wrap the PyTorch optimizer with Horovod’s
DistributedOptimizer
class and scale the learning rate by the number of workers:
optimizer = torch.optim.Adam(model.parameters(), lr=0.01 * lr_scaler)
optimizer = hvd.DistributedOptimizer(optimizer, named_parameters=model.named_parameters())
We broadcast the model and optimizer parameters to all workers with Horovod:
hvd.broadcast_parameters(model.state_dict(), root_rank=0)
hvd.broadcast_optimizer_state(optimizer, root_rank=0)
The rest of the script is the same as the MovieLens example in getting-started-movielens/03-Training-with-PyTorch.ipynb. In order to run it with Horovod, we first need to write it to a file.
%%writefile './torch_trainer.py'
import argparse
import glob
import os
from time import time
import cupy
import torch
import nvtabular as nvt
from nvtabular.framework_utils.torch.models import Model
from nvtabular.framework_utils.torch.utils import process_epoch
from nvtabular.loader.torch import DLDataLoader, TorchAsyncItr
# Horovod must be the last import to avoid conflicts
import horovod.torch as hvd # noqa: E402, isort:skip
parser = argparse.ArgumentParser(description="Train a multi-gpu model with Torch and Horovod")
parser.add_argument("--dir_in", default=None, help="Input directory")
parser.add_argument("--batch_size", default=None, help="Batch size")
parser.add_argument("--cats", default=None, help="Categorical columns")
parser.add_argument("--cats_mh", default=None, help="Categorical multihot columns")
parser.add_argument("--conts", default=None, help="Continuous columns")
parser.add_argument("--labels", default=None, help="Label columns")
parser.add_argument("--epochs", default=1, help="Training epochs")
args = parser.parse_args()
hvd.init()
gpu_to_use = hvd.local_rank()
if torch.cuda.is_available():
torch.cuda.set_device(gpu_to_use)
BASE_DIR = os.path.expanduser(args.dir_in or "./data/")
BATCH_SIZE = int(args.batch_size or 16384) # Batch Size
CATEGORICAL_COLUMNS = args.cats or ["movieId", "userId"] # Single-hot
CATEGORICAL_MH_COLUMNS = args.cats_mh or ["genres"] # Multi-hot
NUMERIC_COLUMNS = args.conts or []
# Output from ETL-with-NVTabular
TRAIN_PATHS = sorted(glob.glob(os.path.join(BASE_DIR, "train", "*.parquet")))
proc = nvt.Workflow.load(os.path.join(BASE_DIR, "workflow/"))
EMBEDDING_TABLE_SHAPES = nvt.ops.get_embedding_sizes(proc)
# TensorItrDataset returns a single batch of x_cat, x_cont, y.
def collate_fn(x):
return x
# Seed with system randomness (or a static seed)
cupy.random.seed(None)
def seed_fn():
"""
Generate consistent dataloader shuffle seeds across workers
Reseeds each worker's dataloader each epoch to get fresh a shuffle
that's consistent across workers.
"""
max_rand = torch.iinfo(torch.int).max // hvd.size()
# Generate a seed fragment
seed_fragment = cupy.random.randint(0, max_rand)
# Aggregate seed fragments from all Horovod workers
seed_tensor = torch.tensor(seed_fragment)
reduced_seed = hvd.allreduce(seed_tensor, name="shuffle_seed", op=hvd.mpi_ops.Sum)
return reduced_seed % max_rand
train_dataset = TorchAsyncItr(
nvt.Dataset(TRAIN_PATHS),
batch_size=BATCH_SIZE,
cats=CATEGORICAL_COLUMNS + CATEGORICAL_MH_COLUMNS,
conts=NUMERIC_COLUMNS,
labels=["rating"],
device=gpu_to_use,
global_size=hvd.size(),
global_rank=hvd.rank(),
shuffle=True,
seed_fn=seed_fn,
)
train_loader = DLDataLoader(
train_dataset, batch_size=None, collate_fn=collate_fn, pin_memory=False, num_workers=0
)
EMBEDDING_TABLE_SHAPES_TUPLE = (
{
CATEGORICAL_COLUMNS[0]: EMBEDDING_TABLE_SHAPES[CATEGORICAL_COLUMNS[0]],
CATEGORICAL_COLUMNS[1]: EMBEDDING_TABLE_SHAPES[CATEGORICAL_COLUMNS[1]],
},
{CATEGORICAL_MH_COLUMNS[0]: EMBEDDING_TABLE_SHAPES[CATEGORICAL_MH_COLUMNS[0]]},
)
model = Model(
embedding_table_shapes=EMBEDDING_TABLE_SHAPES_TUPLE,
num_continuous=0,
emb_dropout=0.0,
layer_hidden_dims=[128, 128, 128],
layer_dropout_rates=[0.0, 0.0, 0.0],
).cuda()
lr_scaler = hvd.size()
optimizer = torch.optim.Adam(model.parameters(), lr=0.01 * lr_scaler)
hvd.broadcast_parameters(model.state_dict(), root_rank=0)
hvd.broadcast_optimizer_state(optimizer, root_rank=0)
optimizer = hvd.DistributedOptimizer(optimizer, named_parameters=model.named_parameters())
for epoch in range(args.epochs):
start = time()
print(f"Training epoch {epoch}")
train_loss, y_pred, y = process_epoch(train_loader,
model,
train=True,
optimizer=optimizer)
hvd.join(gpu_to_use)
hvd.broadcast_parameters(model.state_dict(), root_rank=0)
print(f"Epoch {epoch:02d}. Train loss: {train_loss:.4f}.")
hvd.join(gpu_to_use)
t_final = time() - start
total_rows = train_dataset.num_rows_processed
print(
f"run_time: {t_final} - rows: {total_rows} - "
f"epochs: {epoch} - dl_thru: {total_rows / t_final}"
)
hvd.join(gpu_to_use)
if hvd.local_rank() == 0:
print("Training complete")
Overwriting ./torch_trainer.py
!horovodrun -np 2 python torch_trainer.py --dir_in $BASE_DIR --batch_size 16384
[1,0]<stdout>:Training epoch 0
[1,1]<stdout>:Training epoch 0
[1,1]<stdout>:Total batches: 610
[1,0]<stdout>:Total batches: 610
[1,0]<stdout>:Epoch 00. Train loss: 0.1983.
[1,1]<stdout>:Epoch 00. Train loss: 0.1982.
[1,1]<stdout>:run_time: 82.45111298561096 - rows: 1222 - epochs: 0 - dl_thru: 14.820903633080835
[1,0]<stdout>:run_time: 82.4517252445221 - rows: 1222 - epochs: 0 - dl_thru: 14.820793578011722
[1,0]<stdout>:Training complete