# Copyright 2021 NVIDIA Corporation. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
End-to-end session-based recommendation with TensorFlow
In recent years, several deep learning-based algorithms have been proposed for recommendation systems while its adoption in industry deployments have been steeply growing. In particular, NLP inspired approaches have been successfully adapted for sequential and session-based recommendation problems, which are important for many domains like e-commerce, news and streaming media. Session-Based Recommender Systems (SBRS) have been proposed to model the sequence of interactions within the current user session, where a session is a short sequence of user interactions typically bounded by user inactivity. They have recently gained popularity due to their ability to capture short-term or contextual user preferences towards items.
The field of NLP has evolved significantly within the last decade, particularly due to the increased usage of deep learning. As a result, state of the art NLP approaches have inspired RecSys practitioners and researchers to adapt those architectures, especially for sequential and session-based recommendation problems. Here, we leverage one of the state-of-the-art Transformer-based architecture, XLNet with Causal Language Modeling (CLM)
training technique. Causal LM is the task of predicting the token following a sequence of tokens, where the model only attends to the left context, i.e. models the probability of a token given the previous tokens in a sentence (Lample and Conneau, 2019).
In this end-to-end-session-based recommnender model example, we use Transformers4Rec
library, which leverages the popular HuggingFace’s Transformers NLP library and make it possible to experiment with cutting-edge implementation of such architectures for sequential and session-based recommendation problems. For detailed explanations of the building blocks of Transformers4Rec meta-architecture visit getting-started-session-based and tutorial example notebooks.
1. Model definition using Transformers4Rec
In the previous notebook, we have created sequential features and saved our processed data frames as parquet files, and now we use these processed parquet files to train a session-based recommendation model with XLNet architecture.
1.1 Import Libraries
import os
import glob
import cudf
import numpy as np
from nvtabular.loader.tensorflow import KerasSequenceLoader
from transformers4rec import tf as tr
from transformers4rec.tf.ranking_metric import NDCGAt, RecallAt
2021-12-06 20:25:03.688895: I tensorflow/core/platform/cpu_feature_guard.cc:142] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations: AVX2 AVX512F FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2021-12-06 20:25:04.778884: I tensorflow/core/common_runtime/gpu/gpu_device.cc:1510] Created device /job:localhost/replica:0/task:0/device:GPU:0 with 16254 MB memory: -> device: 0, name: Quadro GV100, pci bus id: 0000:15:00.0, compute capability: 7.0
# disable INFO and DEBUG logging everywhere
import logging
logging.disable(logging.WARNING)
# avoid numba warnings
from numba import config
config.CUDA_LOW_OCCUPANCY_WARNINGS = 0
1.2 Get the schema
The library uses a schema format to configure the input features and automatically creates the necessary layers. This protobuf text file contains the description of each input feature by defining: the name, the type, the number of elements of a list column, the cardinality of a categorical feature and the min and max values of each feature. In addition, the annotation field contains the tags such as specifying continuous
and categorical
features, the target
column or the item_id
feature, among others.
from merlin_standard_lib import Schema
SCHEMA_PATH = "schema_demo.pb"
schema = Schema().from_proto_text(SCHEMA_PATH)
!cat $SCHEMA_PATH
feature {
name: "session_id"
type: INT
int_domain {
name: "session_id"
min: 1
max: 9249733
is_categorical: false
}
annotation {
tag: "groupby_col"
}
}
feature {
name: "item_id-list_seq"
value_count {
min: 2
max: 185
}
type: INT
int_domain {
name: "item_id/list"
min: 1
max: 52742
is_categorical: true
}
annotation {
tag: "item_id"
tag: "list"
tag: "categorical"
tag: "item"
}
}
feature {
name: "category-list_seq"
value_count {
min: 2
max: 185
}
type: INT
int_domain {
name: "category-list_seq"
min: 1
max: 337
is_categorical: true
}
annotation {
tag: "list"
tag: "categorical"
tag: "item"
}
}
feature {
name: "product_recency_days_log_norm-list_seq"
value_count {
min: 2
max: 185
}
type: FLOAT
float_domain {
name: "product_recency_days_log_norm-list_seq"
min: -2.9177291
max: 1.5231701
}
annotation {
tag: "continuous"
tag: "list"
}
}
feature {
name: "et_dayofweek_sin-list_seq"
value_count {
min: 2
max: 185
}
type: FLOAT
float_domain {
name: "et_dayofweek_sin-list_seq"
min: 0.7421683
max: 0.9995285
}
annotation {
tag: "continuous"
tag: "time"
tag: "list"
}
}
We can select the subset of features we want to use for training the model by their tags or their names.
schema = schema.select_by_name(
['item_id-list_seq', 'category-list_seq', 'product_recency_days_log_norm-list_seq', 'et_dayofweek_sin-list_seq']
)
3.2 Define the end-to-end Session-based Transformer-based recommendation model
For session-based recommendation model definition, the end-to-end model definition requires four steps:
Instantiate TabularSequenceFeatures input-module from schema to prepare the embedding tables of categorical variables and project continuous features, if specified. In addition, the module provides different aggregation methods (e.g. ‘concat’, ‘elementwise-sum’) to merge input features and generate the sequence of interactions embeddings. The module also supports language modeling tasks to prepare masked labels for training and evaluation (e.g: ‘clm’ for causal language modeling).
Next, we need to define one or multiple prediction tasks. For this demo, we are going to use NextItemPredictionTask with
Causal Language modeling (CLM)
.Then we construct a
transformer_config
based on the architectures provided by Hugging Face Transformers framework.Finally we link the transformer-body to the inputs and the prediction tasks to get the final Tensorflow
Model
class.
For more details about the features supported by each sub-module, please check out the library documentation page.
max_sequence_length, d_model = 20, 320
# Define the evaluation top-N metrics and the cut-offs
metrics = [
NDCGAt(top_ks=[10, 20], labels_onehot=True),
RecallAt(top_ks=[10, 20], labels_onehot=True)
]
# Define input module to process tabular input-features and to prepare masked inputs
input_module = tr.TabularSequenceFeatures.from_schema(
schema,
max_sequence_length=max_sequence_length,
continuous_projection=64,
aggregation="concat",
d_output=d_model,
masking="clm",
)
# Define Next item prediction-task
prediction_task = tr.NextItemPredictionTask(weight_tying=True, metrics=metrics)
# Define the config of the XLNet Transformer architecture
transformer_config = tr.XLNetConfig.build(
d_model=d_model, n_head=8, n_layer=2, total_seq_length=max_sequence_length
)
# Get the end-to-end model
model = transformer_config.to_tf_model(input_module, prediction_task)
model
Model()
3.3. Daily Fine-Tuning: Training over a time window¶
Now that the model is defined, we are now going to launch training. In this example, we will conduct a time-based finetuning by iteratively training and evaluating using a sliding time window: At each iteration, we use training data of a specific time index $t$ to train the model then we evaluate on the validation data of next index $t + 1$. Particularly, we set start time to 178 and end time to 180. Note that, we are using tf.keras’ model.fit()
and model.evaluate()
methods, where we train the model with model.fit(), and evaluate it with model.evaluate().
Sets DataLoader
We use the NVTabular KerasSequenceLoader
Dataloader for optimized loading of multiple features from input parquet files. In our experiments, we see a speed-up by 9x of the same training workflow with NVTabular dataloader. You can learn more about this data loader here and here.
# Define categorical and continuous columns
x_cat_names = ['item_id-list_seq', 'category-list_seq']
x_cont_names = ['product_recency_days_log_norm-list_seq', 'et_dayofweek_sin-list_seq']
# dictionary representing max sequence length for each column
sparse_features_max = {
fname: 20
for fname in x_cat_names + x_cont_names
}
def get_dataloader(paths_or_dataset, batch_size=384):
dataloader = KerasSequenceLoader(
paths_or_dataset,
batch_size=batch_size,
label_names=None,
cat_names=x_cat_names,
cont_names=x_cont_names,
sparse_names=list(sparse_features_max.keys()),
sparse_max=sparse_features_max,
sparse_as_dense=True,
)
return dataloader.map(lambda X, y: (X, []))
The reason we set the targets to [] in the data-loader because the true item labels are computed internally by the MaskSequence
class.
import tensorflow as tf
opt = tf.keras.optimizers.Adam(learning_rate=0.0005)
# set it to True if to run the model eagerly
model.compile(optimizer=opt, run_eagerly=False)
OUTPUT_DIR = os.environ.get("OUTPUT_DIR", './preproc_sessions_by_day')
start_time_window_index = 178
final_time_window_index = 180
# Iterating over days of one week
for time_index in range(start_time_window_index, final_time_window_index):
# Set data
time_index_train = time_index
time_index_eval = time_index + 1
train_paths = glob.glob(os.path.join(OUTPUT_DIR, f"{time_index_train}/train.parquet"))
eval_paths = glob.glob(os.path.join(OUTPUT_DIR, f"{time_index_eval}/valid.parquet"))
# Train on day related to time_index
print('*' * 20)
print("Launch training for day %s are:" %time_index)
print('*' * 20 + '\n')
train_loader = get_dataloader(train_paths, batch_size=384)
losses = model.fit(train_loader, epochs=5)
model.reset_metrics()
# Evaluate on the following day
eval_loader = get_dataloader(eval_paths, batch_size=512)
eval_metrics = model.evaluate(eval_loader, return_dict=True)
print('*'*20)
print("Eval results for day %s are:\t" %time_index_eval)
print('\n' + '*' * 20 + '\n')
for key in sorted(eval_metrics.keys()):
print(" %s = %s" % (key, str(eval_metrics[key])))
********************
Launch training for day 178 are:
********************
2021-12-06 20:25:15.775103: I tensorflow/stream_executor/cuda/cuda_dnn.cc:369] Loaded cuDNN version 8204
2021-12-06 20:25:15.920777: I tensorflow/compiler/mlir/mlir_graph_optimization_pass.cc:185] None of the MLIR Optimization Passes are enabled (registered 2)
Epoch 1/5
75/75 [==============================] - 28s 231ms/step - train_ndcg@10: 0.0650 - train_ndcg@20: 0.0712 - train_recall@10: 0.0931 - train_recall@20: 0.1178 - loss: 9.2902 - regularization_loss: 0.0000e+00 - total_loss: 9.2902
Epoch 2/5
75/75 [==============================] - 20s 229ms/step - train_ndcg@10: 0.5058 - train_ndcg@20: 0.5179 - train_recall@10: 0.5796 - train_recall@20: 0.6275 - loss: 3.9481 - regularization_loss: 0.0000e+00 - total_loss: 3.9481
Epoch 3/5
75/75 [==============================] - 20s 229ms/step - train_ndcg@10: 0.6742 - train_ndcg@20: 0.6805 - train_recall@10: 0.7266 - train_recall@20: 0.7520 - loss: 2.6460 - regularization_loss: 0.0000e+00 - total_loss: 2.6460
Epoch 4/5
75/75 [==============================] - 20s 226ms/step - train_ndcg@10: 0.7045 - train_ndcg@20: 0.7107 - train_recall@10: 0.7525 - train_recall@20: 0.7773 - loss: 2.3314 - regularization_loss: 0.0000e+00 - total_loss: 2.3314
Epoch 5/5
75/75 [==============================] - 20s 226ms/step - train_ndcg@10: 0.7167 - train_ndcg@20: 0.7237 - train_recall@10: 0.7655 - train_recall@20: 0.7930 - loss: 2.1691 - regularization_loss: 0.0000e+00 - total_loss: 2.1691
6/6 [==============================] - 4s 276ms/step - eval_ndcg@10: 0.5293 - eval_ndcg@20: 0.5330 - eval_recall@10: 0.5957 - eval_recall@20: 0.6103 - loss: 3.9674 - regularization_loss: 0.0000e+00 - total_loss: 3.9674
********************
Eval results for day 179 are:
********************
eval_ndcg@10 = 0.16843447089195251
eval_ndcg@20 = 0.16843447089195251
eval_recall@10 = 0.3142857253551483
eval_recall@20 = 0.3142857253551483
loss = 6.932893753051758
regularization_loss = 0
total_loss = 6.932893753051758
********************
Launch training for day 179 are:
********************
Epoch 1/5
54/54 [==============================] - 15s 234ms/step - train_ndcg@10: 0.6677 - train_ndcg@20: 0.6739 - train_recall@10: 0.7088 - train_recall@20: 0.7331 - loss: 2.8084 - regularization_loss: 0.0000e+00 - total_loss: 2.8084
Epoch 2/5
54/54 [==============================] - 15s 235ms/step - train_ndcg@10: 0.7027 - train_ndcg@20: 0.7093 - train_recall@10: 0.7493 - train_recall@20: 0.7751 - loss: 2.4409 - regularization_loss: 0.0000e+00 - total_loss: 2.4409
Epoch 3/5
54/54 [==============================] - 15s 234ms/step - train_ndcg@10: 0.7271 - train_ndcg@20: 0.7339 - train_recall@10: 0.7733 - train_recall@20: 0.8000 - loss: 2.2137 - regularization_loss: 0.0000e+00 - total_loss: 2.2137
Epoch 4/5
54/54 [==============================] - 15s 231ms/step - train_ndcg@10: 0.7432 - train_ndcg@20: 0.7493 - train_recall@10: 0.7904 - train_recall@20: 0.8145 - loss: 2.0358 - regularization_loss: 0.0000e+00 - total_loss: 2.0358
Epoch 5/5
54/54 [==============================] - 15s 240ms/step - train_ndcg@10: 0.7558 - train_ndcg@20: 0.7619 - train_recall@10: 0.8051 - train_recall@20: 0.8291 - loss: 1.8771 - regularization_loss: 0.0000e+00 - total_loss: 1.8771
5/5 [==============================] - 2s 303ms/step - eval_ndcg@10: 0.5249 - eval_ndcg@20: 0.5308 - eval_recall@10: 0.5637 - eval_recall@20: 0.5873 - loss: 4.5000 - regularization_loss: 0.0000e+00 - total_loss: 4.5000
********************
Eval results for day 180 are:
********************
eval_ndcg@10 = 0.12385287880897522
eval_ndcg@20 = 0.13401004672050476
eval_recall@10 = 0.1855670064687729
eval_recall@20 = 0.22680412232875824
loss = 8.349146842956543
regularization_loss = 0
total_loss = 8.349146842956543
Exports the preprocessing workflow and model in the format required by Triton server:**
NVTabular’s export_tensorflow_ensemble()
function enables us to create model files and config files to be served to Triton Inference Server.
import nvtabular as nvt
workflow = nvt.Workflow.load('workflow_etl')
from nvtabular.inference.triton import export_tensorflow_ensemble
export_tensorflow_ensemble(
model,
workflow,
name="t4r_tf",
model_path='/workspace/TF4Rec/models/tf/',
label_columns=[],
sparse_max=sparse_features_max
)
/nvtabular/nvtabular/inference/triton/ensemble.py:80: UserWarning: TF model expects int64 for column category-list_seq, but workflow is producing type list. Overriding dtype in NVTabular workflow.
warnings.warn(
/nvtabular/nvtabular/inference/triton/ensemble.py:80: UserWarning: TF model expects float32 for column et_dayofweek_sin-list_seq, but workflow is producing type list. Overriding dtype in NVTabular workflow.
warnings.warn(
/nvtabular/nvtabular/inference/triton/ensemble.py:80: UserWarning: TF model expects int64 for column item_id-list_seq, but workflow is producing type list. Overriding dtype in NVTabular workflow.
warnings.warn(
/nvtabular/nvtabular/inference/triton/ensemble.py:80: UserWarning: TF model expects float32 for column product_recency_days_log_norm-list_seq, but workflow is producing type list. Overriding dtype in NVTabular workflow.
warnings.warn(
/nvtabular/nvtabular/inference/triton/ensemble.py:279: UserWarning: Column session_id is being generated by NVTabular workflow but is unused in t4r_tf_tf model
warnings.warn(
/nvtabular/nvtabular/inference/triton/ensemble.py:279: UserWarning: Column item_id-count is being generated by NVTabular workflow but is unused in t4r_tf_tf model
warnings.warn(
/nvtabular/nvtabular/inference/triton/ensemble.py:279: UserWarning: Column day_index is being generated by NVTabular workflow but is unused in t4r_tf_tf model
warnings.warn(
4. Serving Ensemble Model to the Triton Inference Server
NVIDIA Triton Inference Server (TIS) simplifies the deployment of AI models at scale in production. TIS provides a cloud and edge inferencing solution optimized for both CPUs and GPUs. It supports a number of different machine learning frameworks such as TensorFlow and PyTorch.
The last step of machine learning (ML)/deep learning (DL) pipeline is to deploy the ETL workflow and saved model to production. In the production setting, we want to transform the input data as done during training (ETL). We need to apply the same mean/std for continuous features and use the same categorical mapping to convert the categories to continuous integer before we use the DL model for a prediction. Therefore, we deploy the NVTabular workflow with the Tensorflow model as an ensemble model to Triton Inference. The ensemble model guarantees that the same transformation is applied to the raw inputs.
In this section, you will learn how to
to deploy saved NVTabular and Tensorflow models to Triton Inference Server
send requests for predictions and get responses.
4.1. Pull and Start Inference Container
At this point, we launch the Triton Server, and we will load the ensemble t4r_tf
to the inference server below.
Start triton server
You can start triton server with the command below. You need to provide correct path of the models folder.
tritonserver --model-repository=<path_to_models> --backend-config=tensorflow,version=2 --model-control-mode=explicit
Note: The model-repository path for our example is /workspace/TF4Rec/models/tf/
. The models haven’t been loaded, yet. We will request the Triton server to load the saved ensemble model, below.
Connect to the Triton Inference Server and check if the server is alive
import tritonhttpclient
try:
triton_client = tritonhttpclient.InferenceServerClient(url="localhost:8000", verbose=True)
print("client created.")
except Exception as e:
print("channel creation failed: " + str(e))
triton_client.is_server_live()
client created.
GET /v2/health/live, headers None
<HTTPSocketPoolResponse status=200 headers={'content-length': '0', 'content-type': 'text/plain'}>
/usr/local/lib/python3.8/dist-packages/tritonhttpclient/__init__.py:31: DeprecationWarning: The package `tritonhttpclient` is deprecated and will be removed in a future version. Please use instead `tritonclient.http`
warnings.warn(
True
Load raw data for inference
We select the last 50 interactions and filter out sessions with less than 2 interactions.
interactions_merged_df = cudf.read_parquet('/workspace/data/interactions_merged_df.parquet')
interactions_merged_df = interactions_merged_df.sort_values('timestamp')
batch = interactions_merged_df[-50:]
sessions_to_use = batch.session_id.value_counts()
# ignore sessions with less than 2 interactions
filtered_batch = batch[batch.session_id.isin(sessions_to_use[sessions_to_use.values > 1].index.values)]
Send the request to triton server
triton_client.get_model_repository_index()
POST /v2/repository/index, headers None
<HTTPSocketPoolResponse status=200 headers={'content-type': 'application/json', 'content-length': '62'}>
bytearray(b'[{"name":"t4r_tf"},{"name":"t4r_tf_nvt"},{"name":"t4r_tf_tf"}]')
[{'name': 't4r_tf'}, {'name': 't4r_tf_nvt'}, {'name': 't4r_tf_tf'}]
Load the ensemble model to triton
If all models are loaded successfully, you should be seeing successfully loaded
status next to each model name on your terminal.
triton_client.load_model(model_name="t4r_tf")
POST /v2/repository/models/t4r_tf/load, headers None
<HTTPSocketPoolResponse status=200 headers={'content-type': 'application/json', 'content-length': '0'}>
Loaded model 't4r_tf'
import nvtabular.inference.triton as nvt_triton
import tritonclient.grpc as grpcclient
inputs = nvt_triton.convert_df_to_triton_input(filtered_batch.columns, filtered_batch, grpcclient.InferInput)
output_names = ["output_1"]
outputs = []
for col in output_names:
outputs.append(grpcclient.InferRequestedOutput(col))
MODEL_NAME_NVT = "t4r_tf"
with grpcclient.InferenceServerClient("localhost:8001") as client:
response = client.infer(MODEL_NAME_NVT, inputs)
print(col, ':\n', response.as_numpy(col))
output_1 :
[[-23.45367 -20.654154 -12.082266 ... -19.95935 -17.551102 -21.46679 ]
[-15.886233 -16.107304 -5.136529 ... -15.80361 -15.616753 -14.409036]
[-20.026276 -16.286356 -9.805498 ... -17.55959 -15.639177 -17.18394 ]
...
[-20.121122 -16.542429 -5.615509 ... -15.828735 -16.362495 -19.196321]
[-19.131699 -16.101086 -6.773807 ... -16.170242 -15.505514 -15.895823]
[-16.898193 -15.042047 -7.174122 ... -14.768758 -16.249992 -15.368987]]
def visualize_response(batch, response, top_k, session_col="session_id"):
"""
Util function to extract top-k encoded item-ids from logits
Parameters
"""
sessions = batch[session_col].drop_duplicates().values
predictions = response.as_numpy("output_1")
top_preds = np.argpartition(predictions, -top_k, axis=1)[:, -top_k:]
for session, next_items in zip(sessions, top_preds):
print(
"- Top-%s predictions for session `%s`: %s\n"
% (top_k, session, " || ".join([str(e) for e in next_items]))
)
Visualise top-k predictions
visualize_response(filtered_batch, response, top_k=5, session_col='session_id')
- Top-5 predictions for session `11257991`: 10790 || 202 || 5168 || 9334 || 4111
- Top-5 predictions for session `11270119`: 1697 || 18021 || 101 || 2 || 5856
- Top-5 predictions for session `11311424`: 6988 || 8622 || 5064 || 1297 || 6603
- Top-5 predictions for session `11336059`: 28411 || 6607 || 19414 || 1435 || 2259
- Top-5 predictions for session `11394056`: 6510 || 8769 || 3590 || 18021 || 5856
- Top-5 predictions for session `11399751`: 1470 || 2336 || 7613 || 1853 || 1657
- Top-5 predictions for session `11401481`: 28 || 1436 || 5892 || 1229 || 2196
- Top-5 predictions for session `11421333`: 5168 || 9334 || 3233 || 2336 || 3632
- Top-5 predictions for session `11425751`: 2336 || 4541 || 18021 || 74 || 7613
- Top-5 predictions for session `11445777`: 2888 || 2016 || 1342 || 184 || 664
- Top-5 predictions for session `11457123`: 1853 || 7613 || 13292 || 1470 || 25106
- Top-5 predictions for session `11467406`: 5168 || 745 || 2844 || 8769 || 2375
- Top-5 predictions for session `11493827`: 366 || 1294 || 18021 || 2067 || 2336
- Top-5 predictions for session `11528554`: 540 || 4067 || 2034 || 7613 || 500
- Top-5 predictions for session `11561822`: 771 || 6607 || 1306 || 5958 || 18021
As you noticed, we first got prediction results (logits) from the trained model head, and then by using a handy util function visualize_response
we extracted top-k encoded item-ids from logits. Basically, we generated recommended items for a given session.
This is the end of these example notebooks. You successfully
performed feature engineering with NVTabular
trained transformer architecture based session-based recommendation models with Transformers4Rec
deployed a trained model to Triton Inference Server, sent request and got responses from the server.
Unload models
triton_client.unload_model(model_name="t4r_tf")
triton_client.unload_model(model_name="t4r_tf_nvt")
triton_client.unload_model(model_name="t4r_tf_tf")
References
Merlin Transformers4rec: https://github.com/NVIDIA-Merlin/Transformers4Rec
Merlin NVTabular: https://github.com/NVIDIA-Merlin/NVTabular/tree/main/nvtabular
Triton inference server: https://github.com/triton-inference-server
Guillaume Lample, and Alexis Conneau. “Cross-lingual language model pretraining.” arXiv preprint arXiv:1901.07291