# Copyright 2021 NVIDIA Corporation. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
http://developer.download.nvidia.com/compute/machine-learning/frameworks/nvidia_logo.png

NVTabular demo on Rossmann data - TensorFlow

Overview

NVTabular is a feature engineering and preprocessing library for tabular data designed to quickly and easily manipulate terabyte scale datasets used to train deep learning based recommender systems. It provides a high level abstraction to simplify code and accelerates computation on the GPU using the RAPIDS cuDF library.

Learning objectives

In the previous notebooks (01-Download-Convert.ipynb and 02-ETL-with-NVTabular.ipynb), we downloaded, preprocessed and created features for the dataset. Now, we are ready to train our deep learning model on the dataset. In this notebook, we use TensorFlow with the NVTabular data loader for TensorFlow to accelereate the training pipeline.

import os
import math
import json
import glob

Loading NVTabular workflow

This time, we only need to define our data directories. We can load the data schema from the NVTabular workflow.

DATA_DIR = os.environ.get("INPUT_DATA_DIR", os.path.expanduser("~/nvt-examples/data/"))
PREPROCESS_DIR = os.path.join(DATA_DIR, "ross_pre/")
PREPROCESS_DIR_TRAIN = os.path.join(PREPROCESS_DIR, "train")
PREPROCESS_DIR_VALID = os.path.join(PREPROCESS_DIR, "valid")

What files are available to train on in our directories?

!ls $PREPROCESS_DIR
stats.json  train  valid
!ls $PREPROCESS_DIR_TRAIN
0.a76e5a5e3ae14edca780ed3f276b6c7a.parquet  _metadata
_file_list.txt				    _metadata.json
!ls $PREPROCESS_DIR_VALID
0.01c5ac717e6e42ccad7feb3f8eaab315.parquet  _metadata
_file_list.txt				    _metadata.json

We load the data schema and statistic information from stats.json. We created the file in the previous notebook rossmann-store-sales-feature-engineering.

stats = json.load(open(PREPROCESS_DIR + "/stats.json", "r"))
CATEGORICAL_COLUMNS = stats["CATEGORICAL_COLUMNS"]
CONTINUOUS_COLUMNS = stats["CONTINUOUS_COLUMNS"]
LABEL_COLUMNS = stats["LABEL_COLUMNS"]
COLUMNS = CATEGORICAL_COLUMNS + CONTINUOUS_COLUMNS + LABEL_COLUMNS

The embedding table shows the cardinality of each categorical variable along with its associated embedding size. Each entry is of the form (cardinality, embedding_size).

EMBEDDING_TABLE_SHAPES = stats["EMBEDDING_TABLE_SHAPES"]
EMBEDDING_TABLE_SHAPES
{'Store': [1116, 81],
 'DayOfWeek': [8, 16],
 'Year': [4, 16],
 'Month': [13, 16],
 'Day': [32, 16],
 'StateHoliday': [3, 16],
 'CompetitionMonthsOpen': [26, 16],
 'Promo2Weeks': [27, 16],
 'StoreType': [5, 16],
 'Assortment': [4, 16],
 'PromoInterval': [4, 16],
 'CompetitionOpenSinceYear': [24, 16],
 'Promo2SinceYear': [9, 16],
 'State': [13, 16],
 'Week': [53, 16],
 'Events': [22, 16],
 'Promo_fw': [9, 16],
 'Promo_bw': [9, 16],
 'StateHoliday_fw': [6, 16],
 'StateHoliday_bw': [6, 16],
 'SchoolHoliday_fw': [9, 16],
 'SchoolHoliday_bw': [9, 16]}

Training a Network

Now that our data is preprocessed and saved out, we can leverage datasets to read through the preprocessed parquet files in an online fashion to train neural networks.

We’ll start by setting some universal hyperparameters for our model and optimizer. These settings will be the same across all of the frameworks that we explore in the different notebooks.

If you’re interested in contributing to NVTabular, feel free to take this challenge on and submit a pull request if successful. 12% RMSPE is achievable using the Novograd optimizer, but we know of no Novograd implementation for TensorFlow that supports sparse gradients, and so we are not including that solution below.

EMBEDDING_DROPOUT_RATE = 0.04
DROPOUT_RATES = [0.001, 0.01]
HIDDEN_DIMS = [1000, 500]
BATCH_SIZE = 65536
LEARNING_RATE = 0.001
EPOCHS = 25

# TODO: Calculate on the fly rather than recalling from previous analysis.
MAX_SALES_IN_TRAINING_SET = 38722.0
MAX_LOG_SALES_PREDICTION = 1.2 * math.log(MAX_SALES_IN_TRAINING_SET + 1.0)

TRAIN_PATHS = sorted(glob.glob(os.path.join(PREPROCESS_DIR_TRAIN, "*.parquet")))
VALID_PATHS = sorted(glob.glob(os.path.join(PREPROCESS_DIR_VALID, "*.parquet")))

TensorFlow

TensorFlow: Preparing Datasets

KerasSequenceLoader wraps a lightweight iterator around a dataset object to handle chunking, shuffling, and application of any workflows (which can be applied online as a preprocessing step). For column names, can use either a list of string names or a list of TensorFlow feature_columns that will be used to feed the network

import tensorflow as tf

# we can control how much memory to give tensorflow with this environment variable
# IMPORTANT: make sure you do this before you initialize TF's runtime, otherwise
# it's too late and TF will have claimed all free GPU memory
os.environ["TF_MEMORY_ALLOCATION"] = "8192"  # explicit MB
os.environ["TF_MEMORY_ALLOCATION"] = "0.5"  # fraction of free memory
from nvtabular.loader.tensorflow import KerasSequenceLoader, KerasSequenceValidater


# cheap wrapper to keep things some semblance of neat
def make_categorical_embedding_column(name, dictionary_size, embedding_dim):
    return tf.feature_column.embedding_column(
        tf.feature_column.categorical_column_with_identity(name, dictionary_size), embedding_dim
    )


# instantiate our columns
categorical_columns = [
    make_categorical_embedding_column(name, *EMBEDDING_TABLE_SHAPES[name])
    for name in CATEGORICAL_COLUMNS
]
continuous_columns = [tf.feature_column.numeric_column(name, (1,)) for name in CONTINUOUS_COLUMNS]

# feed them to our datasets
train_dataset = KerasSequenceLoader(
    TRAIN_PATHS,  # you could also use a glob pattern
    feature_columns=categorical_columns + continuous_columns,
    batch_size=BATCH_SIZE,
    label_names=LABEL_COLUMNS,
    shuffle=True,
    buffer_size=0.06,  # amount of data, as a fraction of GPU memory, to load at once
)

valid_dataset = KerasSequenceLoader(
    VALID_PATHS,  # you could also use a glob pattern
    feature_columns=categorical_columns + continuous_columns,
    batch_size=BATCH_SIZE * 4,
    label_names=LABEL_COLUMNS,
    shuffle=False,
    buffer_size=0.06,  # amount of data, as a fraction of GPU memory, to load at once
)

TensorFlow: Defining a Model

Using Keras, we can define the layers of our model and their parameters explicitly. Here, for the sake of consistency, we’ll mimic fast.ai’s TabularModel.

# DenseFeatures layer needs a dictionary of {feature_name: input}
categorical_inputs = {}
for column_name in CATEGORICAL_COLUMNS:
    categorical_inputs[column_name] = tf.keras.Input(name=column_name, shape=(1,), dtype=tf.int64)
categorical_embedding_layer = tf.keras.layers.DenseFeatures(categorical_columns)
categorical_x = categorical_embedding_layer(categorical_inputs)
categorical_x = tf.keras.layers.Dropout(EMBEDDING_DROPOUT_RATE)(categorical_x)

# Just concatenating continuous, so can use a list
continuous_inputs = []
for column_name in CONTINUOUS_COLUMNS:
    continuous_inputs.append(tf.keras.Input(name=column_name, shape=(1,), dtype=tf.float32))
continuous_embedding_layer = tf.keras.layers.Concatenate(axis=1)
continuous_x = continuous_embedding_layer(continuous_inputs)
continuous_x = tf.keras.layers.BatchNormalization(epsilon=1e-5, momentum=0.1)(continuous_x)

# concatenate and build MLP
x = tf.keras.layers.Concatenate(axis=1)([categorical_x, continuous_x])
for dim, dropout_rate in zip(HIDDEN_DIMS, DROPOUT_RATES):
    x = tf.keras.layers.Dense(dim, activation="relu")(x)
    x = tf.keras.layers.BatchNormalization(epsilon=1e-5, momentum=0.1)(x)
    x = tf.keras.layers.Dropout(dropout_rate)(x)
x = tf.keras.layers.Dense(1, activation="linear")(x)

# TODO: Initialize model weights to fix saturation issues.
# For now, we'll just scale the output of our model directly before
# hitting the sigmoid.
x = 0.1 * x

x = MAX_LOG_SALES_PREDICTION * tf.keras.activations.sigmoid(x)

# combine all our inputs into a single list
# (note that you can still use .fit, .predict, etc. on a dict
# that maps input tensor names to input values)
inputs = list(categorical_inputs.values()) + continuous_inputs
tf_model = tf.keras.Model(inputs=inputs, outputs=x)

TensorFlow: Training

def rmspe_tf(y_true, y_pred):
    # map back into "true" space by undoing transform
    y_true = tf.exp(y_true) - 1
    y_pred = tf.exp(y_pred) - 1

    percent_error = (y_true - y_pred) / y_true
    return tf.sqrt(tf.reduce_mean(percent_error ** 2))
%%time
from time import time

optimizer = tf.keras.optimizers.Adam(learning_rate=LEARNING_RATE)
tf_model.compile(optimizer, "mse", metrics=[rmspe_tf])

validation_callback = KerasSequenceValidater(valid_dataset)
start = time()
history = tf_model.fit(
    train_dataset,
    callbacks=[validation_callback],
    epochs=EPOCHS,
)
t_final = time() - start
total_rows = train_dataset.num_rows_processed + valid_dataset.num_rows_processed
print(
    f"run_time: {t_final} - rows: {total_rows} - epochs: {EPOCHS} - dl_thru: { (EPOCHS * total_rows) / t_final}"
)
Epoch 1/25
16/16 [==============================] - 10s 122ms/step - loss: 6.2911 - rmspe_tf: 0.8823
{'val_loss': 6.8325696, 'val_rmspe_tf': 0.888467}
Epoch 2/25
16/16 [==============================] - 3s 121ms/step - loss: 5.4513 - rmspe_tf: 0.8872
{'val_loss': 6.171084, 'val_rmspe_tf': 0.886745}
Epoch 3/25
16/16 [==============================] - 3s 121ms/step - loss: 4.5784 - rmspe_tf: 0.8713
{'val_loss': 5.658548, 'val_rmspe_tf': 0.8701949}
Epoch 4/25
16/16 [==============================] - 2s 118ms/step - loss: 3.6675 - rmspe_tf: 0.8446
{'val_loss': 4.995923, 'val_rmspe_tf': 0.8385216}
Epoch 5/25
16/16 [==============================] - 2s 120ms/step - loss: 2.6557 - rmspe_tf: 0.7937
{'val_loss': 4.2057276, 'val_rmspe_tf': 0.77560455}
Epoch 6/25
16/16 [==============================] - 2s 120ms/step - loss: 1.5285 - rmspe_tf: 0.6915
{'val_loss': 3.5816908, 'val_rmspe_tf': 0.65663767}
Epoch 7/25
16/16 [==============================] - 2s 121ms/step - loss: 0.6586 - rmspe_tf: 0.5316
{'val_loss': 3.3899677, 'val_rmspe_tf': 0.57880354}
Epoch 8/25
16/16 [==============================] - 3s 119ms/step - loss: 0.3816 - rmspe_tf: 0.4746
{'val_loss': 3.0101893, 'val_rmspe_tf': 0.47044784}
Epoch 9/25
16/16 [==============================] - 2s 119ms/step - loss: 0.1996 - rmspe_tf: 0.4249
{'val_loss': 2.9503767, 'val_rmspe_tf': 0.4278682}
Epoch 10/25
16/16 [==============================] - 2s 120ms/step - loss: 0.1072 - rmspe_tf: 0.3745
{'val_loss': 2.987262, 'val_rmspe_tf': 0.3785739}
Epoch 11/25
16/16 [==============================] - 2s 118ms/step - loss: 0.0792 - rmspe_tf: 0.3508
{'val_loss': 2.9500449, 'val_rmspe_tf': 0.33269802}
Epoch 12/25
16/16 [==============================] - 2s 120ms/step - loss: 0.0678 - rmspe_tf: 0.3315
{'val_loss': 2.9160886, 'val_rmspe_tf': 0.30033675}
Epoch 13/25
16/16 [==============================] - 2s 124ms/step - loss: 0.0503 - rmspe_tf: 0.2818
{'val_loss': 2.8887079, 'val_rmspe_tf': 0.2663356}
Epoch 14/25
16/16 [==============================] - 2s 120ms/step - loss: 0.0427 - rmspe_tf: 0.2464
{'val_loss': 2.899424, 'val_rmspe_tf': 0.24336538}
Epoch 15/25
16/16 [==============================] - 3s 125ms/step - loss: 0.0403 - rmspe_tf: 0.2514
{'val_loss': 2.8832, 'val_rmspe_tf': 0.23771751}
Epoch 16/25
16/16 [==============================] - 2s 118ms/step - loss: 0.0371 - rmspe_tf: 0.2486
{'val_loss': 2.8893273, 'val_rmspe_tf': 0.22341083}
Epoch 17/25
16/16 [==============================] - 3s 122ms/step - loss: 0.0374 - rmspe_tf: 0.2290
{'val_loss': 2.885254, 'val_rmspe_tf': 0.22222914}
Epoch 18/25
16/16 [==============================] - 3s 122ms/step - loss: 0.0341 - rmspe_tf: 0.2464
{'val_loss': 2.900585, 'val_rmspe_tf': 0.22143732}
Epoch 19/25
16/16 [==============================] - 2s 120ms/step - loss: 0.0354 - rmspe_tf: 0.2218
{'val_loss': 2.8876286, 'val_rmspe_tf': 0.21798971}
Epoch 20/25
16/16 [==============================] - 2s 122ms/step - loss: 0.0353 - rmspe_tf: 0.2260
{'val_loss': 2.8935297, 'val_rmspe_tf': 0.23429932}
Epoch 21/25
16/16 [==============================] - 2s 122ms/step - loss: 0.0477 - rmspe_tf: 0.2582
{'val_loss': 2.8810875, 'val_rmspe_tf': 0.23292302}
Epoch 22/25
16/16 [==============================] - 3s 123ms/step - loss: 0.0351 - rmspe_tf: 0.2307
{'val_loss': 2.8870816, 'val_rmspe_tf': 0.22246556}
Epoch 23/25
16/16 [==============================] - 3s 120ms/step - loss: 0.0478 - rmspe_tf: 0.2583
{'val_loss': 2.8833861, 'val_rmspe_tf': 0.24750796}
Epoch 24/25
16/16 [==============================] - 3s 119ms/step - loss: 0.0349 - rmspe_tf: 0.2257
{'val_loss': 2.91596, 'val_rmspe_tf': 0.23121282}
Epoch 25/25
16/16 [==============================] - 2s 121ms/step - loss: 0.0362 - rmspe_tf: 0.2248
{'val_loss': 2.9000213, 'val_rmspe_tf': 0.22009881}
run_time: 79.32018089294434 - rows: 646 - epochs: 25 - dl_thru: 203.60518367698995
CPU times: user 2min 19s, sys: 25.2 s, total: 2min 44s
Wall time: 1min 19s
from nvtabular.inference.triton import export_tensorflow_ensemble
import nvtabular

BASE_DIR = os.environ.get("BASE_DIR", os.path.expanduser("~/nvt-examples/"))
MODEL_NAME_ENSEMBLE = os.environ.get("MODEL_NAME_ENSEMBLE", "rossmann")
# model path to save the models
MODEL_PATH = os.path.join(BASE_DIR, "models/")

workflow = nvtabular.Workflow.load(os.path.join(DATA_DIR, "workflow"))
export_tensorflow_ensemble(tf_model, workflow, MODEL_NAME_ENSEMBLE, MODEL_PATH, LABEL_COLUMNS)
INFO:tensorflow:Assets written to: /root/nvt-examples/models/rossmann_tf/1/model.savedmodel/assets