Accelerated Training with TensorFlow
When training pipelines with TensorFlow, the dataloader cannot prepare
sequential batches fast enough, so the GPU is not fully utilized. To
combat this issue, we’ve developed a highly customized tabular
dataloader, KerasSequenceLoader
, to accelerate existing pipelines in
TensorFlow. In our experiments, we were able to achieve a speed-up 9
times as fast as the same training workflow that contains a NVTabular
dataloader. The NVTabular dataloader is capable of:
removing bottlenecks from dataloading by processing large chunks of data at a time instead of item by item
processing datasets that don’t fit within the GPU or CPU memory by streaming from the disk
reading data directly into the GPU memory and removing CPU-GPU communication
preparing batch asynchronously into the GPU to avoid CPU-GPU communication
supporting commonly used formats such as parquet
integrating easily into existing TensorFlow training pipelines by using a similar API as the native TensorFlow dataloader since it works with tf.keras models
When KerasSequenceLoader
accelerates training with TensorFlow, the
following happens:
The required libraries are imported. The dataloader loads and prepares batches directly in the GPU and requires some of the GPU memory. Before initializing TensorFlow, the amount of memory that is allocated to TensorFlow needs to be controlled as well as the remaining memory allocation that is allocated to the dataloader. The environment variable ‘TF_MEMORY_ALLOCATION’ can be used to control the TensorFlow memory allocation.
import tensorflow as tf # Control how much memory to give TensorFlow with this environment variable # IMPORTANT: Do this before you initialize the TensorFlow runtime, otherwise # it's too late and TensorFlow will claim all free GPU memory os.environ['TF_MEMORY_ALLOCATION'] = "8192" # explicit MB os.environ['TF_MEMORY_ALLOCATION'] = "0.5" # fraction of free memory from nvtabular.loader.tensorflow import KerasSequenceLoader, KerasSequenceValidater
The data schema is defined with
tf.feature_columns
, the categorical input features (CATEGORICAL_COLUMNS
) are fed through an embedding layer, and the continuous input (CONTINUOUS_COLUMNS
) features are defined withnumeric_column
. TheEMBEDDING_TABLE_SHAPES
is a dictionary that contains cardinality and emb_size tuples for each categorical feature.
def make_categorical_embedding_column(name, dictionary_size, embedding_dim): return tf.feature_column.embedding_column( tf.feature_column.categorical_column_with_identity(name, dictionary_size), embedding_dim ) # instantiate the columns categorical_columns = [ make_categorical_embedding_column(name,*EMBEDDING_TABLE_SHAPES[name]) for name in CATEGORICAL_COLUMNS ] continuous_columns = [ tf.feature_column.numeric_column(name, (1,)) for name in CONTINUOUS_COLUMNS ]
The NVTabular dataloader is initialized. The NVTabular dataloader supports a list of filenames and glob pattern as input, which it will load and iterate over.
feature_columns
defines the data structure, which uses thetf.feature_column
structure that was previously defined. Thebatch_size
,label_names
(target columns),shuffle
, andbuffer_size
are defined.
TRAIN_PATHS = glob.glob("./train/*.parquet") train_dataset_tf = KerasSequenceLoader( TRAIN_PATHS, # you could also use a glob pattern feature_columns=categorical_columns + continuous_columns, batch_size=BATCH_SIZE, label_names=LABEL_COLUMNS, shuffle=True, buffer_size=0.06 # amount of data, as a fraction of GPU memory, to load at one time )
The TensorFlow Keras model (
tf.keras.Model
) is defined if a neural network architecture is created in whichinputs
are the input tensors andoutput
is the output tensors.
model = tf.keras.Model(inputs=inputs, outputs=output) model.compile('sgd', 'binary_crossentropy')
The model is trained with
model.fit
using the NVTabular dataloader.
history = model.fit(train_dataset_tf, epochs=5)
Note: If using the NVTabular dataloader for the validation dataset, a callback can be used for it.
valid_dataset_tf = KerasSequenceLoader(...) validation_callback = KerasSequenceValidater(valid_dataset_tf) history = model.fit(train_dataset_tf, callbacks=[validation_callback], epochs=5)
You can find additional examples in our repository such as MovieLens.