#
# Copyright (c) 2021, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import logging
import os
from typing import Optional, Protocol, Union
import dask.dataframe as dd
import numpy as np
import tensorflow as tf
import merlin.dataloader.tensorflow
from merlin.core.dispatch import HAS_GPU
from merlin.dataloader.tf_utils import get_dataset_schema_from_feature_columns
from merlin.io import Dataset
from merlin.models.loader.backend import _augment_schema
from merlin.models.tf.distributed.backend import hvd, hvd_installed
from merlin.models.utils.schema_utils import select_targets
from merlin.schema import Schema, Tags
LOG = logging.getLogger("merlin.models")
# pylint has issues with TF array ops, so disable checks until fixed:
# https://github.com/PyCQA/pylint/issues/3613
# pylint: disable=no-value-for-parameter,unexpected-keyword-arg,redundant-keyword-arg
dd_engine = {
"parquet": dd.read_parquet,
"csv": dd.read_csv,
"df": dd.DataFrame,
}
def _validate_dataset(paths_or_dataset, batch_size, buffer_size, engine, device, reader_kwargs):
# TODO: put this in parent class and allow
# torch dataset to leverage as well?
# if a dataset was passed, just return it
if hasattr(paths_or_dataset, "schema"):
return paths_or_dataset
# otherwise initialize a dataset
# from paths or glob pattern
if isinstance(paths_or_dataset, str):
files = tf.io.gfile.glob(paths_or_dataset)
parent, file = os.path.split(paths_or_dataset)
_is_empty_msg = f"Couldn't find file pattern {file} in directory {parent}"
else:
# TODO: some checking around attribute
# error here?
files = list(paths_or_dataset)
_is_empty_msg = "paths_or_dataset list must contain at least one filename"
assert isinstance(files, list)
if len(files) == 0:
raise ValueError(_is_empty_msg)
if not engine:
# default engine is parquet
engine = "parquet"
cpu = device and "cpu" in device
return Dataset(files, engine=engine, cpu=cpu)
def _validate_schema(feature_columns, cat_names, cont_names, label_names, schema=None):
_uses_feature_columns = feature_columns is not None
_uses_explicit_schema = (cat_names is not None) or (cont_names is not None)
cat_tag_names = (
schema.select_by_tag([Tags.CATEGORICAL]).excluding_by_tag(Tags.TARGET).column_names
if schema
else []
)
cont_tag_names = (
schema.select_by_tag([Tags.CONTINUOUS]).excluding_by_tag(Tags.TARGET).column_names
if schema
else []
)
label_names = label_names or []
_uses_dataset_schema = cat_tag_names or cont_tag_names
if _uses_feature_columns and _uses_explicit_schema:
raise ValueError(
"Passed `feature_column`s and explicit column names, must be one or the other"
)
elif _uses_feature_columns:
cat_names, cont_names = get_dataset_schema_from_feature_columns(feature_columns)
return cat_names, cont_names, label_names
elif _uses_explicit_schema:
cat_names = cat_names or []
cont_names = cont_names or []
return cat_names, cont_names, label_names
elif _uses_dataset_schema:
label_names = label_names or select_targets(schema).column_names
return cat_tag_names, cont_tag_names, label_names
else:
raise ValueError(
"Must either pass a list of TensorFlow `feature_column`s "
"or explicit `cat_name` and `cont_name` column name lists."
)
def _get_schema(dataset):
if hasattr(dataset, "schema"):
return dataset.schema
return None
class SchemaAwareTransform(Protocol):
def __call__(self, *args, **kwargs):
...
def compute_output_schema(self, schema: Schema):
...
[docs]class Loader(merlin.dataloader.tensorflow.Loader):
"""
Override class to customize data loading for backward compatibility with
older NVTabular releases.
In most cases, when you use the `merlin.io.Dataset` class, data loading
for model training and evaluation is performed automatically by Merlin Models.
Infinite generator used to asynchronously iterate through CSV or Parquet
dataframes on GPU by leveraging an NVTabular `Dataset`. Applies preprocessing
via NVTabular `Workflow` objects and outputs tabular dictionaries of TensorFlow
Tensors via `dlpack <https://github.com/dmlc/dlpack>`_. Useful for training tabular models
built in Keras and trained via
`tf.keras.Model.fit <https://www.tensorflow.org/api_docs/python/tf/keras/Model>`_.
The data loading scheme is implemented by loading, preprocessing, and
batching data in an asynchronous thread. The amount of randomness in
shuffling is controlled by the `buffer_size` and `parts_per_chunk`
kwargs. At load time, sub-chunks of data with size controlled by
`buffer_size` are loaded from random partitions in the dataset,
and `parts_per_chunk` of them are concatenated into a single chunk,
shuffled, and split into batches. This means that each chunk has
`buffer_size*parts_per_chunk` rows, and due to the asynchronous
nature of the dataloader that means there are, including the batch
being processed by your network, `3*buffer_size*parts_per_chunk`
rows of data in GPU memory at any given time. This means that
for a fixed memory budget, using more `parts_per_chunk` will
come at the expense of smaller `buffer_size`, increasing the number
of reads and reducing throughput. The goal should be to maximize the
total amount of memory utilized at once without going OOM and with
the fewest number of reads to meet your epoch-level randomness needs.
An important thing to note is that TensorFlow's default behavior
is to claim all GPU memory for itself at initialziation time,
which leaves none for NVTabular to load or preprocess data.
As such, we attempt to configure TensorFlow to restrict
its memory allocation on a given GPU using the environment variables
`TF_MEMORY_ALLOCATION` and `TF_VISIBLE_DEVICE`. If `TF_MEMORY_ALLOCATION < 1`,
it will be assumed that this refers to a fraction of free GPU
memory on the given device. Otherwise, it will refer to an explicit
allocation amount in MB. `TF_VISIBLE_DEVICE` should be an integer GPU
index.
Iterator output is of the form `(dict(features), list(labels))`,
where each element of the features dict is a
`feature_name: feature_tensor` and each elemtn of the labels
list is a tensor, and all tensors are of shape `(batch_size, 1)`.
Note that this means vectorized continuous and multi-hot categorical
features are not currently supported.
The underlying NVTabular `Dataset` object is stored in the `data`
attribute, and should be used for updating NVTabular `Workflow`
statistics::
workflow = nvt.Workflow(...)
dataset = KerasSequenceLoader(...)
workflow.update_stats(dataset.data.to_iter(), record_stats=True)
Parameters
-------------
paths_or_dataset: str or list(str)
Either a string representing a file pattern (see `tf.glob` for
pattern rules), a list of filenames to be iterated through, or
a Dataset object, in which case `buffer_size`, `engine`, and
`reader_kwargs` will be ignored
batch_size: int
Number of samples to yield at each iteration
label_names: list(str)
Column name of the target variable in the dataframe specified by
`paths_or_dataset`
feature_columns: list(tf.feature_column) or None
A list of TensorFlow feature columns representing the inputs
exposed to the model to be trained. Columns with parent columns
will climb the parent tree, and the names of the columns in the
unique set of terminal columns will be used as the column names.
If left as None, must specify `cat_names` and `cont_names`
cat_names: list(str) or None
List of categorical column names. Ignored if `feature_columns` is
specified
cont_names: list(str) or None
List of continuous column names. Ignored if `feature_columns` is
specified
engine: {'csv', 'parquet', None}, default None
String specifying the type of read engine to use. If left as `None`,
will try to infer the engine type from the file extension.
shuffle: bool, default True
Whether to shuffle chunks of batches before iterating through them.
buffer_size: float or int
If `0 < buffer_size < 1`, `buffer_size` will refer to the fraction of
total GPU memory to occupy with a buffered chunk. If `1 < buffer_size <
batch_size`, the number of rows read for a buffered chunk will
be equal to `int(buffer_size*batch_size)`. Otherwise, if `buffer_size >
batch_size`, `buffer_size` rows will be read in each chunk (except for
the last chunk in a dataset, which will, in general, be smaller).
Larger chunk sizes will lead to more efficiency and randomness,
but require more memory.
device: None
Which GPU device to load from. Ignored for now
parts_per_chunk: int
Number of dataset partitions with size dictated by `buffer_size`
to load and concatenate asynchronously. More partitions leads to
better epoch-level randomness but can negatively impact throughput
reader_kwargs: dict
extra kwargs to pass when instantiating the underlying
`nvtabular.Dataset`
sparse_list : list(str) or None
list with column names of columns that should be represented as sparse tensors
sparse_max : dict
dictionary of key: column_name + value: integer representing max sequence length for column
sparse_as_dense : bool
bool value to activate transforming sparse tensors to dense
"""
[docs] def __init__(
self,
paths_or_dataset,
batch_size,
label_names=None,
feature_columns=None,
cat_names=None,
cont_names=None,
engine=None,
shuffle=True,
seed_fn=None,
buffer_size=0.1,
device=None,
parts_per_chunk=1,
reader_kwargs=None,
global_size=None,
global_rank=None,
drop_last=False,
sparse_names=None,
sparse_max=None,
sparse_as_dense=False,
schema=None,
**loader_kwargs,
):
dataset = _validate_dataset(
paths_or_dataset, batch_size, buffer_size, engine, device, reader_kwargs
)
if schema:
dataset.schema = schema
cat_names = cat_names or (
dataset.schema.select_by_tag(Tags.CATEGORICAL)
.excluding_by_tag(Tags.TARGET)
.column_names
if _get_schema(dataset)
else []
)
cont_names = cont_names or (
dataset.schema.select_by_tag(Tags.CONTINUOUS).excluding_by_tag(Tags.TARGET).column_names
if _get_schema(dataset)
else []
)
label_names = label_names or (
dataset.schema.select_by_tag(Tags.TARGET).column_names if _get_schema(dataset) else []
)
cat_names, cont_names, label_names = _validate_schema(
feature_columns, cat_names, cont_names, label_names, schema=dataset.schema
)
dataset.schema = _augment_schema(
dataset.schema,
cat_names,
cont_names,
label_names,
sparse_names,
sparse_max,
sparse_as_dense,
)
device = "cpu" if not HAS_GPU else device
if hvd_installed and hvd.size() > 1:
device = hvd.local_rank()
global_size = global_size or hvd.size()
global_rank = global_rank or hvd.rank()
seed_fn = seed_fn or get_default_hvd_seed_fn()
super().__init__(
dataset,
batch_size,
shuffle=shuffle,
seed_fn=seed_fn,
parts_per_chunk=parts_per_chunk,
device=device,
global_size=global_size,
global_rank=global_rank,
drop_last=drop_last,
**loader_kwargs,
)
# Override these parameters after initializing the parent dataloader
# class since the new dataloader will use sparse tensors for list
# columns by default, but sparse tensors were disabled by default
# and were optional in the old version of merlin.loader.
self.sparse_names = sparse_names or []
self.sparse_max = sparse_max or {}
self.sparse_as_dense = sparse_as_dense
@property
def output_schema(self) -> Schema:
output_schema = super().output_schema
for map_fn in self._map_fns:
if hasattr(map_fn, "compute_output_schema"):
output_schema = map_fn.compute_output_schema(output_schema)
else:
raise ValueError(
f"Couldn't infer schema from transform {map_fn}. "
"Please implement the `compute_output_schema` method on "
"the transform layer."
)
return output_schema
@property
def has_transforms(self) -> bool:
"""Returns True if Loader has transforms or map functions.
Returns
-------
bool
True if Loader has transforms or map functions, otherwise False.
"""
return len(self._map_fns) > 0 or self.transforms is not None
KerasSequenceValidater = (
KerasSequenceValidator
) = merlin.dataloader.tensorflow.KerasSequenceValidater
def sample_batch(
dataset_or_loader: Union[Dataset, Loader],
batch_size: Optional[int] = None,
shuffle: Optional[bool] = False,
include_targets: Optional[bool] = True,
prepare_features: Optional[bool] = True,
):
"""Util function to generate a batch of input tensors from a merlin.io.Dataset instance
Parameters
----------
data: merlin.io.dataset
A Dataset object.
batch_size: int
Number of samples to return.
shuffle: bool
Whether to sample a random batch or not, by default False.
include_targets: bool
Whether to include the targets in the returned batch, by default True.
prepare_features: bool
Whether to prepare features from dataloader for the model, by default False.
If enabled, it converts multi-hot/list features to dense or ragged based on the schema.
It also ensures that scalar features are converted to 2D (batch size, 1).
P.s. The features are automatically prepared by InputBlockV2 if it is used
Returns
-------
batch: Dict[tf.tensor]
dictionary of input tensors.
"""
from merlin.models.tf.transforms.features import PrepareFeatures
if isinstance(dataset_or_loader, Dataset):
if not batch_size:
raise ValueError("Either use 'Loader' or specify 'batch_size'")
loader = Loader(dataset_or_loader, batch_size=batch_size, shuffle=shuffle)
else:
loader = dataset_or_loader
batch = loader.peek()
# batch could be of type Prediction, so we can't unpack directly
inputs, targets = batch[0], batch[1]
if prepare_features:
pf = PrepareFeatures(loader.output_schema)
if targets is not None:
inputs, targets = pf(inputs, targets=targets)
else:
inputs = pf(inputs)
if not include_targets:
return inputs
return inputs, targets
def get_default_hvd_seed_fn(seed=None):
"""
Generate consistent dataloader shuffle seeds across workers
Reseeds each worker's dataloader each epoch to get fresh a shuffle
that's consistent across workers.
"""
if HAS_GPU:
import cupy
cupy.random.seed(seed)
else:
np.random.seed(seed)
if hvd_installed:
import horovod
else:
raise ImportError("'horovod' is required to use this function.")
def _seed_fn():
min_int, max_int = tf.int32.limits
max_rand = max_int // horovod.tensorflow.keras.size()
# Generate a seed fragment on each worker
if HAS_GPU:
seed_fragment = cupy.random.randint(0, max_rand).get()
else:
seed_fragment = np.random.randint(0, max_rand)
# Aggregate seed fragments from all Horovod workers
seed_tensor = tf.constant(seed_fragment)
reduced_seed = hvd.allreduce(
seed_tensor, name="shuffle_seed", op=horovod.tensorflow.mpi_ops.Sum
)
return reduced_seed % max_rand
return _seed_fn