# Copyright 2022 NVIDIA Corporation. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ======================================================================

# Each user is responsible for checking the content of datasets and the
# applicable licenses and determining if suitable for the intended use.
https://developer.download.nvidia.com/notebooks/dlsw-notebooks/merlin_transformers4rec_getting-started-session-based-01-etl-with-nvtabular/nvidia_logo.png

ETL with NVTabular

In this notebook we are going to generate synthetic data and then create sequential features with NVTabular. Such data will be used in the next notebook to train a session-based recommendation model.

NVTabular is a feature engineering and preprocessing library for tabular data designed to quickly and easily manipulate terabyte scale datasets used to train deep learning based recommender systems. It provides a high level abstraction to simplify code and accelerates computation on the GPU using the RAPIDS cuDF library.

Import required libraries

import os
import glob

import numpy as np
import pandas as pd

import nvtabular as nvt
from nvtabular.ops import *
from merlin.schema.tags import Tags
/usr/local/lib/python3.8/dist-packages/tqdm/auto.py:22: TqdmWarning: IProgress not found. Please update jupyter and ipywidgets. See https://ipywidgets.readthedocs.io/en/stable/user_install.html
  from .autonotebook import tqdm as notebook_tqdm

Define Input/Output Path

INPUT_DATA_DIR = os.environ.get("INPUT_DATA_DIR", "/workspace/data/")

Create a Synthetic Input Data

NUM_ROWS = os.environ.get("NUM_ROWS", 100000)
long_tailed_item_distribution = np.clip(np.random.lognormal(3., 1., int(NUM_ROWS)).astype(np.int32), 1, 50000)
# generate random item interaction features 
df = pd.DataFrame(np.random.randint(70000, 90000, int(NUM_ROWS)), columns=['session_id'])
df['item_id'] = long_tailed_item_distribution

# generate category mapping for each item-id
df['category'] = pd.cut(df['item_id'], bins=334, labels=np.arange(1, 335)).astype(np.int32)
df['age_days'] = np.random.uniform(0, 1, int(NUM_ROWS)).astype(np.float32)
df['weekday_sin']= np.random.uniform(0, 1, int(NUM_ROWS)).astype(np.float32)

# generate day mapping for each session 
map_day = dict(zip(df.session_id.unique(), np.random.randint(1, 10, size=(df.session_id.nunique()))))
df['day'] =  df.session_id.map(map_day)

Visualize couple of rows of the synthetic dataset:

df.head()
session_id item_id category age_days weekday_sin day
0 75772 3 1 0.893401 0.830613 7
1 82179 77 21 0.892670 0.745608 5
2 83356 19 5 0.189608 0.011347 5
3 88757 177 48 0.059060 0.771164 7
4 82165 20 6 0.910964 0.449554 3

Feature Engineering with NVTabular

Deep Learning models require dense input features. Categorical features are sparse, and need to be represented by dense embeddings in the model. To allow for that, categorical features first need to be encoded as contiguous integers (0, ..., |C|), where |C| is the feature cardinality (number of unique values), so that their embeddings can be efficiently stored in embedding layers. We will use NVTabular to preprocess the categorical features, so that all categorical columns are encoded as contiguous integers. Note that the Categorify op encodes OOVs or nulls to 0 automatically. In our synthetic dataset we do not have any nulls. On the other hand 0 is also used for padding the sequences in input block, therefore, you can set start_index=1 arg in the Categorify op if you want the encoded null or OOV values to start from 1 instead of 0 because we reserve 0 for padding the sequence features.

Here our goal is to create sequential features. To do so, we are grouping the features together at the session level in the following cell. In this synthetically generated example dataset, we do not have a timestamp column, but if we had one (that’s the case for most real-world datasets), we would be sorting the interactions by the timestamp column as in this example notebook. Note that we also trim each feature sequence in a session to a certain length. Here, we use the NVTabular library so that we can easily preprocess and create features on GPU with a few lines.

SESSIONS_MAX_LENGTH =20

# Categorify categorical features
categ_feats = ['session_id', 'item_id', 'category'] >> nvt.ops.Categorify()

# Define Groupby Workflow
groupby_feats = categ_feats + ['day', 'age_days', 'weekday_sin']

# Group interaction features by session
groupby_features = groupby_feats >> nvt.ops.Groupby(
    groupby_cols=["session_id"], 
    aggs={
        "item_id": ["list", "count"],
        "category": ["list"],     
        "day": ["first"],
        "age_days": ["list"],
        'weekday_sin': ["list"],
        },
    name_sep="-")

# Select and truncate the sequential features
sequence_features_truncated = (
    groupby_features['category-list']
    >> nvt.ops.ListSlice(-SESSIONS_MAX_LENGTH, pad=True) 
)

sequence_features_truncated_item = (
    groupby_features['item_id-list']
    >> nvt.ops.ListSlice(-SESSIONS_MAX_LENGTH, pad=True) 
    >> TagAsItemID()
)  
sequence_features_truncated_cont = (
    groupby_features['age_days-list', 'weekday_sin-list'] 
    >> nvt.ops.ListSlice(-SESSIONS_MAX_LENGTH, pad=True) 
    >> nvt.ops.AddMetadata(tags=[Tags.CONTINUOUS])
)

# Filter out sessions with length 1 (not valid for next-item prediction training and evaluation)
MINIMUM_SESSION_LENGTH = 2
selected_features = (
    groupby_features['item_id-count', 'day-first', 'session_id'] + 
    sequence_features_truncated_item +
    sequence_features_truncated + 
    sequence_features_truncated_cont
)
    
filtered_sessions = selected_features >> nvt.ops.Filter(f=lambda df: df["item_id-count"] >= MINIMUM_SESSION_LENGTH)

seq_feats_list = filtered_sessions['item_id-list', 'category-list', 'age_days-list', 'weekday_sin-list'] >>  nvt.ops.ValueCount()


workflow = nvt.Workflow(filtered_sessions['session_id', 'day-first', 'item_id-count'] + seq_feats_list)

dataset = nvt.Dataset(df, cpu=False)
# Generate statistics for the features
workflow.fit(dataset)
# Apply the preprocessing and return an NVTabular dataset
sessions_ds = workflow.transform(dataset)
# Convert the NVTabular dataset to a Dask cuDF dataframe (`to_ddf()`) and then to cuDF dataframe (`.compute()`)
sessions_gdf = sessions_ds.to_ddf().compute()
/usr/local/lib/python3.8/dist-packages/merlin/schema/tags.py:148: UserWarning: Compound tags like Tags.ITEM_ID have been deprecated and will be removed in a future version. Please use the atomic versions of these tags, like [<Tags.ITEM: 'item'>, <Tags.ID: 'id'>].
  warnings.warn(
/usr/local/lib/python3.8/dist-packages/merlin/schema/tags.py:148: UserWarning: Compound tags like Tags.ITEM_ID have been deprecated and will be removed in a future version. Please use the atomic versions of these tags, like [<Tags.ITEM: 'item'>, <Tags.ID: 'id'>].
  warnings.warn(
/usr/local/lib/python3.8/dist-packages/merlin/schema/tags.py:148: UserWarning: Compound tags like Tags.ITEM_ID have been deprecated and will be removed in a future version. Please use the atomic versions of these tags, like [<Tags.ITEM: 'item'>, <Tags.ID: 'id'>].
  warnings.warn(
sessions_gdf.head(3)
session_id day-first item_id-count item_id-list category-list age_days-list weekday_sin-list
0 1 7 16 [10, 33, 35, 68, 19, 4, 6, 4, 37, 104, 19, 30,... [5, 9, 11, 21, 4, 2, 2, 2, 11, 29, 4, 9, 7, 14... [0.2510539, 0.12130147, 0.61642516, 0.45710337... [0.66570914, 0.6149484, 0.98552155, 0.10168565...
1 2 9 14 [50, 99, 18, 39, 47, 43, 5, 75, 3, 8, 163, 30,... [14, 26, 6, 10, 14, 12, 1, 19, 1, 4, 45, 9, 2,... [0.17742382, 0.81522274, 0.75508606, 0.1395472... [0.29890507, 0.6564371, 0.96094626, 0.6960773,...
2 3 5 14 [26, 15, 51, 12, 40, 9, 54, 57, 8, 376, 57, 24... [7, 3, 13, 4, 10, 2, 15, 15, 4, 136, 15, 8, 6,... [0.21535604, 0.76454645, 0.82518786, 0.0410606... [0.037031054, 0.3980902, 0.95815617, 0.7962937...

It is possible to save the preprocessing workflow. That is useful to apply the same preprocessing to other data (with the same schema) and also to deploy the session-based recommendation pipeline to Triton Inference Server.

workflow.output_schema
name tags dtype is_list is_ragged properties.num_buckets properties.freq_threshold properties.max_size properties.start_index properties.cat_path properties.domain.min properties.domain.max properties.domain.name properties.embedding_sizes.cardinality properties.embedding_sizes.dimension properties.value_count.min properties.value_count.max
0 session_id (Tags.CATEGORICAL) DType(name='int64', element_type=<ElementType.... False False NaN 0.0 0.0 0.0 .//categories/unique.session_id.parquet 0.0 19855.0 session_id 19856.0 408.0 NaN NaN
1 day-first () DType(name='int64', element_type=<ElementType.... False False NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN
2 item_id-count (Tags.CATEGORICAL) DType(name='int32', element_type=<ElementType.... False False NaN 0.0 0.0 0.0 .//categories/unique.item_id.parquet 0.0 495.0 item_id 496.0 52.0 NaN NaN
3 item_id-list (Tags.ITEM, Tags.ID, Tags.ITEM_ID, Tags.LIST, ... DType(name='int64', element_type=<ElementType.... True False NaN 0.0 0.0 0.0 .//categories/unique.item_id.parquet 0.0 495.0 item_id 496.0 52.0 20.0 20.0
4 category-list (Tags.CATEGORICAL, Tags.LIST) DType(name='int64', element_type=<ElementType.... True False NaN 0.0 0.0 0.0 .//categories/unique.category.parquet 0.0 178.0 category 179.0 29.0 20.0 20.0
5 age_days-list (Tags.LIST, Tags.CONTINUOUS) DType(name='float32', element_type=<ElementTyp... True False NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN 20.0 20.0
6 weekday_sin-list (Tags.LIST, Tags.CONTINUOUS) DType(name='float32', element_type=<ElementTyp... True False NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN 20.0 20.0

The following will generate schema.pbtxt file in the provided folder.

workflow.fit_transform(dataset).to_parquet(os.path.join(INPUT_DATA_DIR, "processed_nvt"))
/usr/local/lib/python3.8/dist-packages/merlin/schema/tags.py:148: UserWarning: Compound tags like Tags.ITEM_ID have been deprecated and will be removed in a future version. Please use the atomic versions of these tags, like [<Tags.ITEM: 'item'>, <Tags.ID: 'id'>].
  warnings.warn(
workflow.save(os.path.join(INPUT_DATA_DIR, "workflow_etl"))

Export pre-processed data by day

In this example we are going to split the preprocessed parquet files by days, to allow for temporal training and evaluation. There will be a folder for each day and three parquet files within each day folder: train.parquet, validation.parquet and test.parquet.

OUTPUT_DIR = os.environ.get("OUTPUT_DIR",os.path.join(INPUT_DATA_DIR, "sessions_by_day"))
from transformers4rec.data.preprocessing import save_time_based_splits
save_time_based_splits(data=nvt.Dataset(sessions_gdf),
                       output_dir= OUTPUT_DIR,
                       partition_col='day-first',
                       timestamp_col='session_id', 
                      )
Creating time-based splits: 100%|█████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 9/9 [00:00<00:00, 19.93it/s]

Checking the preprocessed outputs

TRAIN_PATHS = os.path.join(OUTPUT_DIR, "1", "train.parquet")
df = pd.read_parquet(TRAIN_PATHS)
df
session_id item_id-count item_id-list category-list age_days-list weekday_sin-list
0 6 14 [7, 11, 73, 6, 31, 5, 19, 63, 52, 1, 28, 19, 2... [2, 5, 19, 2, 9, 1, 4, 17, 13, 1, 7, 4, 8, 5, ... [0.84568787, 0.038363576, 0.7171949, 0.0886422... [0.9072822, 0.55461484, 0.2662152, 0.6641106, ...
1 9 14 [42, 22, 30, 26, 19, 9, 53, 5, 51, 5, 19, 3, 2... [12, 6, 9, 7, 4, 2, 15, 1, 13, 1, 4, 1, 8, 3, ... [0.4074032, 0.7792388, 0.49303588, 0.027537243... [0.65899414, 0.42423004, 0.20023833, 0.6077999...
2 14 13 [7, 60, 2, 7, 28, 2, 25, 24, 151, 74, 112, 31,... [2, 16, 1, 2, 7, 1, 8, 8, 40, 24, 29, 9, 17, 0... [0.9137222, 0.77429664, 0.4397028, 0.41606435,... [0.3428851, 0.9583178, 0.07852303, 0.8921527, ...
4 39 12 [67, 1, 16, 31, 21, 9, 14, 3, 8, 22, 23, 50, 0... [17, 1, 3, 9, 6, 2, 3, 1, 4, 6, 8, 14, 0, 0, 0... [0.7679332, 0.7644972, 0.8533882, 0.67827713, ... [0.87136024, 0.92441916, 0.27371496, 0.4557360...
5 52 12 [31, 17, 49, 13, 49, 16, 23, 85, 23, 164, 28, ... [9, 3, 13, 5, 13, 3, 8, 23, 8, 51, 7, 2, 0, 0,... [0.32460424, 0.9527502, 0.77985513, 0.91916, 0... [0.12728073, 0.87657094, 0.7073715, 0.9970732,...
... ... ... ... ... ... ...
2145 19158 2 [34, 2, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,... [9, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, ... [0.44386843, 0.17579898, 0.0, 0.0, 0.0, 0.0, 0... [0.58763367, 0.997146, 0.0, 0.0, 0.0, 0.0, 0.0...
2146 19165 2 [1, 60, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,... [1, 16, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,... [0.45839304, 0.15023704, 0.0, 0.0, 0.0, 0.0, 0... [0.47192892, 0.6211317, 0.0, 0.0, 0.0, 0.0, 0....
2148 19183 2 [23, 29, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0... [8, 7, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, ... [0.7376038, 0.7187783, 0.0, 0.0, 0.0, 0.0, 0.0... [0.4954509, 0.5675057, 0.0, 0.0, 0.0, 0.0, 0.0...
2149 19199 2 [52, 8, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,... [13, 4, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,... [0.96259063, 0.8100127, 0.0, 0.0, 0.0, 0.0, 0.... [0.3484375, 0.10194607, 0.0, 0.0, 0.0, 0.0, 0....
2150 19221 2 [3, 6, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, ... [1, 2, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, ... [0.9268296, 0.71968925, 0.0, 0.0, 0.0, 0.0, 0.... [0.8299869, 0.7187812, 0.0, 0.0, 0.0, 0.0, 0.0...

1718 rows × 6 columns

import gc
del df
gc.collect()
565

You have just created session-level features to train a session-based recommendation model using NVTabular. Now you can move to the the next notebook,02-session-based-XLNet-with-PyT.ipynb to train a session-based recommendation model using XLNet, one of the state-of-the-art NLP model. Please shut down this kernel to free the GPU memory before you start the next one.