Merlin ETL, training, and inference with e-Commerce behavior data
Overview
In this tutorial, we use the eCommerce behavior data from multi category store from REES46 Marketing Platform as our dataset. This tutorial is built upon the NVIDIA RecSys 2020 tutorial.
This notebook provides the code to preprocess the dataset and generate the training, validation, and test sets for the remainder of the tutorial. We define our own goal and filter the dataset accordingly.
For our tutorial, we decided that our goal is to predict if a user purchased an item:
Positive: User purchased an item.
Negative: User added an item to the cart, but did not purchase it (in the same session).
We split the dataset into training, validation, and test set by the timestamp:
Training: October 2019 - February 2020
Validation: March 2020
Test: April 2020
We remove AddToCart Events from a session, if in the same session the same item was purchased.
Data
First, we download and unzip the raw data.
Note: the dataset is approximately 11 GB and can require several minutes to download.
%%bash
export HOME=$PWD
pip install gdown --user
~/.local/bin/gdown https://drive.google.com/uc?id=1-Rov9fFtGJqb7_ePc6qH-Rhzxn0cIcKB
~/.local/bin/gdown https://drive.google.com/uc?id=1-Rov9fFtGJqb7_ePc6qH-Rhzxn0cIcKB
~/.local/bin/gdown https://drive.google.com/uc?id=1zr_RXpGvOWN2PrWI6itWL8HnRsCpyqz8
~/.local/bin/gdown https://drive.google.com/uc?id=1g5WoIgLe05UMdREbxAjh0bEFgVCjA1UL
~/.local/bin/gdown https://drive.google.com/uc?id=1qZIwMbMgMmgDC5EoMdJ8aI9lQPsWA3-P
~/.local/bin/gdown https://drive.google.com/uc?id=1x5ohrrZNhWQN4Q-zww0RmXOwctKHH9PT
import glob
list_files = glob.glob('*.csv.gz')
list_files
['2019-Dec.csv.gz',
'2020-Apr.csv.gz',
'2020-Mar.csv.gz',
'2020-Feb.csv.gz',
'2020-Jan.csv.gz']
Data extraction and initial preprocessing
We extract a few relevant columns from the raw datasets and parse date columns into several atomic columns (day, month…).
import pandas as pd
import numpy as np
from tqdm import tqdm
def process_files(file):
df_tmp = pd.read_csv(file, compression='gzip')
df_tmp['session_purchase'] = df_tmp['user_session'] + '_' + df_tmp['product_id'].astype(str)
df_purchase = df_tmp[df_tmp['event_type']=='purchase']
df_cart = df_tmp[df_tmp['event_type']=='cart']
df_purchase = df_purchase[df_purchase['session_purchase'].isin(df_cart['session_purchase'])]
df_cart = df_cart[~(df_cart['session_purchase'].isin(df_purchase['session_purchase']))]
df_cart['target'] = 0
df_purchase['target'] = 1
df = pd.concat([df_cart, df_purchase])
df = df.drop('category_id', axis=1)
df = df.drop('session_purchase', axis=1)
df[['cat_0', 'cat_1', 'cat_2', 'cat_3']] = df['category_code'].str.split("\.", n = 3, expand = True).fillna('NA')
df['brand'] = df['brand'].fillna('NA')
df = df.drop('category_code', axis=1)
df['timestamp'] = pd.to_datetime(df['event_time'].str.replace(' UTC', ''))
df['ts_hour'] = df['timestamp'].dt.hour
df['ts_minute'] = df['timestamp'].dt.minute
df['ts_weekday'] = df['timestamp'].dt.weekday
df['ts_day'] = df['timestamp'].dt.day
df['ts_month'] = df['timestamp'].dt.month
df['ts_year'] = df['timestamp'].dt.year
df.to_csv('./dataset/' + file.replace('.gz', ''), index=False)
!mkdir ./dataset
for file in tqdm(list_files):
print(file)
process_files(file)
0%| | 0/5 [00:00<?, ?it/s]
2019-Dec.csv.gz
20%|██████████████▊ | 1/5 [04:16<17:05, 256.45s/it]
2020-Apr.csv.gz
40%|█████████████████████████████▌ | 2/5 [08:34<12:51, 257.29s/it]
2020-Mar.csv.gz
60%|████████████████████████████████████████████▍ | 3/5 [12:02<07:49, 234.67s/it]
2020-Feb.csv.gz
80%|███████████████████████████████████████████████████████████▏ | 4/5 [15:30<03:44, 224.22s/it]
2020-Jan.csv.gz
100%|██████████████████████████████████████████████████████████████████████████| 5/5 [19:05<00:00, 229.04s/it]
Prepare the training, validation, and test datasets
Next, we split the data into training, validation, and test sets. We use 3 months for training, 1 month for validation, and 1 month for testing.
lp = []
list_files = glob.glob('./dataset/*.csv')
!ls -l ./dataset/*.csv
-rw-r--r-- 1 root dip 479323170 Nov 16 22:47 ./dataset/2019-Dec.csv
-rw-r--r-- 1 root dip 455992639 Nov 16 22:51 ./dataset/2020-Apr.csv
-rw-r--r-- 1 root dip 453967664 Nov 16 22:58 ./dataset/2020-Feb.csv
-rw-r--r-- 1 root dip 375205173 Nov 16 23:02 ./dataset/2020-Jan.csv
-rw-r--r-- 1 root dip 403896607 Nov 16 22:55 ./dataset/2020-Mar.csv
for file in list_files:
lp.append(pd.read_csv(file))
df = pd.concat(lp)
df.shape
(13184044, 19)
df_test = df[df['ts_month']==4]
df_valid = df[df['ts_month']==3]
df_train = df[(df['ts_month']!=3)&(df['ts_month']!=4)]
df_train.shape, df_valid.shape, df_test.shape
((7949839, 19), (2461719, 19), (2772486, 19))
!mkdir -p ./data
df_train.to_parquet('./data/train.parquet', index=False)
df_valid.to_parquet('./data/valid.parquet', index=False)
df_test.to_parquet('./data/test.parquet', index=False)
df_train.head()
event_time | event_type | product_id | brand | price | user_id | user_session | target | cat_0 | cat_1 | cat_2 | cat_3 | timestamp | ts_hour | ts_minute | ts_weekday | ts_day | ts_month | ts_year | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
0 | 2020-02-01 00:00:18 UTC | cart | 100065078 | xiaomi | 568.61 | 526615078 | 5f0aab9f-f92e-4eff-b0d2-fcec5f553f01 | 0 | construction | tools | light | NaN | 2020-02-01 00:00:18 | 0 | 0 | 5 | 1 | 2 | 2020 |
1 | 2020-02-01 00:00:18 UTC | cart | 5701246 | NaN | 24.43 | 563902689 | 76cc9152-8a9f-43e9-b98a-ee484510f379 | 0 | electronics | video | tv | NaN | 2020-02-01 00:00:18 | 0 | 0 | 5 | 1 | 2 | 2020 |
2 | 2020-02-01 00:00:31 UTC | cart | 14701533 | NaN | 154.42 | 520953435 | 5f1c7752-cf92-41fc-9a16-e8897a90eee8 | 0 | electronics | video | projector | NaN | 2020-02-01 00:00:31 | 0 | 0 | 5 | 1 | 2 | 2020 |
3 | 2020-02-01 00:00:40 UTC | cart | 1004855 | xiaomi | 123.30 | 519236281 | e512f514-dc7f-4fc9-9042-e3955989d395 | 0 | construction | tools | light | NaN | 2020-02-01 00:00:40 | 0 | 0 | 5 | 1 | 2 | 2020 |
4 | 2020-02-01 00:00:47 UTC | cart | 1005100 | samsung | 140.28 | 550305600 | bd7a37b6-420d-4575-8852-ac825aff39b5 | 0 | construction | tools | light | NaN | 2020-02-01 00:00:47 | 0 | 0 | 5 | 1 | 2 | 2020 |
Preprocessing with NVTabular
Next, we will use NVTabular for preprocessing and engineering more features.
But first, we need to import the necessary libraries and initialize a Dask GPU cluster for computation.
Initialize Dask GPU cluster
# Standard Libraries
import os
from time import time
import re
import shutil
import glob
import warnings
# External Dependencies
import numpy as np
import pandas as pd
import cupy as cp
import cudf
import dask_cudf
from dask_cuda import LocalCUDACluster
from dask.distributed import Client
from dask.utils import parse_bytes
from dask.delayed import delayed
import rmm
# NVTabular
import nvtabular as nvt
import nvtabular.ops as ops
from nvtabular.io import Shuffle
from nvtabular.utils import _pynvml_mem_size, device_mem_size
print(nvt.__version__)
0.7.1
# define some information about where to get our data
BASE_DIR = "./nvtabular_temp"
!rm -r $BASE_DIR && mkdir $BASE_DIR
input_path = './dataset'
dask_workdir = os.path.join(BASE_DIR, "workdir")
output_path = os.path.join(BASE_DIR, "output")
stats_path = os.path.join(BASE_DIR, "stats")
rm: cannot remove './nvtabular_temp': No such file or directory
This example was tested on a DGX server with 8 GPUs. If you have less GPUs, modify the NUM_GPUS
variable accordingly.
NUM_GPUS = [0,1,2,3,4,5,6,7]
#NUM_GPUS = [0]
# Dask dashboard
dashboard_port = "8787"
# Deploy a Single-Machine Multi-GPU Cluster
protocol = "tcp" # "tcp" or "ucx"
visible_devices = ",".join([str(n) for n in NUM_GPUS]) # Delect devices to place workers
device_limit_frac = 0.5 # Spill GPU-Worker memory to host at this limit.
device_pool_frac = 0.6
part_mem_frac = 0.05
# Use total device size to calculate args.device_limit_frac
device_size = device_mem_size(kind="total")
device_limit = int(device_limit_frac * device_size)
device_pool_size = int(device_pool_frac * device_size)
part_size = int(part_mem_frac * device_size)
# Check if any device memory is already occupied
"""
for dev in visible_devices.split(","):
fmem = _pynvml_mem_size(kind="free", index=int(dev))
used = (device_size - fmem) / 1e9
if used > 1.0:
warnings.warn(f"BEWARE - {used} GB is already occupied on device {int(dev)}!")
"""
cluster = None # (Optional) Specify existing scheduler port
if cluster is None:
cluster = LocalCUDACluster(
protocol = protocol,
n_workers=len(visible_devices.split(",")),
CUDA_VISIBLE_DEVICES = visible_devices,
device_memory_limit = device_limit,
local_directory=dask_workdir,
dashboard_address=":" + dashboard_port,
)
# Create the distributed client
client = Client(cluster)
client
distributed.preloading - INFO - Import preload module: dask_cuda.initialize
distributed.preloading - INFO - Import preload module: dask_cuda.initialize
distributed.preloading - INFO - Import preload module: dask_cuda.initialize
distributed.preloading - INFO - Import preload module: dask_cuda.initialize
distributed.preloading - INFO - Import preload module: dask_cuda.initialize
distributed.preloading - INFO - Import preload module: dask_cuda.initialize
distributed.preloading - INFO - Import preload module: dask_cuda.initialize
distributed.preloading - INFO - Import preload module: dask_cuda.initialize
Client
Client-5fa9de34-4731-11ec-81a0-0242c0a88002
Connection method: Cluster object | Cluster type: LocalCUDACluster |
Dashboard: http://127.0.0.1:8787/status |
Cluster Info
LocalCUDACluster
280d3dd0
Status: running | Using processes: True |
Dashboard: http://127.0.0.1:8787/status | Workers: 8 |
Total threads: 8 | Total memory: 503.81 GiB |
Scheduler Info
Scheduler
Scheduler-e2967024-5ba6-46fa-889b-ce05595cfe32
Comm: tcp://127.0.0.1:42281 | Workers: 8 |
Dashboard: http://127.0.0.1:8787/status | Total threads: 8 |
Started: Just now | Total memory: 503.81 GiB |
Workers
Worker: 0
Comm: tcp://127.0.0.1:45134 | Total threads: 1 |
Dashboard: http://127.0.0.1:45519/status | Memory: 62.98 GiB |
Nanny: tcp://127.0.0.1:40585 | |
Local directory: /hugectr/notebooks/nvtabular_temp/workdir/dask-worker-space/worker-m2ipvyh1 | |
GPU: Tesla P100-SXM2-16GB | GPU memory: 15.90 GiB |
Worker: 1
Comm: tcp://127.0.0.1:45442 | Total threads: 1 |
Dashboard: http://127.0.0.1:36563/status | Memory: 62.98 GiB |
Nanny: tcp://127.0.0.1:33615 | |
Local directory: /hugectr/notebooks/nvtabular_temp/workdir/dask-worker-space/worker-hhdw5hgq | |
GPU: Tesla P100-SXM2-16GB | GPU memory: 15.90 GiB |
Worker: 2
Comm: tcp://127.0.0.1:38832 | Total threads: 1 |
Dashboard: http://127.0.0.1:45537/status | Memory: 62.98 GiB |
Nanny: tcp://127.0.0.1:34857 | |
Local directory: /hugectr/notebooks/nvtabular_temp/workdir/dask-worker-space/worker-3xv1lvg4 | |
GPU: Tesla P100-SXM2-16GB | GPU memory: 15.90 GiB |
Worker: 3
Comm: tcp://127.0.0.1:33468 | Total threads: 1 |
Dashboard: http://127.0.0.1:34645/status | Memory: 62.98 GiB |
Nanny: tcp://127.0.0.1:33516 | |
Local directory: /hugectr/notebooks/nvtabular_temp/workdir/dask-worker-space/worker-_v4e6b68 | |
GPU: Tesla P100-SXM2-16GB | GPU memory: 15.90 GiB |
Worker: 4
Comm: tcp://127.0.0.1:38052 | Total threads: 1 |
Dashboard: http://127.0.0.1:33234/status | Memory: 62.98 GiB |
Nanny: tcp://127.0.0.1:42282 | |
Local directory: /hugectr/notebooks/nvtabular_temp/workdir/dask-worker-space/worker-lbkl0_sc | |
GPU: Tesla P100-SXM2-16GB | GPU memory: 15.90 GiB |
Worker: 5
Comm: tcp://127.0.0.1:46068 | Total threads: 1 |
Dashboard: http://127.0.0.1:34702/status | Memory: 62.98 GiB |
Nanny: tcp://127.0.0.1:39148 | |
Local directory: /hugectr/notebooks/nvtabular_temp/workdir/dask-worker-space/worker-47f2dcfj | |
GPU: Tesla P100-SXM2-16GB | GPU memory: 15.90 GiB |
Worker: 6
Comm: tcp://127.0.0.1:41440 | Total threads: 1 |
Dashboard: http://127.0.0.1:33288/status | Memory: 62.98 GiB |
Nanny: tcp://127.0.0.1:36099 | |
Local directory: /hugectr/notebooks/nvtabular_temp/workdir/dask-worker-space/worker-qq2t20ws | |
GPU: Tesla P100-SXM2-16GB | GPU memory: 15.90 GiB |
Worker: 7
Comm: tcp://127.0.0.1:43583 | Total threads: 1 |
Dashboard: http://127.0.0.1:45175/status | Memory: 62.98 GiB |
Nanny: tcp://127.0.0.1:35315 | |
Local directory: /hugectr/notebooks/nvtabular_temp/workdir/dask-worker-space/worker-1v3qmqn5 | |
GPU: Tesla P100-SXM2-16GB | GPU memory: 15.90 GiB |
!nvidia-smi
Tue Nov 16 23:03:13 2021
+-----------------------------------------------------------------------------+
| NVIDIA-SMI 450.51.06 Driver Version: 450.51.06 CUDA Version: 11.4 |
|-------------------------------+----------------------+----------------------+
| GPU Name Persistence-M| Bus-Id Disp.A | Volatile Uncorr. ECC |
| Fan Temp Perf Pwr:Usage/Cap| Memory-Usage | GPU-Util Compute M. |
| | | MIG M. |
|===============================+======================+======================|
| 0 Tesla P100-SXM2... On | 00000000:06:00.0 Off | 0 |
| N/A 41C P0 44W / 300W | 508MiB / 16280MiB | 0% Default |
| | | N/A |
+-------------------------------+----------------------+----------------------+
| 1 Tesla P100-SXM2... On | 00000000:07:00.0 Off | 0 |
| N/A 38C P0 41W / 300W | 255MiB / 16280MiB | 0% Default |
| | | N/A |
+-------------------------------+----------------------+----------------------+
| 2 Tesla P100-SXM2... On | 00000000:0A:00.0 Off | 0 |
| N/A 38C P0 44W / 300W | 255MiB / 16280MiB | 0% Default |
| | | N/A |
+-------------------------------+----------------------+----------------------+
| 3 Tesla P100-SXM2... On | 00000000:0B:00.0 Off | 0 |
| N/A 39C P0 45W / 300W | 255MiB / 16280MiB | 0% Default |
| | | N/A |
+-------------------------------+----------------------+----------------------+
| 4 Tesla P100-SXM2... On | 00000000:85:00.0 Off | 0 |
| N/A 43C P0 43W / 300W | 255MiB / 16280MiB | 0% Default |
| | | N/A |
+-------------------------------+----------------------+----------------------+
| 5 Tesla P100-SXM2... On | 00000000:86:00.0 Off | 0 |
| N/A 44C P0 44W / 300W | 255MiB / 16280MiB | 0% Default |
| | | N/A |
+-------------------------------+----------------------+----------------------+
| 6 Tesla P100-SXM2... On | 00000000:89:00.0 Off | 0 |
| N/A 39C P0 44W / 300W | 255MiB / 16280MiB | 0% Default |
| | | N/A |
+-------------------------------+----------------------+----------------------+
| 7 Tesla P100-SXM2... On | 00000000:8A:00.0 Off | 0 |
| N/A 38C P0 42W / 300W | 255MiB / 16280MiB | 0% Default |
| | | N/A |
+-------------------------------+----------------------+----------------------+
+-----------------------------------------------------------------------------+
| Processes: |
| GPU GI CI PID Type Process name GPU Memory |
| ID ID Usage |
|=============================================================================|
+-----------------------------------------------------------------------------+
# Initialize RMM pool on ALL workers
def _rmm_pool():
rmm.reinitialize(
# RMM may require the pool size to be a multiple of 256.
pool_allocator=True,
initial_pool_size=(device_pool_size // 256) * 256, # Use default size
)
client.run(_rmm_pool)
{'tcp://127.0.0.1:33468': None,
'tcp://127.0.0.1:38052': None,
'tcp://127.0.0.1:38832': None,
'tcp://127.0.0.1:41440': None,
'tcp://127.0.0.1:43583': None,
'tcp://127.0.0.1:45134': None,
'tcp://127.0.0.1:45442': None,
'tcp://127.0.0.1:46068': None}
Define NVTabular dataset
train_paths = glob.glob('./data/train.parquet')
valid_paths = glob.glob('./data/valid.parquet')
test_paths = glob.glob('./data/test.parquet')
train_dataset = nvt.Dataset(train_paths, engine='parquet', part_mem_fraction=0.15)
valid_dataset = nvt.Dataset(valid_paths, engine='parquet', part_mem_fraction=0.15)
test_dataset = nvt.Dataset(test_paths, engine='parquet', part_mem_fraction=0.15)
train_dataset.to_ddf().head()
event_time | event_type | product_id | brand | price | user_id | user_session | target | cat_0 | cat_1 | cat_2 | cat_3 | timestamp | ts_hour | ts_minute | ts_weekday | ts_day | ts_month | ts_year | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
0 | 2020-02-01 00:00:18 UTC | cart | 100065078 | xiaomi | 568.61 | 526615078 | 5f0aab9f-f92e-4eff-b0d2-fcec5f553f01 | 0 | construction | tools | light | <NA> | 2020-02-01 00:00:18 | 0 | 0 | 5 | 1 | 2 | 2020 |
1 | 2020-02-01 00:00:18 UTC | cart | 5701246 | <NA> | 24.43 | 563902689 | 76cc9152-8a9f-43e9-b98a-ee484510f379 | 0 | electronics | video | tv | <NA> | 2020-02-01 00:00:18 | 0 | 0 | 5 | 1 | 2 | 2020 |
2 | 2020-02-01 00:00:31 UTC | cart | 14701533 | <NA> | 154.42 | 520953435 | 5f1c7752-cf92-41fc-9a16-e8897a90eee8 | 0 | electronics | video | projector | <NA> | 2020-02-01 00:00:31 | 0 | 0 | 5 | 1 | 2 | 2020 |
3 | 2020-02-01 00:00:40 UTC | cart | 1004855 | xiaomi | 123.30 | 519236281 | e512f514-dc7f-4fc9-9042-e3955989d395 | 0 | construction | tools | light | <NA> | 2020-02-01 00:00:40 | 0 | 0 | 5 | 1 | 2 | 2020 |
4 | 2020-02-01 00:00:47 UTC | cart | 1005100 | samsung | 140.28 | 550305600 | bd7a37b6-420d-4575-8852-ac825aff39b5 | 0 | construction | tools | light | <NA> | 2020-02-01 00:00:47 | 0 | 0 | 5 | 1 | 2 | 2020 |
len(train_dataset.to_ddf().columns)
19
train_dataset.to_ddf().columns
Index(['event_time', 'event_type', 'product_id', 'brand', 'price', 'user_id',
'user_session', 'target', 'cat_0', 'cat_1', 'cat_2', 'cat_3',
'timestamp', 'ts_hour', 'ts_minute', 'ts_weekday', 'ts_day', 'ts_month',
'ts_year'],
dtype='object')
len(train_dataset.to_ddf())
7949839
Preprocessing and feature engineering
In this notebook we will explore a few feature engineering technique with NVTabular:
Creating cross features, e.g.
user_id
and'brand
Target encoding
The engineered features will then be preprocessed into a form suitable for machine learning model:
Fill missing values
Encoding categorical features into integer values
Normalization of numeric features
from nvtabular.ops import LambdaOp
# cross features
def user_id_cross_maker(col, gdf):
return col.astype(str) + '_' + gdf['user_id'].astype(str)
user_id_cross_features = (
nvt.ColumnGroup(['product_id', 'brand', 'ts_hour', 'ts_minute']) >>
LambdaOp(user_id_cross_maker, dependency=['user_id']) >>
nvt.ops.Rename(postfix = '_user_id_cross')
)
def user_id_brand_cross_maker(col, gdf):
return col.astype(str) + '_' + gdf['user_id'].astype(str) + '_' + gdf['brand'].astype(str)
user_id_brand_cross_features = (
nvt.ColumnGroup(['ts_hour', 'ts_weekday', 'cat_0', 'cat_1', 'cat_2']) >>
LambdaOp(user_id_brand_cross_maker, dependency=['user_id', 'brand']) >>
nvt.ops.Rename(postfix = '_user_id_brand_cross')
)
target_encode = (
['brand', 'user_id', 'product_id', 'cat_2', ['ts_weekday', 'ts_day']] >>
nvt.ops.TargetEncoding(
nvt.ColumnGroup('target'),
kfold=5,
p_smooth=20,
out_dtype="float32",
)
)
cat_feats = (user_id_brand_cross_features + user_id_cross_features) >> nvt.ops.Categorify()
cont_feats = ['price', 'ts_weekday', 'ts_day', 'ts_month'] >> nvt.ops.FillMissing() >> nvt.ops.Normalize()
cont_feats += target_encode >> nvt.ops.Rename(postfix = '_TE')
output = cat_feats + cont_feats + 'target'
proc = nvt.Workflow(output)
/nvtabular/nvtabular/workflow/workflow.py:89: UserWarning: A global dask.distributed client has been detected, but the single-threaded scheduler will be used for execution. Please use the `client` argument to initialize a `Workflow` object with distributed-execution enabled.
warnings.warn(
Executing the workflow
After having defined the workflow, calling the fit()
method will start the actual computation to record the required statistics from the training data.
%%time
time_preproc_start = time()
proc.fit(train_dataset)
time_preproc = time()-time_preproc_start
/usr/local/lib/python3.8/dist-packages/numba/cuda/compiler.py:865: NumbaPerformanceWarning: Grid size (1) < 2 * SM count (112) will likely result in GPU under utilization due to low occupancy.
warn(NumbaPerformanceWarning(msg))
CPU times: user 13.3 s, sys: 10.1 s, total: 23.4 s
Wall time: 26.8 s
cat_feats.output_columns.names
['ts_hour_user_id_brand_cross',
'ts_weekday_user_id_brand_cross',
'cat_0_user_id_brand_cross',
'cat_1_user_id_brand_cross',
'cat_2_user_id_brand_cross',
'product_id_user_id_cross',
'brand_user_id_cross',
'ts_hour_user_id_cross',
'ts_minute_user_id_cross']
output.output_columns.names
['ts_hour_user_id_brand_cross',
'ts_weekday_user_id_brand_cross',
'cat_0_user_id_brand_cross',
'cat_1_user_id_brand_cross',
'cat_2_user_id_brand_cross',
'product_id_user_id_cross',
'brand_user_id_cross',
'ts_hour_user_id_cross',
'ts_minute_user_id_cross',
'price',
'ts_weekday',
'ts_day',
'ts_month',
'TE_brand_target_TE',
'TE_user_id_target_TE',
'TE_product_id_target_TE',
'TE_cat_2_target_TE',
'TE_ts_weekday_ts_day_target_TE',
'target']
CAT_FEATS = ['ts_hour_user_id_brand_cross',
'ts_weekday_user_id_brand_cross',
'cat_0_user_id_brand_cross',
'cat_1_user_id_brand_cross',
'cat_2_user_id_brand_cross',
'product_id_user_id_cross',
'brand_user_id_cross',
'ts_hour_user_id_cross',
'ts_minute_user_id_cross',]
CON_FEATS = ['price',
'ts_weekday',
'ts_day',
'ts_month',
'TE_brand_target_TE',
'TE_user_id_target_TE',
'TE_product_id_target_TE',
'TE_cat_2_target_TE',
'TE_ts_weekday_ts_day_target_TE']
dict_dtypes = {}
for col in CAT_FEATS:
dict_dtypes[col] = np.int64
for col in CON_FEATS:
dict_dtypes[col] = np.float32
dict_dtypes['target'] = np.float32
Next, we call the transform()
method to transform the datasets.
output_train_dir = os.path.join(output_path, 'train/')
output_valid_dir = os.path.join(output_path, 'valid/')
output_test_dir = os.path.join(output_path, 'test/')
! rm -rf $output_train_dir && mkdir -p $output_train_dir
! rm -rf $output_valid_dir && mkdir -p $output_valid_dir
! rm -rf $output_test_dir && mkdir -p $output_test_dir
%%time
time_preproc_start = time()
proc.transform(train_dataset).to_parquet(output_path=output_train_dir, dtypes=dict_dtypes,
shuffle=nvt.io.Shuffle.PER_PARTITION,
cats=CAT_FEATS,
conts=CON_FEATS,
labels=['target'])
time_preproc += time()-time_preproc_start
/nvtabular/nvtabular/io/dask.py:375: UserWarning: A global dask.distributed client has been detected, but the single-threaded scheduler will be used for this write operation. Please use the `client` argument to initialize a `Dataset` and/or `Workflow` object with distributed-execution enabled.
warnings.warn(
CPU times: user 1.6 s, sys: 3.29 s, total: 4.89 s
Wall time: 5.79 s
!ls -l $output_train_dir
total 366131
-rw-r--r-- 1 root dip 47 Nov 16 23:04 _file_list.txt
-rw-r--r-- 1 root dip 18283 Nov 16 23:04 _metadata
-rw-r--r-- 1 root dip 1045 Nov 16 23:04 _metadata.json
-rw-r--r-- 1 root dip 706364298 Nov 16 23:04 part_0.parquet
-rw-r--r-- 1 root dip 7975 Nov 16 23:04 schema.pbtxt
%%time
time_preproc_start = time()
proc.transform(valid_dataset).to_parquet(output_path=output_valid_dir, dtypes=dict_dtypes,
shuffle=nvt.io.Shuffle.PER_PARTITION,
cats=CAT_FEATS,
conts=CON_FEATS,
labels=['target'])
time_preproc += time()-time_preproc_start
CPU times: user 1.06 s, sys: 1.57 s, total: 2.63 s
Wall time: 2.83 s
!ls -l $output_valid_dir
total 100979
-rw-r--r-- 1 root dip 47 Nov 16 23:04 _file_list.txt
-rw-r--r-- 1 root dip 8983 Nov 16 23:04 _metadata
-rw-r--r-- 1 root dip 1045 Nov 16 23:04 _metadata.json
-rw-r--r-- 1 root dip 92826604 Nov 16 23:04 part_0.parquet
-rw-r--r-- 1 root dip 7975 Nov 16 23:04 schema.pbtxt
%%time
time_preproc_start = time()
proc.transform(test_dataset).to_parquet(output_path=output_test_dir, dtypes=dict_dtypes,
shuffle=nvt.io.Shuffle.PER_PARTITION,
cats=CAT_FEATS,
conts=CON_FEATS,
labels=['target'])
time_preproc += time()-time_preproc_start
CPU times: user 1.05 s, sys: 1.64 s, total: 2.69 s
Wall time: 2.75 s
time_preproc
38.198790550231934
Verify the preprocessed data
Let’s quickly read the data back and verify that all fields have the expected format.
!ls $output_train_dir
_file_list.txt _metadata _metadata.json part_0.parquet schema.pbtxt
nvtdata = pd.read_parquet(output_train_dir+'/part_0.parquet')
nvtdata.head()
ts_hour_user_id_brand_cross | ts_weekday_user_id_brand_cross | cat_0_user_id_brand_cross | cat_1_user_id_brand_cross | cat_2_user_id_brand_cross | product_id_user_id_cross | brand_user_id_cross | ts_hour_user_id_cross | ts_minute_user_id_cross | price | ts_weekday | ts_day | ts_month | TE_brand_target_TE | TE_user_id_target_TE | TE_product_id_target_TE | TE_cat_2_target_TE | TE_ts_weekday_ts_day_target_TE | target | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
0 | 817883 | 908980 | 0 | 0 | 0 | 1085846 | 855303 | 144463 | 5417827 | -0.725652 | -0.502085 | 0.235441 | 1.314784 | 0.199788 | 0.325234 | 0.227236 | -1.284867 | 0.387063 | 0.0 |
1 | 4152058 | 2732403 | 1210052 | 712779 | 0 | 1360101 | 954962 | 731363 | 2230166 | -0.836849 | -0.007929 | -0.802043 | -0.864342 | 0.355313 | 0.266837 | 0.255486 | -1.285741 | 0.420646 | 0.0 |
2 | 3204608 | 274365 | 30730 | 31144 | 25505 | 29457 | 32720 | 3039842 | 3062261 | -0.184922 | -0.007929 | 1.618752 | -0.864342 | 0.466206 | 0.237990 | 0.414308 | 0.459563 | 0.239809 | 0.0 |
3 | 0 | 0 | 0 | 0 | 0 | 3464677 | 0 | 2467278 | 2493129 | -0.841169 | -0.007929 | 0.465993 | -0.666240 | -1.285569 | 0.390281 | 0.318047 | -1.288352 | 0.376334 | 0.0 |
4 | 2665639 | 66327 | 19261 | 19397 | 16204 | 2497109 | 16447 | 2620458 | 2349810 | 3.510283 | 0.486228 | 0.581269 | -0.666240 | 0.446405 | 0.533477 | 0.492186 | 0.459563 | 0.388496 | 0.0 |
!ls $output_valid_dir
_file_list.txt _metadata _metadata.json part_0.parquet schema.pbtxt
nvtdata_valid = pd.read_parquet(output_valid_dir+'/part_0.parquet')
nvtdata_valid.head()
ts_hour_user_id_brand_cross | ts_weekday_user_id_brand_cross | cat_0_user_id_brand_cross | cat_1_user_id_brand_cross | cat_2_user_id_brand_cross | product_id_user_id_cross | brand_user_id_cross | ts_hour_user_id_cross | ts_minute_user_id_cross | price | ts_weekday | ts_day | ts_month | TE_brand_target_TE | TE_user_id_target_TE | TE_product_id_target_TE | TE_cat_2_target_TE | TE_ts_weekday_ts_day_target_TE | target | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 1.107537 | 0.980384 | -0.225663 | -0.468138 | 0.372078 | 0.390281 | 0.427259 | 0.390176 | 0.353950 | 0.0 |
1 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | -0.840005 | 0.980384 | -0.225663 | -0.468138 | 0.364968 | 0.390281 | 0.320797 | -1.284867 | 0.352739 | 0.0 |
2 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | -0.327548 | -1.490397 | -0.802043 | -0.468138 | 0.466705 | 0.390281 | 0.498779 | 0.459734 | 0.380590 | 1.0 |
3 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 2.291189 | -1.490397 | -1.608975 | -0.468138 | 0.446405 | 0.390281 | 0.303992 | 0.459563 | 0.408594 | 0.0 |
4 | 0 | 0 | 2677742 | 1812225 | 1033276 | 3759307 | 2828489 | 0 | 0 | -0.738273 | -0.007929 | 1.157649 | -0.468138 | 0.277334 | 0.390281 | 0.326134 | 0.257637 | 0.409076 | 0.0 |
sum(nvtdata_valid['ts_hour_user_id_brand_cross']==0)
2359020
len(nvtdata_valid)
2461719
Getting the embedding size
Next, we need to get the embedding size for the categorical variables. This is an important input for defining the embedding table size to be used by HugeCTR.
embeddings = ops.get_embedding_sizes(proc)
embeddings
{'ts_hour_user_id_brand_cross': (4427037, 512),
'ts_weekday_user_id_brand_cross': (3961156, 512),
'cat_0_user_id_brand_cross': (2877223, 512),
'cat_1_user_id_brand_cross': (2890639, 512),
'cat_2_user_id_brand_cross': (2159304, 512),
'product_id_user_id_cross': (4398425, 512),
'brand_user_id_cross': (3009092, 512),
'ts_hour_user_id_cross': (3999369, 512),
'ts_minute_user_id_cross': (5931061, 512)}
print([embeddings[x][0] for x in cat_feats.output_columns.names])
[4427037, 3961156, 2877223, 2890639, 2159304, 4398425, 3009092, 3999369, 5931061]
cat_feats.output_columns.names
['ts_hour_user_id_brand_cross',
'ts_weekday_user_id_brand_cross',
'cat_0_user_id_brand_cross',
'cat_1_user_id_brand_cross',
'cat_2_user_id_brand_cross',
'product_id_user_id_cross',
'brand_user_id_cross',
'ts_hour_user_id_cross',
'ts_minute_user_id_cross']
embedding_size_str = "{}".format([embeddings[x][0] for x in cat_feats.output_columns.names])
embedding_size_str
'[4427037, 3961156, 2877223, 2890639, 2159304, 4398425, 3009092, 3999369, 5931061]'
num_con_feates = len(CON_FEATS)
num_con_feates
9
print([embeddings[x][0] for x in cat_feats.output_columns.names])
[4427037, 3961156, 2877223, 2890639, 2159304, 4398425, 3009092, 3999369, 5931061]
Next, we’ll shutdown our Dask client from earlier to free up some memory so that we can share it with HugeCTR.
client.shutdown()
cluster.close()
Preparing the training Python script for HugeCTR
The HugeCTR model can be defined by Python API. The following Python script defines a DLRM model and specifies the training resources.
Several parameters that need to be edited to match this dataset are:
slot_size_array
: cardinalities for the categorical variablesdense_dim
: number of dense featuresslot_num
: number of categorical variables
The model graph can be saved into a JSON file by calling model.graph_to_json
, which will be used for inference afterwards.
In the following code, we train the network using 8 GPUs and a workspace of 4000 MB per GPU. Note that the total embedding size is 33653306*128*4/(1024**3)
= 16.432 GB.
%%writefile hugectr_dlrm_ecommerce.py
import hugectr
from mpi4py import MPI
solver = hugectr.CreateSolver(max_eval_batches = 2720,
batchsize_eval = 16384,
batchsize = 16384,
lr = 0.1,
warmup_steps = 8000,
decay_start = 48000,
decay_steps = 24000,
vvgpu = [[0,1,2,3,4,5,6,7]],
repeat_dataset = True,
i64_input_key = True)
reader = hugectr.DataReaderParams(data_reader_type = hugectr.DataReaderType_t.Parquet,
source = ["./nvtabular_temp/output/train/_file_list.txt"],
eval_source = "./nvtabular_temp/output/valid/_file_list.txt",
check_type = hugectr.Check_t.Non,
slot_size_array = [4427037, 3961156, 2877223, 2890639, 2159304, 4398425, 3009092, 3999369, 5931061])
optimizer = hugectr.CreateOptimizer(optimizer_type = hugectr.Optimizer_t.SGD,
update_type = hugectr.Update_t.Local,
atomic_update = True)
model = hugectr.Model(solver, reader, optimizer)
model.add(hugectr.Input(label_dim = 1, label_name = "label",
dense_dim = 9, dense_name = "dense",
data_reader_sparse_param_array =
[hugectr.DataReaderSparseParam("data1", 1, True, 9)]))
model.add(hugectr.SparseEmbedding(embedding_type = hugectr.Embedding_t.DistributedSlotSparseEmbeddingHash,
workspace_size_per_gpu_in_mb = 4000,
embedding_vec_size = 128,
combiner = 'sum',
sparse_embedding_name = "sparse_embedding1",
bottom_name = "data1",
optimizer = optimizer))
model.add(hugectr.DenseLayer(layer_type = hugectr.Layer_t.InnerProduct,
bottom_names = ["dense"],
top_names = ["fc1"],
num_output=512))
model.add(hugectr.DenseLayer(layer_type = hugectr.Layer_t.ReLU,
bottom_names = ["fc1"],
top_names = ["relu1"]))
model.add(hugectr.DenseLayer(layer_type = hugectr.Layer_t.InnerProduct,
bottom_names = ["relu1"],
top_names = ["fc2"],
num_output=256))
model.add(hugectr.DenseLayer(layer_type = hugectr.Layer_t.ReLU,
bottom_names = ["fc2"],
top_names = ["relu2"]))
model.add(hugectr.DenseLayer(layer_type = hugectr.Layer_t.InnerProduct,
bottom_names = ["relu2"],
top_names = ["fc3"],
num_output=128))
model.add(hugectr.DenseLayer(layer_type = hugectr.Layer_t.ReLU,
bottom_names = ["fc3"],
top_names = ["relu3"]))
model.add(hugectr.DenseLayer(layer_type = hugectr.Layer_t.Interaction,
bottom_names = ["relu3","sparse_embedding1"],
top_names = ["interaction1"]))
model.add(hugectr.DenseLayer(layer_type = hugectr.Layer_t.InnerProduct,
bottom_names = ["interaction1"],
top_names = ["fc4"],
num_output=1024))
model.add(hugectr.DenseLayer(layer_type = hugectr.Layer_t.ReLU,
bottom_names = ["fc4"],
top_names = ["relu4"]))
model.add(hugectr.DenseLayer(layer_type = hugectr.Layer_t.InnerProduct,
bottom_names = ["relu4"],
top_names = ["fc5"],
num_output=1024))
model.add(hugectr.DenseLayer(layer_type = hugectr.Layer_t.ReLU,
bottom_names = ["fc5"],
top_names = ["relu5"]))
model.add(hugectr.DenseLayer(layer_type = hugectr.Layer_t.InnerProduct,
bottom_names = ["relu5"],
top_names = ["fc6"],
num_output=512))
model.add(hugectr.DenseLayer(layer_type = hugectr.Layer_t.ReLU,
bottom_names = ["fc6"],
top_names = ["relu6"]))
model.add(hugectr.DenseLayer(layer_type = hugectr.Layer_t.InnerProduct,
bottom_names = ["relu6"],
top_names = ["fc7"],
num_output=256))
model.add(hugectr.DenseLayer(layer_type = hugectr.Layer_t.ReLU,
bottom_names = ["fc7"],
top_names = ["relu7"]))
model.add(hugectr.DenseLayer(layer_type = hugectr.Layer_t.InnerProduct,
bottom_names = ["relu7"],
top_names = ["fc8"],
num_output=1))
model.add(hugectr.DenseLayer(layer_type = hugectr.Layer_t.BinaryCrossEntropyLoss,
bottom_names = ["fc8", "label"],
top_names = ["loss"]))
model.compile()
model.summary()
model.graph_to_json(graph_config_file = "dlrm_ecommerce.json")
model.fit(max_iter = 12000, display = 1000, eval_interval = 3000, snapshot = 10000, snapshot_prefix = "./")
Overwriting hugectr_dlrm_ecommerce.py
HugeCTR training
Now we are ready to train a DLRM model with HugeCTR.
!python3 hugectr_dlrm_ecommerce.py
HugeCTR Version: 3.2
====================================================Model Init=====================================================
[HUGECTR][23:17:45][INFO][RANK0]: Global seed is 1985961998
[HUGECTR][23:17:45][INFO][RANK0]: Device to NUMA mapping:
GPU 0 -> node 0
GPU 1 -> node 0
GPU 2 -> node 0
GPU 3 -> node 0
GPU 4 -> node 1
GPU 5 -> node 1
GPU 6 -> node 1
GPU 7 -> node 1
[HUGECTR][23:17:54][WARNING][RANK0]: Peer-to-peer access cannot be fully enabled.
[HUGECTR][23:17:54][INFO][RANK0]: Start all2all warmup
[HUGECTR][23:17:54][INFO][RANK0]: End all2all warmup
[HUGECTR][23:17:54][INFO][RANK0]: Using All-reduce algorithm: NCCL
[HUGECTR][23:17:54][INFO][RANK0]: Device 0: Tesla P100-SXM2-16GB
[HUGECTR][23:17:54][INFO][RANK0]: Device 1: Tesla P100-SXM2-16GB
[HUGECTR][23:17:54][INFO][RANK0]: Device 2: Tesla P100-SXM2-16GB
[HUGECTR][23:17:54][INFO][RANK0]: Device 3: Tesla P100-SXM2-16GB
[HUGECTR][23:17:54][INFO][RANK0]: Device 4: Tesla P100-SXM2-16GB
[HUGECTR][23:17:54][INFO][RANK0]: Device 5: Tesla P100-SXM2-16GB
[HUGECTR][23:17:54][INFO][RANK0]: Device 6: Tesla P100-SXM2-16GB
[HUGECTR][23:17:54][INFO][RANK0]: Device 7: Tesla P100-SXM2-16GB
[HUGECTR][23:17:54][INFO][RANK0]: num of DataReader workers: 8
[HUGECTR][23:17:55][INFO][RANK0]: Vocabulary size: 33653306
[HUGECTR][23:17:55][INFO][RANK0]: max_vocabulary_size_per_gpu_=8192000
[HUGECTR][23:17:58][INFO][RANK0]: Graph analysis to resolve tensor dependency
===================================================Model Compile===================================================
[HUGECTR][23:18:12][INFO][RANK0]: gpu0 start to init embedding
[HUGECTR][23:18:12][INFO][RANK0]: gpu1 start to init embedding
[HUGECTR][23:18:12][INFO][RANK0]: gpu6 start to init embedding
[HUGECTR][23:18:12][INFO][RANK0]: gpu2 start to init embedding
[HUGECTR][23:18:12][INFO][RANK0]: gpu7 start to init embedding
[HUGECTR][23:18:12][INFO][RANK0]: gpu5 start to init embedding
[HUGECTR][23:18:12][INFO][RANK0]: gpu3 start to init embedding
[HUGECTR][23:18:12][INFO][RANK0]: gpu4 start to init embedding
[HUGECTR][23:18:12][INFO][RANK0]: gpu0 init embedding done
[HUGECTR][23:18:12][INFO][RANK0]: gpu1 init embedding done
[HUGECTR][23:18:12][INFO][RANK0]: gpu2 init embedding done
[HUGECTR][23:18:12][INFO][RANK0]: gpu7 init embedding done
[HUGECTR][23:18:12][INFO][RANK0]: gpu6 init embedding done
[HUGECTR][23:18:12][INFO][RANK0]: gpu5 init embedding done
[HUGECTR][23:18:12][INFO][RANK0]: gpu4 init embedding done
[HUGECTR][23:18:12][INFO][RANK0]: gpu3 init embedding done
[HUGECTR][23:18:12][INFO][RANK0]: Starting AUC NCCL warm-up
[HUGECTR][23:18:12][INFO][RANK0]: Warm-up done
===================================================Model Summary===================================================
label Dense Sparse
label dense data1
(None, 1) (None, 9)
------------------------------------------------------------------------------------------------------------------
Layer Type Input Name Output Name Output Shape
------------------------------------------------------------------------------------------------------------------
DistributedSlotSparseEmbeddingHash data1 sparse_embedding1 (None, 9, 128)
InnerProduct dense fc1 (None, 512)
ReLU fc1 relu1 (None, 512)
InnerProduct relu1 fc2 (None, 256)
ReLU fc2 relu2 (None, 256)
InnerProduct relu2 fc3 (None, 128)
ReLU fc3 relu3 (None, 128)
Interaction relu3,sparse_embedding1 interaction1 (None, 174)
InnerProduct interaction1 fc4 (None, 1024)
ReLU fc4 relu4 (None, 1024)
InnerProduct relu4 fc5 (None, 1024)
ReLU fc5 relu5 (None, 1024)
InnerProduct relu5 fc6 (None, 512)
ReLU fc6 relu6 (None, 512)
InnerProduct relu6 fc7 (None, 256)
ReLU fc7 relu7 (None, 256)
InnerProduct relu7 fc8 (None, 1)
BinaryCrossEntropyLoss fc8,label loss
------------------------------------------------------------------------------------------------------------------
[HUGECTR][23:18:12][INFO][RANK0]: Save the model graph to dlrm_ecommerce.json successfully
=====================================================Model Fit=====================================================
[HUGECTR][23:18:12][INFO][RANK0]: Use non-epoch mode with number of iterations: 12000
[HUGECTR][23:18:12][INFO][RANK0]: Training batchsize: 16384, evaluation batchsize: 16384
[HUGECTR][23:18:12][INFO][RANK0]: Evaluation interval: 3000, snapshot interval: 10000
[HUGECTR][23:18:12][INFO][RANK0]: Sparse embedding trainable: 1, dense network trainable: 1
[HUGECTR][23:18:12][INFO][RANK0]: Use mixed precision: 0, scaler: 1, use cuda graph: -875196854
[HUGECTR][23:18:12][INFO][RANK0]: lr: 0.100000, warmup_steps: 8000, decay_start: 48000, decay_steps: 24000, decay_power: 2.000000, end_lr: 0.000000
[HUGECTR][23:18:12][INFO][RANK0]: Training source file: ./nvtabular_temp/output/train/_file_list.txt
[HUGECTR][23:18:12][INFO][RANK0]: Evaluation source file: ./nvtabular_temp/output/valid/_file_list.txt
[HUGECTR][23:18:20][INFO][RANK0]: Iter: 1000 Time(1000 iters): 8.477706s Loss: 0.654302 lr:0.012512
[HUGECTR][23:18:29][INFO][RANK0]: Iter: 2000 Time(1000 iters): 8.461642s Loss: 0.537260 lr:0.025013
[HUGECTR][23:18:37][INFO][RANK0]: Iter: 3000 Time(1000 iters): 8.473848s Loss: 0.523659 lr:0.037512
[HUGECTR][23:18:47][INFO][RANK0]: Evaluation, AUC: 0.652278
[HUGECTR][23:18:47][INFO][RANK0]: Eval Time for 2720 iters: 9.794543s
[HUGECTR][23:18:56][INFO][RANK0]: Iter: 4000 Time(1000 iters): 18.361339s Loss: 0.521578 lr:0.050012
[HUGECTR][23:19:04][INFO][RANK0]: Iter: 5000 Time(1000 iters): 8.492043s Loss: 0.515692 lr:0.062513
[HUGECTR][23:19:13][INFO][RANK0]: Iter: 6000 Time(1000 iters): 8.491605s Loss: 0.518826 lr:0.075013
[HUGECTR][23:19:22][INFO][RANK0]: Evaluation, AUC: 0.650539
[HUGECTR][23:19:22][INFO][RANK0]: Eval Time for 2720 iters: 9.814989s
[HUGECTR][23:19:31][INFO][RANK0]: Iter: 7000 Time(1000 iters): 18.332924s Loss: 0.511855 lr:0.087513
[HUGECTR][23:19:39][INFO][RANK0]: Iter: 8000 Time(1000 iters): 8.488666s Loss: 0.515189 lr:0.100000
[HUGECTR][23:19:48][INFO][RANK0]: Iter: 9000 Time(1000 iters): 8.455840s Loss: 0.513654 lr:0.100000
[HUGECTR][23:19:58][INFO][RANK0]: Evaluation, AUC: 0.645823
[HUGECTR][23:19:58][INFO][RANK0]: Eval Time for 2720 iters: 9.750920s
[HUGECTR][23:20:06][INFO][RANK0]: Iter: 10000 Time(1000 iters): 18.285750s Loss: 0.518827 lr:0.100000
[HUGECTR][23:20:15][INFO][RANK0]: Rank0: Write hash table to file
[HUGECTR][23:21:26][INFO][RANK0]: Dumping sparse weights to files, successful
[HUGECTR][23:21:26][INFO][RANK0]: Dumping sparse optimzer states to files, successful
[HUGECTR][23:21:26][INFO][RANK0]: Dumping dense weights to file, successful
[HUGECTR][23:21:26][INFO][RANK0]: Dumping dense optimizer states to file, successful
[HUGECTR][23:21:26][INFO][RANK0]: Dumping untrainable weights to file, successful
[HUGECTR][23:21:35][INFO][RANK0]: Iter: 11000 Time(1000 iters): 88.781702s Loss: 0.511783 lr:0.100000
[HUGECTR][23:21:43][INFO][RANK0]: Finish 12000 iterations with batchsize: 16384 in 211.67s.
HugeCTR inference
In this section, we read the test dataset and compute the AUC value.
We will utilize the saved model graph in JSON format for inference.
Prepare the inference session
import sys
from hugectr.inference import InferenceParams, CreateInferenceSession
from mpi4py import MPI
# create inference session
inference_params = InferenceParams(model_name = "dlrm",
max_batchsize = 4096,
hit_rate_threshold = 0.6,
dense_model_file = "./_dense_10000.model",
sparse_model_files = ["./0_sparse_10000.model"],
device_id = 0,
use_gpu_embedding_cache = True,
cache_size_percentage = 0.2,
i64_input_key = True)
inference_session = CreateInferenceSession("dlrm_ecommerce.json", inference_params)
[HUGECTR][23:21:46][INFO][RANK0]: default_emb_vec_value is not specified using default: 0.000000
[HUGECTR][23:21:46][INFO][RANK0]: Created parallel (16 partitions) blank database backend in local memory!
[HUGECTR][23:22:51][INFO][RANK0]: ParallelLocalMemory backend. Table: dlrm#0. Inserted 33653303 / 33653303 pairs.
[HUGECTR][23:22:51][INFO][RANK0]: Cached 0.000000 * 33653303 embeddings in CPU memory database!
[HUGECTR][23:22:52][INFO][RANK0]: Create embedding cache in device 0.
[HUGECTR][23:22:53][INFO][RANK0]: create_refreshspace2
[HUGECTR][23:22:53][INFO][RANK0]: Global seed is 813179416
[HUGECTR][23:22:53][INFO][RANK0]: Device to NUMA mapping:
GPU 0 -> node 0
[HUGECTR][23:22:54][WARNING][RANK0]: Peer-to-peer access cannot be fully enabled.
[HUGECTR][23:22:54][INFO][RANK0]: Start all2all warmup
[HUGECTR][23:22:54][INFO][RANK0]: End all2all warmup
[HUGECTR][23:22:54][INFO][RANK0]: Use mixed precision: 0
[HUGECTR][23:22:54][INFO][RANK0]: start create embedding for inference
[HUGECTR][23:22:54][INFO][RANK0]: sparse_input name data1
[HUGECTR][23:22:54][INFO][RANK0]: create embedding for inference success
[HUGECTR][23:22:54][INFO][RANK0]: Inference stage skip BinaryCrossEntropyLoss layer, replaced by Sigmoid layer
Reading and preparing the data
First, we read the NVTabular processed data.
import pandas as pd
nvtdata_test = pd.read_parquet('./nvtabular_temp/output/test/part_0.parquet')
nvtdata_test.head()
ts_hour_user_id_brand_cross | ts_weekday_user_id_brand_cross | cat_0_user_id_brand_cross | cat_1_user_id_brand_cross | cat_2_user_id_brand_cross | product_id_user_id_cross | brand_user_id_cross | ts_hour_user_id_cross | ts_minute_user_id_cross | price | ts_weekday | ts_day | ts_month | TE_brand_target_TE | TE_user_id_target_TE | TE_product_id_target_TE | TE_cat_2_target_TE | TE_ts_weekday_ts_day_target_TE | target | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | -0.823432 | -1.490397 | 1.272925 | -0.270035 | -1.287369 | 0.390281 | 0.390281 | -1.285848 | 0.445558 | 0.0 |
1 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | -0.627107 | -0.996241 | 0.581269 | -0.270035 | 0.306095 | 0.390281 | 0.339375 | 0.352659 | 0.396743 | 0.0 |
2 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | -0.299060 | -0.996241 | 1.388201 | -0.270035 | 0.364552 | 0.390281 | 0.443069 | 0.459563 | 0.433425 | 0.0 |
3 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | -0.037364 | 1.474540 | -0.456215 | -0.270035 | 0.466595 | 0.390281 | 0.431157 | 0.460040 | 0.407608 | 1.0 |
4 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | -0.704362 | -0.502085 | -0.917319 | -0.270035 | 0.218853 | 0.390281 | 0.274339 | -1.285741 | 0.428683 | 0.0 |
con_feats = ['price',
'ts_weekday',
'ts_day',
'ts_month',
'TE_brand_target_TE',
'TE_user_id_target_TE',
'TE_product_id_target_TE',
'TE_cat_2_target_TE',
'TE_ts_weekday_ts_day_target_TE']
cat_feats = ['ts_hour_user_id_brand_cross',
'ts_weekday_user_id_brand_cross',
'cat_0_user_id_brand_cross',
'cat_1_user_id_brand_cross',
'cat_2_user_id_brand_cross',
'product_id_user_id_cross',
'brand_user_id_cross',
'ts_hour_user_id_cross',
'ts_minute_user_id_cross']
emb_size = [4427037, 3961156, 2877223, 2890639, 2159304, 4398425, 3009092, 3999369, 5931061]
Converting data to CSR format
HugeCTR expects data in CSR format for inference. One important thing to note is that NVTabular requires categorical variables to occupy different integer ranges. For example, if there are 10 users and 10 items, then the users should be encoded in the 0-9 range, while items should be in the 10-19 range. NVTabular encodes both users and items in the 0-9 ranges.
For this reason, we need to shift the keys of the categorical variable produced by NVTabular to comply with HugeCTR.
import numpy as np
shift = np.insert(np.cumsum(emb_size), 0, 0)[:-1]
cat_data = nvtdata_test[cat_feats].values + shift
dense_data = nvtdata_test[con_feats].values
def infer_batch(inference_session, dense_data_batch, cat_data_batch):
dense_features = list(dense_data_batch.flatten())
embedding_columns = list(cat_data_batch.flatten())
row_ptrs= list(range(0,len(embedding_columns)+1))
output = inference_session.predict(dense_features, embedding_columns, row_ptrs)
return output
Now we are ready to carry out inference on the test set.
batch_size = 4096
num_batches = (len(dense_data) // batch_size) + 1
batch_idx = np.array_split(np.arange(len(dense_data)), num_batches)
!pip install tqdm
Looking in indexes: https://pypi.org/simple, https://pypi.ngc.nvidia.com
Requirement already satisfied: tqdm in /usr/local/lib/python3.8/dist-packages (4.62.3)
WARNING: Running pip as the 'root' user can result in broken permissions and conflicting behaviour with the system package manager. It is recommended to use a virtual environment instead: https://pip.pypa.io/warnings/venv
WARNING: You are using pip version 21.2.4; however, version 21.3.1 is available.
You should consider upgrading via the '/usr/bin/python -m pip install --upgrade pip' command.
from tqdm import tqdm
labels = []
for batch_id in tqdm(batch_idx):
dense_data_batch = dense_data[batch_id]
cat_data_batch = cat_data[batch_id]
results = infer_batch(inference_session, dense_data_batch, cat_data_batch)
labels.extend(results)
len(labels)
2772486
Computing the test AUC value
ground_truth = nvtdata_test['target'].values
from sklearn.metrics import roc_auc_score
roc_auc_score(ground_truth, labels)
0.5565971533171648
Conclusion
In this notebook, we have walked you through the process of preprocessing the data, train a DLRM model with HugeCTR, then carrying out inference with the HugeCTR Python interface. Try this workflow on your data and let us know your feedback.