merlin.io package#
- class merlin.io.Dataset(path_or_source, engine=None, npartitions=None, part_size=None, part_mem_fraction=None, storage_options=None, dtypes=None, client='auto', cpu=None, base_dataset=None, schema=None, **kwargs)[source]#
Bases:
object
Universal external-data wrapper for NVTabular
The NVTabular Workflow and DataLoader-related APIs require all external data to be converted to the universal Dataset type. The main purpose of this class is to abstract away the raw format of the data, and to allow other NVTabular classes to reliably materialize a dask_cudf.DataFrame collection (and/or collection-based iterator) on demand.
A new Dataset object can be initialized from a variety of different raw-data formats. To initialize an object from a directory path or file list, the engine argument should be used to specify either “parquet” or “csv” format. If the first argument contains a list of files with a suffix of either “parquet” or “csv”, the engine can be inferred:
# Initialize Dataset with a parquet-dataset directory. # must specify engine="parquet" dataset = Dataset("/path/to/data_pq", engine="parquet") # Initialize Dataset with list of csv files. # engine="csv" argument is optional dataset = Dataset(["file_0.csv", "file_1.csv"])
Since NVTabular leverages fsspec as a file-system interface, the underlying data can be stored either locally, or in a remote/cloud data store. To read from remote storage, like gds or s3, the appropriate protocol should be prepended to the Dataset path argument(s), and any special backend parameters should be passed in a storage_options dictionary:
# Initialize Dataset with s3 parquet data dataset = Dataset( "s3://bucket/path", engine="parquet", storage_options={'anon': True, 'use_ssl': False}, )
By default, both parquet and csv-based data will be converted to a Dask-DataFrame collection with a maximum partition size of roughly 12.5 percent of the total memory on a single device. The partition size can be changed to a different fraction of total memory on a single device with the part_mem_fraction argument. Alternatively, a specific byte size can be specified with the part_size argument:
# Dataset partitions will be ~10% single-GPU memory (or smaller) dataset = Dataset("bigfile.parquet", part_mem_fraction=0.1) # Dataset partitions will be ~1GB (or smaller) dataset = Dataset("bigfile.parquet", part_size="1GB")
Note that, if both the fractional and literal options are used at the same time, part_size will take precedence. Also, for parquet-formatted data, the partitioning is done at the row- group level, and the byte-size of the first row-group (after CuDF conversion) is used to map all other partitions. Therefore, if the distribution of row-group sizes is not uniform, the partition sizes will not be balanced.
In addition to handling data stored on disk, a Dataset object can also be initialized from an existing CuDF/Pandas DataFrame, or from a Dask-DataFrame collection (e.g. dask_cudf.DataFrame). For these in-memory formats, the size/number of partitions will not be modified. That is, a CuDF/Pandas DataFrame (or PyArrow Table) will produce a single-partition collection, while the number/size of a Dask-DataFrame collection will be preserved:
# Initialize from CuDF DataFrame (creates 1 partition) gdf = cudf.DataFrame(...) dataset = Dataset(gdf) # Initialize from Dask-CuDF DataFrame (preserves partitions) ddf = dask_cudf.read_parquet(...) dataset = Dataset(ddf)
Since the Dataset API can both ingest and output a Dask collection, it is straightforward to transform data either before or after an NVTabular workflow is executed. This means that some complex pre-processing operations, that are not yet supported in NVTabular, can still be accomplished with the Dask-CuDF API:
# Sort input data before final Dataset initialization # Warning: Global sorting requires significant device memory! ddf = Dataset("/path/to/data_pq", engine="parquet").to_ddf() ddf = ddf.sort_values("user_rank", ignore_index=True) dataset = Dataset(ddf)
Dataset Optimization Tips (DOTs)
The NVTabular dataset should be created from Parquet files in order to get the best possible performance, preferably with a row group size of around 128MB. While NVTabular also supports reading from CSV files, reading CSV can be over twice as slow as reading from Parquet. Take a look at this notebook for an example of transforming the original Criteo CSV dataset into a new Parquet dataset optimized for use with NVTabular.
- Parameters:
path_or_source (str, list of str, or <dask.dataframe|cudf|pd>.DataFrame) – Dataset path (or list of paths), or a DataFrame. If string, should specify a specific file or directory path. If this is a directory path, the directory structure must be flat (nested directories are not yet supported).
engine (str or DatasetEngine) – DatasetEngine object or string identifier of engine. Current string options include: (“parquet”, “csv”, “avro”). This argument is ignored if path_or_source is a DataFrame type.
npartitions (int) – Desired number of Dask-collection partitions to produce in the
to_ddf
method whenpath_or_source
corresponds to a DataFrame type. This argument is ignored for file-basedpath_or_source
input.part_size (str or int) – Desired size (in bytes) of each Dask partition. If None, part_mem_fraction will be used to calculate the partition size. Note that the underlying engine may allow other custom kwargs to override this argument. This argument is ignored if path_or_source is a DataFrame type.
part_mem_fraction (float (default 0.125)) – Fractional size of desired dask partitions (relative to GPU memory capacity). Ignored if part_size is passed directly. Note that the underlying engine may allow other custom kwargs to override this argument. This argument is ignored if path_or_source is a DataFrame type. If
cpu=True
, this value will be relative to the total host memory detected by the client process.storage_options (None or dict) – Further parameters to pass to the bytes backend. This argument is ignored if path_or_source is a DataFrame type.
cpu (bool) – WARNING: Experimental Feature! Whether NVTabular should keep all data in cpu memory when the Dataset is converted to an internal Dask collection. The default value is False, unless
cudf
anddask_cudf
are not installed (in which case the default is True). In the future, if True, NVTabular will NOT use any available GPU devices for down-stream processing. NOTE: Down-stream ops and output do not yet support a Dataset generated withcpu=True
.base_dataset (Dataset) – Optional reference to the original “base” Dataset object used to construct the current Dataset instance. This object is used to preserve file-partition mapping information.
schema (Schema) – Optional argument, to support custom user defined Schemas. This overrides the derived schema behavior.
**kwargs – Key-word arguments to pass through to Dask.dataframe IO function. For the Parquet engine(s), notable arguments include filters and aggregate_files (the latter is experimental).
- to_ddf(columns=None, shuffle=False, seed=None)[source]#
Convert Dataset object to dask_cudf.DataFrame
- Parameters:
columns (str or list(str); default None) – Columns to include in output DataFrame. If not specified, the output will contain all known columns in the Dataset.
shuffle (bool; default False) – Whether to shuffle the order of partitions in the output dask_cudf.DataFrame. Note that this does not shuffle the rows within each partition. This is because the data is not actually loaded into memory for this operation.
seed (int; Optional) – The random seed to use if shuffle=True. If nothing is specified, the current system time will be used by the random std library.
- property file_partition_map#
- property partition_lens#
- shuffle_by_keys(keys, hive_data=None, npartitions=None)[source]#
Shuffle the in-memory Dataset so that all unique-key combinations are moved to the same partition.
- Parameters:
hive_data (bool; default None) – Whether the dataset is backed by a hive-partitioned dataset (with the keys encoded in the directory structure). By default, the Dataset’s file_partition_map property will be inspected to infer this setting. When hive_data is True, the number of output partitions will correspond to the number of unique key combinations in the dataset.
npartitions (int; default None) – Number of partitions in the output Dataset. For hive-partitioned data, this value should be <= the number of unique key combinations (the default), otherwise it will be ignored. For data that is not hive-partitioned, the
npartitions
input should be <= the original partition count, otherwise it will be ignored.
- repartition(npartitions=None, partition_size=None)[source]#
Repartition the underlying ddf, and return a new Dataset
- Parameters:
npartitions (int; default None) – Number of partitions in output
Dataset
. Only used ifpartition_size
isn’t specified.partition_size (int or str; default None) – Max number of bytes of memory for each partition. Use numbers or strings like ‘5MB’. If specified,
npartitions
will be ignored.
- classmethod merge(left, right, **kwargs)[source]#
Merge two Dataset objects
Produces a new Dataset object. If the
cpu
Dataset attributes do not match, the right side will be modified. See Dask-Dataframemerge
documentation for more information. Example usage:ds_1 = Dataset("file.parquet") ds_2 = Dataset(cudf.DataFrame(...)) ds_merged = Dataset.merge(ds_1, ds_2, on="foo", how="inner")
- to_iter(columns=None, indices=None, shuffle=False, seed=None, use_file_metadata=None, epochs=1)[source]#
Convert Dataset object to a cudf.DataFrame iterator.
Note that this method will use to_ddf to produce a dask_cudf.DataFrame, and materialize a single partition for each iteration.
- Parameters:
columns (str or list(str); default None) – Columns to include in each DataFrame. If not specified, the outputs will contain all known columns in the Dataset.
indices (list(int); default None) – A specific list of partition indices to iterate over. If nothing is specified, all partitions will be returned in order (or the shuffled order, if shuffle=True).
shuffle (bool; default False) – Whether to shuffle the order of dask_cudf.DataFrame partitions used by the iterator. If the indices argument is specified, those indices correspond to the partition indices AFTER the shuffle operation.
seed (int; Optional) – The random seed to use if shuffle=True. If nothing is specified, the current system time will be used by the random std library.
use_file_metadata (bool; Optional) – Whether to allow the returned
DataFrameIter
object to use file metadata from thebase_dataset
to estimate the row-count. By default, the file-metadata optimization will only be used if the current Dataset is backed by a file-based engine. Otherwise, it is possible that an intermediate transform has modified the row-count.epochs (int) – Number of dataset passes to include within a single iterator. This option is used for multi-epoch data-loading. Default is 1.
- to_parquet(output_path, shuffle=None, preserve_files=False, output_files=None, out_files_per_proc=None, row_group_size=None, num_threads=0, dtypes=None, cats=None, conts=None, labels=None, suffix='.parquet', partition_on=None, method='subgraph', write_hugectr_keyset=False)[source]#
Writes out to a parquet dataset
- Parameters:
output_path (string) – Path to write processed/shuffled output data
shuffle (merlin.io.Shuffle enum) – How to shuffle the output dataset. For all options, other than None (which means no shuffling), the partitions of the underlying dataset/ddf will be randomly ordered. If PER_PARTITION is specified, each worker/process will also shuffle the rows within each partition before splitting and appending the data to a number (out_files_per_proc) of output files. Output files are distinctly mapped to each worker process. If PER_WORKER is specified, each worker will follow the same procedure as PER_PARTITION, but will re-shuffle each file after all data is persisted. This results in a full shuffle of the data processed by each worker. To improve performance, this option currently uses host-memory BytesIO objects for the intermediate persist stage. The FULL option is not yet implemented.
partition_on (str or list(str)) – Columns to use for hive-partitioning. If this option is used, preserve_files, output_files, and out_files_per_proc cannot be specified, and method will be ignored. Also, the PER_WORKER shuffle will not be supported.
preserve_files (bool) – Whether to preserve the original file-to-partition mapping of the base dataset. This option requires method=”subgraph”, and is only available if the base dataset is known, and if it corresponds to csv or parquet format. If True, the out_files_per_proc option will be ignored. Default is False.
output_files (dict, list or int) – The total number of desired output files. This option requires method=”subgraph”. When out_files_per_proc=None, the default is the number of underlying Dask partitions. When out_files_per_proc is set to an integer, the default is the product of that integer and the total number of workers in the Dask cluster. For further output-file control, this argument may also be used to pass a dictionary mapping the output file names to partition indices, or a list of desired output-file names.
out_files_per_proc (integer) – Number of output files that each process will use to shuffle an input partition. Default is 1. If method=”worker”, the total number of output files will always be the total number of Dask workers, multiplied by this argument. If method=”subgraph”, the total number of files is determined by output_files (and out_files_per_proc must be 1 if a dictionary is specified).
row_group_size (integer) – Maximum number of rows to include in each Parquet row-group. By default, the maximum row-group size will be chosen by the backend Parquet engine (cudf or pyarrow). Note that cudf currently prohibits this value from being less than 5000 rows. If smaller row-groups are necessary, try calling to_cpu() before writing to disk.
num_threads (integer) – Number of IO threads to use for writing the output dataset. For 0 (default), no dedicated IO threads will be used.
dtypes (dict) – Dictionary containing desired datatypes for output columns. Keys are column names, values are datatypes.
suffix (str or False) – File-name extension to use for all output files. This argument is ignored if a specific list of file names is specified using the
output_files
option. Ifpreserve_files=True
, this suffix will be appended to the original name of each file, unless the original extension is “.csv”, “.parquet”, “.avro”, or “.orc” (in which case the old extension will be replaced).method ({"subgraph", "worker"}) – General algorithm to use for the parallel graph execution. In order to minimize memory pressure, to_parquet will use a “subgraph” by default. This means that we segment the full Dask task graph into a distinct subgraph for each output file (or output-file group). Then, each of these subgraphs is executed, in full, by the same worker (as a single large task). In some cases, it may be more ideal to prioritize concurrency. In that case, a worker-based approach can be used by specifying method=”worker”.
write_hugectr_keyset (bool, optional) – Whether to write a HugeCTR keyset output file (“_hugectr.keyset”). Writing this file can be very slow, and should only be done if you are planning to ingest the output data with HugeCTR. Default is False.
- to_hugectr(output_path, cats, conts, labels, shuffle=None, file_partition_map=None, out_files_per_proc=None, num_threads=0, dtypes=None)[source]#
Writes out to a hugectr dataset
- Parameters:
output_path (string) – Path to write processed/shuffled output data
shuffle (merlin.io.Shuffle, optional) – How to shuffle the output dataset. Shuffling is only performed if the data is written to disk. For all options, other than None (which means no shuffling), the partitions of the underlying dataset/ddf will be randomly ordered. If PER_PARTITION is specified, each worker/process will also shuffle the rows within each partition before splitting and appending the data to a number (out_files_per_proc) of output files. Output files are distinctly mapped to each worker process. If PER_WORKER is specified, each worker will follow the same procedure as PER_PARTITION, but will re-shuffle each file after all data is persisted. This results in a full shuffle of the data processed by each worker. To improve performance, this option currently uses host-memory BytesIO objects for the intermediate persist stage. The FULL option is not yet implemented.
file_partition_map (dict) – Dictionary mapping of output file names to partition indices that should be written to that file name. If this argument is passed, only the partitions included in the dictionary will be written to disk, and the output_files_per_proc argument will be ignored.
out_files_per_proc (integer) – Number of files to create (per process) after shuffling the data
num_threads (integer) – Number of IO threads to use for writing the output dataset. For 0 (default), no dedicated IO threads will be used.
dtypes (dict) – Dictionary containing desired datatypes for output columns. Keys are column names, values are datatypes.
- compute(*args, **kwargs)#
- head(*args, **kwargs)#
- persist(*args, **kwargs)#
- tail(*args, **kwargs)#
- to_npy(output_file: str, append: bool = False)[source]#
Converts a dataset into an npy file, can append if data is larger than memory
- property num_rows#
- property npartitions#
- validate_dataset(**kwargs)[source]#
Validate for efficient processing.
The purpose of this method is to validate that the Dataset object meets the minimal requirements for efficient NVTabular processing. For now, this criteria requires the data to be in parquet format.
Example Usage:
dataset = Dataset("/path/to/data_pq", engine="parquet") assert validate_dataset(dataset)
- Parameters:
**kwargs – Key-word arguments to pass down to the engine’s validate_dataset method. For the recommended parquet format, these arguments include add_metadata_file, row_group_max_size, file_min_size, and require_metadata_file. For more information, see ParquetDatasetEngine.validate_dataset.
- Returns:
valid – True if the input dataset is valid for efficient NVTabular processing.
- Return type:
- regenerate_dataset(output_path, columns=None, output_format='parquet', compute=True, **kwargs)[source]#
EXPERIMENTAL: Regenerate an NVTabular Dataset for efficient processing by writing out new Parquet files. In contrast to default
to_parquet
behavior, this method preserves the original ordering.Example Usage:
dataset = Dataset("/path/to/data_pq", engine="parquet") dataset.regenerate_dataset( out_path, part_size="1MiB", file_size="10MiB" )
- Parameters:
output_path (string) – Root directory path to use for the new (regenerated) dataset.
columns (list(string), optional) – Subset of columns to include in the regenerated dataset.
output_format (string, optional) – Format to use for regenerated dataset. Only “parquet” (default) is currently supported.
compute (bool, optional) – Whether to compute the task graph or to return a Delayed object. By default, the graph will be executed.
**kwargs – Key-word arguments to pass down to the engine’s regenerate_dataset method. See ParquetDatasetEngine.regenerate_dataset for more information.
- Returns:
result – If compute=True (default), the return value will be an integer corresponding to the number of generated data files. If False, the returned value will be a Delayed object.
- Return type:
int or Delayed