Scaling Criteo: Training with Merlin Models TensorFlow

# Copyright 2021 NVIDIA Corporation. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================

# Each user is responsible for checking the content of datasets and the
# applicable licenses and determining if suitable for the intended use.
https://developer.download.nvidia.com/notebooks/dlsw-notebooks/merlin_merlin_scaling-criteo-03-training-with-merlin-models-tensorflow/nvidia_logo.png

Scaling Criteo: Training with Merlin Models TensorFlow#

This notebook is created using the latest stable merlin-tensorflow container.

Overview#

The Criteo 1TB Click Logs dataset is a popular dataset in the recommender system community as it is one of the largest, public available dataset. It contains ~1.3 TB of uncompressed click logs containing over four billion samples spanning 24 days.

We will train Facebook’s deep learning recommendation model (DLRM) architecture with Merlin Models. We will assume you are familiar with Merlin Models’ API and features. Otherwise, we recommend to start with the Merlin Models examples.

Learning objectives#

  • Train a DLRM architecture with Merlin Models on a large dataset

Training a DLRM model#

Let’s start with importing the libraries that we’ll use in this notebook.

import os
os.environ["TF_GPU_ALLOCATOR"] = "cuda_malloc_async"

import glob
import merlin.models.tf as mm
from merlin.io.dataset import Dataset

from merlin.schema import Tags
import tensorflow as tf
/usr/lib/python3/dist-packages/requests/__init__.py:89: RequestsDependencyWarning: urllib3 (1.26.12) or chardet (3.0.4) doesn't match a supported version!
  warnings.warn("urllib3 ({}) or chardet ({}) doesn't match a supported "
2022-11-29 17:20:01.942873: I tensorflow/core/platform/cpu_feature_guard.cc:194] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  SSE3 SSE4.1 SSE4.2 AVX
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2022-11-29 17:20:06.579306: I tensorflow/core/common_runtime/gpu/gpu_process_state.cc:222] Using CUDA malloc Async allocator for GPU: 0
2022-11-29 17:20:06.579506: I tensorflow/core/common_runtime/gpu/gpu_device.cc:1532] Created device /job:localhost/replica:0/task:0/device:GPU:0 with 16255 MB memory:  -> device: 0, name: Tesla V100-SXM2-32GB-LS, pci bus id: 0000:85:00.0, compute capability: 7.0
2022-11-29 17:20:06.582211: I tensorflow/core/common_runtime/gpu/gpu_process_state.cc:222] Using CUDA malloc Async allocator for GPU: 1
2022-11-29 17:20:06.582328: I tensorflow/core/common_runtime/gpu/gpu_device.cc:1532] Created device /job:localhost/replica:0/task:0/device:GPU:1 with 30655 MB memory:  -> device: 1, name: Tesla V100-SXM2-32GB-LS, pci bus id: 0000:8a:00.0, compute capability: 7.0

Define the path to directories which contains the processed data.

input_path = os.environ.get("INPUT_DATA_DIR", "/raid/data/criteo/test_dask/output/")

# path to processed data
PATH_TO_TRAIN_DATA = sorted(glob.glob(os.path.join(input_path, "train", "*.parquet")))
PATH_TO_VALID_DATA = sorted(glob.glob(os.path.join(input_path, "valid", "*.parquet")))

PATH_TO_TRAIN_DATA, PATH_TO_VALID_DATA
(['/raid/data/criteo/test_dask/output/train/part_0.parquet',
  '/raid/data/criteo/test_dask/output/train/part_1.parquet'],
 ['/raid/data/criteo/test_dask/output/valid/part_0.parquet',
  '/raid/data/criteo/test_dask/output/valid/part_1.parquet'])

We define some hyperparameters for the model architecture.

BATCH_SIZE = int(os.environ.get("BATCH_SIZE", 64 * 1024))
EMBEDDING_SIZE = 8
EPOCHS = 1
LR = 0.01
OPTIMIZER = tf.keras.optimizers.SGD(learning_rate=LR)

We will use Merlin Dataset object to initialize the dataloaders. It provides a dataset schema to initialize the model architectures. The Merlin Models examples will explain more details.

train = Dataset(PATH_TO_TRAIN_DATA, part_mem_fraction=0.08)
valid = Dataset(PATH_TO_VALID_DATA, part_mem_fraction=0.08)

We initialize the DLRM architecture with Merlin Models.

model = mm.DLRMModel(
    train.schema,
    embedding_dim=EMBEDDING_SIZE,
    bottom_block=mm.MLPBlock([128, EMBEDDING_SIZE]),
    top_block=mm.MLPBlock([128, 64, 32]),
    prediction_tasks=mm.BinaryClassificationTask(
        train.schema.select_by_tag(Tags.TARGET).column_names[0]
    )
)

We compile and train our model.

%%time

model.compile(optimizer=OPTIMIZER, run_eagerly=False)
model.fit(train,
          validation_data=valid,
          batch_size=BATCH_SIZE,
          epochs=EPOCHS
          )
2867/2867 [==============================] - 224s 72ms/step - loss: 0.1563 - precision: 0.0378 - recall: 2.0745e-04 - binary_accuracy: 0.9677 - auc: 0.5673 - regularization_loss: 0.0000e+00 - loss_batch: 0.1563 - val_loss: 0.1385 - val_precision: 0.0000e+00 - val_recall: 0.0000e+00 - val_binary_accuracy: 0.9680 - val_auc: 0.6279 - val_regularization_loss: 0.0000e+00 - val_loss_batch: 0.1405
CPU times: user 8min 32s, sys: 13min 13s, total: 21min 45s
Wall time: 3min 54s
<keras.callbacks.History at 0x7f58457b1c40>

Evaluate the model#

Finally, we can evaluate our model on the validation dataset.

eval_metrics = model.evaluate(valid, batch_size=BATCH_SIZE, return_dict=True)
eval_metrics
2787/2787 [==============================] - 89s 31ms/step - loss: 0.1385 - precision: 0.0000e+00 - recall: 0.0000e+00 - binary_accuracy: 0.9680 - auc: 0.6279 - regularization_loss: 0.0000e+00 - loss_batch: 0.1385
{'loss': 0.13853003084659576,
 'precision': 0.0,
 'recall': 0.0,
 'binary_accuracy': 0.9679937958717346,
 'auc': 0.6279259324073792,
 'regularization_loss': 0.0,
 'loss_batch': 0.14048266410827637}

Save the model#

We save the model to disk.

model.save(os.path.join(input_path, "dlrm"))
INFO:tensorflow:Unsupported signature for serialization: ((PredictionOutput(predictions=TensorSpec(shape=(None, 1), dtype=tf.float32, name='outputs/predictions'), targets=TensorSpec(shape=(None, 1), dtype=tf.float32, name='outputs/targets'), positive_item_ids=None, label_relevant_counts=None, valid_negatives_mask=None, negative_item_ids=None, sample_weight=None), <tensorflow.python.framework.func_graph.UnknownArgument object at 0x7f57c5f8ecd0>), {}).
WARNING:absl:Function `_wrapped_model` contains input name(s) C1, C10, C11, C12, C13, C14, C15, C16, C17, C18, C19, C2, C20, C21, C22, C23, C24, C25, C26, C3, C4, C5, C6, C7, C8, C9, I1, I10, I11, I12, I13, I2, I3, I4, I5, I6, I7, I8, I9 with unsupported characters which will be renamed to c1, c10, c11, c12, c13, c14, c15, c16, c17, c18, c19, c2, c20, c21, c22, c23, c24, c25, c26, c3, c4, c5, c6, c7, c8, c9, i1, i10, i11, i12, i13, i2, i3, i4, i5, i6, i7, i8, i9 in the SavedModel.
INFO:tensorflow:Unsupported signature for serialization: ((PredictionOutput(predictions=TensorSpec(shape=(None, 1), dtype=tf.float32, name='outputs/predictions'), targets=TensorSpec(shape=(None, 1), dtype=tf.float32, name='outputs/targets'), positive_item_ids=None, label_relevant_counts=None, valid_negatives_mask=None, negative_item_ids=None, sample_weight=None), <tensorflow.python.framework.func_graph.UnknownArgument object at 0x7f57c5f8ecd0>), {}).
INFO:tensorflow:Unsupported signature for serialization: ((PredictionOutput(predictions=TensorSpec(shape=(None, 1), dtype=tf.float32, name='outputs/predictions'), targets=TensorSpec(shape=(None, 1), dtype=tf.float32, name='outputs/targets'), positive_item_ids=None, label_relevant_counts=None, valid_negatives_mask=None, negative_item_ids=None, sample_weight=None), <tensorflow.python.framework.func_graph.UnknownArgument object at 0x7f57c5f8ecd0>), {}).
WARNING:absl:Found untraced functions such as train_compute_metrics, model_context_layer_call_fn, model_context_layer_call_and_return_conditional_losses, output_layer_layer_call_fn, output_layer_layer_call_and_return_conditional_losses while saving (showing 5 of 161). These functions will not be directly callable after loading.
INFO:tensorflow:Assets written to: /raid/data/criteo/test_dask/output/dlrm/assets
INFO:tensorflow:Assets written to: /raid/data/criteo/test_dask/output/dlrm/assets

Summary#

We trained Facebook’s popular DLRM architecture with only ~5 commands on the large criteo dataset.

Next steps#

The next step is to deploy the NVTabular workflow and DLRM model to production.

If you are interested more in different architecture and training models with Merlin Models, we recommend to check out our Merlin Models examples