# Copyright 2021 NVIDIA Corporation. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ================================
Building Intelligent Recommender Systems with Merlin
Overview
Recommender Systems (RecSys) are the engine of the modern internet and the catalyst for human decisions. Building a recommendation system is challenging because it requires multiple stages (data preprocessing, offline training, item retrieval, filtering, ranking, ordering, etc.) to work together seamlessly and efficiently. The biggest challenges for new practitioners are the lack of understanding around what RecSys look like in the real world, and the gap between examples of simple models and a production-ready end-to-end recommender systems.
The figure below represents a four-stage recommender systems. This is more complex process than only training a single model and deploying it, and it is much more realistic and closer to what’s happening in the real-world recommender production systems.
In these series of notebooks, we are going to showcase how we can deploy a four-stage recommender systems using Merlin Systems library easily on Triton Inference Server. Let’s go over the concepts in the figure briefly.
Retrieval: This is the step to narrow down millions of items into thousands of candidates. We are going to train a Two-Tower item retrieval model to retrieve the relevant top-K candidate items.
Filtering: This step is to exclude the already interacted or undesirable items from the candidate items set or to apply business logic rules. Although this is an important step, for this example we skip this step.
Scoring: This is also known as ranking. Here the retrieved and filtered candidate items are being scored. We are going to train a ranking model to be able to use at our scoring step.
Ordering: At this stage, we can order the final set of items that we want to recommend to the user. Here, we’re able to align the output of the model with business needs, constraints, or criteria.
To learn more about the four-stage recommender systems, you can listen to Even Oldridge’s Moving Beyond Recommender Models talk at KDD’21 and read more in this blog post.
Learning objectives
Understanding four stages of recommender systems
Training retrieval and ranking models with Merlin Models
Setting up feature store and approximate nearest neighbours (ANN) search libraries
Deploying trained models to Triton Inference Server with Merlin Systems
In addition to NVIDIA Merlin libraries and the Triton Inference Server client library, we use two external libraries in these series of examples:
Feast: an end-to-end open source feature store library for machine learning
Faiss: a library for efficient similarity search and clustering of dense vectors
You can find more information about Feast feature store
and Faiss
libraries in the next notebook. Please follow the instructions in the README.md file to install these libraries.
Import required libraries and functions
Compatibility:
These notebooks are developed and tested using our latest inference container on NVIDIA’s docker registry.
%pip install tensorflow "feast<0.20" faiss-gpu
import os
os.environ["TF_GPU_ALLOCATOR"]="cuda_malloc_async"
import glob
import gc
import nvtabular as nvt
from nvtabular.ops import *
from merlin.models.utils.example_utils import workflow_fit_transform
from merlin.schema.tags import Tags
import merlin.models.tf as mm
from merlin.io.dataset import Dataset
import tensorflow as tf
2022-05-05 14:34:58.341639: I tensorflow/core/platform/cpu_feature_guard.cc:151] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations: AVX2 AVX512F FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2022-05-05 14:35:00.407290: I tensorflow/core/common_runtime/gpu/gpu_process_state.cc:214] Using CUDA malloc Async allocator for GPU: 0
2022-05-05 14:35:00.407448: I tensorflow/core/common_runtime/gpu/gpu_device.cc:1525] Created device /job:localhost/replica:0/task:0/device:GPU:0 with 30667 MB memory: -> device: 0, name: Quadro GV100, pci bus id: 0000:15:00.0, compute capability: 7.0
2022-05-05 14:35:00.408163: I tensorflow/core/common_runtime/gpu/gpu_process_state.cc:214] Using CUDA malloc Async allocator for GPU: 1
2022-05-05 14:35:00.408224: I tensorflow/core/common_runtime/gpu/gpu_device.cc:1525] Created device /job:localhost/replica:0/task:0/device:GPU:1 with 30315 MB memory: -> device: 1, name: Quadro GV100, pci bus id: 0000:2d:00.0, compute capability: 7.0
# disable INFO and DEBUG logging everywhere
import logging
logging.disable(logging.WARNING)
In this example notebook, we will generate the synthetic train and test datasets mimicking the real Ali-CCP: Alibaba Click and Conversion Prediction dataset to build our recommender system models.
First, we define our input and output paths.
DATA_FOLDER = os.environ.get("DATA_FOLDER", "/workspace/data/")
output_path = os.path.join(DATA_FOLDER, 'processed/ranking')
Then, we use generate_data
utility function to generate synthetic dataset.
from merlin.datasets.synthetic import generate_data
NUM_ROWS = 100000
train, valid = generate_data("aliccp-raw", int(NUM_ROWS), set_sizes=(0.7, 0.3))
/core/merlin/io/dataset.py:251: UserWarning: Initializing an NVTabular Dataset in CPU mode.This is an experimental feature with extremely limited support!
warnings.warn(
If you would like to use the real ALI-CCP dataset, you can use get_aliccp() function instead. This function takes the raw csv files, and generate parquet files that can be directly fed to NVTabular workflow above.
Feature Engineering with NVTabular
We are going to process our raw categorical features by encoding them using Categorify()
operator and tag the features with user
or item
tags in the schema file. To learn more about NVTabular and the schema object visit this example notebook in the Merlin Models repo.
%%time
user_id = ["user_id"] >> Categorify(dtype='int32') >> TagAsUserID()
item_id = ["item_id"] >> Categorify(dtype='int32') >> TagAsItemID()
item_features = ["item_category", "item_shop", "item_brand"] >> Categorify(dtype='int32') >> TagAsItemFeatures()
user_features = ['user_shops', 'user_profile', 'user_group',
'user_gender', 'user_age', 'user_consumption_2', 'user_is_occupied',
'user_geography', 'user_intentions', 'user_brands', 'user_categories'] \
>> Categorify(dtype='int32') >> TagAsUserFeatures()
targets = ["click"] >> AddMetadata(tags=[Tags.BINARY_CLASSIFICATION, "target"])
outputs = user_id+item_id+item_features+user_features+targets
CPU times: user 151 µs, sys: 45 µs, total: 196 µs
Wall time: 200 µs
Let’s call transform_aliccp
utility function to be able to perform fit
and transform
steps on the raw dataset applying the operators defined in the NVTabular workflow pipeline below, and also save our workflow model. After fit and transform, the processed parquet files are saved to output_path.
from merlin.datasets.ecommerce import transform_aliccp
transform_aliccp((train, valid), output_path, nvt_workflow=outputs, workflow_name='workflow_ranking')
/core/merlin/io/dataset.py:251: UserWarning: Initializing an NVTabular Dataset in CPU mode.This is an experimental feature with extremely limited support!
warnings.warn(
Training a Ranking Model with DLRM
NVTabular exported the schema file, schema.pbtxt
a protobuf text file, of our processed dataset. To learn more about the schema object and schema file you can explore 02-Merlin-Models-and-NVTabular-integration.ipynb notebook.
We use the schema
object to define our model.
# define train and valid dataset objects
train = Dataset(os.path.join(output_path, 'train', '*.parquet'), part_size="500MB")
valid = Dataset(os.path.join(output_path, 'valid', '*.parquet'), part_size="500MB")
# define schema object
schema = train.schema
target_column = schema.select_by_tag(Tags.TARGET).column_names[0]
target_column
'click'
Deep Learning Recommendation Model (DLRM) architecture is a popular neural network model originally proposed by Facebook in 2019. The model was introduced as a personalization deep learning model that uses embeddings to process sparse features that represent categorical data and a multilayer perceptron (MLP) to process dense features, then interacts these features explicitly using the statistical techniques proposed in here. To learn more about DLRM architetcture please visit Exploring-different-models
notebook in the Merlin Models GH repo.
model = mm.DLRMModel(
schema,
embedding_dim=64,
bottom_block=mm.MLPBlock([128, 64]),
top_block=mm.MLPBlock([128, 64, 32]),
prediction_tasks=mm.BinaryClassificationTask(target_column, metrics=[tf.keras.metrics.AUC()])
)
model.compile(optimizer='adam', run_eagerly=False)
model.fit(train, validation_data=valid, batch_size=16*1024)
2022-05-05 14:35:03.403519: W tensorflow/python/util/util.cc:368] Sets are not currently considered sequences, but this may change in the future, so consider avoiding using them.
5/5 [==============================] - 6s 217ms/step - auc: 0.4983 - loss: 0.6931 - regularization_loss: 0.0000e+00 - total_loss: 0.6931 - val_auc: 0.4995 - val_loss: 0.6932 - val_regularization_loss: 0.0000e+00 - val_total_loss: 0.6932
2022-05-05 14:35:10.085952: W tensorflow/core/grappler/optimizers/loop_optimizer.cc:907] Skipping loop optimization for Merge node with control input: cond/branch_executed/_13
<keras.callbacks.History at 0x7fa0bab07520>
We will create the feature repo in the current working directory, which is BASE_DIR
for us.
# set up the base dir to for feature store
BASE_DIR = os.environ.get("BASE_DIR", "/Merlin/examples/Building-and-deploying-multi-stage-RecSys/")
Let’s save our DLRM model to be able to load back at the deployment stage.
model.save(os.path.join(BASE_DIR, 'dlrm'))
Training a Retrieval Model with Two-Tower Model
Now we move to the offline retrieval stage. We are going to train a Two-Tower model for item retrieval. To learn more about the Two-tower model you can visit 05-Retrieval-Model.ipynb.
output_path = os.path.join(DATA_FOLDER, 'processed/retrieval')
We select only positive interaction rows where click==1
in the dataset with Filter()
operator.
user_id = ["user_id"] >> Categorify(dtype='int32') >> TagAsUserID()
item_id = ["item_id"] >> Categorify(dtype='int32') >> TagAsItemID()
item_features = ["item_category", "item_shop", "item_brand"] >> Categorify(dtype='int32') >> TagAsItemFeatures()
user_features = ['user_shops', 'user_profile', 'user_group',
'user_gender', 'user_age', 'user_consumption_2', 'user_is_occupied',
'user_geography', 'user_intentions', 'user_brands', 'user_categories'] \
>> Categorify(dtype='int32') >> TagAsUserFeatures()
inputs = user_id + item_id + item_features + user_features + ['click']
outputs = inputs >> Filter(f=lambda df: df["click"] == 1)
transform_aliccp((train, valid), output_path, nvt_workflow=outputs, workflow_name='workflow_retrieval')
/core/merlin/io/dataset.py:251: UserWarning: Initializing an NVTabular Dataset in CPU mode.This is an experimental feature with extremely limited support!
warnings.warn(
train_tt = Dataset(os.path.join(output_path, 'train', '*.parquet'))
valid_tt = Dataset(os.path.join(output_path, 'valid', '*.parquet'))
schema = train_tt.schema
schema = schema.select_by_tag([Tags.ITEM_ID, Tags.USER_ID, Tags.ITEM, Tags.USER])
model = mm.TwoTowerModel(
schema,
query_tower=mm.MLPBlock([128, 64], no_activation_last_layer=True),
loss="categorical_crossentropy",
samplers=[mm.InBatchSampler()],
embedding_options = mm.EmbeddingOptions(infer_embedding_sizes=True),
metrics=[mm.RecallAt(10), mm.NDCGAt(10)]
)
model.compile(optimizer='adam', run_eagerly=False)
model.fit(train_tt, validation_data=valid_tt, batch_size=1024*8, epochs=1)
4/5 [=======================>......] - ETA: 0s - recall_at_10: 0.0199 - ndcg_10: 0.0175 - loss: 8.9784 - regularization_loss: 0.0000e+00 - total_loss: 8.9784
2022-05-05 14:35:25.929977: W tensorflow/core/grappler/optimizers/loop_optimizer.cc:907] Skipping loop optimization for Merge node with control input: cond/branch_executed/_24
5/5 [==============================] - 6s 326ms/step - recall_at_10: 0.0209 - ndcg_10: 0.0185 - loss: 8.5198 - regularization_loss: 0.0000e+00 - total_loss: 8.5198 - val_recall_at_10: 0.0419 - val_ndcg_10: 0.0394 - val_loss: 8.8001 - val_regularization_loss: 0.0000e+00 - val_total_loss: 8.8001
<keras.callbacks.History at 0x7fa052dd8310>
In the following cells we are going to export the required user and item features files, and save the query (user) tower model and item embeddings to disk. If you want to read more about exporting retrieval models, please visit 05-Retrieval-Model.ipynb notebook in Merlin Models library repo.
Set up a feature store with Feast
Before we move onto the next step, we need to create a Feast feature repository.
!cd $BASE_DIR && feast init feature_repo
Creating a new Feast repository in /Merlin/examples/Building-and-deploying-multi-stage-RecSys/feature_repo.
You should be seeing a message like Creating a new Feast repository in … printed out above. Now, navigate to the feature_repo
folder and remove the demo parquet file created by default, and examples.py
file.
os.remove(os.path.join(BASE_DIR, 'feature_repo', 'example.py'))
os.remove(os.path.join(BASE_DIR, 'feature_repo/data', 'driver_stats.parquet'))
Exporting query (user) model
query_tower = model.retrieval_block.query_block()
query_tower.save(os.path.join(BASE_DIR, 'query_tower'))
Exporting user and item features
from merlin.models.utils.dataset import unique_rows_by_features
user_features = unique_rows_by_features(train, Tags.USER, Tags.USER_ID).compute().reset_index(drop=True)
/core/merlin/io/dataset.py:251: UserWarning: Initializing an NVTabular Dataset in CPU mode.This is an experimental feature with extremely limited support!
warnings.warn(
user_features.head()
user_id | user_shops | user_profile | user_group | user_gender | user_age | user_consumption_2 | user_is_occupied | user_geography | user_intentions | user_brands | user_categories | |
---|---|---|---|---|---|---|---|---|---|---|---|---|
0 | 14 | 14 | 1 | 1 | 1 | 1 | 1 | 1 | 1 | 14 | 14 | 14 |
1 | 38 | 38 | 2 | 1 | 1 | 1 | 1 | 1 | 1 | 38 | 38 | 38 |
2 | 8 | 8 | 1 | 1 | 1 | 1 | 1 | 1 | 1 | 8 | 8 | 8 |
3 | 5 | 5 | 1 | 1 | 1 | 1 | 1 | 1 | 1 | 5 | 5 | 5 |
4 | 31 | 31 | 2 | 1 | 1 | 1 | 1 | 1 | 1 | 31 | 31 | 31 |
We will artificially add datetime
and created
timestamp columns to our user_features dataframe. This required by Feast to track the user-item features and their creation time and to determine which version to use when we query Feast.
from datetime import datetime
user_features["datetime"] = datetime.now()
user_features["datetime"] = user_features["datetime"].astype("datetime64[ns]")
user_features["created"] = datetime.now()
user_features["created"] = user_features["created"].astype("datetime64[ns]")
user_features.head()
user_id | user_shops | user_profile | user_group | user_gender | user_age | user_consumption_2 | user_is_occupied | user_geography | user_intentions | user_brands | user_categories | datetime | created | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
0 | 14 | 14 | 1 | 1 | 1 | 1 | 1 | 1 | 1 | 14 | 14 | 14 | 2022-05-05 14:35:32.971809 | 2022-05-05 14:35:32.973595 |
1 | 38 | 38 | 2 | 1 | 1 | 1 | 1 | 1 | 1 | 38 | 38 | 38 | 2022-05-05 14:35:32.971809 | 2022-05-05 14:35:32.973595 |
2 | 8 | 8 | 1 | 1 | 1 | 1 | 1 | 1 | 1 | 8 | 8 | 8 | 2022-05-05 14:35:32.971809 | 2022-05-05 14:35:32.973595 |
3 | 5 | 5 | 1 | 1 | 1 | 1 | 1 | 1 | 1 | 5 | 5 | 5 | 2022-05-05 14:35:32.971809 | 2022-05-05 14:35:32.973595 |
4 | 31 | 31 | 2 | 1 | 1 | 1 | 1 | 1 | 1 | 31 | 31 | 31 | 2022-05-05 14:35:32.971809 | 2022-05-05 14:35:32.973595 |
user_features.to_parquet(os.path.join(BASE_DIR, 'feature_repo/data', 'user_features.parquet'))
item_features = unique_rows_by_features(train, Tags.ITEM, Tags.ITEM_ID).compute().reset_index(drop=True)
/core/merlin/io/dataset.py:251: UserWarning: Initializing an NVTabular Dataset in CPU mode.This is an experimental feature with extremely limited support!
warnings.warn(
item_features.shape
(448, 4)
item_features["datetime"] = datetime.now()
item_features["datetime"] = item_features["datetime"].astype("datetime64[ns]")
item_features["created"] = datetime.now()
item_features["created"] = item_features["created"].astype("datetime64[ns]")
item_features.head()
item_id | item_category | item_shop | item_brand | datetime | created | |
---|---|---|---|---|---|---|
0 | 15 | 15 | 15 | 15 | 2022-05-05 14:35:33.084300 | 2022-05-05 14:35:33.086938 |
1 | 2 | 2 | 2 | 2 | 2022-05-05 14:35:33.084300 | 2022-05-05 14:35:33.086938 |
2 | 59 | 59 | 59 | 59 | 2022-05-05 14:35:33.084300 | 2022-05-05 14:35:33.086938 |
3 | 149 | 149 | 149 | 149 | 2022-05-05 14:35:33.084300 | 2022-05-05 14:35:33.086938 |
4 | 63 | 63 | 63 | 63 | 2022-05-05 14:35:33.084300 | 2022-05-05 14:35:33.086938 |
# save to disk
item_features.to_parquet(os.path.join(BASE_DIR, 'feature_repo/data', 'item_features.parquet'))
Extract and save Item embeddings
item_embs = model.item_embeddings(Dataset(item_features, schema=schema), batch_size=1024)
item_embs_df = item_embs.compute(scheduler="synchronous")
/core/merlin/io/dataset.py:251: UserWarning: Initializing an NVTabular Dataset in CPU mode.This is an experimental feature with extremely limited support!
warnings.warn(
/core/merlin/io/dataset.py:251: UserWarning: Initializing an NVTabular Dataset in CPU mode.This is an experimental feature with extremely limited support!
warnings.warn(
# select only item_id together with embedding columns
item_embeddings = item_embs_df.drop(columns=['item_category', 'item_shop', 'item_brand'])
item_embeddings.head()
item_id | 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | ... | 54 | 55 | 56 | 57 | 58 | 59 | 60 | 61 | 62 | 63 | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
0 | 15 | 0.009764 | 0.008560 | -0.008236 | 0.001079 | 0.006261 | 0.009971 | 0.009541 | -0.019058 | 0.036954 | ... | 0.003188 | 0.025160 | 0.003522 | 0.024247 | 0.023871 | -0.017902 | 0.007299 | -0.013606 | 0.003823 | 0.013440 |
1 | 2 | 0.014012 | 0.019497 | -0.015541 | -0.006398 | 0.004327 | 0.077202 | 0.033808 | -0.008815 | 0.062406 | ... | 0.011110 | 0.010548 | -0.021843 | 0.044792 | 0.043012 | -0.014118 | 0.028639 | 0.021822 | -0.008754 | -0.021522 |
2 | 59 | 0.019117 | 0.004479 | -0.031548 | -0.020642 | 0.035912 | 0.015081 | 0.001050 | -0.005041 | 0.014927 | ... | -0.012306 | 0.012168 | -0.012633 | 0.017448 | 0.016328 | -0.022380 | -0.008413 | 0.028804 | 0.009847 | -0.018740 |
3 | 149 | 0.022785 | 0.018213 | -0.035266 | 0.002669 | 0.004770 | 0.023919 | 0.004185 | -0.048692 | 0.013208 | ... | -0.002208 | 0.023864 | -0.019109 | -0.006574 | 0.012167 | 0.009672 | -0.016330 | 0.005729 | 0.025929 | -0.008024 |
4 | 63 | -0.012254 | -0.016057 | -0.017900 | -0.017911 | -0.037520 | 0.012523 | -0.007489 | -0.005636 | 0.017751 | ... | 0.009128 | 0.032535 | -0.020749 | 0.038132 | 0.032030 | -0.052667 | -0.014624 | -0.009040 | -0.029470 | -0.004175 |
5 rows × 65 columns
# save to disk
item_embeddings.to_parquet(os.path.join(BASE_DIR,'item_embeddings.parquet'))
Create feature definitions
Now we will create our user and item features definitions in the user_features.py and item_features.py files and save these files in the feature_repo.
file = open(os.path.join(BASE_DIR, 'feature_repo/','user_features.py'), "w")
file.write(
'''
from google.protobuf.duration_pb2 import Duration
import datetime
from feast import Entity, Feature, FeatureView, ValueType
from feast.infra.offline_stores.file_source import FileSource
user_features = FileSource(
path="{}",
event_timestamp_column="datetime",
created_timestamp_column="created",
)
user = Entity(name="user_id", value_type=ValueType.INT32, description="user id",)
user_features_view = FeatureView(
name="user_features",
entities=["user_id"],
ttl=Duration(seconds=86400 * 7),
features=[
Feature(name="user_shops", dtype=ValueType.INT32),
Feature(name="user_profile", dtype=ValueType.INT32),
Feature(name="user_group", dtype=ValueType.INT32),
Feature(name="user_gender", dtype=ValueType.INT32),
Feature(name="user_age", dtype=ValueType.INT32),
Feature(name="user_consumption_2", dtype=ValueType.INT32),
Feature(name="user_is_occupied", dtype=ValueType.INT32),
Feature(name="user_geography", dtype=ValueType.INT32),
Feature(name="user_intentions", dtype=ValueType.INT32),
Feature(name="user_brands", dtype=ValueType.INT32),
Feature(name="user_categories", dtype=ValueType.INT32),
],
online=True,
input=user_features,
tags=dict(),
)
'''.format(os.path.join(BASE_DIR, 'feature_repo/data/','user_features.parquet'))
)
file.close()
with open(os.path.join(BASE_DIR, 'feature_repo/','item_features.py'), "w") as f:
f.write(
'''
from google.protobuf.duration_pb2 import Duration
import datetime
from feast import Entity, Feature, FeatureView, ValueType
from feast.infra.offline_stores.file_source import FileSource
item_features = FileSource(
path="{}",
event_timestamp_column="datetime",
created_timestamp_column="created",
)
item = Entity(name="item_id", value_type=ValueType.INT32, description="item id",)
item_features_view = FeatureView(
name="item_features",
entities=["item_id"],
ttl=Duration(seconds=86400 * 7),
features=[
Feature(name="item_category", dtype=ValueType.INT32),
Feature(name="item_shop", dtype=ValueType.INT32),
Feature(name="item_brand", dtype=ValueType.INT32),
],
online=True,
input=item_features,
tags=dict(),
)
'''.format(os.path.join(BASE_DIR, 'feature_repo/data/','item_features.parquet'))
)
file.close()
Let’s checkout our Feast feature repository structure.
# install seedir
!pip install seedir
import seedir as sd
feature_repo_path = os.path.join(BASE_DIR, 'feature_repo')
sd.seedir(feature_repo_path, style='lines', itemlimit=10, depthlimit=3, exclude_folders='.ipynb_checkpoints', sort=True)
feature_repo/
├─__init__.py
├─data/
│ ├─item_features.parquet
│ └─user_features.parquet
├─feature_store.yaml
├─item_features.py
└─user_features.py
Next Steps
We trained and exported our ranking and retrieval models and NVTabular workflows. In the next step, we will learn how to deploy our trained models into Triton Inference Server (TIS) with Merlin Systems library.
For the next step, move on to the 02-Deploying-multi-stage-Recsys-with-Merlin-Systems.ipynb
notebook to deploy our saved models as an ensemble to TIS and obtain prediction results for a given request.