Source code for nvtabular.loader.torch

#
# Copyright (c) 2021, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import numpy as np
import pandas as pd
import torch
from torch.utils.dlpack import from_dlpack

from merlin.core.dispatch import HAS_GPU

from .backend import DataLoader


[docs]class IterDL(torch.utils.data.IterableDataset): def __init__(self, file_paths, batch_size=1, shuffle=False): self.file_paths = file_paths self.batch_size = batch_size self.shuffle = shuffle def __iter__(self): for file_path in self.file_paths: pdf = pd.read_parquet(file_path) for start in range(0, pdf.shape[0], self.batch_size): df = pdf[start : start + self.batch_size] if self.shuffle: df = df.sample(frac=1).reset_index(drop=True) yield df
[docs]class TorchAsyncItr(torch.utils.data.IterableDataset, DataLoader): """This class creates batches of tensor. Each batch size is specified by the user. The data input requires an NVTabular dataset. Handles spillover to ensure all batches are the specified size until the final batch. Parameters ----------- dataset : NVTabular dataset cats : [str] the list of categorical columns in the dataset conts : [str] the list of continuous columns in the dataset labels : [str] the list of label columns in the dataset batch_size : int the size of each batch to supply to the model shuffle : bool enable/disable shuffling of dataset parts_per_chunk : int number of partitions from the iterator, an NVTabular Dataset, to concatenate into a "chunk" device : int device id of selected GPU sparse_list : [str] list with column names of columns that should be represented as sparse tensors sparse_max : {str: int} dictionary of key: column_name + value: integer representing max sequence length for column sparse_as_dense : bool bool value to activate transforming sparse tensors to dense """ def __init__( self, dataset, cats=None, conts=None, labels=None, batch_size=1, shuffle=False, seed_fn=None, parts_per_chunk=1, device=None, global_size=None, global_rank=None, drop_last=False, sparse_names=None, sparse_max=None, sparse_as_dense=False, ): DataLoader.__init__( self, dataset, batch_size, shuffle, cat_names=cats, cont_names=conts, label_names=labels, seed_fn=seed_fn, parts_per_chunk=parts_per_chunk, device=device, global_size=global_size, global_rank=global_rank, drop_last=drop_last, sparse_names=sparse_names, sparse_max=sparse_max, sparse_as_dense=sparse_as_dense, ) def __iter__(self): return DataLoader.__iter__(self) def _get_device_ctx(self, dev): if dev == "cpu": return torch.device("cpu") return torch.cuda.device("cuda:{}".format(dev)) def _pack(self, gdf): if self.device == "cpu": return gdf return gdf.to_dlpack() def _unpack(self, dlpack): if self.device == "cpu": if ( len(dlpack.values.shape) == 2 and dlpack.values.shape[1] == 1 and isinstance(dlpack.values[0], np.ndarray) ): return torch.squeeze(torch.Tensor(dlpack.values)) return torch.Tensor(dlpack.values) return from_dlpack(dlpack) def _to_tensor(self, gdf, dtype=None): dl_pack = self._pack(gdf) tensor = self._unpack(dl_pack) return tensor.type(dtype) def _split_fn(self, tensor, idx, axis=0): return torch.split(tensor, idx, dim=axis) def _tensor_split(self, tensor, idx, axis=0): return torch.tensor_split(tensor, idx, axis=axis) @property def _LONG_DTYPE(self): return torch.long @property def _FLOAT32_DTYPE(self): return torch.float32 def _pull_values_offsets(self, values_offset): # pull_values_offsets, return values offsets diff_offsets if isinstance(values_offset, tuple): values = values_offset[0].flatten() offsets = values_offset[1].flatten() else: values = values_offset.flatten() offsets = torch.arange(values.size()[0], device=self.device) num_rows = len(offsets) if HAS_GPU: offsets = torch.cat([offsets, torch.cuda.LongTensor([len(values)])]) else: offsets = torch.cat([offsets, torch.LongTensor([len(values)])]) diff_offsets = offsets[1:] - offsets[:-1] return values, offsets, diff_offsets, num_rows def _get_max_seq_len(self, diff_offsets): return int(diff_offsets.max()) # Building the indices to reconstruct the sparse tensors def _get_indices(self, offsets, diff_offsets): row_ids = torch.arange(len(offsets) - 1, device=self.device) row_ids_repeated = torch.repeat_interleave(row_ids, diff_offsets) row_offset_repeated = torch.repeat_interleave(offsets[:-1], diff_offsets) col_ids = torch.arange(len(row_offset_repeated), device=self.device) - row_offset_repeated indices = torch.cat([row_ids_repeated.unsqueeze(-1), col_ids.unsqueeze(-1)], axis=1) return indices def _get_sparse_tensor(self, values, indices, num_rows, seq_limit): sparse_tensor = torch.sparse_coo_tensor( indices.T, values, torch.Size([num_rows, seq_limit]), device=self.device ) if self.sparse_as_dense: sparse_tensor = sparse_tensor.to_dense() return sparse_tensor def _build_sparse_tensor(self, values, offsets, diff_offsets, num_rows, seq_limit): indices = self._get_indices(offsets, diff_offsets) return self._get_sparse_tensor(values, indices, num_rows, seq_limit)
[docs]class DLDataLoader(torch.utils.data.DataLoader): """ This class is an extension of the torch dataloader. It is required to support the FastAI framework. """ @property def device(self): return torch.device("cuda" if torch.cuda.is_available() else "cpu") def __len__(self): return len(self.dataset)