# Copyright 2021 NVIDIA Corporation. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
http://developer.download.nvidia.com/compute/machine-learning/frameworks/nvidia_logo.png

Multi-GPU with MovieLens: ETL and Training

Overview

NVIDIA Merlin is a open source framework to accelerate and scale end-to-end recommender system pipelines on GPU. In this notebook, we use NVTabular, Merlin’s ETL component, to scale feature engineering and pre-processing to multiple GPUs and then perform data-parallel distributed training of a neural network on multiple GPUs with PyTorch, Horovod, and NCCL.

The pre-requisites for this notebook are to be familiar with NVTabular and its API:

In this notebook, we will focus only on the new information related to multi-GPU training, so please check out the other notebooks first (if you haven’t already.)

Learning objectives

In this notebook, we learn how to scale ETL and deep learning taining to multiple GPUs

  • Learn to use larger than GPU/host memory datasets for ETL and training

  • Use multi-GPU or multi node for ETL with NVTabular

  • Use NVTabular dataloader to accelerate PyTorch pipelines

  • Scale PyTorch training with Horovod

Dataset

In this notebook, we use the MovieLens25M dataset. It is popular for recommender systems and is used in academic publications. The dataset contains 25M movie ratings for 62,000 movies given by 162,000 users. Many projects use only the user/item/rating information of MovieLens, but the original dataset provides metadata for the movies, as well.

Note: We are using the MovieLens 25M dataset in this example for simplicity, although the dataset is not large enough to require multi-GPU training. However, the functionality demonstrated in this notebook can be easily extended to scale recommender pipelines for larger datasets in the same way.

Tools

Download and Convert

First, we will download and convert the dataset to Parquet. This section is based on 01-Download-Convert.ipynb.

Download

# External dependencies
import os
import pathlib

import cudf  # cuDF is an implementation of Pandas-like Dataframe on GPU

from nvtabular.utils import download_file

INPUT_DATA_DIR = os.environ.get(
    "INPUT_DATA_DIR", "~/nvt-examples/multigpu-movielens/data/"
)
BASE_DIR = pathlib.Path(INPUT_DATA_DIR).expanduser()
zip_path = pathlib.Path(BASE_DIR, "ml-25m.zip")
download_file(
    "http://files.grouplens.org/datasets/movielens/ml-25m.zip", zip_path, redownload=False
)
downloading ml-25m.zip: 262MB [00:09, 28.3MB/s]                            
unzipping files: 100%|██████████| 8/8 [00:04<00:00,  1.91files/s]

Convert

movies = cudf.read_csv(pathlib.Path(BASE_DIR, "ml-25m", "movies.csv"))
movies["genres"] = movies["genres"].str.split("|")
movies = movies.drop("title", axis=1)
movies.to_parquet(pathlib.Path(BASE_DIR, "ml-25m", "movies_converted.parquet"))

Split into train and validation datasets

ratings = cudf.read_csv(pathlib.Path(BASE_DIR, "ml-25m", "ratings.csv"))
ratings = ratings.drop("timestamp", axis=1)

# shuffle the dataset
ratings = ratings.sample(len(ratings), replace=False)
# split the train_df as training and validation data sets.
num_valid = int(len(ratings) * 0.2)
train = ratings[:-num_valid]
valid = ratings[-num_valid:]

train.to_parquet(pathlib.Path(BASE_DIR, "train.parquet"))
valid.to_parquet(pathlib.Path(BASE_DIR, "valid.parquet"))

ETL with NVTabular

We finished downloading and converting the dataset. We will preprocess and engineer features with NVTabular on multiple GPUs. You can read more

Deploy a Distributed-Dask Cluster

This section is based on scaling-criteo/02-ETL-with-NVTabular.ipynb and multi-gpu-toy-example/multi-gpu_dask.ipynb

# Standard Libraries
import shutil

# External Dependencies
import cudf
from dask_cuda import LocalCUDACluster
from dask.distributed import Client
import rmm

# NVTabular
import nvtabular as nvt
from nvtabular.io import Shuffle
from nvtabular.utils import device_mem_size
# define some information about where to get our data
input_path = pathlib.Path(BASE_DIR, "converted", "movielens")
dask_workdir = pathlib.Path(BASE_DIR, "test_dask", "workdir")
output_path = pathlib.Path(BASE_DIR, "test_dask", "output")
stats_path = pathlib.Path(BASE_DIR, "test_dask", "stats")

# Make sure we have a clean worker space for Dask
if pathlib.Path.is_dir(dask_workdir):
    shutil.rmtree(dask_workdir)
dask_workdir.mkdir(parents=True)

# Make sure we have a clean stats space for Dask
if pathlib.Path.is_dir(stats_path):
    shutil.rmtree(stats_path)
stats_path.mkdir(parents=True)

# Make sure we have a clean output path
if pathlib.Path.is_dir(output_path):
    shutil.rmtree(output_path)
output_path.mkdir(parents=True)

# Get device memory capacity
capacity = device_mem_size(kind="total")
# Deploy a Single-Machine Multi-GPU Cluster
protocol = "tcp"  # "tcp" or "ucx"
visible_devices = "0,1"  # Delect devices to place workers
device_spill_frac = 0.5  # Spill GPU-Worker memory to host at this limit.
# Reduce if spilling fails to prevent
# device memory errors.
cluster = None  # (Optional) Specify existing scheduler port
if cluster is None:
    cluster = LocalCUDACluster(
        protocol=protocol,
        CUDA_VISIBLE_DEVICES=visible_devices,
        local_directory=dask_workdir,
        device_memory_limit=capacity * device_spill_frac,
    )

# Create the distributed client
client = Client(cluster)
client

Client

Cluster

  • Workers: 2
  • Cores: 2
  • Memory: 125.84 GiB
# Initialize RMM pool on ALL workers
def _rmm_pool():
    rmm.reinitialize(
        pool_allocator=True,
        initial_pool_size=None,  # Use default size
    )


client.run(_rmm_pool)
{'tcp://127.0.0.1:39189': None, 'tcp://127.0.0.1:41297': None}

Defining our Preprocessing Pipeline

This subsection is based on getting-started-movielens/02-ETL-with-NVTabular.ipynb. The only difference is that we initialize the NVTabular workflow using the LocalCUDACluster client with nvt.Workflow(output, client=client).

movies = cudf.read_parquet(pathlib.Path(BASE_DIR, "ml-25m", "movies_converted.parquet"))
joined = ["userId", "movieId"] >> nvt.ops.JoinExternal(movies, on=["movieId"])
cat_features = joined >> nvt.ops.Categorify()
ratings = nvt.ColumnSelector(["rating"]) >> nvt.ops.LambdaOp(lambda col: (col > 3).astype("int8"))
output = cat_features + ratings
# USE client in NVTabular workflow
workflow = nvt.Workflow(output, client=client)
!rm -rf $BASE_DIR/train
!rm -rf $BASE_DIR/valid
train_iter = nvt.Dataset([str(pathlib.Path(BASE_DIR, "train.parquet"))], part_size="100MB")
valid_iter = nvt.Dataset([str(pathlib.Path(BASE_DIR, "valid.parquet"))], part_size="100MB")
workflow.fit(train_iter)
workflow.save(pathlib.Path(BASE_DIR, "workflow"))
shuffle = Shuffle.PER_WORKER  # Shuffle algorithm
out_files_per_proc = 4  # Number of output files per worker
workflow.transform(train_iter).to_parquet(
    output_path=pathlib.Path(BASE_DIR, "train"),
    shuffle=shuffle,
    out_files_per_proc=out_files_per_proc,
)
workflow.transform(valid_iter).to_parquet(
    output_path=pathlib.Path(BASE_DIR, "valid"),
    shuffle=shuffle,
    out_files_per_proc=out_files_per_proc,
)

client.shutdown()
cluster.close()
/opt/conda/lib/python3.8/site-packages/distributed/worker.py:3560: UserWarning: Large object of size 1.90 MiB detected in task graph: 
  ("('read-parquet-da3b70ae6b307343d9879321b01eda75' ... 79321b01eda75')
Consider scattering large objects ahead of time
with client.scatter to reduce scheduler burden and 
keep data on workers

    future = client.submit(func, big_data)    # bad

    big_future = client.scatter(big_data)     # good
    future = client.submit(func, big_future)  # good
  warnings.warn(

Training with PyTorch on multiGPUs

In this section, we will train a PyTorch model with multi-GPU support. In the NVTabular v0.5 release, we added multi-GPU support for NVTabular dataloaders. We will modify the getting-started-movielens/03-Training-with-PyTorch.ipynb to use multiple GPUs. Please review that notebook, if you have questions about the general functionality of the NVTabular dataloaders or the neural network architecture.

NVTabular dataloader for PyTorch

We’ve identified that the dataloader is one bottleneck in deep learning recommender systems when training pipelines with PyTorch. The normal PyTorch dataloaders cannot prepare the next training batches fast enough and therefore, the GPU is not fully utilized.

We developed a highly customized tabular dataloader for accelerating existing pipelines in PyTorch. In our experiments, we see a speed-up by 9x of the same training workflow with NVTabular dataloader. NVTabular dataloader’s features are:

  • removing bottleneck of item-by-item dataloading

  • enabling larger than memory dataset by streaming from disk

  • reading data directly into GPU memory and remove CPU-GPU communication

  • preparing batch asynchronously in GPU to avoid CPU-GPU communication

  • supporting commonly used .parquet format

  • easy integration into existing PyTorch pipelines by using similar API

  • supporting multi-GPU training with Horovod

You can find more information on the dataloaders in our blogpost.

Using Horovod with PyTorch and NVTabular

The training script below is based on getting-started-movielens/03-Training-with-PyTorch.ipynb, with a few important changes:

  • We provide several additional parameters to the TorchAsyncItr class, including the total number of workers hvd.size(), the current worker’s id number hvd.rank(), and a function for generating random seeds seed_fn().

    train_dataset = TorchAsyncItr(
        ...
        global_size=hvd.size(),
        global_rank=hvd.rank(),
        seed_fn=seed_fn,
    )
  • The seed function uses Horovod to collectively generate a random seed that’s shared by all workers so that they can each shuffle the dataset in a consistent way and select partitions to work on without overlap. The seed function is called by the dataloader during the shuffling process at the beginning of each epoch:

    def seed_fn():
        max_rand = torch.iinfo(torch.int).max // hvd.size()

        # Generate a seed fragment
        seed_fragment = cupy.random.randint(0, max_rand)

        # Aggregate seed fragments from all Horovod workers
        seed_tensor = torch.tensor(seed_fragment)
        reduced_seed = hvd.allreduce(seed_tensor, name="shuffle_seed", op=hvd.mpi_ops.Sum)

        return reduced_seed % max_rand
  • We wrap the PyTorch optimizer with Horovod’s DistributedOptimizer class and scale the learning rate by the number of workers:

    optimizer = torch.optim.Adam(model.parameters(), lr=0.01 * lr_scaler)
    optimizer = hvd.DistributedOptimizer(optimizer, named_parameters=model.named_parameters())
  • We broadcast the model and optimizer parameters to all workers with Horovod:

    hvd.broadcast_parameters(model.state_dict(), root_rank=0)
    hvd.broadcast_optimizer_state(optimizer, root_rank=0)

The rest of the script is the same as the MovieLens example in getting-started-movielens/03-Training-with-PyTorch.ipynb. In order to run it with Horovod, we first need to write it to a file.

%%writefile './torch_trainer.py'

import argparse
import glob
import os
from time import time

import cupy
import torch

import nvtabular as nvt
from nvtabular.framework_utils.torch.models import Model
from nvtabular.framework_utils.torch.utils import process_epoch
from nvtabular.loader.torch import DLDataLoader, TorchAsyncItr

# Horovod must be the last import to avoid conflicts
import horovod.torch as hvd  # noqa: E402, isort:skip


parser = argparse.ArgumentParser(description="Train a multi-gpu model with Torch and Horovod")
parser.add_argument("--dir_in", default=None, help="Input directory")
parser.add_argument("--batch_size", default=None, help="Batch size")
parser.add_argument("--cats", default=None, help="Categorical columns")
parser.add_argument("--cats_mh", default=None, help="Categorical multihot columns")
parser.add_argument("--conts", default=None, help="Continuous columns")
parser.add_argument("--labels", default=None, help="Label columns")
parser.add_argument("--epochs", default=1, help="Training epochs")
args = parser.parse_args()

hvd.init()

gpu_to_use = hvd.local_rank()

if torch.cuda.is_available():
    torch.cuda.set_device(gpu_to_use)


BASE_DIR = os.path.expanduser(args.dir_in or "./data/")
BATCH_SIZE = int(args.batch_size or 16384)  # Batch Size
CATEGORICAL_COLUMNS = args.cats or ["movieId", "userId"]  # Single-hot
CATEGORICAL_MH_COLUMNS = args.cats_mh or ["genres"]  # Multi-hot
NUMERIC_COLUMNS = args.conts or []

# Output from ETL-with-NVTabular
TRAIN_PATHS = sorted(glob.glob(os.path.join(BASE_DIR, "train", "*.parquet")))

proc = nvt.Workflow.load(os.path.join(BASE_DIR, "workflow/"))

EMBEDDING_TABLE_SHAPES = nvt.ops.get_embedding_sizes(proc)


# TensorItrDataset returns a single batch of x_cat, x_cont, y.
def collate_fn(x):
    return x


# Seed with system randomness (or a static seed)
cupy.random.seed(None)


def seed_fn():
    """
    Generate consistent dataloader shuffle seeds across workers

    Reseeds each worker's dataloader each epoch to get fresh a shuffle
    that's consistent across workers.
    """

    max_rand = torch.iinfo(torch.int).max // hvd.size()

    # Generate a seed fragment
    seed_fragment = cupy.random.randint(0, max_rand)

    # Aggregate seed fragments from all Horovod workers
    seed_tensor = torch.tensor(seed_fragment)
    reduced_seed = hvd.allreduce(seed_tensor, name="shuffle_seed", op=hvd.mpi_ops.Sum)

    return reduced_seed % max_rand


train_dataset = TorchAsyncItr(
    nvt.Dataset(TRAIN_PATHS),
    batch_size=BATCH_SIZE,
    cats=CATEGORICAL_COLUMNS + CATEGORICAL_MH_COLUMNS,
    conts=NUMERIC_COLUMNS,
    labels=["rating"],
    device=gpu_to_use,
    global_size=hvd.size(),
    global_rank=hvd.rank(),
    shuffle=True,
    seed_fn=seed_fn,
)
train_loader = DLDataLoader(
    train_dataset, batch_size=None, collate_fn=collate_fn, pin_memory=False, num_workers=0
)


EMBEDDING_TABLE_SHAPES_TUPLE = (
    {
        CATEGORICAL_COLUMNS[0]: EMBEDDING_TABLE_SHAPES[CATEGORICAL_COLUMNS[0]],
        CATEGORICAL_COLUMNS[1]: EMBEDDING_TABLE_SHAPES[CATEGORICAL_COLUMNS[1]],
    },
    {CATEGORICAL_MH_COLUMNS[0]: EMBEDDING_TABLE_SHAPES[CATEGORICAL_MH_COLUMNS[0]]},
)

model = Model(
    embedding_table_shapes=EMBEDDING_TABLE_SHAPES_TUPLE,
    num_continuous=0,
    emb_dropout=0.0,
    layer_hidden_dims=[128, 128, 128],
    layer_dropout_rates=[0.0, 0.0, 0.0],
).cuda()

lr_scaler = hvd.size()

optimizer = torch.optim.Adam(model.parameters(), lr=0.01 * lr_scaler)

hvd.broadcast_parameters(model.state_dict(), root_rank=0)
hvd.broadcast_optimizer_state(optimizer, root_rank=0)

optimizer = hvd.DistributedOptimizer(optimizer, named_parameters=model.named_parameters())

for epoch in range(args.epochs):
    start = time()
    print(f"Training epoch {epoch}")
    train_loss, y_pred, y = process_epoch(train_loader,
                                          model,
                                          train=True,
                                          optimizer=optimizer)
    hvd.join(gpu_to_use)
    hvd.broadcast_parameters(model.state_dict(), root_rank=0)
    print(f"Epoch {epoch:02d}. Train loss: {train_loss:.4f}.")
    hvd.join(gpu_to_use)
    t_final = time() - start
    total_rows = train_dataset.num_rows_processed
    print(
        f"run_time: {t_final} - rows: {total_rows} - "
        f"epochs: {epoch} - dl_thru: {total_rows / t_final}"
    )


hvd.join(gpu_to_use)
if hvd.local_rank() == 0:
    print("Training complete")
Overwriting ./torch_trainer.py
!horovodrun -np 2 python torch_trainer.py --dir_in $BASE_DIR --batch_size 16384
[1,0]<stdout>:Training epoch 0
[1,1]<stdout>:Training epoch 0
[1,1]<stdout>:Total batches: 610
[1,0]<stdout>:Total batches: 610
[1,0]<stdout>:Epoch 00. Train loss: 0.1983.
[1,1]<stdout>:Epoch 00. Train loss: 0.1982.
[1,1]<stdout>:run_time: 82.45111298561096 - rows: 1222 - epochs: 0 - dl_thru: 14.820903633080835
[1,0]<stdout>:run_time: 82.4517252445221 - rows: 1222 - epochs: 0 - dl_thru: 14.820793578011722
[1,0]<stdout>:Training complete