In HugeCTR version 3.5, we provide Python APIs for embedding table lookup with HugeCTR Hierarchical Parameter Server (HPS) HPS supports different database backends and GPU embedding caches.

This notebook demonstrates how to use HPS with HugeCTR Python APIs. Without loss of generality, the HPS APIs are utilized together with the ONNX Runtime APIs to create an ensemble inference model, where HPS is responsible for embedding table lookup while the ONNX model takes charge of feed forward of dense neural networks.


Get HugeCTR from NGC

The HugeCTR Python module is preinstalled in the 22.09 and later Merlin Training Container:

You can check the existence of required libraries by running the following Python code after launching this container.

$ python3 -c "import hugectr"

Note: This Python module contains both training APIs and offline inference APIs. For online inference with Triton, please refer to HugeCTR Backend.

If you prefer to build HugeCTR from the source code instead of using the NGC container, please refer to the How to Start Your Development documentation.

Data Generation

HugeCTR provides a tool to generate synthetic datasets. The Data Generator is capable of generating datasets of different file formats and different distributions. We will generate one-hot Parquet datasets with power-law distribution for this notebook:

import hugectr
from import DataGeneratorParams, DataGenerator

data_generator_params = DataGeneratorParams(
  format = hugectr.DataReaderType_t.Parquet,
  label_dim = 1,
  dense_dim = 10,
  num_slot = 4,
  i64_input_key = True,
  nnz_array = [1, 1, 1, 1],
  source = "./data_parquet/file_list.txt",
  eval_source = "./data_parquet/file_list_test.txt",
  slot_size_array = [10000, 10000, 10000, 10000],
  check_type = hugectr.Check_t.Non,
  dist_type = hugectr.Distribution_t.PowerLaw,
  power_law_type = hugectr.PowerLaw_t.Short,
  num_files = 16,
  eval_num_files = 4,
  num_samples_per_file = 40960)
data_generator = DataGenerator(data_generator_params)
Train from Scratch

We can train fom scratch by performing the following steps with Python APIs:

  1. Create the solver, reader and optimizer, then initialize the model.

  2. Construct the model graph by adding input, sparse embedding and dense layers in order.

  3. Compile the model and have an overview of the model graph.

  4. Dump the model graph to the JSON file.

  5. Fit the model, save the model weights and optimizer states implicitly.

  6. Dump one batch of evaluation results to files.

import hugectr
from mpi4py import MPI
solver = hugectr.CreateSolver(model_name = "hps_demo",
                              max_eval_batches = 1,
                              batchsize_eval = 1024,
                              batchsize = 1024,
                              lr = 0.001,
                              vvgpu = [[0]],
                              i64_input_key = True,
                              repeat_dataset = True,
                              use_cuda_graph = True)
reader = hugectr.DataReaderParams(data_reader_type = hugectr.DataReaderType_t.Parquet,
                                  source = ["./data_parquet/file_list.txt"],
                                  eval_source = "./data_parquet/file_list_test.txt",
                                  check_type = hugectr.Check_t.Non,
                                  slot_size_array = [10000, 10000, 10000, 10000])
optimizer = hugectr.CreateOptimizer(optimizer_type = hugectr.Optimizer_t.Adam)
model = hugectr.Model(solver, reader, optimizer)
model.add(hugectr.Input(label_dim = 1, label_name = "label",
                        dense_dim = 10, dense_name = "dense",
                        data_reader_sparse_param_array = 
                        [hugectr.DataReaderSparseParam("data1", [1, 1], True, 2),
                        hugectr.DataReaderSparseParam("data2", [1, 1], True, 2)]))
model.add(hugectr.SparseEmbedding(embedding_type = hugectr.Embedding_t.DistributedSlotSparseEmbeddingHash, 
                            workspace_size_per_gpu_in_mb = 4,
                            embedding_vec_size = 16,
                            combiner = "sum",
                            sparse_embedding_name = "sparse_embedding1",
                            bottom_name = "data1",
                            optimizer = optimizer))
model.add(hugectr.SparseEmbedding(embedding_type = hugectr.Embedding_t.DistributedSlotSparseEmbeddingHash, 
                            workspace_size_per_gpu_in_mb = 8,
                            embedding_vec_size = 32,
                            combiner = "sum",
                            sparse_embedding_name = "sparse_embedding2",
                            bottom_name = "data2",
                            optimizer = optimizer))
model.add(hugectr.DenseLayer(layer_type = hugectr.Layer_t.Reshape,
                            bottom_names = ["sparse_embedding1"],
                            top_names = ["reshape1"],
model.add(hugectr.DenseLayer(layer_type = hugectr.Layer_t.Reshape,
                            bottom_names = ["sparse_embedding2"],
                            top_names = ["reshape2"],
model.add(hugectr.DenseLayer(layer_type = hugectr.Layer_t.Concat,
                            bottom_names = ["reshape1", "reshape2", "dense"], top_names = ["concat1"]))
model.add(hugectr.DenseLayer(layer_type = hugectr.Layer_t.InnerProduct,
                            bottom_names = ["concat1"],
                            top_names = ["fc1"],
model.add(hugectr.DenseLayer(layer_type = hugectr.Layer_t.ReLU,
                            bottom_names = ["fc1"],
                            top_names = ["relu1"]))
model.add(hugectr.DenseLayer(layer_type = hugectr.Layer_t.InnerProduct,
                            bottom_names = ["relu1"],
                            top_names = ["fc2"],
model.add(hugectr.DenseLayer(layer_type = hugectr.Layer_t.BinaryCrossEntropyLoss,
                            bottom_names = ["fc2", "label"],
                            top_names = ["loss"]))
model.graph_to_json("hps_demo.json") = 1100, display = 200, eval_interval = 1000, snapshot = 1000, snapshot_prefix = "hps_demo")
model.export_predictions("hps_demo_pred_" + str(1000), "hps_demo_label_" + str(1000))
Convert HugeCTR to ONNX

We will convert the saved HugeCTR models to ONNX using the HugeCTR to ONNX Converter. For more information about the converter, refer to the README in the onnx_converter directory of the repository.

For the sake of double checking the correctness, we will investigate both cases of conversion depending on whether or not to convert the sparse embedding models.

import hugectr2onnx
hugectr2onnx.converter.convert(onnx_model_path = "hps_demo_with_embedding.onnx",
                            graph_config = "hps_demo.json",
                            dense_model = "hps_demo_dense_1000.model",
                            convert_embedding = True,
                            sparse_models = ["hps_demo0_sparse_1000.model", "hps_demo1_sparse_1000.model"])

hugectr2onnx.converter.convert(onnx_model_path = "hps_demo_without_embedding.onnx",
                            graph_config = "hps_demo.json",
                            dense_model = "hps_demo_dense_1000.model",
                            convert_embedding = False)
The model is checked!
The model is saved at hps_demo_with_embedding.onnx
Skip sparse embedding layers in converted ONNX model
Skip sparse embedding layers in converted ONNX model
The model is checked!
The model is saved at hps_demo_without_embedding.onnx

Inference with HPS & ONNX

We will make inference by performing the following steps with Python APIs:

  1. Configure the HPS hyperparameters.

  2. Initialize the HPS object, which is responsible for embedding table lookup.

  3. Loading the Parquet data.

  4. Make inference with the HPS object and the ONNX inference session of hps_demo_without_embedding.onnx.

  5. Check the correctness by comparing with dumped evaluation results.

  6. Make inference with the ONNX inference session of hps_demo_with_embedding.onnx (double check).

from hugectr.inference import HPS, ParameterServerConfig, InferenceParams

import pandas as pd
import numpy as np

import onnxruntime as ort

slot_size_array = [10000, 10000, 10000, 10000]
key_offset = np.insert(np.cumsum(slot_size_array), 0, 0)[:-1]
batch_size = 1024

# 1. Configure the HPS hyperparameters
ps_config = ParameterServerConfig(
           emb_table_name = {"hps_demo": ["sparse_embedding1", "sparse_embedding2"]},
           embedding_vec_size = {"hps_demo": [16, 32]},
           max_feature_num_per_sample_per_emb_table = {"hps_demo": [2, 2]},
           inference_params_array = [
                model_name = "hps_demo",
                max_batchsize = batch_size,
                hit_rate_threshold = 1.0,
                dense_model_file = "",
                sparse_model_files = ["hps_demo0_sparse_1000.model", "hps_demo1_sparse_1000.model"],
                deployed_devices = [0],
                use_gpu_embedding_cache = True,
                cache_size_percentage = 0.5,
                i64_input_key = True)

# 2. Initialize the HPS object
hps = HPS(ps_config)

# 3. Loading the Parquet data.
df = pd.read_parquet("data_parquet/val/gen_0.parquet")
dense_input_columns = df.columns[1:11]
cat_input1_columns = df.columns[11:13]
cat_input2_columns = df.columns[13:15]
dense_input = df[dense_input_columns].loc[0:batch_size-1].to_numpy(dtype=np.float32)
cat_input1 = (df[cat_input1_columns].loc[0:batch_size-1].to_numpy(dtype=np.int64) + key_offset[0:2]).reshape((batch_size, 2, 1))
cat_input2 = (df[cat_input2_columns].loc[0:batch_size-1].to_numpy(dtype=np.int64) + key_offset[2:4]).reshape((batch_size, 2, 1))

# 4. Make inference from the HPS object and the ONNX inference session of `hps_demo_without_embedding.onnx`.
embedding1 = hps.lookup(cat_input1.flatten(), "hps_demo", 0).reshape(batch_size, 2, 16)
embedding2 = hps.lookup(cat_input2.flatten(), "hps_demo", 1).reshape(batch_size, 2, 32)
sess = ort.InferenceSession("hps_demo_without_embedding.onnx")
res =[sess.get_outputs()[0].name],
               input_feed={sess.get_inputs()[0].name: dense_input,
               sess.get_inputs()[1].name: embedding1,
               sess.get_inputs()[2].name: embedding2})
pred = res[0]

# 5. Check the correctness by comparing with dumped evaluation results.
ground_truth = np.loadtxt("hps_demo_pred_1000")
print("ground_truth: ", ground_truth)
diff = pred.flatten()-ground_truth
mse = np.mean(diff*diff)
print("pred: ", pred)
print("mse between pred and ground_truth: ", mse)

# 6. Make inference with the ONNX inference session of `hps_demo_with_embedding.onnx` (double check).
sess_ref = ort.InferenceSession("hps_demo_with_embedding.onnx")
res_ref =[sess_ref.get_outputs()[0].name],
                   input_feed={sess_ref.get_inputs()[0].name: dense_input,
                   sess_ref.get_inputs()[1].name: cat_input1,
                   sess_ref.get_inputs()[2].name: cat_input2})
pred_ref = res_ref[0]
diff_ref = pred_ref.flatten()-ground_truth
mse_ref = np.mean(diff_ref*diff_ref)
print("pred_ref: ", pred_ref)
print("mse between pred_ref and ground_truth: ", mse_ref)
Lookup the Embedding Vector from DLPack

We also provide a lookup_fromdlpack interface that could query embedding keys on the CPU and return the embedding vectors on the GPU/CPU.

  1. Suppose you have created a Pytorch/Tensorflow tensor that stores the embedded keys.

  2. Convert the embedding key tensor to DLPack capsule through the corresponding platform’s to_dlpack function.

  3. Creates an empty tensor as a buffer to store embedding vectors.

  4. Convert a buffer tensor to DLPack capsule.

  5. Lookup the embedding vector of the corresponding embedding key directly through lookup_fromdlpack interface, and output it to the embedding vector buffer tensor

  6. If the output capsule is allocated on the GPU, then a device_id needs to be specified in lookup_fromdlpack interface for corresponding embedding cache. If not specified, the default value is device 0

Note: Please make sure that tensorflow or pytorch have been installed correctly in the container

embedding1 = hps.lookup(cat_input1.flatten(), "hps_demo", 0).reshape(batch_size, 2, 16)
embedding2 = hps.lookup(cat_input2.flatten(), "hps_demo", 1).reshape(batch_size, 2, 32)

# 1. Look up from dlpack for Pytorch tensor on CPU
print(" Look up from dlpack for Pytorch tensor")
import torch.utils.dlpack
import os
print("************Look up from pytorch dlpack on CPU")
device = torch.device("cpu")
key = torch.tensor(cat_input1.flatten(),dtype=torch.int64, device=device)
out = torch.empty((1,cat_input1.flatten().shape[0]*16), dtype=torch.float32, device=device)
key_capsule = torch.utils.dlpack.to_dlpack(key)
print("The device type of embedding keys that lookup dlpack from hps interface for embedding table 0 of hps_demo: {}, the keys: {}".format(key.device, key))
out_capsule = torch.utils.dlpack.to_dlpack(out)
# Lookup the embedding vectors from dlpack
hps.lookup_fromdlpack(key_capsule, out_capsule,"hps_demo", 0)
out_put = torch.utils.dlpack.from_dlpack(out_capsule)
print("[The device type of embedding vectors that lookup dlpack from hps interface for embedding table 0 of hps_demo: {}, the vectors: {}\n".format(out_put.device, out_put))
diff = out_put-embedding1.reshape(1,cat_input1.flatten().shape[0]*16)
if diff.mean() > 1e-4:
    raise RuntimeError("Too large mse between pytorch dlpack on cpu and native HPS lookup api: {}".format(diff.mean()))
    print("Pytorch dlpack on cpu  results are consistent with native HPS lookup api, mse: {}".format(diff.mean()))

# 2. Look up from dlpack for Pytorch tensor on GPU
print("************Look up from pytorch dlpack on GPU")
cuda_device = torch.device("cuda:0" if torch.cuda.is_available else "cpu")
out = torch.empty((1,cat_input1.flatten().shape[0]*16), dtype=torch.float32, device=cuda_device)
out_capsule = torch.utils.dlpack.to_dlpack(out)
hps.lookup_fromdlpack(key_capsule, out_capsule,"hps_demo", 0)
out_put = torch.utils.dlpack.from_dlpack(out_capsule)
print("The device type of embedding vectors that lookup dlpack from hps interface for embedding table 0 of hps_demo: {}, the vectors: {}\n\n".format(out_put.device, out_put))
diff = out_put.cpu()-embedding1.reshape(1,cat_input1.flatten().shape[0]*16)
if diff.mean() > 1e-3:
    raise RuntimeError("Too large mse between pytorch dlpack on cpu and native HPS lookup api: {}".format(diff.mean()))
    print("Pytorch dlpack on GPU results are consistent with native HPS lookup api, mse: {}".format(diff.mean()))
 Look up from dlpack for Pytorch tensor
************Look up from pytorch dlpack on CPU
The device type of embedding keys that lookup dlpack from hps interface for embedding table 0 of hps_demo: cpu, the keys: tensor([    0, 10000,     0,  ..., 10037,    57, 10057])
[The device type of embedding vectors that lookup dlpack from hps interface for embedding table 0 of hps_demo: cpu, the vectors: tensor([[-0.0843,  0.0634,  0.0409,  ..., -0.0584,  0.0030, -0.0187]])

Pytorch dlpack on cpu  results are consistent with native HPS lookup api, mse: 0.0
************Look up from pytorch dlpack on GPU
The device type of embedding vectors that lookup dlpack from hps interface for embedding table 0 of hps_demo: cuda:0, the vectors: tensor([[-0.0843,  0.0634,  0.0409,  ..., -0.0584,  0.0030, -0.0187]],

Pytorch dlpack on GPU results are consistent with native HPS lookup api, mse: 0.0
# 3. Look up from dlpack for tensorflow tensor on CPU
print("Look up from dlpack for Tensorflow tensor")
from tensorflow.python.dlpack import dlpack  
import tensorflow as tf
from tensorflow.python.eager import context
from tensorflow.python.framework import dtypes
print("***************Look up from tensorflow dlpack on CPU**********")
with tf.device('/CPU:0'):
    key_tensor = tf.constant(cat_input2.flatten(),dtype=tf.int64)
    out_tensor = tf.zeros([1, cat_input2.flatten().shape[0]*32],dtype=tf.float32)
    print("The device type of embedding keys that lookup dlpack from hps interface for embedding table 1 of hps_demo: {}, the keys: {}".format(key_tensor.device, key_tensor))
    key_capsule = tf.experimental.dlpack.to_dlpack(key_tensor)
    out_dlcapsule = tf.experimental.dlpack.to_dlpack(out_tensor)
hps.lookup_fromdlpack(key_capsule,out_dlcapsule, "hps_demo", 1)
out= tf.experimental.dlpack.from_dlpack(out_dlcapsule)
print("The device type of embedding vectors that lookup dlpack from hps interface for embedding table 1 of hps_demo: {}, the vectors: {}\n".format(out.device, out))
diff = out-embedding2.reshape(1,cat_input2.flatten().shape[0]*32)
mse = tf.reduce_mean(diff)
if mse> 1e-3:
    raise RuntimeError("Too large mse between tensorflow dlpack on cpu and native HPS lookup api: {}".format(mse))
    print("tensorflow dlpack on CPU results are consistent with native HPS lookup api, mse: {}".format(mse))
# 4. Look up from dlpack for tensorflow tensor on GPU
print("***************Look up from tensorflow dlpack on GPU**********")
with tf.device('/GPU:0'):
    out_tensor = tf.zeros([1, cat_input2.flatten().shape[0]*32],dtype=tf.float32)
    key_capsule = tf.experimental.dlpack.to_dlpack(key_tensor)
    out_dlcapsule = tf.experimental.dlpack.to_dlpack(out_tensor)
hps.lookup_fromdlpack(key_capsule,out_dlcapsule, "hps_demo", 1)
out= tf.experimental.dlpack.from_dlpack(out_dlcapsule)
print("[HUGECTR][INFO] The device type of embedding vectors that lookup dlpack from hps interface for embedding table 1 of wdl: {}, the vectors: {}\n".format(out.device, out))
diff = out-embedding2.reshape(1,cat_input2.flatten().shape[0]*32)
mse = tf.reduce_mean(diff)
if mse> 1e-3:
    raise RuntimeError("Too large mse between tensorflow dlpack on cpu and native HPS lookup api: {}".format(mse))
    print("tensorflow dlpack on GPU results are consistent with native HPS lookup api, mse: {}".format(mse))
Look up from dlpack for Tensorflow tensor
***************Look up from tensorflow dlpack on CPU**********
The device type of embedding keys that lookup dlpack from hps interface for embedding table 1 of hps_demo: /job:localhost/replica:0/task:0/device:CPU:0, the keys: [20000 30000 20000 ... 30037 20057 30057]
The device type of embedding vectors that lookup dlpack from hps interface for embedding table 1 of hps_demo: /job:localhost/replica:0/task:0/device:CPU:0, the vectors: [[ 0.04648086  0.06154778 -0.04931969 ...  0.00693844  0.04137739

tensorflow dlpack on CPU results are consistent with native HPS lookup api, mse: 0.0
***************Look up from tensorflow dlpack on GPU**********
[HUGECTR][INFO] The device type of embedding vectors that lookup dlpack from hps interface for embedding table 1 of wdl: /job:localhost/replica:0/task:0/device:GPU:0, the vectors: [[ 0.04648086  0.06154778 -0.04931969 ...  0.00693844  0.04137739

tensorflow dlpack on GPU results are consistent with native HPS lookup api, mse: 0.0