merlin.io.Dataset#

class merlin.io.Dataset(path_or_source, engine=None, npartitions=None, part_size=None, part_mem_fraction=None, storage_options=None, dtypes=None, client='auto', cpu=None, base_dataset=None, schema=None, **kwargs)[source]#

Bases: object

Universal external-data wrapper for NVTabular

The NVTabular Workflow and DataLoader-related APIs require all external data to be converted to the universal Dataset type. The main purpose of this class is to abstract away the raw format of the data, and to allow other NVTabular classes to reliably materialize a dask_cudf.DataFrame collection (and/or collection-based iterator) on demand.

A new Dataset object can be initialized from a variety of different raw-data formats. To initialize an object from a directory path or file list, the engine argument should be used to specify either “parquet” or “csv” format. If the first argument contains a list of files with a suffix of either “parquet” or “csv”, the engine can be inferred:

# Initialize Dataset with a parquet-dataset directory.
# must specify engine="parquet"
dataset = Dataset("/path/to/data_pq", engine="parquet")

# Initialize Dataset with list of csv files.
# engine="csv" argument is optional
dataset = Dataset(["file_0.csv", "file_1.csv"])

Since NVTabular leverages fsspec as a file-system interface, the underlying data can be stored either locally, or in a remote/cloud data store. To read from remote storage, like gds or s3, the appropriate protocol should be prepended to the Dataset path argument(s), and any special backend parameters should be passed in a storage_options dictionary:

# Initialize Dataset with s3 parquet data
dataset = Dataset(
    "s3://bucket/path",
    engine="parquet",
    storage_options={'anon': True, 'use_ssl': False},
)

By default, both parquet and csv-based data will be converted to a Dask-DataFrame collection with a maximum partition size of roughly 12.5 percent of the total memory on a single device. The partition size can be changed to a different fraction of total memory on a single device with the part_mem_fraction argument. Alternatively, a specific byte size can be specified with the part_size argument:

# Dataset partitions will be ~10% single-GPU memory (or smaller)
dataset = Dataset("bigfile.parquet", part_mem_fraction=0.1)

# Dataset partitions will be ~1GB (or smaller)
dataset = Dataset("bigfile.parquet", part_size="1GB")

Note that, if both the fractional and literal options are used at the same time, part_size will take precedence. Also, for parquet-formatted data, the partitioning is done at the row- group level, and the byte-size of the first row-group (after CuDF conversion) is used to map all other partitions. Therefore, if the distribution of row-group sizes is not uniform, the partition sizes will not be balanced.

In addition to handling data stored on disk, a Dataset object can also be initialized from an existing CuDF/Pandas DataFrame, or from a Dask-DataFrame collection (e.g. dask_cudf.DataFrame). For these in-memory formats, the size/number of partitions will not be modified. That is, a CuDF/Pandas DataFrame (or PyArrow Table) will produce a single-partition collection, while the number/size of a Dask-DataFrame collection will be preserved:

# Initialize from CuDF DataFrame (creates 1 partition)
gdf = cudf.DataFrame(...)
dataset = Dataset(gdf)

# Initialize from Dask-CuDF DataFrame (preserves partitions)
ddf = dask_cudf.read_parquet(...)
dataset = Dataset(ddf)

Since the Dataset API can both ingest and output a Dask collection, it is straightforward to transform data either before or after an NVTabular workflow is executed. This means that some complex pre-processing operations, that are not yet supported in NVTabular, can still be accomplished with the Dask-CuDF API:

# Sort input data before final Dataset initialization
# Warning: Global sorting requires significant device memory!
ddf = Dataset("/path/to/data_pq", engine="parquet").to_ddf()
ddf = ddf.sort_values("user_rank", ignore_index=True)
dataset = Dataset(ddf)

Dataset Optimization Tips (DOTs)

The NVTabular dataset should be created from Parquet files in order to get the best possible performance, preferably with a row group size of around 128MB. While NVTabular also supports reading from CSV files, reading CSV can be over twice as slow as reading from Parquet. Take a look at this notebook for an example of transforming the original Criteo CSV dataset into a new Parquet dataset optimized for use with NVTabular.

Parameters
  • path_or_source (str, list of str, or <dask.dataframe|cudf|pd>.DataFrame) – Dataset path (or list of paths), or a DataFrame. If string, should specify a specific file or directory path. If this is a directory path, the directory structure must be flat (nested directories are not yet supported).

  • engine (str or DatasetEngine) – DatasetEngine object or string identifier of engine. Current string options include: (“parquet”, “csv”, “avro”). This argument is ignored if path_or_source is a DataFrame type.

  • npartitions (int) – Desired number of Dask-collection partitions to produce in the to_ddf method when path_or_source corresponds to a DataFrame type. This argument is ignored for file-based path_or_source input.

  • part_size (str or int) – Desired size (in bytes) of each Dask partition. If None, part_mem_fraction will be used to calculate the partition size. Note that the underlying engine may allow other custom kwargs to override this argument. This argument is ignored if path_or_source is a DataFrame type.

  • part_mem_fraction (float (default 0.125)) – Fractional size of desired dask partitions (relative to GPU memory capacity). Ignored if part_size is passed directly. Note that the underlying engine may allow other custom kwargs to override this argument. This argument is ignored if path_or_source is a DataFrame type. If cpu=True, this value will be relative to the total host memory detected by the client process.

  • storage_options (None or dict) – Further parameters to pass to the bytes backend. This argument is ignored if path_or_source is a DataFrame type.

  • cpu (bool) – WARNING: Experimental Feature! Whether NVTabular should keep all data in cpu memory when the Dataset is converted to an internal Dask collection. The default value is False, unless cudf and dask_cudf are not installed (in which case the default is True). In the future, if True, NVTabular will NOT use any available GPU devices for down-stream processing. NOTE: Down-stream ops and output do not yet support a Dataset generated with cpu=True.

  • base_dataset (Dataset) – Optional reference to the original “base” Dataset object used to construct the current Dataset instance. This object is used to preserve file-partition mapping information.

  • schema (Schema) – Optional argument, to support custom user defined Schemas. This overrides the derived schema behavior.

  • **kwargs – Key-word arguments to pass through to Dask.dataframe IO function. For the Parquet engine(s), notable arguments include filters and aggregate_files (the latter is experimental).

__init__(path_or_source, engine=None, npartitions=None, part_size=None, part_mem_fraction=None, storage_options=None, dtypes=None, client='auto', cpu=None, base_dataset=None, schema=None, **kwargs)[source]#

Methods

__init__(path_or_source[, engine, ...])

compute(*args, **kwargs)

head(*args, **kwargs)

infer_schema([n])

Create a schema containing the column names and inferred dtypes of the Dataset

merge(left, right, **kwargs)

Merge two Dataset objects

persist(*args, **kwargs)

regenerate_dataset(output_path[, columns, ...])

EXPERIMENTAL: Regenerate an NVTabular Dataset for efficient processing by writing out new Parquet files.

repartition([npartitions, partition_size])

Repartition the underlying ddf, and return a new Dataset

sample_dtypes([n, annotate_lists])

Return the real dtypes of the Dataset

shuffle_by_keys(keys[, hive_data, npartitions])

Shuffle the in-memory Dataset so that all unique-key combinations are moved to the same partition.

tail(*args, **kwargs)

to_cpu()

to_ddf([columns, shuffle, seed])

Convert Dataset object to dask_cudf.DataFrame

to_gpu()

to_hugectr(output_path, cats, conts, labels)

Writes out to a hugectr dataset

to_iter([columns, indices, shuffle, seed, ...])

Convert Dataset object to a cudf.DataFrame iterator.

to_npy(output_file[, append])

Converts a dataset into an npy file, can append if data is larger than memory

to_parquet(output_path[, shuffle, ...])

Writes out to a parquet dataset

validate_dataset(**kwargs)

Validate for efficient processing.

Attributes

file_partition_map

npartitions

num_rows

partition_lens

to_ddf(columns=None, shuffle=False, seed=None)[source]#

Convert Dataset object to dask_cudf.DataFrame

Parameters
  • columns (str or list(str); default None) – Columns to include in output DataFrame. If not specified, the output will contain all known columns in the Dataset.

  • shuffle (bool; default False) – Whether to shuffle the order of partitions in the output dask_cudf.DataFrame. Note that this does not shuffle the rows within each partition. This is because the data is not actually loaded into memory for this operation.

  • seed (int; Optional) – The random seed to use if shuffle=True. If nothing is specified, the current system time will be used by the random std library.

property file_partition_map#
property partition_lens#
to_cpu()[source]#
to_gpu()[source]#
shuffle_by_keys(keys, hive_data=None, npartitions=None)[source]#

Shuffle the in-memory Dataset so that all unique-key combinations are moved to the same partition.

Parameters
  • keys (list(str)) – Column names to shuffle by.

  • hive_data (bool; default None) – Whether the dataset is backed by a hive-partitioned dataset (with the keys encoded in the directory structure). By default, the Dataset’s file_partition_map property will be inspected to infer this setting. When hive_data is True, the number of output partitions will correspond to the number of unique key combinations in the dataset.

  • npartitions (int; default None) – Number of partitions in the output Dataset. For hive-partitioned data, this value should be <= the number of unique key combinations (the default), otherwise it will be ignored. For data that is not hive-partitioned, the npartitions input should be <= the original partition count, otherwise it will be ignored.

repartition(npartitions=None, partition_size=None)[source]#

Repartition the underlying ddf, and return a new Dataset

Parameters
  • npartitions (int; default None) – Number of partitions in output Dataset. Only used if partition_size isn’t specified.

  • partition_size (int or str; default None) – Max number of bytes of memory for each partition. Use numbers or strings like ‘5MB’. If specified, npartitions will be ignored.

classmethod merge(left, right, **kwargs)[source]#

Merge two Dataset objects

Produces a new Dataset object. If the cpu Dataset attributes do not match, the right side will be modified. See Dask-Dataframe merge documentation for more information. Example usage:

ds_1 = Dataset("file.parquet")
ds_2 = Dataset(cudf.DataFrame(...))
ds_merged = Dataset.merge(ds_1, ds_2, on="foo", how="inner")
Parameters
  • left (Dataset) – Left-side Dataset object.

  • right (Dataset) – Right-side Dataset object.

  • **kwargs – Key-word arguments to be passed through to Dask-Dataframe.

to_iter(columns=None, indices=None, shuffle=False, seed=None, use_file_metadata=None, epochs=1)[source]#

Convert Dataset object to a cudf.DataFrame iterator.

Note that this method will use to_ddf to produce a dask_cudf.DataFrame, and materialize a single partition for each iteration.

Parameters
  • columns (str or list(str); default None) – Columns to include in each DataFrame. If not specified, the outputs will contain all known columns in the Dataset.

  • indices (list(int); default None) – A specific list of partition indices to iterate over. If nothing is specified, all partitions will be returned in order (or the shuffled order, if shuffle=True).

  • shuffle (bool; default False) – Whether to shuffle the order of dask_cudf.DataFrame partitions used by the iterator. If the indices argument is specified, those indices correspond to the partition indices AFTER the shuffle operation.

  • seed (int; Optional) – The random seed to use if shuffle=True. If nothing is specified, the current system time will be used by the random std library.

  • use_file_metadata (bool; Optional) – Whether to allow the returned DataFrameIter object to use file metadata from the base_dataset to estimate the row-count. By default, the file-metadata optimization will only be used if the current Dataset is backed by a file-based engine. Otherwise, it is possible that an intermediate transform has modified the row-count.

  • epochs (int) – Number of dataset passes to include within a single iterator. This option is used for multi-epoch data-loading. Default is 1.

to_parquet(output_path, shuffle=None, preserve_files=False, output_files=None, out_files_per_proc=None, row_group_size=None, num_threads=0, dtypes=None, cats=None, conts=None, labels=None, suffix='.parquet', partition_on=None, method='subgraph', write_hugectr_keyset=False)[source]#

Writes out to a parquet dataset

Parameters
  • output_path (string) – Path to write processed/shuffled output data

  • shuffle (merlin.io.Shuffle enum) – How to shuffle the output dataset. For all options, other than None (which means no shuffling), the partitions of the underlying dataset/ddf will be randomly ordered. If PER_PARTITION is specified, each worker/process will also shuffle the rows within each partition before splitting and appending the data to a number (out_files_per_proc) of output files. Output files are distinctly mapped to each worker process. If PER_WORKER is specified, each worker will follow the same procedure as PER_PARTITION, but will re-shuffle each file after all data is persisted. This results in a full shuffle of the data processed by each worker. To improve performance, this option currently uses host-memory BytesIO objects for the intermediate persist stage. The FULL option is not yet implemented.

  • partition_on (str or list(str)) – Columns to use for hive-partitioning. If this option is used, preserve_files, output_files, and out_files_per_proc cannot be specified, and method will be ignored. Also, the PER_WORKER shuffle will not be supported.

  • preserve_files (bool) – Whether to preserve the original file-to-partition mapping of the base dataset. This option requires method=”subgraph”, and is only available if the base dataset is known, and if it corresponds to csv or parquet format. If True, the out_files_per_proc option will be ignored. Default is False.

  • output_files (dict, list or int) – The total number of desired output files. This option requires method=”subgraph”. When out_files_per_proc=None, the default is the number of underlying Dask partitions. When out_files_per_proc is set to an integer, the default is the product of that integer and the total number of workers in the Dask cluster. For further output-file control, this argument may also be used to pass a dictionary mapping the output file names to partition indices, or a list of desired output-file names.

  • out_files_per_proc (integer) – Number of output files that each process will use to shuffle an input partition. Default is 1. If method=”worker”, the total number of output files will always be the total number of Dask workers, multiplied by this argument. If method=”subgraph”, the total number of files is determined by output_files (and out_files_per_proc must be 1 if a dictionary is specified).

  • row_group_size (integer) – Maximum number of rows to include in each Parquet row-group. By default, the maximum row-group size will be chosen by the backend Parquet engine (cudf or pyarrow). Note that cudf currently prohibits this value from being less than 5000 rows. If smaller row-groups are necessary, try calling to_cpu() before writing to disk.

  • num_threads (integer) – Number of IO threads to use for writing the output dataset. For 0 (default), no dedicated IO threads will be used.

  • dtypes (dict) – Dictionary containing desired datatypes for output columns. Keys are column names, values are datatypes.

  • suffix (str or False) – File-name extension to use for all output files. This argument is ignored if a specific list of file names is specified using the output_files option. If preserve_files=True, this suffix will be appended to the original name of each file, unless the original extension is “.csv”, “.parquet”, “.avro”, or “.orc” (in which case the old extension will be replaced).

  • cats (list of str, optional) – List of categorical columns

  • conts (list of str, optional) – List of continuous columns

  • labels (list of str, optional) – List of label columns

  • method ({"subgraph", "worker"}) – General algorithm to use for the parallel graph execution. In order to minimize memory pressure, to_parquet will use a “subgraph” by default. This means that we segment the full Dask task graph into a distinct subgraph for each output file (or output-file group). Then, each of these subgraphs is executed, in full, by the same worker (as a single large task). In some cases, it may be more ideal to prioritize concurrency. In that case, a worker-based approach can be used by specifying method=”worker”.

  • write_hugectr_keyset (bool, optional) – Whether to write a HugeCTR keyset output file (“_hugectr.keyset”). Writing this file can be very slow, and should only be done if you are planning to ingest the output data with HugeCTR. Default is False.

to_hugectr(output_path, cats, conts, labels, shuffle=None, file_partition_map=None, out_files_per_proc=None, num_threads=0, dtypes=None)[source]#

Writes out to a hugectr dataset

Parameters
  • output_path (string) – Path to write processed/shuffled output data

  • cats (list of str) – List of categorical columns

  • conts (list of str) – List of continuous columns

  • labels (list of str) – List of label columns

  • shuffle (merlin.io.Shuffle, optional) – How to shuffle the output dataset. Shuffling is only performed if the data is written to disk. For all options, other than None (which means no shuffling), the partitions of the underlying dataset/ddf will be randomly ordered. If PER_PARTITION is specified, each worker/process will also shuffle the rows within each partition before splitting and appending the data to a number (out_files_per_proc) of output files. Output files are distinctly mapped to each worker process. If PER_WORKER is specified, each worker will follow the same procedure as PER_PARTITION, but will re-shuffle each file after all data is persisted. This results in a full shuffle of the data processed by each worker. To improve performance, this option currently uses host-memory BytesIO objects for the intermediate persist stage. The FULL option is not yet implemented.

  • file_partition_map (dict) – Dictionary mapping of output file names to partition indices that should be written to that file name. If this argument is passed, only the partitions included in the dictionary will be written to disk, and the output_files_per_proc argument will be ignored.

  • out_files_per_proc (integer) – Number of files to create (per process) after shuffling the data

  • num_threads (integer) – Number of IO threads to use for writing the output dataset. For 0 (default), no dedicated IO threads will be used.

  • dtypes (dict) – Dictionary containing desired datatypes for output columns. Keys are column names, values are datatypes.

compute(*args, **kwargs)#
head(*args, **kwargs)#
persist(*args, **kwargs)#
tail(*args, **kwargs)#
to_npy(output_file: str, append: bool = False)[source]#

Converts a dataset into an npy file, can append if data is larger than memory

Parameters
  • output_file (str) – The output file path for the resulting npy file

  • append (bool, optional) – Enables append mode for larger that memory data, by default False

property num_rows#
property npartitions#
validate_dataset(**kwargs)[source]#

Validate for efficient processing.

The purpose of this method is to validate that the Dataset object meets the minimal requirements for efficient NVTabular processing. For now, this criteria requires the data to be in parquet format.

Example Usage:

dataset = Dataset("/path/to/data_pq", engine="parquet")
assert validate_dataset(dataset)
Parameters

**kwargs – Key-word arguments to pass down to the engine’s validate_dataset method. For the recommended parquet format, these arguments include add_metadata_file, row_group_max_size, file_min_size, and require_metadata_file. For more information, see ParquetDatasetEngine.validate_dataset.

Returns

validTrue if the input dataset is valid for efficient NVTabular processing.

Return type

bool

regenerate_dataset(output_path, columns=None, output_format='parquet', compute=True, **kwargs)[source]#

EXPERIMENTAL: Regenerate an NVTabular Dataset for efficient processing by writing out new Parquet files. In contrast to default to_parquet behavior, this method preserves the original ordering.

Example Usage:

dataset = Dataset("/path/to/data_pq", engine="parquet")
dataset.regenerate_dataset(
    out_path, part_size="1MiB", file_size="10MiB"
)
Parameters
  • output_path (string) – Root directory path to use for the new (regenerated) dataset.

  • columns (list(string), optional) – Subset of columns to include in the regenerated dataset.

  • output_format (string, optional) – Format to use for regenerated dataset. Only “parquet” (default) is currently supported.

  • compute (bool, optional) – Whether to compute the task graph or to return a Delayed object. By default, the graph will be executed.

  • **kwargs – Key-word arguments to pass down to the engine’s regenerate_dataset method. See ParquetDatasetEngine.regenerate_dataset for more information.

Returns

result – If compute=True (default), the return value will be an integer corresponding to the number of generated data files. If False, the returned value will be a Delayed object.

Return type

int or Delayed

infer_schema(n=1)[source]#

Create a schema containing the column names and inferred dtypes of the Dataset

Parameters

n (int, optional) – Number of rows to sample to infer the dtypes. Defaults to 1.

sample_dtypes(n=1, annotate_lists=False)[source]#

Return the real dtypes of the Dataset

Use cached metadata if this operation was already performed. Otherwise, call down to the underlying engine for sampling logic.