NVIDIA Merlin on Microsoft’s News Dataset (MIND)

Overview

In this tutorial notebook, we would be using the Microsoft’s News Dataset (MIND) to demonstrate NVTabular for ETL the data and HugeCTR for training Deep Neural Network models for building a Recommender System.

The MIND dataset contains 15M impressions generated by 1M users over 160k news articles. Our goal from this jupyter notebook would be to train a model that can predict whether a user would click on a news article or not.

In order to build a Recommender System, we would be first cleaning and pre-processing the data, then developing simple time based and complex target & count encoded features to finally train and evaluate Deep Learning Recommendation Model (DLRM).

Please remember to run this jupyter notebook in the merlin-training:22.04 docker container.

Step 1: Import libraries and create directories

# Install packages required for this notebook
!pip install tqdm graphviz
!apt install wget unzip graphviz -y
import time, glob, shutil, sys, os, pickle, json
from tqdm import tqdm

import cupy as cp          # CuPy is an implementation of NumPy-compatible multi-dimensional array on GPU
import cudf                # cuDF is an implementation of Pandas-like Dataframe on GPU
import rmm                 # library for pre-allocating memory on GPU

import numpy as np

# NVTabular is the core library we will use here for feature engineering/preprocessing on GPU
from nvtabular.ops import Operator
import nvtabular as nvt
from nvtabular.utils import device_mem_size

# Dask is the backend job scheduler used by NVTabular
import dask   
import dask_cudf
from dask_cuda import LocalCUDACluster
from dask.distributed import Client
from dask.distributed import wait
from dask.utils import parse_bytes
from dask.delayed import delayed

It is often a good idea to set-aside (fast) dedicated disk space for dask workers to spill data and logging information. To make things simple, we will perform all IO within a single BASE_DIR for this example. Feel free to set this variable yourself.

# Define "fast" root directory for this example
BASE_DIR = os.environ.get("BASE_DIR", "./basedir")

# Define worker/output directories
dask_workdir = os.path.join(BASE_DIR, "workdir")

# Directory to store the raw downloaded dataset
data_input_path = os.path.join(BASE_DIR, "dataset")
data_path_train = os.path.join(data_input_path, "train")
data_path_valid = os.path.join(data_input_path, "valid")

# Directory to store NVTabular's processed dataset
data_output_path = os.path.join(BASE_DIR, "processed_nvt")
output_train_path = os.path.join(data_output_path, "train")
output_valid_path = os.path.join(data_output_path, "valid")

# Directory to store HugeCTR's train configurations and weights
config_output_path = os.path.join(BASE_DIR, "configs")
weights_path = os.path.join(BASE_DIR, "weights")

#Creating and cleaning our worker/output directories
try:
    # Ensure BASE_DIR exists
    if not os.path.isdir(BASE_DIR):
        os.mkdir(BASE_DIR)

    # Make sure we have a clean worker space for Dask
    if os.path.isdir(dask_workdir):
        shutil.rmtree(dask_workdir)
    os.mkdir(dask_workdir)

    # Make sure we have a clean path for downloading dataset and preprocessing
    if os.path.isdir(data_input_path):
        shutil.rmtree(data_input_path)
    os.mkdir(data_input_path)
    os.mkdir(data_path_train)
    os.mkdir(data_path_valid)

    # Make sure we have a clean output path
    if os.path.isdir(data_output_path):
        shutil.rmtree(data_output_path)
    os.mkdir(data_output_path)
    os.mkdir(output_train_path)
    os.mkdir(output_valid_path)
    
    # Make sure we have a clean configs and weights path
    if os.path.isdir(config_output_path):
        shutil.rmtree(config_output_path)
    os.mkdir(config_output_path)    
        
    if os.path.isdir(weights_path):
        shutil.rmtree(weights_path)
    os.mkdir(weights_path)

except OSError:
    print ("Creation of the directories failed")
else:
    print ("Successfully created the directories")

The following directory structure has been created and would be used to store everything concerning this tutorial:

basedir
  |— workdir
  |— dataset
    |— train
    |— valid
  |— processed_nvt
    |— train
    |— valid
  |— configs
  |— weights

Step 2: Deploy a Distributed-Dask cluster

# Check the GPUs that are available to this notebook
!nvidia-smi

Initialize Dask GPU Cluster

NUM_GPUS = [0,1,2,3] # Set this to the GPU IDs that are observed from the above cell

# Dask dashboard
dashboard_port = "8787"

# Deploy a single-machine multi-GPU cluster
protocol = "tcp"             # "tcp" or "ucx"
visible_devices = ",".join([str(n) for n in NUM_GPUS])  # Detect devices to place workers
device_spill_frac = 0.9      # Spill GPU-Worker memory to host at this limit.
                             # Reduce if spilling fails to prevent
                             # device memory errors.

# Get device memory capacity
capacity = device_mem_size(kind="total") 

# Check if any device memory is already occupied
"""
for dev in visible_devices.split(","):
    fmem = _pynvml_mem_size(kind="free", index=int(dev))
    used = (device_size - fmem) / 1e9
    if used > 1.0:
        warnings.warn(f"BEWARE - {used} GB is already occupied on device {int(dev)}!")
"""

cluster = None               # (Optional) Specify existing scheduler port
if cluster is None:
    cluster = LocalCUDACluster(
        protocol = protocol,
        n_workers=len(visible_devices.split(",")),
        CUDA_VISIBLE_DEVICES = visible_devices,
        device_memory_limit = capacity * device_spill_frac,
        local_directory=dask_workdir,
        dashboard_address=":" + dashboard_port,
    )

# Create the distributed client
client = Client(cluster)
client
# Initialize RMM pool on ALL workers
def _rmm_pool():
    rmm.reinitialize(
        pool_allocator=True,
        initial_pool_size=None, # Use default size
    )
    
client.run(_rmm_pool)

Step 3: Download & explore MIND dataset

MIcrosoft News Dataset (MIND) is a large-scale dataset for news recommendation research. It was collected from anonymized behavior logs of Microsoft News website.

Please read and accept the Microsoft Research License Terms before downloading.

Let’s download the train and validation set, and unzip them to their respective directories.

!wget https://mind201910small.blob.core.windows.net/release/MINDlarge_train.zip
!wget https://mind201910small.blob.core.windows.net/release/MINDlarge_dev.zip
!unzip MINDlarge_train.zip -d $BASE_DIR/dataset/train
!unzip MINDlarge_dev.zip -d  $BASE_DIR/dataset/valid

The MIND dataset for news recommendation was collected from anonymized behavior logs of Microsoft News website. To protect user’s privacy, each user is de-linked from the production system when securely hashed into an anonymized ID. MIND dataset team has randomly sampled 1M users who had at least 5 news clicks from October 12 to November 22, 2019 (6 weeks).

Microsoft has provided train, validation and test sets of this data but we are going to use the train and validation set for this tutorial.

Dataset format

Each set of this data contains the following 4 files:

  1. behaviors.tsv - The click history and impression logs of users

  2. news.tsv - Details of news articles mapped with the news ID

  3. entity_embedding.vec - The embeddings of entities in news extracted from knowledge graph

  4. relation_embedding.vec - The embeddings of relations between entities extracted from knowledge graph

Let’s take a look at both these TSV files and understand how we can utilise them for our Recommendation System.
Note - For the ease of this tutorial, we are ignoring the embeddings provided by the MIND team.

Behaviors data

behaviors_train = cudf.read_csv(os.path.join(data_path_train , 'behaviors.tsv'), 
                                header=None, 
                                sep='\t',)
behaviors_train.head()

Each row in this data file represents one instance of an impression generated by the user. The columns of behaviors data are represented as:

[Impression ID] [User ID] [Time until when Impression Recorded] [User Click History] [Impression News]

Column 0: Impression ID (int64)
This is the ID of the impression generated.
e.g. 1,2,3,4,5

Column 1: User ID (string)
The anonymous ID of a user who has generated that impression.
e.g. U89 , U395 , U60005, U3965770

Column 2: Time (timestamp)
The impression time with format MM/DD/YYYY HH:MM:SS AM/PM
This is the point of time upto which the user’s impression have been captured.

Column 3: History (string)
The news click history of this user before this impression. The clicked news articles are ordered by time.
e.g. N106403 N71977 N97080 N102132 N97212 N121652

Column 4: Impressions (string)
List of news displayed to the user and user’s click behaviors on them (1 for click and 0 for non-click).
e.g. N129416-0 N26703-1 N120089-1 N53018-0 N89764-0 N91737-0 N29160-0

The corresponding details of news ID in history and impression columns would be present in the news.tsv file.

For more details on dataset: Official MIND Dataset Description, click Official Dataset Description

Let’s reload the data with their respective column names.

behaviors_columns = ['impression_id', 'uid', 'time', 'history', 'impressions']
behaviors_train = cudf.read_csv(os.path.join(data_path_train , 'behaviors.tsv'), 
                          header=None, 
                          names=behaviors_columns,
                    sep='\t',)
behaviors_train.head()
behaviors_valid = cudf.read_csv(os.path.join(data_path_valid , 'behaviors.tsv'), 
                          header=None, 
                          names=behaviors_columns,
                    sep='\t',)
behaviors_valid.head()

News data

news_train = cudf.read_csv(os.path.join(data_path_train , 'news.tsv'), 
                          header=None, 
                          sep='\t',)
news_train.head()

Each row in this data file represents a news article and its attributes. The columns of this data file are:

[News ID] [Category] [Subcategory] [News Title] [News Abstract] [News Url] [Entities in News Title] [Entities in News Abstract]

Column 0: News ID (string)
This is the ID of the news article
e.g. N89 , N395 , N60005, N3965770

Column 1: Category (string)
Category of the news. There are 18 categories
e.g. sports , health , news … etc

Column 2: SubCategory (string)
Sub-category of the news. There are 242 unique sub-categories.
e.g. golf, newsscienceandtechnology, medical, newsworld … etc

Column 3: Title (string)
Title of the news article
e.g. PGA Tour winners, 50 Worst Habits For Belly Fats … etc

Column 4: Abstract (string)
Abstract of the news article
e.g. A gallery of recent winners on the PGA Tour, These seemingly harmless habits are holding

Column 5: URL (string)
URL to the MSN site where the news article was published.
e.g. https://www.msn.com/en-us/sports/golf/pga-tour-winners/ss-AAjnQjj?ocid=chopendata

Column 6: Title Entities (string)
Entities present in the title

Column 7: Abstract Entites (string)
Entites present in the abstract

Let’s reload the data with their respective column names.

news_columns = ['did', 'cat', 'sub_cat', 'title', 'abstract', 'url', 'title_entities', 'abstract_entities']
news_train = cudf.read_csv(os.path.join(data_path_train , 'news.tsv'), 
                          header=None, 
                          names=news_columns,
                    sep='\t',)
news_train.head()
news_valid = cudf.read_csv(os.path.join(data_path_valid , 'news.tsv'), 
                          header=None, 
                          names=news_columns,
                    sep='\t',)
news_valid.head()

Step 4 : Initial pre-processing to prepare dataset for feature engineering

Before we use the data in NVTabular for pre-processing and feature engineering, we have to make a few changes to make it efficient for GPU operations.
The changes we have to make in the behaviours data file are:

  • The history column is a long string and not a list. NVTabular support multi-hot categorical features but HugeCTR parquet reader does not,.Thus we need to extend the dataframe with multiple history columns, capturing each element in this long string. While extending the history columns, we have to make sure we pick the most recent history (in reverse chronological order).

  • The impression column contains a long string of unique negative and positive values for the same impression event. Each of these unique values in this column is a data point for our model to learn from. Thus, these unique positive & negative entries should be unrolled into multiple rows. The row expansion operation is not supported in NVtabular and hence we’re going to perform it with cuDF.

As for the news data file, we would just be using the news id, category and sub-category columns.
Their are many ways to use the other columns (title, abstract, entities etc.) as features but we would leave it up to you to explore those.

In a nutshell, we are going to take the raw downloaded dataset, do these basic pre-processing using cuDF, generate a new train dataset which will then be used for further processing.

Pre-process 1: Drop columns from the news dataset

The columns that we would drop from the news.tsv are: ‘title’, ‘abstract’, ‘url’, ‘title_entities’, ‘abstract_entities’

We encourage you to explore using ‘title_entities’ and ‘abstract_entities’ as categorical features.

news_train = news_train.drop(['title', 'abstract', 'url', 'title_entities', 'abstract_entities'],axis = 1)
news_valid = news_valid.drop(['title', 'abstract', 'url', 'title_entities', 'abstract_entities'],axis = 1)

# Merging news train/valid dataset to have a single view of news and their attributes
news = cudf.concat([news_train,news_valid]).drop_duplicates().reset_index().drop(['index'],axis=1)

# Freeing up memory by nulling the variables
news_train = None
news_valid = None

news.head()

Pre-process 2: Label encoding for categorical variables

Strings require significant amount of memory as compared to integers. As an example, representing the string cfcd208495d565ef66e7dff9f98764 as integer 0 can save upto 90% memory.

Thus, we would be label encoding the categorical variables in our dataset so that the downstream pre-preprocessing and feature engineering pipelines doesn’t consume a high amount of memory.
We will also label encode low cardinality columns in news.tsv like the news_categories and news_subcategories.

# Encoding user id from both train and validation dataframe
user_index = {} 

temp = cudf.concat([behaviors_train['uid'],behaviors_valid['uid']]).unique().to_pandas() 
for i in tqdm(range(len(temp)),total = len(temp)):
    user_index[temp[i]] = i + 1    
# Replacing uid in the dataset with their respective indexes

behaviors_train['uid'] = behaviors_train['uid'].replace([i for i in user_index],[str(user_index[i]) for i in user_index]).astype('int')
behaviors_valid['uid'] = behaviors_valid['uid'].replace([i for i in user_index],[str(user_index[i]) for i in user_index]).astype('int')

# Freeing up memory by nulling variables
user_index = None
# Encoding news id from the combined news dataframe
news_index = {}

for n,data in tqdm(news.to_pandas().iterrows(),total = len(news)):
    news_index[data['did']] = n + 1
# Encoding new's category and subcategories

cat = {}
subcat = {}

temp = news['cat'].unique()
for i in tqdm(range(len(temp)),total = len(temp)):
    cat[temp[i]] = i + 1
    
temp = news['sub_cat'].unique()
for i in tqdm(range(len(temp)),total = len(temp)):
    subcat[temp[i]] = i + 1

# Replacing did, cat and sub_cate with their respective indexes in the news dataframe
news = news.replace({'did': [i for i in news_index], 'cat': [i for i in cat], 'sub_cat': [i for i in subcat]},{'did': [str(news_index[i]) for i in news_index], 'cat': [str(cat[i]) for i in cat], 'sub_cat': [str(subcat[i]) for i in subcat]}).astype('int')
news = news.set_index('did').to_pandas().T.to_dict()

# Freeing up memory by nulling variables
temp = None
cat = None
subcat = None

We will replace the news id with their corresponding news_index in the behaviours dataframe in the pre-process step 3.

Pre-process 3: Unroll items in history column

As an example, consider the below row in behaviours dataframe

impression_id

uid

time

history

impressions

1

U64099

11/19/2019 11:37:45 AM

N121133 N104200 N43255 N55860 N128965 N38014

N78206-0 N26368-1 N7578-1 N58592-0 N19858-0

We have to convert one history column with many news id to multiple history columns with single news id.

hist_0

hist_1

hist_2

hist_3

hist_4

hist_5

N121133

N104200

N43255

N55860

N128965

N38014

Finally, we will add the news category and subcategory for these news ids. The row after these transformations would look like this:

impression_id

uid

time

hist_cat_0

hist_cat_1

hist_cat_2

hist_subcat_3

hist_subcat_4

hist_subcat_5

impressions

1

U64099

11/19/2019 11:37:45 AM

sports

finance

entertainment

markets

celebrity

football_nfl

N78206-0 N26368-1 N7578-1 N58592-0 N19858-0

We observed that the maximum number of items in the history column is 400 & the mean is 30.
We have to unroll the same number of history items for each row and thus would define a variable that will control this number. Feel free to increase this number to include more items.

For this tutorial, max_hist i.e. the number of history columns to be unrolled is set to 10.

max_hist = 10

Lets expand the history column into individual columns of histories with the limit max_hist. During expansion, we will use the last max_hist items from history column as those items would be the most recent ones (since the news id in this column is ordered by time).

In addition, we’re also saving the length of history in a seperate column which could be used as a feature too.

We will also replace the news id with their news indexes in the behaviours dataframe.

# Making a new gdf for storing history
hist = cudf.DataFrame() 

# Splitting the long string of history into several columns
hist[['hist_'+str(i) for i in range(max_hist)]] = behaviors_train.history.str.rsplit(n=max_hist,expand=True).fillna(0)[[i for i in range(1,max_hist+1)]]

# Replacing string news id in history with respective indexes
hist = hist.replace([i for i in news_index],[str(news_index[i]) for i in news_index]).astype('int')

# Appending news category corresponding to these newly created history columns
behaviors_train[['hist_cat_'+str(i) for i in range(max_hist)]] = hist.replace([int(i) for i in news],[int(news[i]['cat']) for i in news])

# Appending news sub-category corresponding to these newly created history columns
behaviors_train[['hist_subcat_'+str(i) for i in range(max_hist)]] = hist.replace([int(i) for i in news],[int(news[i]['sub_cat']) for i in news])

# Creating a column for the length of history 
behaviors_train['hist_count'] = behaviors_train.history.str.count(" ")+1

# Dropping the long string history column
behaviors_train = behaviors_train.drop(['history'],axis=1)

# Freeing up memory by nulling variables
hist = None
# Repeating the same for validation set
hist = cudf.DataFrame()

hist[['hist_'+str(i) for i in range(max_hist)]] = behaviors_valid.history.str.rsplit(n=max_hist,expand=True).fillna(0)[[i for i in range(1,max_hist+1)]]

hist = hist.replace([i for i in news_index],[str(news_index[i]) for i in news_index]).astype('int')

behaviors_valid[['hist_cat_'+str(i) for i in range(max_hist)]] = hist.replace([int(i) for i in news],[int(news[i]['cat']) for i in news])

behaviors_valid[['hist_subcat_'+str(i) for i in range(max_hist)]] = hist.replace([int(i) for i in news],[int(news[i]['sub_cat']) for i in news])

behaviors_valid['hist_count'] = behaviors_valid.history.str.count(" ")+1

behaviors_valid = behaviors_valid.drop(['history'],axis=1)

hist = None

Pre-process 4 : Unroll items in impression column

As an example, consider the below expanded history column row from the behaviours dataframe:

impression_id

uid

time

hist_cat_0

hist_cat_1

hist_cat_2

hist_subcat_3

hist_subcat_4

hist_subcat_5

impressions

1

U64099

11/19/2019 11:37:45 AM

sports

finance

entertainment

markets

celebrity

football_nfl

N78206-0 N26368-1 N7578-1 N58592-0 N19858-0

The impression column contains the positive and negetive samples as a long string.

After unrolling one row of impressions into multiple rows, the resulting dataframe would look like this:

impression_id

uid

time

hist_cat_0

hist_cat_1

hist_cat_2

hist_subcat_3

hist_subcat_4

hist_subcat_5

impressions

label

1

U64099

11/19/2019 11:37:45 AM

sports

finance

entertainment

markets

celebrity

football_nfl

N78206

0

1

U64099

11/19/2019 11:37:45 AM

sports

finance

entertainment

markets

celebrity

football_nfl

N26368

1

1

U64099

11/19/2019 11:37:45 AM

sports

finance

entertainment

markets

celebrity

football_nfl

N7578

1

1

U64099

11/19/2019 11:37:45 AM

sports

finance

entertainment

markets

celebrity

football_nfl

N58592

0

1

U64099

11/19/2019 11:37:45 AM

sports

finance

entertainment

markets

celebrity

football_nfl

N19858

0

Note that all the 5 generated rows have the same impression_id, uid, time and history data columns.

We have observed that the maximum number of items in impression column is 105 and the mean is 40.
We will limit the items to unroll from impression column by defining the variable max_impr and set it to 100. Feel free to increase or decrease this value.

Note - Make sure you’re using a GPU with atleast 16GB memory to avoid OOM errors with the below set values.

max_impr = 100

Row expansion is a memory and I/O intensive operation thus, we will perform it in 2 steps. We will first create a dictionary with impression-label and later merge it with the train set.

Let’s convert impression column as dictionary of list with impression id as key and the impression items as value.

# For train dataset
impr_train = behaviors_train.set_index('impression_id').impressions.to_pandas().str.split()
impr_train = impr_train.to_dict()
behaviors_train = behaviors_train.drop(['impressions'],axis=1)

# For validation dataset
impr_valid = behaviors_valid.set_index('impression_id').impressions.to_pandas().str.split()
impr_valid = impr_valid.to_dict()
behaviors_valid = behaviors_valid.drop(['impressions'],axis=1)

Since the number of negative samples (labelled with 0) are greater than the positive samples, we can define a ratio between the negatives and positives to sampling a balanced distribution.
For now, let’s set this variable to -1 to include all the samples from the impression column. Feel free to set this variable to a value greater than 1 to downsample the negative samples.

np_ratio = -1 # ratio of neg-to-pos samples

Iterating over the above dictionary to create a new dataframe with individual impression news in a new row with its corresponding label.
This is a time consuming operation!

# For train set

imp_id = []
imp_list = []
imp_label = []
for i in tqdm(impr_train,total = len(impr_train)):
    imp, label = np.transpose([[news_index[imp.split('-')[0]],imp.split('-')[1]] for imp in impr_train[i]])
    pos = (label == '1').sum()
    neg = 0
    for j in range(min(len(imp),max_impr)):
        if label[j] == '0' and np_ratio > -1:
            if neg <= pos*np_ratio :
                imp_id.append(i)
                imp_list.append(imp[j])
                imp_label.append(label[j])
                neg+=1
        else:
            imp_id.append(i)
            imp_list.append(imp[j])
            imp_label.append(label[j])

impr_train = None 

# Creating a new gdf with impression id, news id and its label
impressions_train = cudf.DataFrame({'imp_id': imp_id,'impr': imp_list,'label': imp_label})

# Appending news category corresponding to above impression news in the above created DataFrame
impressions_train['impr_cat'] = impressions_train['impr'].replace([int(i) for i in news],[int(news[i]['cat']) for i in news])

# Appending news sub-category corresponding to above impression news in above created DataFrame
impressions_train['impr_subcat'] = impressions_train['impr'].replace([int(i) for i in news],[int(news[i]['sub_cat']) for i in news])

# Droping impr columns as news data is added for it.
impressions_train = impressions_train.drop(['impr'],axis=1)

impressions_train.head()
# For validation set

imp_id = []
imp_list = []
imp_label = []
for i in tqdm(impr_valid,total = len(impr_valid)):
    imp, label = np.transpose([[news_index[imp.split('-')[0]],imp.split('-')[1]] for imp in impr_valid[i]])
    pos = (label == '1').sum()
    neg = 0
    for j in range(min(len(imp),max_impr)):
        if label[j] == '0' and np_ratio > -1:
            if neg <= pos*np_ratio :
                imp_id.append(i)
                imp_list.append(imp[j])
                imp_label.append(label[j])
                neg+=1
        else:
            imp_id.append(i)
            imp_list.append(imp[j])
            imp_label.append(label[j])

impr_valid = None 

impressions_valid = cudf.DataFrame({'imp_id': imp_id,'impr': imp_list,'label': imp_label})
impressions_valid['impr_cat'] = impressions_valid['impr'].replace([int(i) for i in news],[int(news[i]['cat']) for i in news])
impressions_valid['impr_subcat'] = impressions_valid['impr'].replace([int(i) for i in news],[int(news[i]['sub_cat']) for i in news])
impressions_valid = impressions_valid.drop(['impr'],axis=1)
impressions_valid.head()
# Freeing up memory by nulling variables
imp_id = None
imp_list = None
imp_label = None

Pre-process 5: Merge behaviour and news datasets

Collating all the required columns from both behaviours and news dataset would make the feature engineering process much more faster.

We will merge the history columns (from behaviors dataframe) with the above created impression data and save it as a parquet file.
We will also re-initialize RMM to allow us to perform memory intensive merge operation.

# For training set
rmm.reinitialize(managed_memory=True)

final_data = impressions_train.merge(behaviors_train,left_on = ['imp_id'],right_on = ['impression_id']).drop(['imp_id'],axis=1)
final_data = cudf.concat([final_data.drop(['time'],axis=1).astype('int'),final_data['time']],axis=1)
final_data.to_parquet(os.path.join(data_input_path, 'train.parquet'), compression = None)

# Freeing up memory by nulling variables
final_data=None
impressions_train = None
behaviors_train = None

#client.run(_rmm_pool)
# For validation set

final_data = impressions_valid.merge(behaviors_valid,left_on = ['imp_id'],right_on = ['impression_id']).drop(['imp_id'],axis=1)
final_data = cudf.concat([final_data.drop(['time'],axis=1).astype('int'),final_data['time']],axis=1)
final_data.to_parquet(os.path.join(data_input_path, 'valid.parquet'),compression = None)

# Freeing up memory by nulling variables
final_data=None
impressions_valid = None
behaviors_valid = None

Finally, we have our initial pre-processed data - train.parquet and valid.parquet - that would be used for feature engineering and further processing.

Step 5: Feature Engineering - time-based features

To get started with NVTabular, we’ll first use it for creating simple time based features that would be extracted from the timestamp column in the behaviours data.

# Declaring features of train set that we created

cat_features = [
 'hist_cat_0',
 'hist_subcat_0',
 'hist_cat_1',
 'hist_subcat_1',
 'hist_cat_2',
 'hist_subcat_2',
 'hist_cat_3',
 'hist_subcat_3',
 'hist_cat_4',
 'hist_subcat_4',
 'hist_cat_5',
 'hist_subcat_5',
 'hist_cat_6',
 'hist_subcat_6',
 'hist_cat_7',
 'hist_subcat_7',
 'hist_cat_8',
 'hist_subcat_8',
 'hist_cat_9',
 'hist_subcat_9',
 'impr_cat',
 'impr_subcat',
 'impression_id',
 'uid']

cont_features = ['hist_count']

labels = ['label']
# Creating time based features by extracting the relevant elements using cuDF

datetime = nvt.ColumnGroup(['time']) >> (lambda col: cudf.to_datetime(col,format="%m/%d/%Y %I:%M:%S %p"))

hour = datetime >> (lambda col: col.dt.hour) >> nvt.ops.Rename(postfix = '_hour')
minute = datetime >> (lambda col: col.dt.minute) >> nvt.ops.Rename(postfix = '_minute')
seconds = datetime >> (lambda col: col.dt.second) >> nvt.ops.Rename(postfix = '_second')

weekday = datetime >> (lambda col: col.dt.weekday) >> nvt.ops.Rename(postfix = '_wd')
day = datetime >> (lambda col: cudf.to_datetime(col, unit='s').dt.day) >> nvt.ops.Rename(postfix = '_day')

week = day >> (lambda col: (col/7).floor().astype('int')) >> nvt.ops.Rename(postfix = '_week')

To create embedding tables and segregate the pre-processing functions (normalization, fill missing values etc.) among categorical and continous features, we define and pipeline them using NVTabular’s operator overloading.

cat_features = cat_features + hour + minute + seconds + weekday + day + week + datetime >> nvt.ops.Categorify(out_path = data_output_path)
cont_features = cont_features >> nvt.ops.FillMissing() >> nvt.ops.NormalizeMinMax()
labels = ['label']

We can visualize the complete workflow pipeline.

output = cat_features + cont_features
output.graph

To run this graph, we would create a workflow object that calculates statistics and performs the relevant transformations.

proc = nvt.Workflow(cat_features + cont_features + labels[0])
# Initialize a nvt.Dataset from parquet file that was created in step 4.

data_train = nvt.Dataset(os.path.join(data_input_path, "train.parquet"), engine="parquet",part_size="256MB")
data_valid = nvt.Dataset(os.path.join(data_input_path, "valid.parquet"), engine="parquet",part_size="256MB")

Since we are going to train the DNNs using HugeCTR, we need to conform to the following dtypes:

  • categorical feature columns in int64

  • continuous feature columns in float32

  • label columns in float32

We will make a dictionary containing names of columns as key and the required datatype as value. This dictionary will be used by NVTabular for type casting.

dict_dtypes={}

for col in cat_features.columns:
    dict_dtypes[col] = np.int64

for col in cont_features.columns:
    dict_dtypes[col] = np.float32

for col in labels:
    dict_dtypes[col] = np.float32

Let’s fit the workflow on the training set to record the statistics.

%%time
proc.fit(data_train)

Next, we apply the transformation to the dataset and persist it to disk as parquet.

%%time

# For training set
proc.transform(data_train).to_parquet(output_path= output_train_path,
                                shuffle=nvt.io.Shuffle.PER_PARTITION,
                                dtypes=dict_dtypes,
                                out_files_per_proc=10,
                                cats = cat_features.columns,
                                conts = cont_features.columns,
                                labels = labels)
%%time

# For validation set
proc.transform(data_valid).to_parquet(output_path= output_valid_path,
                                shuffle=nvt.io.Shuffle.PER_PARTITION,
                                dtypes=dict_dtypes,
                                out_files_per_proc=10,
                                cats = cat_features.columns,
                                conts = cont_features.columns,
                                labels = labels)

Let’s load the NVTabular processed parquet files and look at our first NVTabular pre-processed dataset.

df = dask_cudf.read_parquet(os.path.join(output_train_path, '*.parquet'))
df.head()

After transformation and persisting the data on the disk, the following files will be created:

  1. parquet

    • The number of parquet files depends on out_files_per_proc in proc_train.transform()

  2. _file_list.txt

    • The 1st line contains the number of parquet files

    • The subsequent lines are the paths to each parquet file.

  3. _metadata.json

    • This file is used by HugeCTR in parsing the processed parquet files.

    • ‘file_stats’ contains the name of the parquet files and their corresponding number of rows.

    • ‘cats’ is a list of categorical features/columns in the dataset and their index.

    • ‘conts’ is a list of continous/dense columns in the dataset and their index.

    • ‘labels’ is a list of labels in the dataset and their index.

    • This file shouldn’t be edited manually.

Let’s look at the contents of _metadata.json

with open(os.path.join(output_train_path, '_metadata.json'),'r') as f:
    metadata = json.load(f)

metadata

Next, we need to get the embedding size for the categorical variables. This will be an important input for defining the embedding table size to be used by HugeCTR.

from nvtabular.ops import get_embedding_sizes
embeddings_simple_time = get_embedding_sizes(proc)
embeddings_simple_time
# Reformatting the above output for ease of copy paste in HugeCTRs config.json

embedding_size_str_simple_time = [embeddings_simple_time[x][0] for x in cat_features.columns]
embedding_size_str_simple_time

We can also check the name of the categorical and continuous features that we’ve defined. This should match with the cats and conts dictionaries in the _metadata.json

cat_features.columns
cont_features.columns

Before moving on to training a DNN, let’s try few complex feature engineering techniques using NVTabular. We would later train DNNs on both these feature engineered dataset and compare their performances.

Step 6: Feature Engineering - count and target encoding

We will now perform count and target encoding on the processed dataset generated in step 5. Let’s start by defining directories for the input dataset and the output processed dataset.

# Define our worker/output directories
dask_workdir = os.path.join(BASE_DIR, "workdir")

# Mapping our processed_nvt output directories as input directories for new workflow.
data_input_path = os.path.join(BASE_DIR, "dataset")

# Defining new directories for output
data_output_path = os.path.join(BASE_DIR, "processed_ce-te")
output_train_path = os.path.join(data_output_path, "train")
output_valid_path = os.path.join(data_output_path, "valid")

# Creating and cleaning our worker/output directories
try:
    # Ensure BASE_DIR exists
    if not os.path.isdir(BASE_DIR):
        os.mkdir(BASE_DIR)
        
    # Make sure we have a clean worker space for Dask
    if os.path.isdir(dask_workdir):
        shutil.rmtree(dask_workdir)
    os.mkdir(dask_workdir)

    # Make sure we have a clean output path for our new dataset
    if os.path.isdir(data_output_path):
        shutil.rmtree(data_output_path)
        
    os.mkdir(data_output_path)
    os.mkdir(output_train_path)
    os.mkdir(output_valid_path)

except OSError:
    print ("Creation of the directories failed")
else:
    print ("Successfully created the directories")

As you would observe, we have created a new directory by the name processed_ce-te. The complete directory structure now is:

basedir
  |— workdir
  |— dataset
    |— train
    |— valid
  |— processed_nvt
    |— train
    |— valid
  |— processed_ce-te
    |— train
    |— valid
  |— configs
  |— weights

Again, defining the categorical and continous features based on processed data generated in step-5.

cat_features = ['hist_cat_0',
 'hist_subcat_0',
 'hist_cat_1',
 'hist_subcat_1',
 'hist_cat_2',
 'hist_subcat_2',
 'hist_cat_3',
 'hist_subcat_3',
 'hist_cat_4',
 'hist_subcat_4',
 'hist_cat_5',
 'hist_subcat_5',
 'hist_cat_6',
 'hist_subcat_6',
 'hist_cat_7',
 'hist_subcat_7',
 'hist_cat_8',
 'hist_subcat_8',
 'hist_cat_9',
 'hist_subcat_9',
 'impr_cat',
 'impr_subcat',
 'impression_id',
 'uid',]

cont_features = ['hist_count']

labels = ['label']

Count Encoding calculates the frequency of one or more categorical features. For the purpose of this tutorial, we will count how often the user had clicked on news with the same category/sub-category in a given impression.

To calculate the occurence of the same news category/sub-category in history, we will iterate over the group of rows with the same impression id. We will also consider the category/sub-category of the impression news.
Let’s start by defining supportive functions for counting the category and subcategory from history columns. This supportive function will be used by apply_rows() in LambdaOp create_count_features

We can also limit the number of history columns to be considered for count encoding. For now, let’s use all the history columns that we have in the dataset i.e. all 10.

max_hist = 10
def add_cat_count(
         hist_cat_0,
         hist_cat_1,
         hist_cat_2,
         hist_cat_3,
         hist_cat_4,
         hist_cat_5,
         hist_cat_6,
         hist_cat_7,
         hist_cat_8,
         hist_cat_9,
         impr_cat,
         impr_cat_count,
         k):
    
    # Following loop iterates over each row of columns hist_cat_0->9 and impr_cat
    for i, temp in enumerate(zip(hist_cat_0,
                                 hist_cat_1,
                                 hist_cat_2,
                                 hist_cat_3,
                                 hist_cat_4,
                                 hist_cat_5,
                                 hist_cat_6,
                                 hist_cat_7,
                                 hist_cat_8,
                                 hist_cat_9,
                                 impr_cat,
                                )):
        
        # Iterate over each column and check if history category matches with impression category.
        for j in temp[:-1]:
            if j == temp[-1]:
                k += 1
        
        # Update the count in the corresponding row of output column (impr_cat_count)
        impr_cat_count[i] = k
def add_subcat_count(
         hist_subcat_0,
         hist_subcat_1,
         hist_subcat_2,
         hist_subcat_3,
         hist_subcat_4,
         hist_subcat_5,
         hist_subcat_6,
         hist_subcat_7,
         hist_subcat_8,
         hist_subcat_9,
         impr_subcat,
         impr_subcat_count,
         k):
    
    # Following loop iterates over each row of columns hist_subcat_0->9 and impr_cat
    for i, temp in enumerate(zip(
                                 hist_subcat_0,
                                 hist_subcat_1,
                                 hist_subcat_2,
                                 hist_subcat_3,
                                 hist_subcat_4,
                                 hist_subcat_5,
                                 hist_subcat_6,
                                 hist_subcat_7,
                                 hist_subcat_8,
                                 hist_subcat_9,
                                 impr_subcat,
                                )):

        # Iterate over each column and check if history sub-category matches with impression sub-category.
        for j in temp[:-1]:
            if j == temp[-1]:
                k += 1      
        # Update the count(occurence) in corresponding row of output column (impr_cat_count)        
        impr_subcat_count[i] = k

To add the count encoding for ‘categories’ and ‘sub_categories’ to each row for their corresponding news_id, we will write a LambdaOp by simply inhereting from NVTabular’s Operator class and defining the transform and output_column_names methods.

class create_count_features(Operator):
    def transform(self, columns, gdf):
        if columns[-1] == 'impr_cat':
            gdf = gdf.apply_rows(add_cat_count,incols = ['hist_cat_{}'.format(i) for i in range(max_hist)]+['impr_cat'],outcols = {'impr_cat_count': np.int64},kwargs={'k': 0})
            return(gdf.drop(columns,axis=1))
        if columns[-1] == 'impr_subcat':
            gdf = gdf.apply_rows(add_subcat_count,incols = ['hist_subcat_{}'.format(i) for i in range(max_hist)]+['impr_subcat'],outcols = {'impr_subcat_count': np.int64},kwargs={'k': 0})
            return(gdf.drop(columns,axis=1))

    def output_column_names(self, columns):
        col = []
        if columns[-1] == 'impr_cat':
            col.append('impr_cat_count')
        if columns[-1] == 'impr_subcat':
            col.append('impr_subcat_count')
        return col

    def dependencies(self):
        return None

Target encoding is used to average the target value by some category/group. This technique is used to find numeric mean relationship between the categorical features and target.

We have observed that the hist_cat columns are the most suitable for target encoding. Rather than using just 1 history category column, we also found that a group of history columns encode better probabilities with the target variable.

For this tutorial, we are going use 5 history category columns, in a moving window fashion, along with the impression category column to calculate the target encoding.

te_columns = [['hist_cat_'+str(j) for j in range(i-5+1, i+1)] + ['impr_cat'] for i in range(4, max_hist)]
target_encode = (
    te_columns >>
    nvt.ops.TargetEncoding(
        ['label'],
        out_path = BASE_DIR,
        kfold=5,
        p_smooth=20,
        out_dtype="float32",
    )
)

We’ll also create the time based features in the same way as we did in step-6.

datetime = nvt.ColumnGroup(['time']) >> (lambda col: cudf.to_datetime(col,format="%m/%d/%Y %I:%M:%S %p"))

hour = datetime >> (lambda col: col.dt.hour) >> nvt.ops.Rename(postfix = '_hour')
minute = datetime >> (lambda col: col.dt.minute) >> nvt.ops.Rename(postfix = '_minute')
seconds = datetime >> (lambda col: col.dt.second) >> nvt.ops.Rename(postfix = '_second')

weekday = datetime >> (lambda col: col.dt.weekday) >> nvt.ops.Rename(postfix = '_wd')
day = datetime >> (lambda col: cudf.to_datetime(col, unit='s').dt.day) >> nvt.ops.Rename(postfix = '_day')

week = day >> (lambda col: (col/7).floor().astype('int')) >> nvt.ops.Rename(postfix = '_week')
cat_count_encode = ['hist_cat_{}'.format(i) for i in range(max_hist)] + ['impr_cat'] >> create_count_features()

subcat_count_encode = ['hist_subcat_{}'.format(i) for i in range(max_hist)] + ['impr_subcat'] >> create_count_features()
cat_features = cat_features + datetime + hour + minute + seconds + weekday + day + week >> nvt.ops.Categorify(out_path = data_output_path)
cont_features = cont_features + cat_count_encode + subcat_count_encode >> nvt.ops.FillMissing() >> nvt.ops.NormalizeMinMax()
cont_features += target_encode >> nvt.ops.Rename(postfix = '_TE')

We can visualize the complete workflow pipeline.

output = cat_features + cont_features
output.graph
proc = nvt.Workflow(cat_features + cont_features + labels[0])

We initialize a nvt.Dataset object from parquet dataset that was created in step 5.

data_train = nvt.Dataset(os.path.join(data_input_path, "train.parquet"), engine="parquet",part_size="256MB")
data_valid = nvt.Dataset(os.path.join(data_input_path, "valid.parquet"), engine="parquet",part_size="256MB")
dict_dtypes={}

for col in cat_features.columns:
    dict_dtypes[col] = np.int64

for col in cont_features.columns:
    dict_dtypes[col] = np.float32

for col in labels:
    dict_dtypes[col] = np.float32

Let’s fit the workflow on our training dataset to record the statistics.

%%time
proc.fit(data_train)
%%time

# For training set
proc.transform(data_train).to_parquet(output_path=output_train_path,
                                shuffle=nvt.io.Shuffle.PER_PARTITION,
                                dtypes=dict_dtypes,
                                out_files_per_proc=10,
                                cats = cat_features.columns,
                                conts = cont_features.columns,
                                labels = labels)
%%time

# For validation set
proc.transform(data_valid).to_parquet(output_path=output_valid_path,
                                shuffle=nvt.io.Shuffle.PER_PARTITION,
                                dtypes=dict_dtypes,
                                out_files_per_proc=10,
                                cats = cat_features.columns,
                                conts = cont_features.columns,
                                labels = labels)
rmm.reinitialize(managed_memory=False)

Let’s take a quick look at the contents of _metadata.json

with open(os.path.join(output_train_path, '_metadata.json'),'r') as f:
    metadata = json.load(f)

metadata
from nvtabular.ops import get_embedding_sizes
embeddings_count_encode =  get_embedding_sizes(proc)
embeddings_count_encode
# Reformatting the above output for ease of copy paste in HugeCTRs config.json

embedding_size_str_count_encode = [embeddings_count_encode[x][0] for x in cat_features.columns]
embedding_size_str_count_encode

Now that we have 2 versions of our dataset ready, one with time based features and other with count + target encoded features, we can start training a few DNNs using HugeCTR.

Step 7: Train DNN with HugeCTR

In this section, we would be training Deep Learning Recommendation Model (DLRM) using HugeCTR’s low level python API. We would also be using the inference python API for evaluation on the validation set.

We would be training 2 models, one each for the 2 datasets that we’ve processed.

Train config json for simple time based features dataset

Python low level train API requires a train config.json with the arguments and definitions of various training parameters like - optimizer, iterations, neural architecture and dataset.
As a first step, we will develop this config file for our feature engineered dataset and DLRM model.

# Define paths to save the config and the weights

config_file_path = os.path.join(config_output_path,'dlrm_fp32_simple-time_1gpu.json')
weights_output_path = os.path.join(weights_path,'dlrm_fp32_simple-time_1gpu/')

# Creating Directory inside weights folder
if os.path.isdir(weights_output_path):
    shutil.rmtree(weights_output_path)
os.mkdir(weights_output_path)

For DLRM model, lets consider the dlrm_fp32_64k.json sample file that is available on HugeCTR’s github repository, and modify it for our dataset.

The parameters that we should modify are:

  • solver: - max_iter: Num. of samples / batch size - gpu: List of GPU IDs to use for training - batchsize: num. of samples to process in the batch training mode - eval_interval: Num. of iterations after which evaluation should trigger on the validation set

  • optimizer: - type: Adam - learning_rate: 1e-4 (smaller value to begin with)

  • layers - format: Parquet (since our dataset is in parquet) - source and eval_source: Path to _file_list.txt for the train and eval dataset produced by NVTabular - slot_num: For LocalizedSlot, set it to the number of categorical features - max_feature_num_per_sample: For LocalizedSlot, this can be the same as slot_num - slot_size_array: Cardinality of the categorical features (in the same order as column names in ‘cats’ dictionary of _metadata.json) - embedding_vec_size: dimension of the embedding vectors for the categorical features - label_dim: Labels dimension - dense_dim: Number of dense/continous features - sparse: Dimensions of categorical features - DLRM layer fc3: Output dimension of fc3 should be the same as embedding_vec_size

We’ve developed one such train config.json below with the appropriate path to the dataset and default batch size for a 32GB GPU.
Let’s make use of the data path and other variables we’ve defined in the steps above and re-define the ones which may have changed throughout the pre-processing step.

# Path to the simple time based feature processed dataset
output_train_path = os.path.join(BASE_DIR, "processed_nvt/train")
output_valid_path = os.path.join(BASE_DIR, "processed_nvt/valid")

# Model related parameter
embedding_vec_size = 4
optimizer = {
        "type": "Adam",
        "update_type": "Local",
        "adam_hparam": {
            "learning_rate": 0.001,
            "beta1": 0.9,
            "beta2": 0.999,
            "epsilon": 1e-07,
            "warmup_steps": 10000,
            "decay_start": 20000,
            "decay_steps": 200000,
            "decay_power": 1,
            "end_lr": 0.000001
        }
    }

layers = [
        {
            "name": "data",
            "type": "Data",
            "format": "Parquet",
            "slot_size_array": embedding_size_str_simple_time,
            "source": output_train_path+"/_file_list.txt",
            "eval_source": output_valid_path+"/_file_list.txt",
            "check": "None",
            "label": {
                "top": "label",
                "label_dim": 1
            },
            "dense": {
                "top": "dense",
                "dense_dim": 1
            },
            "sparse": [
                {
                    "top": "data1",
                    "type": "LocalizedSlot",
                    "max_feature_num_per_sample": len(embeddings_simple_time),
                    "max_nnz": 1,
                    "slot_num": len(embeddings_simple_time)
                }
            ]
        },
        {
            "name": "sparse_embedding1",
            "type": "LocalizedSlotSparseEmbeddingHash",
            "bottom": "data1",
            "top": "sparse_embedding1",
            "sparse_embedding_hparam": {
                "slot_size_array": embedding_size_str_simple_time,
                "embedding_vec_size": embedding_vec_size,
                "combiner": 0
            }
        },
        {
            "name": "fc1",
            "type": "InnerProduct",
            "bottom": "dense",
            "top": "fc1",
            "fc_param": {
                "num_output": 512
            }
        },
        {
            "name": "relu1",
            "type": "ReLU",
            "bottom": "fc1",
            "top": "relu1"
        },
        {
            "name": "fc2",
            "type": "InnerProduct",
            "bottom": "relu1",
            "top": "fc2",
            "fc_param": {
                "num_output": 256
            }
        },
        {
            "name": "relu2",
            "type": "ReLU",
            "bottom": "fc2",
            "top": "relu2"
        },
        {
            "name": "fc3",
            "type": "InnerProduct",
            "bottom": "relu2",
            "top": "fc3",
            "fc_param": {
                "num_output": embedding_vec_size
            }
        },
        {
            "name": "relu3",
            "type": "ReLU",
            "bottom": "fc3",
            "top": "relu3"
        },
        {
            "name": "interaction1",
            "type": "Interaction",
            "bottom": [
                "relu3",
                "sparse_embedding1"
            ],
            "top": "interaction1"
        },
        {
            "name": "fc4",
            "type": "InnerProduct",
            "bottom": "interaction1",
            "top": "fc4",
            "fc_param": {
                "num_output": 1024
            }
        },
        {
            "name": "relu4",
            "type": "ReLU",
            "bottom": "fc4",
            "top": "relu4"
        },
        {
            "name": "fc5",
            "type": "InnerProduct",
            "bottom": "relu4",
            "top": "fc5",
            "fc_param": {
                "num_output": 1024
            }
        },
        {
            "name": "relu5",
            "type": "ReLU",
            "bottom": "fc5",
            "top": "relu5"
        },
        {
            "name": "fc6",
            "type": "InnerProduct",
            "bottom": "relu5",
            "top": "fc6",
            "fc_param": {
                "num_output": 512
            }
        },
        {
            "name": "relu6",
            "type": "ReLU",
            "bottom": "fc6",
            "top": "relu6"
        },
        {
            "name": "fc7",
            "type": "InnerProduct",
            "bottom": "relu6",
            "top": "fc7",
            "fc_param": {
                "num_output": 256
            }
        },
        {
            "name": "relu7",
            "type": "ReLU",
            "bottom": "fc7",
            "top": "relu7"
        },
        {
            "name": "fc8",
            "type": "InnerProduct",
            "bottom": "relu7",
            "top": "fc8",
            "fc_param": {
                "num_output": 1
            }
        },
        {
            "name": "loss",
            "type": "BinaryCrossEntropyLoss",
            "bottom": [
                "fc8",
                "label"
            ],
            "top": "loss"
        }
    ]
config = {
    "optimizer": optimizer,
    "layers": layers
}

with open(config_file_path,'w') as f:
    json.dump(config,f,indent = 4)

Finally, we can start the training using the above config file.

import os
from hugectr import Session, solver_parser_helper,get_learning_rate_scheduler
from mpi4py import MPI

# Solver related parameters
NUM_GPUS = [0]                                                     # GPUs used for training
json_file = config_file_path                                       # Path to the json config file
batchsize = 2048                                                   # Batch size used for training
batchsize_eval = 2048                                              # Batch size used for evaluation
max_eval_batches = 3768                                            # Iterations required to go through the complete validation set with the set batchsize_eval

# Training related parameters
num_iter = 30001                                                   # Iterations to train the model for
eval_trigger = 10000                                               # Start evaluation after these iterations
snapshot_trigger = 10000                                           # Save model checkpoints after these iterations

solver_config = solver_parser_helper(
                                    seed = 0,
                                    batchsize = batchsize,                   # Minibatch size for training
                                    batchsize_eval = batchsize_eval,         # Minibatch size for eval 
                                    max_eval_batches = max_eval_batches,     # Max no. of eval batches on which eval will be done
                                    model_file = "",                         # Load any pretrained model , if training from scratch, leave empty
                                    embedding_files = [],                    # Path to trained embedding table, if training from scratch then leave empty
                                    vvgpu = [NUM_GPUS],                      # GPU Indices to be used ofr training
                                    use_mixed_precision = False,             # Flag to indicate use of Mixed precision training 
                                    scaler = 1024,                           # To be set when MixedPrecisiontraining is ON
                                    i64_input_key = True,                    # As we are using Parquet from NVTabular, I64 should be true 
                                    use_algorithm_search = False,            # Enable algo search within the fully connected-layers
                                    use_cuda_graph = False,                  # Enable cuda graph for forward and back proppogation
                                    repeat_dataset = True                    # Repeat the dataset for training, True for Non Epoch Based Training
                                    )

lr_sch = get_learning_rate_scheduler(json_file)                    # Get learning rate statistics from optimizers     

sess = Session(solver_config, json_file)                           # Initialise a Session Object
sess.start_data_reading()                                          # Start Data Reading

for i in range(num_iter):                                          # Start training loop
    lr = lr_sch.get_next()                                         # Update learning rate parameters                                   
    sess.set_learning_rate(lr)                                     # Pass the updated learning rate to the session
    sess.train()                                                   # Train on 1 iteration on 1 Minibatch

    if (i%1000 == 0):
        loss = sess.get_current_loss()                             # Returns the loss value for the current iteration.
        print("[HUGECTR][INFO] iter: {}; loss: {:.6f}; lr: {:.6f}".format(i, loss, lr))
    if (i%eval_trigger == 0 and i != 0):
        sess.check_overflow()                                      # Checks whether any embedding has encountered overflow
        sess.copy_weights_for_evaluation()                         # Copies the weights of the dense network from training layers to evaluation layers.
        for _ in range(solver_config.max_eval_batches):
            sess.eval()                                            # Calculates the evaluation metrics based on one minibatch of evaluation data
        metrics = sess.get_eval_metrics()                          # Returns the average evaluation metrics of several minibatches of evaluation data.
        print("[HUGECTR][INFO] iter: {}, {}".format(i, metrics))
    if (i%snapshot_trigger == 0 and i != 0):
        sess.download_params_to_files(weights_output_path , i)     # Saving model

Train config json for count and target encoded features dataset

Following the same methodology as done above, we will modify the DLRM config file for this version of the dataset.

# Define paths to save the config and the weights

config_file_path = os.path.join(config_output_path, 'dlrm_fp32_count-target-encode_1gpu.json')
weights_output_path = os.path.join(weights_path,'dlrm_fp32_count-target-encode_1gpu/')

# Creating Directory inside weights folder
if os.path.isdir(weights_output_path):
    shutil.rmtree(weights_output_path)
os.mkdir(weights_output_path)
# Path to the simple time based feature processed dataset
output_train_path = os.path.join(BASE_DIR, "processed_ce-te/train")
output_valid_path = os.path.join(BASE_DIR, "processed_ce-te/valid")

# Model related parameter
embedding_vec_size = 4
optimizer = {
        "type": "Adam",
        "update_type": "Local",
        "adam_hparam": {
            "learning_rate": 0.001,
            "beta1": 0.9,
            "beta2": 0.999,
            "epsilon": 1e-07,
            "warmup_steps": 10000,
            "decay_start": 20000,
            "decay_steps": 200000,
            "decay_power": 1,
            "end_lr": 1e-06
        }
    }

layers = [
        {
            "name": "data",
            "type": "Data",
            "format": "Parquet",
            "slot_size_array": embedding_size_str_count_encode,
            "source": output_train_path+"/_file_list.txt",
            "eval_source": output_valid_path+"/_file_list.txt",
            "check": "None",
            "label": {
                "top": "label",
                "label_dim": 1
            },
            "dense": {
                "top": "dense",
                "dense_dim": 9
            },
            "sparse": [
                {
                    "top": "data1",
                    "type": "LocalizedSlot",
                    "max_feature_num_per_sample": len(embeddings_count_encode),
                    "max_nnz": 1,
                    "slot_num": len(embeddings_count_encode)
                }
            ]
        },
        {
            "name": "sparse_embedding1",
            "type": "LocalizedSlotSparseEmbeddingHash",
            "bottom": "data1",
            "top": "sparse_embedding1",
            "sparse_embedding_hparam": {
                "slot_size_array": embedding_size_str_count_encode,
                "embedding_vec_size": embedding_vec_size,
                "combiner": 0
            }
        },
        {
            "name": "fc1",
            "type": "InnerProduct",
            "bottom": "dense",
            "top": "fc1",
            "fc_param": {
                "num_output": 512
            }
        },
        {
            "name": "relu1",
            "type": "ReLU",
            "bottom": "fc1",
            "top": "relu1"
        },
        {
            "name": "fc2",
            "type": "InnerProduct",
            "bottom": "relu1",
            "top": "fc2",
            "fc_param": {
                "num_output": 256
            }
        },
        {
            "name": "relu2",
            "type": "ReLU",
            "bottom": "fc2",
            "top": "relu2"
        },
        {
            "name": "fc3",
            "type": "InnerProduct",
            "bottom": "relu2",
            "top": "fc3",
            "fc_param": {
                "num_output": embedding_vec_size
            }
        },
        {
            "name": "relu3",
            "type": "ReLU",
            "bottom": "fc3",
            "top": "relu3"
        },
        {
            "name": "interaction1",
            "type": "Interaction",
            "bottom": [
                "relu3",
                "sparse_embedding1"
            ],
            "top": "interaction1"
        },
        {
            "name": "fc4",
            "type": "InnerProduct",
            "bottom": "interaction1",
            "top": "fc4",
            "fc_param": {
                "num_output": 1024
            }
        },
        {
            "name": "relu4",
            "type": "ReLU",
            "bottom": "fc4",
            "top": "relu4"
        },
        {
            "name": "fc5",
            "type": "InnerProduct",
            "bottom": "relu4",
            "top": "fc5",
            "fc_param": {
                "num_output": 1024
            }
        },
        {
            "name": "relu5",
            "type": "ReLU",
            "bottom": "fc5",
            "top": "relu5"
        },
        {
            "name": "fc6",
            "type": "InnerProduct",
            "bottom": "relu5",
            "top": "fc6",
            "fc_param": {
                "num_output": 512
            }
        },
        {
            "name": "relu6",
            "type": "ReLU",
            "bottom": "fc6",
            "top": "relu6"
        },
        {
            "name": "fc7",
            "type": "InnerProduct",
            "bottom": "relu6",
            "top": "fc7",
            "fc_param": {
                "num_output": 256
            }
        },
        {
            "name": "relu7",
            "type": "ReLU",
            "bottom": "fc7",
            "top": "relu7"
        },
        {
            "name": "fc8",
            "type": "InnerProduct",
            "bottom": "relu7",
            "top": "fc8",
            "fc_param": {
                "num_output": 1
            }
        },
        {
            "name": "loss",
            "type": "BinaryCrossEntropyLoss",
            "bottom": [
                "fc8",
                "label"
            ],
            "top": "loss"
        }
    ]
config = {
    "optimizer": optimizer,
    "layers": layers
}
with open(config_file_path,'w') as f:
    json.dump(config,f,indent = 4)
# Solver related parameters
NUM_GPUS = [0]                                               # GPUs used for training
json_file = config_file_path                                       # Path to the json config file
batchsize = 2048                                                   # Batch size used for training
batchsize_eval = 2048                                              # Batch size used during evaluation
max_eval_batches = 3768                                            # Iterations required to go through the complete validation set with the set batchsize_eval

# Training related parameters
num_iter = 30001                                                   # Iterations to train the model for
eval_trigger = 10000                                               # Start evaluation after these iterations
snapshot_trigger = 10000                                           # Save model checkpoints after these iterations

solver_config = solver_parser_helper(
                                    seed = 0,
                                    batchsize = batchsize,                       # Minibatch size for training
                                    batchsize_eval = batchsize_eval,         # Minibatch size for eval 
                                    max_eval_batches = max_eval_batches,     # Max no. of eval batches on which eval will be done
                                    model_file = "",                         # Load any pretrained model , if training from scratch, leave empty
                                    embedding_files = [],                    # Path to trained embedding table, if training from scratch then leave empty
                                    vvgpu = [NUM_GPUS],                      # GPU Indices to be used ofr training
                                    use_mixed_precision = False,             # Flag to indicate use of Mixed precision training 
                                    scaler = 1024,                           # To be set when MixedPrecisiontraining is ON
                                    i64_input_key = True,                    # As we are using Parquet from NVTabular, I64 should be true 
                                    use_algorithm_search = False,            # Enable algo search within the fully connected-layers
                                    use_cuda_graph = False,                  # Enable cuda graph for forward and back proppogation
                                    repeat_dataset = True                    # Repeat the dataset for training, True for Non Epoch Based Training
                                    )

lr_sch = get_learning_rate_scheduler(json_file)                    # Get learning rate statistics from optimizers     

sess = Session(solver_config, json_file)                           # Initialise a Session Object
sess.start_data_reading()                                          # Start Data Reading

for i in range(num_iter):                                          # Start training loop
    lr = lr_sch.get_next()                                         # Update learning rate parameters                                   
    sess.set_learning_rate(lr)                                     # Pass the updated learning rate to the session
    sess.train()                                                   # Train on 1 iteration on 1 Minibatch

    if (i%1000 == 0):
        loss = sess.get_current_loss()                             # Returns the loss value for the current iteration.
        print("[HUGECTR][INFO] iter: {}; loss: {:.6f}; lr: {:.6f}".format(i, loss, lr))
    if (i%eval_trigger == 0 and i != 0):
        sess.check_overflow()                                      # Checks whether any embedding has encountered overflow
        sess.copy_weights_for_evaluation()                         # Copies the weights of the dense network from training layers to evaluation layers.
        for _ in range(solver_config.max_eval_batches):
            sess.eval()                                            # Calculates the evaluation metrics based on one minibatch of evaluation data
        metrics = sess.get_eval_metrics()                          # Returns the average evaluation metrics of several minibatches of evaluation data.
        print("[HUGECTR][INFO] iter: {}, {}".format(i, metrics))
    if (i%snapshot_trigger == 0 and i != 0):
        sess.download_params_to_files(weights_output_path , i)     # Saving model

Step 8: Inference on 1st validation set with HugeCTR

After training 2 DLRM models, let’s evaluate them on the validation set using HugeCTR’s python inference API. The evaluation metric is AUC.
We will first write the inference config file, then prepare the validation data into CSR format and finally use the inference APIs to get the predictions.

Let’s start with the first trained model i.e. DLRM trained on simple time based features. In the next step, we would repeat the same process for the second trained model.

Infer config json

We need to first prepare a inference json file. The important modifications required in the train config file are:

  • Omit the optimizer and solver clauses, while adding and inference clause

  • Change the output layer to Sigmoid type.

Let’s first write the infer config file for the simple time based feature dataset.

import json

config_inference_file_path = os.path.join(config_output_path,'dlrm_fp32_simple-time_1gpu_inference.json')
weights_output_path = os.path.join(weights_path,'dlrm_fp32_simple-time_1gpu/')
# Adding the path to the dense and sparse model checkpoints from the weights directory
inference = {
    "max_batchsize": 2048,
    "dense_model_file": weights_output_path+"/_dense_30000.model",
    "sparse_model_file": weights_output_path+"/0_sparse_30000.model",
    "input_key_type": "I64"
  }

layers = [
      {
            "name": "data",
            "type": "Data",
            "check": "None",
            "label": {
                "top": "label",
                "label_dim": 1
            },
            "dense": {
                "top": "dense",
                "dense_dim": 1
            },
            "sparse": [
                {
                    "top": "data1",
                    "type": "LocalizedSlot",
                    "max_feature_num_per_sample": len(embedding_size_str_simple_time),
                    "max_nnz": 1,
                    "slot_num": len(embedding_size_str_simple_time)
                }
            ]
        },
        {
            "name": "sparse_embedding1",
            "type": "LocalizedSlotSparseEmbeddingHash",
            "bottom": "data1",
            "top": "sparse_embedding1",
            "sparse_embedding_hparam": {
                "slot_size_array": embedding_size_str_simple_time,
                "embedding_vec_size": embedding_vec_size,
                "combiner": 0
            }
        },
        {
            "name": "fc1",
            "type": "InnerProduct",
            "bottom": "dense",
            "top": "fc1",
            "fc_param": {
                "num_output": 512
            }
        },
        {
            "name": "relu1",
            "type": "ReLU",
            "bottom": "fc1",
            "top": "relu1"
        },
        {
            "name": "fc2",
            "type": "InnerProduct",
            "bottom": "relu1",
            "top": "fc2",
            "fc_param": {
                "num_output": 256
            }
        },
        {
            "name": "relu2",
            "type": "ReLU",
            "bottom": "fc2",
            "top": "relu2"
        },
        {
            "name": "fc3",
            "type": "InnerProduct",
            "bottom": "relu2",
            "top": "fc3",
            "fc_param": {
                "num_output": embedding_vec_size
            }
        },
        {
            "name": "relu3",
            "type": "ReLU",
            "bottom": "fc3",
            "top": "relu3"
        },
        {
            "name": "interaction1",
            "type": "Interaction",
            "bottom": [
                "relu3",
                "sparse_embedding1"
            ],
            "top": "interaction1"
        },
        {
            "name": "fc4",
            "type": "InnerProduct",
            "bottom": "interaction1",
            "top": "fc4",
            "fc_param": {
                "num_output": 1024
            }
        },
        {
            "name": "relu4",
            "type": "ReLU",
            "bottom": "fc4",
            "top": "relu4"
        },
        {
            "name": "fc5",
            "type": "InnerProduct",
            "bottom": "relu4",
            "top": "fc5",
            "fc_param": {
                "num_output": 1024
            }
        },
        {
            "name": "relu5",
            "type": "ReLU",
            "bottom": "fc5",
            "top": "relu5"
        },
        {
            "name": "fc6",
            "type": "InnerProduct",
            "bottom": "relu5",
            "top": "fc6",
            "fc_param": {
                "num_output": 512
            }
        },
        {
            "name": "relu6",
            "type": "ReLU",
            "bottom": "fc6",
            "top": "relu6"
        },
        {
            "name": "fc7",
            "type": "InnerProduct",
            "bottom": "relu6",
            "top": "fc7",
            "fc_param": {
                "num_output": 256
            }
        },
        {
            "name": "relu7",
            "type": "ReLU",
            "bottom": "fc7",
            "top": "relu7"
        },
        {
            "name": "fc8",
            "type": "InnerProduct",
            "bottom": "relu7",
            "top": "fc8",
            "fc_param": {
                "num_output": 1
            }
        },

      {
        "name": "sigmoid",
        "type": "Sigmoid",
        "bottom": ["fc8"],
        "top": "sigmoid"
      }
    ]
config = {
    "inference": inference,
    "layers": layers
}

with open(config_inference_file_path,'w') as f:
    json.dump(config,f,indent = 4)

Prepare validation set for inference

import pandas as pd

output_valid_path = os.path.join(BASE_DIR, "processed_nvt/valid")

nvtdata_test = pd.read_parquet(output_valid_path)
nvtdata_test.head()
con_feats = ['hist_count']

cat_feats = ['time_hour',
 'hist_cat_0',
 'hist_subcat_0',
 'hist_cat_1',
 'hist_subcat_1',
 'hist_cat_2',
 'hist_subcat_2',
 'hist_cat_3',
 'hist_subcat_3',
 'hist_cat_4',
 'hist_subcat_4',
 'hist_cat_5',
 'hist_subcat_5',
 'hist_cat_6',
 'hist_subcat_6',
 'hist_cat_7',
 'hist_subcat_7',
 'hist_cat_8',
 'hist_subcat_8',
 'hist_cat_9',
 'hist_subcat_9',
 'impr_cat',
 'impr_subcat',
 'impression_id',
 'uid',
 'time_minute',
 'time_second',
 'time_wd',
 'time_day',
 'time_day_week',
 'time']

For inference, HugeCTR expects the data to conform to CSR format which mandates the categorical variables to occupy different integer ranges.
As an example, if there are 10 users and 10 items then HugeCTR expects the users to be encoded in the 1-10 range, while the items to be encoded in the 11-20 range. NVTabular encodes both users and items in the 1-10 ranges.

For this reason, we need to shift the keys of the categorical variable produced by NVTabular to comply with HugeCTR.

import numpy as np

shift = np.insert(np.cumsum(embedding_size_str_simple_time), 0, 0)[:-1]
cat_data = nvtdata_test[cat_feats].values + shift
dense_data = nvtdata_test[con_feats].values

Create inference session

import sys
from mpi4py import MPI
from hugectr.inference import CreateParameterServer, CreateEmbeddingCache, InferenceSession

# Declare parameter server, embedding cache and inference session using inference config
parameter_server = CreateParameterServer([config_inference_file_path],["DLRM"],True)
embedding_cache = CreateEmbeddingCache(parameter_server,0,True,0.2,config_inference_file_path,"DLRM",True)
inference_session = InferenceSession(config_inference_file_path,0,embedding_cache)

# Define a function to perform batched inference
def infer_batch(inference_session, dense_data_batch, cat_data_batch):
    dense_features = list(dense_data_batch.flatten())
    embedding_columns = list(cat_data_batch.flatten())

    row_ptrs= list(range(0,len(embedding_columns)+1))
    
    output = inference_session.predict(dense_features, embedding_columns, row_ptrs, True)
    return output

Now we are ready to carry out inference.

batch_size = 2048
num_batches = (len(dense_data) // batch_size) + 1
batch_idx = np.array_split(np.arange(len(dense_data)), num_batches)
labels = []

for batch_id in tqdm(batch_idx):
    dense_data_batch = dense_data[batch_id]
    cat_data_batch = cat_data[batch_id]
    results = infer_batch(inference_session, dense_data_batch, cat_data_batch)
    labels.extend(results)
# Extract ground truth to calculate AUC
ground_truth = nvtdata_test['label'].values
from sklearn.metrics import roc_auc_score

roc_auc_score(ground_truth, labels)

Step 9: Inference on 2nd validation set with HugeCTR

Following the same procedure as in the last step, let’s compute the AUC on the count plus target encoded feature engineered validation set.

config_inference_file_path = os.path.join(config_output_path, 'dlrm_fp32_count-target-encode_1gpu_inference.json')
weights_output_path = os.path.join(weights_path,'dlrm_fp32_count-target-encode_1gpu/')

inference = {
    "max_batchsize": 2048,
    "dense_model_file": weights_output_path+"/_dense_30000.model",
    "sparse_model_file": weights_output_path+"/0_sparse_30000.model",
    "input_key_type": "I64"
  }

layers = [
      {
            "name": "data",
            "type": "Data",
            "check": "None",
            "label": {
                "top": "label",
                "label_dim": 1
            },
            "dense": {
                "top": "dense",
                "dense_dim": 9
            },
            "sparse": [
                {
                    "top": "data1",
                    "type": "LocalizedSlot",
                    "max_feature_num_per_sample": len(embedding_size_str_count_encode),
                    "max_nnz": 1,
                    "slot_num": len(embedding_size_str_count_encode)
                }
            ]
        },
        {
            "name": "sparse_embedding1",
            "type": "LocalizedSlotSparseEmbeddingHash",
            "bottom": "data1",
            "top": "sparse_embedding1",
            "sparse_embedding_hparam": {
                "slot_size_array": embedding_size_str_count_encode,
                "embedding_vec_size": embedding_vec_size,
                "combiner": 0
            }
        },
        {
            "name": "fc1",
            "type": "InnerProduct",
            "bottom": "dense",
            "top": "fc1",
            "fc_param": {
                "num_output": 512
            }
        },
        {
            "name": "relu1",
            "type": "ReLU",
            "bottom": "fc1",
            "top": "relu1"
        },
        {
            "name": "fc2",
            "type": "InnerProduct",
            "bottom": "relu1",
            "top": "fc2",
            "fc_param": {
                "num_output": 256
            }
        },
        {
            "name": "relu2",
            "type": "ReLU",
            "bottom": "fc2",
            "top": "relu2"
        },
        {
            "name": "fc3",
            "type": "InnerProduct",
            "bottom": "relu2",
            "top": "fc3",
            "fc_param": {
                "num_output": embedding_vec_size
            }
        },
        {
            "name": "relu3",
            "type": "ReLU",
            "bottom": "fc3",
            "top": "relu3"
        },
        {
            "name": "interaction1",
            "type": "Interaction",
            "bottom": [
                "relu3",
                "sparse_embedding1"
            ],
            "top": "interaction1"
        },
        {
            "name": "fc4",
            "type": "InnerProduct",
            "bottom": "interaction1",
            "top": "fc4",
            "fc_param": {
                "num_output": 1024
            }
        },
        {
            "name": "relu4",
            "type": "ReLU",
            "bottom": "fc4",
            "top": "relu4"
        },
        {
            "name": "fc5",
            "type": "InnerProduct",
            "bottom": "relu4",
            "top": "fc5",
            "fc_param": {
                "num_output": 1024
            }
        },
        {
            "name": "relu5",
            "type": "ReLU",
            "bottom": "fc5",
            "top": "relu5"
        },
        {
            "name": "fc6",
            "type": "InnerProduct",
            "bottom": "relu5",
            "top": "fc6",
            "fc_param": {
                "num_output": 512
            }
        },
        {
            "name": "relu6",
            "type": "ReLU",
            "bottom": "fc6",
            "top": "relu6"
        },
        {
            "name": "fc7",
            "type": "InnerProduct",
            "bottom": "relu6",
            "top": "fc7",
            "fc_param": {
                "num_output": 256
            }
        },
        {
            "name": "relu7",
            "type": "ReLU",
            "bottom": "fc7",
            "top": "relu7"
        },
        {
            "name": "fc8",
            "type": "InnerProduct",
            "bottom": "relu7",
            "top": "fc8",
            "fc_param": {
                "num_output": 1
            }
        },

      {
        "name": "sigmoid",
        "type": "Sigmoid",
        "bottom": ["fc8"],
        "top": "sigmoid"
      }
    ]


config = {
    "inference": inference,
    "layers": layers
}

with open(config_inference_file_path,'w') as f:
    json.dump(config,f,indent = 4)
output_valid_path = os.path.join(BASE_DIR, "processed_ce-te/valid")

nvtdata_test = pd.read_parquet(output_valid_path)
nvtdata_test.head()
con_feats = [
 'TE_hist_cat_0_hist_cat_1_hist_cat_2_hist_cat_3_hist_cat_4_impr_cat_label_TE',
 'TE_hist_cat_1_hist_cat_2_hist_cat_3_hist_cat_4_hist_cat_5_impr_cat_label_TE',
 'TE_hist_cat_2_hist_cat_3_hist_cat_4_hist_cat_5_hist_cat_6_impr_cat_label_TE',
 'TE_hist_cat_3_hist_cat_4_hist_cat_5_hist_cat_6_hist_cat_7_impr_cat_label_TE',
 'TE_hist_cat_4_hist_cat_5_hist_cat_6_hist_cat_7_hist_cat_8_impr_cat_label_TE',
 'TE_hist_cat_5_hist_cat_6_hist_cat_7_hist_cat_8_hist_cat_9_impr_cat_label_TE',
 'hist_count',
 'impr_cat_count',
 'impr_subcat_count']

cat_feats = ['time',
 'hist_cat_0',
 'hist_subcat_0',
 'hist_cat_1',
 'hist_subcat_1',
 'hist_cat_2',
 'hist_subcat_2',
 'hist_cat_3',
 'hist_subcat_3',
 'hist_cat_4',
 'hist_subcat_4',
 'hist_cat_5',
 'hist_subcat_5',
 'hist_cat_6',
 'hist_subcat_6',
 'hist_cat_7',
 'hist_subcat_7',
 'hist_cat_8',
 'hist_subcat_8',
 'hist_cat_9',
 'hist_subcat_9',
 'impr_cat',
 'impr_subcat',
 'impression_id',
 'uid',
 'time_hour',
 'time_minute',
 'time_second',
 'time_wd',
 'time_day',
 'time_day_week']
shift = np.insert(np.cumsum(embedding_size_str_count_encode), 0, 0)[:-1]
cat_data = nvtdata_test[cat_feats].values + shift

dense_data = nvtdata_test[con_feats].values
# create parameter server, embedding cache and inference session
parameter_server = CreateParameterServer([config_inference_file_path],["DLRM"],True)
embedding_cache = CreateEmbeddingCache(parameter_server,0,True,0.2,config_inference_file_path,"DLRM",True)
inference_session = InferenceSession(config_inference_file_path,0,embedding_cache)

# Define a function to perform batched inference

def infer_batch(inference_session, dense_data_batch, cat_data_batch):
    dense_features = list(dense_data_batch.flatten())
    embedding_columns = list(cat_data_batch.flatten())

    row_ptrs= list(range(0,len(embedding_columns)+1))
    
    output = inference_session.predict(dense_features, embedding_columns, row_ptrs, True)
    return output

Now we are ready to carry out inference on the test set.

batch_size = 2048
num_batches = (len(dense_data) // batch_size) + 1
batch_idx = np.array_split(np.arange(len(dense_data)), num_batches)
labels = []
for batch_id in tqdm(batch_idx):
    dense_data_batch = dense_data[batch_id]
    cat_data_batch = cat_data[batch_id]
    results = infer_batch(inference_session, dense_data_batch, cat_data_batch)
    labels.extend(results)
# Extract ground truth to calculate AUC

ground_truth = nvtdata_test['label'].values
roc_auc_score(ground_truth, labels)

Conclusion

In this tutorial notebook, we have walked through the process of data cleaning, pre-processing, feature engineering to model training and inferencing, all using the Merlin framework. We hope that this notebook would be helpful for building Recommendation Systems on your datasets as well.

Feel free to experiment with the various hyper-parameters on the feature engineering and model training side and share your results!