#
# Copyright (c) 2021, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import glob
import inspect
import itertools
import logging
import os
import sys
import time
from typing import Any, Dict
logger = logging.getLogger(__name__)
[docs]def filter_kwargs(kwargs, thing_with_kwargs, filter_positional_or_keyword=True):
sig = inspect.signature(thing_with_kwargs)
if filter_positional_or_keyword:
filter_keys = [
param.name
for param in sig.parameters.values()
if param.kind == param.POSITIONAL_OR_KEYWORD
]
else:
filter_keys = [param.name for param in sig.parameters.values()]
filtered_dict = {
filter_key: kwargs[filter_key] for filter_key in filter_keys if filter_key in kwargs
}
return filtered_dict
[docs]def safe_json(data):
if data is None:
return True
elif isinstance(data, (bool, int, float, str)):
return True
elif isinstance(data, (tuple, list)):
return all(safe_json(x) for x in data)
elif isinstance(data, dict):
return all(isinstance(k, str) and safe_json(v) for k, v in data.items())
return False
[docs]def get_filenames(data_paths, files_filter_pattern="*"):
paths = [glob.glob(os.path.join(path, files_filter_pattern)) for path in data_paths]
return list(itertools.chain.from_iterable(paths))
[docs]def get_label_feature_name(feature_map: Dict[str, Any]) -> str:
"""
Analyses the feature map config and returns the name of the label feature (e.g. item_id)
"""
label_feature_config = list(
k for k, v in feature_map.items() if "is_label" in v and v["is_label"]
)
if len(label_feature_config) == 0:
raise ValueError("One feature have be configured as label (is_label = True)")
if len(label_feature_config) > 1:
raise ValueError("Only one feature can be selected as label (is_label = True)")
label_name = label_feature_config[0]
return label_name
[docs]def get_timestamp_feature_name(feature_map: Dict[str, Any]) -> str:
"""
Analyses the feature map config and returns the name of the label feature (e.g. item_id)
"""
timestamp_feature_name = list(k for k, v in feature_map.items() if v["dtype"] == "timestamp")
if len(timestamp_feature_name) == 0:
raise Exception('No feature have be configured as timestamp (dtype = "timestamp")')
if len(timestamp_feature_name) > 1:
raise Exception('Only one feature can be configured as timestamp (dtype = "timestamp")')
timestamp_fname = timestamp_feature_name[0]
return timestamp_fname
[docs]def get_parquet_files_names(data_args, time_indices, is_train, eval_on_test_set=False):
if not isinstance(time_indices, list):
time_indices = [time_indices]
time_window_folders = [
os.path.join(
data_args.data_path,
str(time_index).zfill(data_args.time_window_folder_pad_digits),
)
for time_index in time_indices
]
if is_train:
data_filename = "train.parquet"
else:
if eval_on_test_set:
data_filename = "test.parquet"
else:
data_filename = "valid.parquet"
parquet_paths = [os.path.join(folder, data_filename) for folder in time_window_folders]
# If paths are folders, list the parquet file within the folders
# parquet_paths = get_filenames(parquet_paths, files_filter_pattern="*.parquet"
return parquet_paths
[docs]class Timing:
"""A context manager that prints the execution time of the block it manages"""
def __init__(self, message, file=sys.stdout, logger=None, one_line=True):
self.message = message
if logger is not None:
self.default_logger = False
self.one_line = False
self.logger = logger
else:
self.default_logger = True
self.one_line = one_line
self.logger = None
self.file = file
def _log(self, message, newline=True):
# pylint: disable=broad-except
if self.default_logger:
print(message, end="\n" if newline else "", file=self.file)
try:
self.file.flush()
except Exception:
pass
else:
self.logger.info(message)
def __enter__(self):
self.start = time.time()
self._log(self.message, not self.one_line)
def __exit__(self, exc_type, exc_value, traceback):
self._log(
"{}Done in {:.3f}s".format(
"" if self.one_line else self.message, time.time() - self.start
)
)
[docs]def get_object_size(obj, seen=None):
"""Recursively finds size of objects"""
size = sys.getsizeof(obj)
if seen is None:
seen = set()
obj_id = id(obj)
if obj_id in seen:
return 0
# Important mark as seen *before* entering recursion to gracefully handle
# self-referential objects
seen.add(obj_id)
if isinstance(obj, dict):
size += sum([get_object_size(v, seen) for v in obj.values()])
size += sum([get_object_size(k, seen) for k in obj.keys()])
elif hasattr(obj, "__dict__"):
size += get_object_size(obj.__dict__, seen)
elif hasattr(obj, "__iter__") and not isinstance(obj, (str, bytes, bytearray)):
size += sum([get_object_size(i, seen) for i in obj])
return size
[docs]def validate_dataset(paths_or_dataset, batch_size, buffer_size, engine, reader_kwargs):
"""
Util function to load merlin.io.Dataset from disk
Parameters
----------
paths_or_dataset: Union[merlin.io.dataset.Dataset, str]
Path to dataset to load or Merlin Dataset,
if Dataset, return the object.
batch_size: int
batch size for Dataloader.
buffer_size: float
parameter, which refers to the fraction of batches
to load at once.
engine: str
parameter to specify the file format,
possible values are: ["parquet", "csv", "csv-no-header"].
reader_kwargs: dict
Additional arguments of the specified reader.
"""
try:
from merlin.io.dataset import Dataset
except ImportError:
raise ValueError("Merlin Core is necessary for this function, please install: merlin-core.")
# TODO: put this in parent class and allow
# torch dataset to leverage as well?
# if a dataset was passed, just return it
if isinstance(paths_or_dataset, Dataset):
return paths_or_dataset
# otherwise initialize a dataset
# from paths or glob pattern
if isinstance(paths_or_dataset, str):
files = glob.glob(paths_or_dataset)
_is_empty_msg = "Couldn't find file pattern {} in directory {}".format(
*os.path.split(paths_or_dataset)
)
else:
# TODO: some checking around attribute
# error here?
files = list(paths_or_dataset)
_is_empty_msg = "paths_or_dataset list must contain at least one filename"
assert isinstance(files, list)
if len(files) == 0:
raise ValueError(_is_empty_msg)
# implement buffer size logic
# TODO: IMPORTANT
# should we divide everything by 3 to account
# for extra copies laying around due to asynchronicity?
reader_kwargs = reader_kwargs or {}
if buffer_size >= 1:
if buffer_size < batch_size:
reader_kwargs["batch_size"] = int(batch_size * buffer_size)
else:
reader_kwargs["batch_size"] = buffer_size
else:
reader_kwargs["part_mem_fraction"] = buffer_size
return Dataset(files, engine=engine, **reader_kwargs)
def _augment_schema(
schema,
cats=None,
conts=None,
labels=None,
sparse_names=None,
sparse_max=None,
sparse_as_dense=False,
):
from merlin.schema import ColumnSchema, Tags
schema = schema.select_by_name(conts + cats + labels)
labels = [labels] if isinstance(labels, str) else labels
for label in labels or []:
schema[label] = schema[label].with_tags(Tags.TARGET)
for label in cats or []:
schema[label] = schema[label].with_tags(Tags.CATEGORICAL)
for label in conts or []:
schema[label] = schema[label].with_tags(Tags.CONTINUOUS)
# Set the appropriate properties for the sparse_names/sparse_max/sparse_as_dense
for col in sparse_names or []:
cs = schema[col]
properties = cs.properties
if sparse_max and col in sparse_max:
properties["value_count"] = {"max": sparse_max[col]}
schema[col] = ColumnSchema(
name=cs.name,
tags=cs.tags,
dtype=cs.dtype,
is_list=True,
is_ragged=not sparse_as_dense,
properties=properties,
)
return schema