Source code for merlin.models.tf.transforms.features

#
# Copyright (c) 2021, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import warnings
from itertools import combinations
from typing import Dict, Optional, Sequence, Tuple, Union

import tensorflow as tf
from keras.layers.preprocessing import preprocessing_utils as p_utils
from keras.utils import layer_utils

from merlin.models.config.schema import requires_schema
from merlin.models.tf.core.base import Block, PredictionOutput
from merlin.models.tf.core.combinators import ParallelBlock, TabularBlock
from merlin.models.tf.transforms.tensor import to_dense, to_sparse
from merlin.models.tf.typing import TabularData
from merlin.models.tf.utils import tf_utils
from merlin.models.tf.utils.tf_utils import list_col_to_ragged
from merlin.models.utils import schema_utils
from merlin.schema import ColumnSchema, Schema, Tags

ONE_HOT = p_utils.ONE_HOT
MULTI_HOT = p_utils.MULTI_HOT
COUNT = p_utils.COUNT


@tf.keras.utils.register_keras_serializable(package="merlin.models")
class FeaturesTensorTypeConversion(TabularBlock):
    """Base class to convert the tensor type of features provided in the schema

    Parameters
        ----------
        schema : Optional[Schema], optional
            The schema with the columns that will be transformed, by default None
    """

    def __init__(self, schema: Optional[Schema] = None, **kwargs):
        super().__init__(schema=schema, **kwargs)
        self.column_names = None

    def call(self, inputs: TabularData, **kwargs) -> TabularData:
        raise NotImplementedError("The call method need to be implemented by child classes")

    def compute_output_shape(self, input_shape):
        if not isinstance(input_shape, dict):
            return input_shape

        outputs = {}
        for name, val in input_shape.items():
            if self.has_schema and name in self.schema.column_names:
                col_schema_shape = self.schema[name].shape
                if col_schema_shape.is_list:
                    max_seq_length = col_schema_shape.dims[1].max
                    if max_seq_length is not None:
                        max_seq_length = int(max_seq_length)
                    shape = val.as_list()
                    if max_seq_length is not None:
                        max_seq_length = int(max_seq_length)
                    shape[1] = max_seq_length
                    val = tf.TensorShape(shape)

            outputs[name] = val
        return outputs

    def compute_call_output_shape(self, input_shapes):
        return self.compute_output_shape(input_shapes)

    def compute_output_schema(self, input_schema: Schema) -> Schema:
        return input_schema


[docs]@Block.registry.register("to_sparse") @tf.keras.utils.register_keras_serializable(package="merlin.models") class ToSparse(FeaturesTensorTypeConversion): """Convert the features provided in the schema to sparse tensors. The other features are kept unchanged. """
[docs] def call(self, inputs: Union[TabularData, tf.RaggedTensor, tf.Tensor], **kwargs) -> TabularData: if isinstance(inputs, (tf.RaggedTensor, tf.SparseTensor)): return to_sparse(inputs) outputs = {} for name, val in inputs.items(): if not self.has_schema or name in self.schema.column_names: val = to_sparse(val) outputs[name] = val return outputs
[docs]@Block.registry.register("to_dense") @tf.keras.utils.register_keras_serializable(package="merlin.models") class ToDense(FeaturesTensorTypeConversion): """Convert the features provided in the schema to dense tensors. The other features are kept unchanged. """
[docs] def call( self, inputs: Union[TabularData, tf.RaggedTensor, tf.SparseTensor], **kwargs ) -> TabularData: if isinstance(inputs, (tf.RaggedTensor, tf.SparseTensor)): return to_dense(inputs) outputs = {} for name, val in inputs.items(): if self.has_schema: if name in self.schema.column_names: col_schema_shape = self.schema[name].shape if col_schema_shape.is_list: # TODO: Not sure if we really need to extract the # max seq length from the col schema to # have a dense tensor properly set max_seq_length = col_schema_shape.dims[1].max if max_seq_length is not None: max_seq_length = int(max_seq_length) val = to_dense(val, max_seq_length=max_seq_length) else: val = to_dense(val) else: val = to_dense(val) outputs[name] = val return outputs
[docs]@Block.registry.register("prepare_lists") @tf.keras.utils.register_keras_serializable(package="merlin.models") class PrepareListFeatures(TabularBlock): """Prepares all list (multi-hot/sequential) features, so that they converted to tf.RaggedTensor or dense tf.Tensor based on the columns schema. It manages in particular the Merlin dataloader representation of ragged list features, which consists of two keys in the inputs dict suffixed by "__values" and "__offsets". For example, the "categories" ragged list feature is represented by the Merlin Dataloader as "categories__values" and "categories__offsets" keys and this block converts them to a single tf.RaggedTensor "categories". Parameters ---------- schema : Schema The features schema list_as_dense : bool, optional Whether to enforce all list features to be dense tf.Tensor (including the ragged ones), by default False """
[docs] def __init__(self, schema: Schema, list_as_dense: Optional[bool] = False, **kwargs): super().__init__(schema=schema, **kwargs) self.list_as_dense = list_as_dense
[docs] def call(self, inputs: TabularData, **kwargs) -> TabularData: if not inputs: return inputs outputs = {} if self.has_schema: for name in self.schema.column_names: val = None col_schema_shape = self.schema[name].shape if col_schema_shape.is_list: if name in inputs: val = inputs[name] if isinstance(val, tf.RaggedTensor): val = inputs[name] elif isinstance(val, tf.SparseTensor): val = tf.RaggedTensor.from_sparse(val) else: if Tags.EMBEDDING in self.schema[name].tags: # This fix ensures that for pre-trained embeddings # the last dim is defined (not None) in graph mode val = tf.reshape(val, (-1, col_schema_shape.dims[-1].max)) else: if col_schema_shape.is_ragged: if f"{name}__values" not in inputs or f"{name}__offsets" not in inputs: raise ValueError( f"The ragged list feature '{name}' is expected to be " f"represented by two features in the inputs: '{name}__values' " f"and '{name}__offsets', but they were not found." ) ragged_values = inputs[f"{name}__values"] if Tags.EMBEDDING in self.schema[name].tags: # This fix ensures that for pre-trained embeddings # the last dim is defined (not None) in graph mode ragged_values = tf.reshape( ragged_values, (-1, col_schema_shape.dims[-1].max) ) val = list_col_to_ragged( ragged_values, inputs[f"{name}__offsets"], ) del inputs[f"{name}__values"] del inputs[f"{name}__offsets"] else: raise ValueError(f"Feature '{name}' was not found in the inputs") if len(val.shape) == 2 and Tags.EMBEDDING not in self.schema[name].tags: # Only expands 2D sequential features if # they are not pre-trained embeeddings val = tf.expand_dims(val, axis=-1) if self.list_as_dense: val = to_dense(val, max_seq_length=col_schema_shape.dims[1].max) elif name in inputs: val = inputs[name] if val is not None: outputs[name] = val # Adding other inputs that might not be in the schema, # as they might be treated by other block for k, v in inputs.items(): if k not in outputs: outputs[k] = v return outputs
[docs] def compute_output_shape(self, input_shapes): if not input_shapes: return input_shapes output_shapes = {} if self.has_schema: for name in self.schema.column_names: col_schema_shape = self.schema[name].shape if col_schema_shape.is_list: if name not in input_shapes and f"{name}__offsets" in input_shapes: batch_size = input_shapes[f"{name}__offsets"][0] if batch_size is not None: # The length of offset is always batch size + 1 batch_size -= 1 seq_length = None if self.list_as_dense or not self.schema[name].shape.is_ragged: if col_schema_shape.dims[1] is None: raise Exception( f"List feature {name} does not have maximum" "length set in the schema ({col_schema_shape})" ) seq_length = int(col_schema_shape.dims[1].max) last_dim = 1 if Tags.EMBEDDING in self.schema[name].tags: last_dim = col_schema_shape[-1].max output_shapes[name] = tf.TensorShape([batch_size, seq_length, last_dim]) else: if ( len(input_shapes[name]) == 2 and Tags.EMBEDDING not in self.schema[name].tags ): output_shapes[name] = input_shapes[name] + (1,) elif name in input_shapes: output_shapes[name] = input_shapes[name] # Adding other inputs that might not be in the schema, # as they might be treated by other block for k, v in input_shapes.items(): if k.replace("__values", "").replace("__offsets", "") not in output_shapes: output_shapes[k] = v return output_shapes
[docs] def compute_call_output_shape(self, input_shapes): return self.compute_output_shape(input_shapes)
[docs] def get_config(self): config = super().get_config() config.update({"list_as_dense": self.list_as_dense}) return config
[docs]@Block.registry.register("prepare_features") @tf.keras.utils.register_keras_serializable(package="merlin.models") class PrepareFeatures(TabularBlock): """Prepares scalar and list (multi-hot/sequential) features to be used with a Merlin model. The transformations are applied only for features in the schema, the other features are kept the same. The scalar features are extended to be 2D (batch_size, 1) and list features are converted to either tf.RaggedTensor or dense tf.Tensor based on the columns schema. It manages in particular the Merlin dataloader representation of list features, which consists of two keys in the inputs dict suffixed by "__values" and "__offsets". For example, the "categories" list feature are represented by the Merlin Dataloader as "categories__values" and "categories__offsets" keys and this block converts them to a single ragged or dense tensor "categories". Parameters ---------- schema : Schema The features schema list_as_dense : bool, optional Whether to enforce all list features to be dense tf.Tensor (including the ragged ones), by default False """
[docs] def __init__(self, schema: Schema, list_as_dense: Optional[bool] = False, **kwargs): super().__init__(schema=schema, **kwargs) self.list_as_dense = list_as_dense self.prepare_lists = PrepareListFeatures(schema, list_as_dense)
[docs] def call( self, inputs: TabularData, targets: TabularData = None, **kwargs ) -> Union[TabularData, Tuple[TabularData, TabularData]]: # This might be needed for list targets prepared in the preprocessing # targets = self.prepare_lists(targets) inputs = self.prepare_lists(inputs) if not inputs: return inputs outputs = {} out_targets = {} if self.has_schema: # Preparing non-list features and targets for name in self.schema.column_names: if name in inputs: val = inputs[name] if not self.schema[name].shape.is_list and val.get_shape().rank == 1: # Expanding / setting last dim of non-list input features to be 2D val = tf.reshape(val, (-1, 1)) outputs[name] = val if isinstance(targets, dict) and name in targets: val = targets[name] if not self.schema[name].shape.is_list and val.get_shape().rank == 1: # Expanding / setting last dim of non-list target features to be 2D val = tf.reshape(val, (-1, 1)) out_targets[name] = val # Adding other inputs that might not be in the schema, # as they might be treated by other block for k, v in inputs.items(): if k not in outputs: outputs[k] = v if targets is not None: if isinstance(targets, dict): for k, v in targets.items(): if k not in out_targets: out_targets[k] = v if v.get_shape().rank == 1: # Expanding / setting last dim of non-list # target features to be 2D out_targets[k] = tf.reshape(v, (-1, 1)) elif targets.get_shape().rank == 1: # Expanding / setting last dim of non-list # target features to be 2D out_targets = tf.reshape(targets, (-1, 1)) else: out_targets = targets return (outputs, out_targets) return outputs
[docs] def compute_output_shape(self, input_shapes): output_shapes = self.prepare_lists.compute_output_shape(input_shapes) if self.has_schema: for name in self.schema.column_names: if not self.schema[name].shape.is_list and name in input_shapes: output_shapes[name] = tf.TensorShape([input_shapes[name][0], 1]) # Adding other inputs that might not be in the schema, # as they might be treated by other block for k, v in input_shapes.items(): if ( k not in output_shapes and not k.endswith("__values") and not k.endswith("__offsets") ): output_shapes[k] = v return output_shapes
[docs] def compute_call_output_shape(self, input_shapes): return self.compute_output_shape(input_shapes)
[docs] def get_config(self): config = super().get_config() config.update({"list_as_dense": self.list_as_dense}) return config
@Block.registry.register("as-ragged") @tf.keras.utils.register_keras_serializable(package="merlin.models") class Rename(TabularBlock): """Rename input features Parameters ---------- renames: dict Mapping with new features names. schema: Schema, optional The `Schema` with input features, by default None """ def __init__( self, renames: Dict[Union[str, Tags], str], schema: Optional[Schema] = None, **kwargs ): super().__init__(schema=schema, **kwargs) self.renames = {} for key, val in renames.items(): if isinstance(key, Tags): if schema is None: raise ValueError("Schema must be provided to rename features with Tags") cols = schema.select_by_tag(key) if len(cols) != 1: raise ValueError(f"Tag: {key} does not uniquely identify a column") self.renames[cols.first.name] = val else: self.renames[key] = val def call(self, inputs: TabularData, **kwargs) -> TabularData: outputs = {} for key, val in inputs.items(): if key in self.renames: outputs[self.renames[key]] = val else: outputs[key] = val return outputs def compute_output_shape(self, input_shape): outputs = {} for key, val in input_shape.items(): if key in self.renames: outputs[self.renames[key]] = val else: outputs[key] = val return outputs def get_config(self): config = super().get_config() config.update({"renames": self.renames}) return config
[docs]@Block.registry.register_with_multiple_names("category_encoding") @tf.keras.utils.register_keras_serializable(package="merlin_models") @requires_schema class CategoryEncoding(TabularBlock): """ A preprocessing layer which encodes integer features. This layer provides options for condensing data into a categorical encoding. It accepts integer values as inputs, and it outputs a dense or sparse representation of those inputs. Only categorical features with "CATEGORICAL" as Tag can be transformed, and other features without this Tag would be discarded. It outputs a TabularData (Dict of features), where each feature is a 2D tensor computed based on the outputmode. Parameters: ---------- schema : Optional[Schema] The `Schema` with the input features output_mode: Optional[str] Specification for the output of the layer. Defaults to `"multi_hot"`. Values can be "one_hot", "multi_hot" or "count", configuring the transformation layer as follows: - "one_hot": Encodes each individual element in the input into a tensor with shape (batch_size, feature_cardinality), containing a 1 at the element index. It accepts both 1D tensor or 2D tensor if squeezable (i.e., if the last dimension is 1). - "multi_hot": Encodes each categorical value from the 2D input features into a multi-hot representation with shape (batch_size, feature_cardinality), with 1 at the indices present in the sequence and 0 for the other position. If 1D feature is provided, it behaves the same as "one_hot". - "count": also expects 2D tensor like `"multi_hot"` and outputs the features with shape (batch_size, feature_cardinality). But instead of returning "multi-hot" values, it outputs the frequency (count) of the number of items each item occurs in each sample. sparse: Optional[Boolean] If true, returns a `SparseTensor` instead of a dense `Tensor`. Defaults to `False`. Setting sparse=True is recommended for high-cardinality features, in order to avoid out-of-memory errors. count_weights: Optional[Union(tf.Tensor, tf.RaggedTensor, tf.SparseTensor)] count_weights is used to calculate weighted sum of times a token at that index appeared when `output_mode` is "count" """
[docs] def __init__( self, schema: Schema = None, output_mode="one_hot", sparse=False, count_weights=None, **kwargs, ): super().__init__(**kwargs) if schema: self.set_schema(schema.select_by_tag(Tags.CATEGORICAL)) self.sparse = sparse self.cardinalities = schema_utils.categorical_cardinalities(self.schema) # 'output_mode' must be one of (COUNT, ONE_HOT, MULTI_HOT) layer_utils.validate_string_arg( output_mode, allowable_strings=(COUNT, ONE_HOT, MULTI_HOT), layer_name="CategoryEncoding", arg_name="output_mode", ) self.output_mode = output_mode self.sparse = sparse if count_weights is not None: if self.output_mode != COUNT: raise ValueError( "`count_weights` is not used when `output_mode` is not " "`'count'`. Received `count_weights={count_weights}`." ) self.count_weights = p_utils.ensure_tensor(count_weights, self.compute_dtype) else: self.count_weights = None # Used to reshape 1D<->2Dtensors depending on the output_mode, when in graph mode self.features_2d_last_dim = {}
[docs] def call(self, inputs: TabularData, **kwargs) -> TabularData: outputs = {} for name, depth in self.cardinalities.items(): # Ensures the input is a Tensor, SparseTensor, then convert to Tensor if isinstance(inputs[name], tf.RaggedTensor): raise ValueError( f"All `CategoryEncoding` inputs should not contain a RaggedTensor. Received " f"{name} with type of {type(inputs[name])}" ) assertion_min_rank = tf.Assert( tf.logical_and( tf.greater_equal(tf.rank(inputs[name]), 2), tf.less_equal(tf.rank(inputs[name]), 3), ), [ "`CategoryEncoding` only accepts 2D (batch_size,1)" " or 3D (batch_size,seq_length,1) inputs, but got different " "rank for {name}" ], ) reshape_fn = ( tf.sparse.reshape if isinstance(inputs[name], tf.SparseTensor) else tf.reshape ) inputs[name] = reshape_fn(inputs[name], tf.shape(inputs[name])[:-1]) outputs[name] = p_utils.ensure_tensor(inputs[name]) if isinstance(outputs[name], tf.SparseTensor): max_value = tf.reduce_max(outputs[name].values) min_value = tf.reduce_min(outputs[name].values) else: max_value = tf.reduce_max(outputs[name]) min_value = tf.reduce_min(outputs[name]) condition = tf.logical_and( tf.greater(tf.cast(depth, max_value.dtype), max_value), tf.greater_equal(min_value, tf.cast(0, min_value.dtype)), ) assertion_valid_values = tf.Assert( condition, [ "Input values must be in the range 0 <= values < num_tokens" " with num_tokens={}".format(depth) ], ) with tf.control_dependencies([assertion_min_rank, assertion_valid_values]): outputs[name] = reshape_categorical_input_tensor_for_encoding( outputs[name], name, self.features_2d_last_dim, self.output_mode, ensure_1d_for_one_hot_mode=True, ) outputs[name] = p_utils.encode_categorical_inputs( outputs[name], output_mode=self.output_mode, depth=depth, dtype=self.compute_dtype, sparse=self.sparse, count_weights=self.count_weights, ) if not self.sparse and isinstance(outputs[name], tf.SparseTensor): outputs[name] = tf.sparse.to_dense(outputs[name]) return outputs
[docs] def compute_output_shape(self, input_shapes): batch_size = self.calculate_batch_size_from_input_shapes(input_shapes) outputs = {} for key in self.schema.column_names: if key in input_shapes: outputs[key] = tf.TensorShape([batch_size, self.cardinalities[key]]) input_shape = input_shapes[key] if input_shape[-1] == 1: input_shape = input_shape[:-1] if len(input_shape) == 2: self.features_2d_last_dim[key] = input_shape[-1] return outputs
[docs] def get_config(self): config = super().get_config() if self.schema: config["schema"] = schema_utils.schema_to_tensorflow_metadata_json(self.schema) config.update( { "output_mode": self.output_mode, "sparse": self.sparse, "count_weights": self.count_weights.numpy() if self.count_weights else None, } ) return config
@Block.registry.register_with_multiple_names("continuous-powers") @tf.keras.utils.register_keras_serializable(package="merlin.models") class ContinuousPowers(TabularBlock): """Trick from `Deep Neural Networks for YouTube Recommendations`""" def call(self, inputs: TabularData, **kwargs) -> TabularData: outputs: TabularData = {} for key, val in inputs.items(): outputs[key] = val if len(val.shape) < 2 or (len(val.shape) == 2 and val.shape[1] == 1): val_float = tf.cast(val, tf.float32) outputs[f"{key}_sqrt"] = tf.sqrt(val_float) outputs[f"{key}_pow"] = tf.pow(val_float, 2) return outputs def compute_output_shape(self, input_shape): output_shape = {} for key, val in input_shape.items(): output_shape[key] = val if len(val) < 2 or (len(val) == 2 and val[1] == 1): output_shape[f"{key}_sqrt"] = val output_shape[f"{key}_squared"] = val return output_shape
[docs]@Block.registry.register_with_multiple_names("label_to_onehot") @tf.keras.utils.register_keras_serializable(package="merlin_models") class ToOneHot(Block): """Transform the categorical encoded labels into a one-hot representation"""
[docs] def __init__(self, **kwargs): super().__init__(**kwargs)
[docs] def compute_output_shape(self, input_shape): return input_shape
[docs] def call_outputs( self, outputs: PredictionOutput, training=True, **kwargs ) -> "PredictionOutput": targets, predictions = outputs.targets, outputs.predictions num_classes = tf.shape(predictions)[-1] targets = tf_utils.transform_label_to_onehot(targets, num_classes) return outputs.copy_with_updates(targets=targets)
[docs]@Block.registry.register("hashed_cross") @tf.keras.utils.register_keras_serializable(package="merlin.models") class HashedCross(TabularBlock): """A transformation block which crosses categorical features using the "hashing trick". Conceptually, the transformation can be thought of as: hash(concatenation of features) % num_bins Example usage:: model_body = ParallelBlock( TabularBlock.from_schema(schema=cross_schema, pre=ml.HashedCross(cross_schema, num_bins = 1000)), is_input=True).connect(ml.MLPBlock([64, 32])) model = ml.Model(model_body, ml.BinaryClassificationTask("click")) Parameters ---------- schema : Schema The `Schema` with the input features num_bins : int Number of hash bins. output_mode: string Specification for the output of the layer. Defaults to "one_hot". Values can be "int", or "one_hot", configuring the layer as follows: - `"int"`: Return the integer bin indices directly. - `"one_hot"`: Encodes each individual element in the input into an array with the same size as `num_bins`, containing a 1 at the input's bin index. sparse: bool Boolean. Only applicable to `"one_hot"` mode. If True, returns a `SparseTensor` instead of a dense `Tensor`. Defaults to False. output_name: string Name of output feature, if not specified, default would be cross_<feature_name>_<feature_name>_<...> infer_num_bins: bool If True, num_bins would be set as the multiplier of feature cadinalities, if the multiplier is bigger than max_num_bins, then it would be cliped by max_num_bins max_num_bins: int Upper bound of num_bins, by default 100000. """
[docs] def __init__( self, schema: Schema, num_bins: int = None, sparse: bool = False, output_mode: str = "one_hot", output_name: str = None, infer_num_bins: bool = False, max_num_bins: int = 100000, **kwargs, ): super().__init__(**kwargs) if (not infer_num_bins) and (num_bins is None): raise ValueError( "num_bins is not given, and infer_num_bins is False, either of them " "is required, if you want to set fixed num_bins, then set infer_num_bins to False," " and set num_bins to an integer value, if you want to infer num_bins from the " "mulplier of feature cardinalities, at the same time you can set the max_num_bins." ) if not (output_mode in ["int", "one_hot", "multi_hot"]): raise ValueError("output_mode must be 'int', 'one_hot', or 'multi_hot'") self.schema = schema self.output_mode = output_mode self.sparse = sparse if not output_name: self.output_name = "cross" for name in sorted(schema.column_names): self.output_name = self.output_name + "_" + name else: self.output_name = output_name # Set num_bins if num_bins: self.num_bins = num_bins else: cardinalities = schema_utils.categorical_cardinalities(schema) inferred_num_bins_from_cardinalities_multiplier = 1.0 for cardinality in cardinalities.values(): inferred_num_bins_from_cardinalities_multiplier = ( inferred_num_bins_from_cardinalities_multiplier * cardinality ) self.num_bins = int(min(max_num_bins, inferred_num_bins_from_cardinalities_multiplier)) # Used to enforce the shape of 2D tensors depending on the output_mode, when in graph mode self.features_2d_last_dim = dict()
[docs] def call(self, inputs): self._check_at_least_two_inputs() _inputs = {} for name in self.schema.column_names: reshape_fn = ( tf.sparse.reshape if isinstance(inputs[name], tf.SparseTensor) else tf.reshape ) inputs[name] = reshape_fn(inputs[name], tf.shape(inputs[name])[:-1]) assertion_min_rank = tf.Assert( tf.logical_and( tf.greater_equal(tf.rank(inputs[name]), 1), tf.less_equal(tf.rank(inputs[name]), 2), ), [ "`HashedCross` only accepts 1D or 2D-shaped inputs, but got " f"different rank for {name}" ], ) with tf.control_dependencies([assertion_min_rank]): _inputs[name] = reshape_categorical_input_tensor_for_encoding( inputs[name], name, self.features_2d_last_dim, self.output_mode, ensure_1d_for_one_hot_mode=False, ) # Perform the cross and convert to dense output = tf.sparse.cross_hashed(list(_inputs.values()), self.num_bins) if self.output_mode == ONE_HOT: output = tf.sparse.reshape(output, [-1]) # Encode outputs. outputs = {} outputs[self.output_name] = p_utils.encode_categorical_inputs( output, output_mode=self.output_mode, depth=self.num_bins, sparse=self.sparse, ) if not self.sparse and isinstance(outputs[self.output_name], tf.SparseTensor): outputs[self.output_name] = tf.sparse.to_dense(outputs[self.output_name]) return outputs
[docs] def compute_output_shape(self, input_shapes): self._check_at_least_two_inputs() self._check_input_shape_and_type(input_shapes) # Save the last dim for 2D features so that we can reshape them in graph mode in call() for key in self.schema.column_names: input_shape = input_shapes[key] if input_shape[-1] == 1: input_shape = input_shape[:-1] if len(input_shape) == 2: self.features_2d_last_dim[key] = input_shape[-1] output_shape = {} batch_size = self.calculate_batch_size_from_input_shapes(input_shapes) output_shape[self.output_name] = p_utils.compute_shape_for_encode_categorical( shape=[batch_size, 1], output_mode=self.output_mode, depth=self.num_bins ) return output_shape
[docs] def get_config(self): config = super().get_config() config.update( { "num_bins": self.num_bins, "output_mode": self.output_mode, "sparse": self.sparse, "output_name": self.output_name, } ) if self.schema: config["schema"] = schema_utils.schema_to_tensorflow_metadata_json(self.schema) return config
def _check_at_least_two_inputs(self): if len(self.schema) < 2: raise ValueError( "`HashedCross` should be called on at least two features. " f"Received: {len(self.schema)} schemas" ) for name, column_schema in self.schema.column_schemas.items(): if Tags.CATEGORICAL not in column_schema.tags: warnings.warn( f"Please make sure input features to be categorical, detect {name} " "has no categorical tag" ) def _check_input_shape_and_type(self, inputs_shapes) -> TabularData: _inputs_shapes = [] for name in self.schema.column_names: shape = inputs_shapes[name] if shape[-1] == 1: shape = shape[:-1] if shape.rank not in [1, 2]: raise ValueError( "All `HashedCross` inputs should have 1D or 2D shape. " f"Received: input {name} with shape={shape}" ) _inputs_shapes.append(shape) if len(set([shape[0] for shape in _inputs_shapes])) > 1: raise ValueError( "All `HashedCross` inputs should have equal first dim (batch size). " f"Received: inputs={_inputs_shapes}" )
[docs]def HashedCrossAll( schema: Schema, num_bins: int = None, infer_num_bins: bool = False, max_num_bins: int = 100000, max_level: int = 2, sparse: bool = False, output_mode: str = "one_hot", ignore_combinations: Sequence[Sequence[str]] = [], ) -> ParallelBlock: """Parallel block consists of HashedCross blocks for all combinations of schema with all levels through level 2 to max_level. Parameters: ---------- schema: Schema Schema of the input data. max_level: int Max num of levels, this function would hash cross all combinations, the number of features included in these combinations is in the range from 2 to max_level, i.e. [2, max_level], by default 2, which means it would return hashed cross blocks of all level 2 combinations of features within schema For example, if schemas contain 3 features: feature_1, feature_2 and feature_3, if we call `level_3_cross = HashedCrossAll(schema = schemas, max_level = 3)` Then level_3_cross is a Parallel block, which contains 4 hashed crosses of 1) feature_1 and feature_2 2) feature_1 and feature_3 3) feature_2 and feature_3 4) feature_1, feature_2 and feature_3 num_bins : int Number of hash bins, note that num_bins is for all hashed cross transformation block, no matter what level it is, if you want to set different num_bins for different hashed cross, please use HashedCross to define each one with different num_bins. output_mode: string Specification for the output of the layer. Defaults to `"one_hot"`. Values can be `"int"`, or `"one_hot"` configuring the layer as follows: - `"int"`: Return the integer bin indices directly. - `"one_hot"`: Encodes each individual element in the input into an array the same size as `num_bins`, containing a 1 at the input's bin index. sparse : bool Boolean. Only applicable to `"one_hot"` mode. If True, returns a `SparseTensor` instead of a dense `Tensor`. Defaults to False. infer_num_bins: bool If True, all num_bins would be set as the multiplier of corresponding feature cadinalities, if the multiplier is bigger than max_num_bins, then it would be cliped by max_num_bins max_num_bins: int Upper bound of num_bins for all hashed cross transformation blocks, by default 100000. ignore_combinations : Sequence[Sequence[str]] If provided, ignore feature combinations from this list. Useful to avoid interacting features whose combined value is always the same. For example, interacting these features is not useful and one of the features is dependent on the other : [["item_id", "item_category"], ["user_id", "user_birth_city", "user_age"]] Example usage:: level_3_cross = HashedCrossAll(schema = schemas, max_level = 3, infer_num_bins = True) """ if max_level < 2 or max_level > 3: raise ValueError( "Please make sure 1 < max_level < 4, because the cross transformation requires at " "least 2 features, and HashedCrossAll only support at most 3 level, if you want to " "cross more than 3 features, please use HashedCross. Received: max_level = {max_level}" ) if len(schema) < 2: raise ValueError( "`HashedCrossing` should be called on at least two features. " f"Received: {len(schema)} schemas" ) all_combinations = [] for combination_tuple in combinations(schema.column_names, 2): all_combinations.append(set(combination_tuple)) if max_level == 3: for combination_tuple in combinations(schema.column_names, 3): all_combinations.append(set(combination_tuple)) if ignore_combinations: ignore_combinations_set = list([set(c) for c in ignore_combinations]) all_combinations = list( [ combination for combination in all_combinations if (combination not in ignore_combinations_set) ] ) hashed_crosses = [] for schema_names in all_combinations: hashed_crosses.append( HashedCross( schema=schema.select_by_name(schema_names), num_bins=num_bins, sparse=sparse, output_mode=output_mode, infer_num_bins=infer_num_bins, max_num_bins=max_num_bins, ) ) return ParallelBlock(hashed_crosses)
[docs]@Block.registry.register_with_multiple_names("to_target") @tf.keras.utils.register_keras_serializable(package="merlin_models") class ToTarget(Block): """Transform columns to targets"""
[docs] def __init__( self, schema: Schema, *target: Union[ColumnSchema, Schema, str, Tags], one_hot: bool = False, **kwargs, ): super().__init__(**kwargs) self.schema = schema self.target = target self.one_hot = one_hot
def _target_column_schemas(self): target_columns = {} for t in self.target: if isinstance(t, str): target_columns[t] = self.schema.select_by_name(t).first elif isinstance(t, ColumnSchema): target_columns[t.name] = t elif isinstance(t, Tags): selected_schema = self.schema.select_by_tag(t) for col_name, col_schema in selected_schema.column_schemas.items(): target_columns[col_name] = col_schema elif isinstance(t, Schema): for col_name, col_schema in t.column_schemas.items(): target_columns[col_name] = col_schema else: raise ValueError(f"Unsupported target type {type(t)}") return target_columns
[docs] def call( self, inputs: TabularData, targets=None, **kwargs ) -> Tuple[TabularData, Union[TabularData, tf.Tensor, None]]: target_columns = self._target_column_schemas() outputs = {} for name in inputs: if name not in target_columns: outputs[name] = inputs[name] continue if isinstance(targets, dict): _target = targets.get(name, inputs[name]) if self.one_hot: _target = self._to_one_hot(name, _target) targets[name] = _target else: _target = inputs[name] if self.one_hot: _target = self._to_one_hot(name, _target) targets = _target return outputs, targets
def _to_one_hot(self, name, target): num_classes = schema_utils.categorical_cardinalities(self.schema)[name] one_hot = tf.one_hot(target, num_classes, dtype=target.dtype) return tf.squeeze(one_hot)
[docs] def compute_output_shape(self, input_shape): return input_shape
[docs] def compute_output_schema(self, input_schema: Schema) -> Schema: target_columns = self._target_column_schemas() output_column_schemas = {} for col_name, col_schema in input_schema.column_schemas.items(): if col_name in target_columns: output_column_schemas[col_name] = col_schema.with_tags(Tags.TARGET) else: output_column_schemas[col_name] = col_schema return Schema(column_schemas=output_column_schemas)
def reshape_categorical_input_tensor_for_encoding( input, feat_name, features_2d_last_dim, output_mode, ensure_1d_for_one_hot_mode=True ): output = input reshape_fn = tf.sparse.reshape if isinstance(output, tf.SparseTensor) else tf.reshape if ensure_1d_for_one_hot_mode and output_mode == ONE_HOT: if features_2d_last_dim.get(feat_name, None) == 1 or input.get_shape()[-1] == 1: output = reshape_fn(output, [-1]) elif feat_name in features_2d_last_dim or ( input.get_shape()[-1] is not None and input.get_shape().rank == 2 ): raise ValueError( "One-hot accepts input tensors that are squeezable to 1D, but received" f" a tensor with shape: {input.get_shape()}" ) else: if feat_name in features_2d_last_dim or ( input.get_shape()[-1] is not None and len(input.get_shape()) == 2 ): # Ensures that the shape is known to avoid error on graph mode new_shape = (-1, features_2d_last_dim.get(feat_name, None) or input.get_shape()[-1]) output = reshape_fn(output, new_shape) else: expand_dims_fn = ( tf.sparse.expand_dims if isinstance(output, tf.SparseTensor) else tf.expand_dims ) output = expand_dims_fn(output, 1) return output
[docs]@tf.keras.utils.register_keras_serializable(package="merlin.models") class BroadcastToSequence(Block): """Broadcast context features to match the timesteps of sequence features. This layer supports mask propagation. If the sequence features have a mask. The context features being broadcast will inherit the mask. Parameters ---------- context_schema : Schema The schema representing contextual features to be broadcast sequence_schema : Schema The schema representing sequence features """
[docs] def __init__(self, context_schema: Schema, sequence_schema: Schema, **kwargs): super().__init__(**kwargs) self.context_schema = context_schema self.sequence_schema = sequence_schema
[docs] def call(self, inputs: TabularData) -> TabularData: inputs = self._broadcast(inputs) return inputs
def _check_sequence_features(self, inputs: TabularData): sequence_features = self.sequence_schema.column_names if len(sequence_features) == 0: return not_found_seq_features = set(sequence_features).difference(set(inputs.keys())) if len(not_found_seq_features) > 0: raise ValueError( f"Some sequential features were not found in the inputs: {not_found_seq_features}" ) sequence_length = None sequence_is_ragged = None for k, v in inputs.items(): if k in sequence_features: if isinstance(v, tf.RaggedTensor): if sequence_is_ragged is False: raise ValueError( "Sequential features must all be ragged or all dense, not both." ) new_sequence_length = v.row_lengths() sequence_is_ragged = True else: if sequence_is_ragged is True: raise ValueError( "Sequential features must all be ragged or all dense, not both." ) new_sequence_length = [v.shape[1]] sequence_is_ragged = False # check sequences lengths match if sequence_length is not None: sequence_lengths_equal = tf.math.reduce_all( tf.equal(new_sequence_length, sequence_length) ) tf.Assert( sequence_lengths_equal, [ "Sequential features must share the same sequence lengths", (sequence_length, new_sequence_length), ], ) sequence_length = new_sequence_length def _check_context_features(self, inputs: TabularData): context_features = self.context_schema.column_names if len(context_features) == 0: return not_found_seq_features = set(context_features).difference(set(inputs.keys())) if len(not_found_seq_features) > 0: raise ValueError( f"Some contextual features were not found in the inputs: {not_found_seq_features}" ) for k in context_features: v = inputs[k] if not isinstance(v, tf.Tensor): raise ValueError(f"A contextual feature ({k}) should be a dense tf.Tensor") if len(v.shape) >= 3: raise ValueError( f"A contextual feature ({k}) should be a 1D or " "2D tf.Tensor: {v.shape}." ) @tf.function def _broadcast(self, inputs): self._check_sequence_features(inputs) self._check_context_features(inputs) sequence_features = self.sequence_schema.column_names context_features = self.context_schema.column_names if len(sequence_features) == 0 and len(context_features) == 0: return inputs sequence_features_values = list( [inputs[k] for k in sequence_features if inputs[k] is not None] ) if len(sequence_features_values) == 0: return inputs template_seq_feature_value = sequence_features_values[0] non_seq_target = {} for fname in context_features: if inputs[fname] is None: continue if isinstance(template_seq_feature_value, tf.RaggedTensor): new_value = inputs[fname] while len(new_value.shape) < len(template_seq_feature_value.shape): new_value = tf.expand_dims(new_value, 1) # Here broadcast the context feature using the same shape # of a 3D ragged sequential feature with compatible # So that the context feature shape becomes (batch size, seq length, feature dim) non_seq_target[fname] = ( tf.ones_like(template_seq_feature_value[..., :1], dtype=new_value.dtype) * new_value ) else: shape = inputs[fname].shape sequence_length = template_seq_feature_value.shape[1] target_shape = shape[:1] + [sequence_length] + shape[1:] non_seq_target[fname] = tf.broadcast_to( tf.expand_dims(inputs[fname], 1), target_shape ) inputs = {**inputs, **non_seq_target} return inputs
[docs] def compute_output_shape( self, input_shape: Dict[str, tf.TensorShape] ) -> Dict[str, tf.TensorShape]: sequence_length = None for k in input_shape: if k in self.sequence_schema.column_names: sequence_length = input_shape[k][1] break context_shapes = {} for k in input_shape: if k in self.context_schema.column_names: rest_shape = input_shape[k][1:] context_shapes[k] = ( input_shape[k][:1] + tf.TensorShape([sequence_length]) + rest_shape ) output_shape = {**input_shape, **context_shapes} return output_shape
[docs] def compute_mask(self, inputs: TabularData, mask: Optional[TabularData] = None): if mask is None: return None # find the sequence mask sequence_mask = None for k in mask: if mask[k] is not None and k in self.sequence_schema.column_names: sequence_mask = mask[k] break # no sequence mask found if sequence_mask is None: return mask # set the mask value for those that are none masks_context = {} for k in mask: if mask[k] is None and k in self.context_schema.column_names: masks_context[k] = sequence_mask new_mask = {**mask, **masks_context} return new_mask
[docs] def get_config(self): config = super().get_config() config["context_schema"] = schema_utils.schema_to_tensorflow_metadata_json( self.context_schema ) config["sequence_schema"] = schema_utils.schema_to_tensorflow_metadata_json( self.sequence_schema ) return config
[docs] @classmethod def from_config(cls, config): context_schema = schema_utils.tensorflow_metadata_json_to_schema( config.pop("context_schema") ) sequence_schema = schema_utils.tensorflow_metadata_json_to_schema( config.pop("sequence_schema") ) return cls(context_schema, sequence_schema, **config)
def expected_input_cols_from_schema(schema: Schema, inputs: Optional[TabularData] = None) -> str: """Returns the columns/keys that are expected to be present in inputs dictionary based on the schema. The particular cases are ragged list/multi-hot columns which are represented by two keys in the inputs dict, suffixed by "__values" and "__offsets". For example, the "categories" ragged list feature are represented by the Merlin Dataloader as "categories__values" and "categories__offsets" keys, which are converted by Merlin Models as single Ragged/Dense tensor Parameters ---------- schema : Schema The schema with the expected features inputs : Optional[TabularData] If provided, checks is ragged list features are already converted from (__ragged, __offsets) dict representation Returns ------- str Returns a list with the expected columns in the input dictionary """ schema = schema.remove_by_tag(Tags.TARGET) expected_features = [] for name in schema.column_names: if ( schema[name].shape.is_list and schema[name].shape.is_ragged and (inputs is None or name not in inputs) ): expected_features.extend([f"{name}__values", f"{name}__offsets"]) else: expected_features.append(name) return expected_features