merlin.loader.tensorflow.Loader

class merlin.loader.tensorflow.Loader(dataset, batch_size, shuffle=True, seed_fn=None, parts_per_chunk=1, global_size=None, global_rank=None, drop_last=False)[source]

Bases: keras.utils.data_utils.Sequence, merlin.loader.loader_base.LoaderBase

Infinite generator used to asynchronously iterate through CSV or Parquet dataframes on GPU by leveraging an merlin.io.Dataset.

This lazily loads merlin.io.Dataset objects and outputs tabular dictionaries of TensorFlow Tensors via dlpack. Useful for training tabular models built in Keras and trained via tf.keras.Model.fit.

The data loading scheme is implemented by loading, preprocessing, and batching data in an asynchronous thread. The amount of randomness in shuffling is controlled by the buffer_size and parts_per_chunk kwargs. At load time, sub-chunks of data with size controlled by buffer_size are loaded from random partitions in the dataset, and parts_per_chunk of them are concatenated into a single chunk, shuffled, and split into batches. This means that each chunk has buffer_size*parts_per_chunk rows, and due to the asynchronous nature of the dataloader that means there are, including the batch being processed by your network, 3*buffer_*parts_per_chunk rows of data in GPU memory at any given time. This means that for a fixed memory budget, using more parts_per_chunk will come at the expense of smaller buffer_size, increasing the number of reads and reducing throughput. The goal should be to maximize the total amount of memory utilized at once without going OOM and with the fewest number of reads to meet your epoch-level randomness needs.

An important thing to note is that TensorFlow’s default behavior is to claim all GPU memory for itself at initialziation time, which leaves none for this class to load or preprocess data. As such, we attempt to configure TensorFlow to restrict its memory allocation on a given GPU using the environment variables TF_MEMORY_ALLOCATION and TF_VISIBLE_DEVICE. If TF_MEMORY_ALLOCATION < 1, it will be assumed that this refers to a fraction of free GPU memory on the given device. Otherwise, it will refer to an explicit allocation amount in MB. TF_VISIBLE_DEVICE should be an integer GPU index.

Iterator output is of the form (dict(features), list(labels)), where each element of the features dict is a feature_name: feature_tensor and each element of the labels list is a tensor, and all tensors are of shape (batch_size, 1).

Parameters
  • dataset (merlin.io.Dataset) – The dataset to load

  • batch_size (int) – Number of rows to yield at each iteration

  • shuffle (bool, default True) – Whether to shuffle chunks of batches before iterating through them.

  • seed_fn (callable) – Function used to initialize random state

  • parts_per_chunk (int) – Number of dataset partitions with size dictated by buffer_size to load and concatenate asynchronously. More partitions leads to better epoch-level randomness but can negatively impact throughput

  • global_size (int, optional) – When doing distributed training, this indicates the number of total processes that are training the model.

  • global_rank – When doing distributed training, this indicates the local rank for the current process.

  • drop_last (bool, default False) – Whether or not to drop the last batch in an epoch. This is useful when you need to guarantee that each batch contains exactly batch_size rows - since the last batch will usually contain fewer rows.

__init__(dataset, batch_size, shuffle=True, seed_fn=None, parts_per_chunk=1, global_size=None, global_rank=None, drop_last=False)[source]

Methods

__init__(dataset, batch_size[, shuffle, …])

epochs([epochs])

Create a dataloader that will efficiently run for more than one epoch.

make_tensors(gdf[, use_nnz])

Turns a gdf into tensor representation by column

map(fn)

Applying a function to each batch.

on_epoch_end()

Method called at the end of every epoch.

stop()

Halts and resets the initialization parameters of the dataloader.

map(fn)[source]

Applying a function to each batch.

This can for instance be used to add sample_weight to the model.