#
# Copyright (c) 2021, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import collections
from typing import Any, List, Sequence, Tuple, Union
import numpy as np
import tensorflow as tf
from keras.utils.tf_inspect import getfullargspec
from packaging import version
from tensorflow.python import to_dlpack
from merlin.core.compat import cudf, cupy
from merlin.core.dispatch import DataFrameType
from merlin.io import Dataset
from merlin.models.tf.core.base import Block, ModelContext
from merlin.models.tf.typing import TabularData
from merlin.models.utils.misc_utils import filter_kwargs
if version.parse(tf.__version__) < version.parse("2.3.0"):
try:
from tfdlpack import from_dlpack
except ModuleNotFoundError as e:
message = "If using TensorFlow < 2.3.0, you must install tfdlpack-gpu extension library"
raise ModuleNotFoundError(message) from e
else:
from tensorflow.experimental.dlpack import from_dlpack
def get_output_sizes_from_schema(schema, batch_size=0, max_sequence_length=None):
sizes = {}
for feature in schema:
name = feature.name
if feature.is_list:
sizes[name] = tf.TensorShape(
[
batch_size,
max_sequence_length if max_sequence_length else feature.shape.dims[1].max,
]
)
elif feature.HasField("shape"):
sizes[name] = tf.TensorShape([batch_size] + [d.size for d in feature.shape.dim])
else:
sizes[name] = tf.TensorShape([batch_size, 1])
return sizes
def calculate_batch_size_from_inputs(inputs):
input_shapes = {k: v.shape for k, v in inputs.items()}
batch_size = calculate_batch_size_from_input_shapes(input_shapes)
return batch_size
def calculate_batch_size_from_input_shapes(input_shapes):
non_ragged_features = list(
[k for k in input_shapes if not k.endswith("__values") and not k.endswith("__offsets")]
)
if len(non_ragged_features) > 0:
batch_size = input_shapes[non_ragged_features[0]][0]
return batch_size
ragged_features_offsets = list([k for k in input_shapes if k.endswith("__offsets")])
if len(ragged_features_offsets) > 0:
batch_size = input_shapes[ragged_features_offsets[0]][0]
if batch_size is not None:
batch_size -= 1
return batch_size
return None
def maybe_serialize_keras_objects(
self,
config,
maybe_serialize_keys,
):
for key in maybe_serialize_keys:
maybe_value = getattr(self, key, None)
if maybe_value:
if isinstance(maybe_value, dict):
config[key] = {
k: tf.keras.utils.serialize_keras_object(v) for k, v in maybe_value.items()
}
elif isinstance(maybe_value, (list, tuple)):
config[key] = [tf.keras.utils.serialize_keras_object(v) for v in maybe_value]
else:
config[key] = tf.keras.utils.serialize_keras_object(maybe_value)
return config
def maybe_deserialize_keras_objects(
config,
to_deserialize,
deserialize_fn=tf.keras.utils.deserialize_keras_object,
custom_objects={},
):
if isinstance(to_deserialize, list):
to_deserialize = {k: deserialize_fn for k in to_deserialize}
for key, fn in to_deserialize.items():
maybe_val = config.get(key, None)
if maybe_val:
if isinstance(maybe_val, list):
config[key] = [fn(v, custom_objects=custom_objects) for v in maybe_val]
else:
config[key] = fn(maybe_val, custom_objects=custom_objects)
return config
def rescore_false_negatives(
positive_item_ids: tf.Tensor,
neg_samples_item_ids: tf.Tensor,
negative_scores: tf.Tensor,
false_negatives_score: float,
):
"""
Zeroes the logits of accidental negatives.
"""
# Removing dimensions of size 1 from the shape of the item ids, if applicable
positive_item_ids = tf.cast(tf.squeeze(positive_item_ids), neg_samples_item_ids.dtype)
neg_samples_item_ids = tf.squeeze(neg_samples_item_ids)
# Reshapes positive and negative ids so that false_negatives_mask matches the scores shape
false_negatives_mask = tf.equal(
tf.expand_dims(positive_item_ids, -1), tf.expand_dims(neg_samples_item_ids, 0)
)
# Setting a very small value for false negatives (accidental hits) so that it has
# negligicle effect on the loss functions
negative_scores = tf.where(
false_negatives_mask,
tf.ones_like(negative_scores) * false_negatives_score,
negative_scores,
)
valid_negatives_mask = tf.logical_not(false_negatives_mask)
return tf.squeeze(negative_scores), valid_negatives_mask
def extract_topk(
k: int,
predictions: tf.Tensor,
labels: tf.Tensor,
shuffle_ties: bool = False,
shuffle_ties_epsilon=1e-6,
seed=None,
) -> Tuple[tf.Tensor, tf.Tensor, tf.Tensor]:
"""Extracts top-k values of predictions, sorting the corresponding
labels accordingly.
Parameters
----------
k : int
Cut-off to extract top-k items
predictions : tf.Tensor
Tensor with the predictions per example
labels : tf.Tensor
Tensor with the labels per example
shuffle_ties : bool, optional
Adds a small random value to predictions to break ties if any, by default False
shuffle_ties_epsilon : float, optional
The maximum random value to be added to break ties (used only if shuffle_ties=True),
by default 1e-6
seed : int, optional
Random seed to use for tie breaking
Returns
-------
Tuple(tf.Tensor,tf.Tensor,tf.Tensor)
Returns a triple with the following tensors
(topk_predictions,topk_labels,label_relevant_counts).
The label_relevant_counts holds the total number of positive values per example,
as some metrics (e.g. recall) need that information, which is lost when we
extract top-k values
"""
# Computes the number of relevant items per row (before extracting only the top-k)
label_relevant_counts = tf.reduce_sum(labels, axis=-1)
# Limits k to the number of prediction scores
k = tf.minimum(k, tf.shape(predictions)[-1])
if shuffle_ties:
# Adds a small random value to break ties in the range [0,shuffle_ties_epsilon)
if seed is not None:
tf.random.set_seed(seed)
predictions = predictions + (
tf.random.uniform(tf.shape(predictions)) * shuffle_ties_epsilon
)
topk_predictions, topk_indices = tf.math.top_k(predictions, k)
topk_labels = gather_torch_like(labels, topk_indices, k)
return topk_predictions, topk_labels, label_relevant_counts
def transform_label_to_onehot(labels, vocab_size):
return tf.one_hot(tf.reshape(labels, (-1,)), vocab_size)
def create_output_placeholder(scores, ks):
return tf.Variable(tf.zeros([tf.shape(scores)[0], len(ks)], tf.float32))
def gather_torch_like(labels, indices, max_k):
row_idxs = tf.repeat(tf.range(tf.shape(labels)[0]), max_k)
col_idx = tf.reshape(indices, tf.shape(row_idxs))
all_indices = tf.transpose(tf.stack([row_idxs, col_idx]))
labels = tf.reshape(tf.gather_nd(labels, all_indices), (tf.shape(labels)[0], max_k))
return labels
def batch_ref(inputs: Union[tf.Tensor, TabularData]):
"""Get hash-code of a tensor or a dictionary of tensors."""
if isinstance(inputs, tf.Tensor):
return hash(inputs.ref())
refs = []
keys = sorted(inputs.keys())
for key in keys:
refs.append(inputs[key].ref())
return hash(tuple(refs))
def pack_df(gdf):
if isinstance(gdf, np.ndarray):
return gdf
elif hasattr(gdf, "to_dlpack") and callable(getattr(gdf, "to_dlpack")):
return gdf.to_dlpack()
elif hasattr(gdf, "to_numpy") and callable(getattr(gdf, "to_numpy")):
gdf = gdf.to_numpy()
if isinstance(gdf[0], list):
gdf = np.stack(gdf)
return gdf
return gdf.toDlpack()
def unpack_df(gdf):
if hasattr(gdf, "shape"):
return tf.convert_to_tensor(gdf)
return from_dlpack(gdf)
def df_to_tensor(gdf, dtype=None):
if gdf.empty:
return
# checks necessary because of this bug
# https://github.com/tensorflow/tensorflow/issues/42660
if len(gdf.shape) == 1 or gdf.shape[1] == 1:
dlpack = pack_df(gdf)
elif gdf.shape[0] == 1:
dlpack = pack_df(gdf.values[0])
else:
dlpack = pack_df(gdf.values.T)
# catch error caused by tf eager context
# not being initialized
try:
x = unpack_df(dlpack)
except AssertionError:
tf.random.uniform((1,))
x = unpack_df(dlpack)
# if rank is already two it is already in list format
if gdf.shape[0] == 1 and not tf.rank(x) == 2:
# batch size 1 so got squashed to a vector
x = tf.expand_dims(x, 0)
elif len(gdf.shape) == 1 or len(x.shape) == 1:
# sort of a generic check for any other
# len(shape)==1 case, could probably
# be more specific
x = tf.expand_dims(x, -1)
elif gdf.shape[1] > 1:
# matrix which means we had to transpose
# for the bug above, so untranspose
x = tf.transpose(x)
if dtype:
return tf.cast(x, dtype)
return x
def tensor_to_df(tensor, index=None, gpu=None):
if gpu is None:
gpu = cudf
if gpu:
# Note: It is not possible to convert Tensorflow tensors to the cudf dataframe
# directly using dlPack (as the example commented below) because cudf.from_dlpack()
# expects the 2D tensor to be in Fortran order (column-major), which is not
# supported by TF (https://github.com/rapidsai/cudf/issues/10754).
# df = cudf.from_dlpack(to_dlpack(tf.convert_to_tensor(embeddings)))
tensor_cupy = cupy.fromDlpack(to_dlpack(tf.convert_to_tensor(tensor)))
df = cudf.DataFrame(tensor_cupy)
df.columns = [str(col) for col in list(df.columns)]
if not index:
index = cudf.RangeIndex(0, tensor.shape[0])
df.set_index(index)
else:
import pandas as pd
df = pd.DataFrame(tensor.numpy())
df.columns = [str(col) for col in list(df.columns)]
if not index:
index = pd.RangeIndex(0, tensor.shape[0])
df.set_index(index)
return df
def add_epsilon_to_zeros(tensor: tf.Tensor, epsilon: float = 1e-24) -> tf.Tensor:
"""Replaces zeros by adding a small epsilon value to them.
This is useful to avoid inf and nan errors on math ops
like log().
Parameters
----------
tensor : tf.Tensor
Tensor to operate on
epsilon : float, optional
Small value to add to zeros, by default 1e-24
Returns
-------
tf.Tensor
The tensor without zeros
"""
return tf.where(tf.equal(tensor, 0.0), tensor + epsilon, tensor)
def get_candidate_probs(
item_freq_probs: Union[tf.Tensor, Sequence], is_prob_distribution: bool = False
):
"""Returns the candidate probs after checking if
item_freq_probs is frequencies or probs and their
dtype and shape according to the item feature cardinality
Parameters:
----------
item_freq_probs : Union[tf.Tensor, Sequence]
A Tensor or list with item frequencies (if is_prob_distribution=False)
or with item probabilities (if is_prob_distribution=True)
is_prob_distribution: bool, optional
If True, the item_freq_probs should be a probability distribution of the items.
If False, the item frequencies is converted to probabilities
Returns
-------
A tensor with the item probability distributon
"""
item_freq_probs = tf.convert_to_tensor(item_freq_probs)
if is_prob_distribution:
tf.debugging.assert_type(
item_freq_probs, tf.float32, message="The item_weights should have tf.float32 dtype"
)
tf.debugging.assert_near(
tf.reduce_sum(item_freq_probs),
1.0,
message="The item_weights should be a probability distribution and sum to 1.0",
)
candidate_probs = item_freq_probs
else:
item_freq_probs = tf.cast(item_freq_probs, tf.float32)
candidate_probs = item_freq_probs / tf.reduce_sum(item_freq_probs)
return candidate_probs
[docs]@tf.keras.utils.register_keras_serializable(package="merlin.models")
class TensorInitializer(tf.keras.initializers.Initializer):
"""Initializer that returns a tensor (e.g. pre-trained
embeddings) set in the constructor
"""
[docs] def __init__(self, weights: Union[tf.Tensor, Any], **kwargs):
self._weights = tf.convert_to_tensor(weights)
def __call__(self, shape: tf.TensorShape, dtype: tf.DType = None, **kwargs) -> tf.Tensor:
"""Returns a tensor object initialized with the tensor
set in the constructor.
Parameters
----------
shape : tf.TensorShape
Shape of the variable to be initialized
dtype : tf.DType, optional
Optional dtype of the tensor. Only numeric or boolean dtypes are
supported, by default None
Returns
-------
tf.Tensor
Returns the tensor set in the constructor
"""
tf.assert_equal(shape, self._weights.shape)
weights = self._weights
if dtype:
weights = tf.cast(self._weights, dtype)
return weights
[docs] @classmethod
def from_dataset(cls, data: Union[Dataset, DataFrameType], **kwargs) -> "TensorInitializer":
if hasattr(data, "to_ddf"):
data = data.to_ddf().compute()
embeddings = df_to_tensor(data)
return cls(weights=embeddings, **kwargs)
[docs] def get_config(self): # To support serialization
return {"weights": self._weights.numpy()}
def call_layer(layer: tf.keras.layers.Layer, inputs, *args, **kwargs):
"""Calls a layer with the given inputs and filters kwargs. Returns the output"""
has_custom_call = getattr(layer, "_has_custom__call__", False)
_k = dict(cascade_kwargs_if_possible=True, argspec_fn=getfullargspec)
filtered_kwargs = filter_kwargs(kwargs, layer, **_k)
if not has_custom_call:
if isinstance(layer, tf.keras.layers.Lambda):
filtered_kwargs = filter_kwargs(kwargs, layer.function, **_k)
else:
# We need to check the call method on the type since when the model gets saved
# we can't infer the kwargs from using `layer.call` directly
call_fn = type(layer).call
filtered_kwargs = filter_kwargs(filtered_kwargs, call_fn, **_k)
return layer(inputs, *args, **filtered_kwargs)
def get_sub_blocks(blocks: Sequence[Block]) -> List[Block]:
"""Get all sub-blocks of given blocks, including blocks themselves, return a list of blocks
Traverse(Iterate) the model to check each block (sub_block) by BFS"""
result_blocks = set()
if not isinstance(blocks, (list, tuple)):
blocks = [blocks]
for block in blocks:
# Iterate all submodule (BFS) except ModelContext
deque = collections.deque()
if not isinstance(block, ModelContext):
deque.append(block)
while deque:
current_module = deque.popleft()
# Add all sub-blocks include itself
result_blocks.add(current_module)
for sub_module in current_module._flatten_modules(include_self=False, recursive=False):
# filter out modelcontext
if type(sub_module) != ModelContext:
deque.append(sub_module)
return list(result_blocks)
@tf.function
def list_col_to_ragged(values: tf.Tensor, offsets: tf.Tensor):
if offsets.dtype.is_floating:
offsets = tf.cast(offsets, tf.int32)
return tf.RaggedTensor.from_row_splits(values, offsets)
def check_inputs_mask_compatible_shape(
inputs: Union[tf.Tensor, tf.RaggedTensor], mask: Union[tf.Tensor, tf.RaggedTensor]
):
"""Check if the shape and the type of the input and mask tensors are compatible.
Parameters
----------
inputs : Union[tf.Tensor, tf.RaggedTensor]
The input tensor, which can be either a dense or ragged tensor.
mask : Union[tf.Tensor, tf.RaggedTensor]
The mask tensor, which can be either a dense or ragged tensor.
Returns
-------
bool:
Returns True if the shape of the input and mask tensors are compatible, False otherwise.
Notes
-----
The function assumes that the `inputs` tensor has one more dimension than the `mask` tensor,
with the extra dimension typically related to the embeddings dimension.
"""
result = False
if type(inputs) == type(mask) and (inputs.shape.as_list()[:-1] == mask.shape.as_list()):
if isinstance(inputs, tf.RaggedTensor):
result = tf.reduce_all(
tf.cast(inputs.row_lengths(), tf.int32) == tf.cast(mask.row_lengths(), tf.int32)
)
else:
result = True
return result