#
# Copyright (c) 2021, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import numpy as np
import tensorflow as tf
from tensorflow.python.feature_column import feature_column_v2 as fc
# pylint has issues with TF array ops, so disable checks until fixed:
# https://github.com/PyCQA/pylint/issues/3613
# pylint: disable=no-value-for-parameter, unexpected-keyword-arg
def _sort_columns(feature_columns):
return sorted(feature_columns, key=lambda col: col.name)
def _validate_numeric_column(feature_column):
if len(feature_column.shape) > 1:
return "Matrix numeric features are not allowed, " "found feature {} with shape {}".format(
feature_column.key, feature_column.shape
)
def _validate_categorical_column(feature_column):
if not isinstance(feature_column, fc.IdentityCategoricalColumn):
return (
"Only acceptable categorical columns for feeding "
"embeddings are identity, found column {} of type {}. "
"Consider using NVTabular online preprocessing to perform "
"categorical transformations".format(feature_column.name, type(feature_column).__name__)
)
def _validate_dense_feature_columns(feature_columns):
_errors = []
for feature_column in feature_columns:
if isinstance(feature_column, fc.CategoricalColumn):
if not isinstance(feature_column, fc.BucketizedColumn):
_errors.append(
"All feature columns must be dense, found categorical "
"column {} of type {}. Please wrap categorical columns "
"in embedding or indicator columns before passing".format(
feature_column.name, type(feature_column).__name__
)
)
else:
_errors.append(
"Found bucketized column {}. DenseFeatures layer "
"cannot apply bucketization preprocessing. Consider using "
"NVTabular to do preprocessing offline".format(feature_column.name)
)
elif isinstance(feature_column, (fc.EmbeddingColumn, fc.IndicatorColumn)):
_errors.append(_validate_categorical_column(feature_column.categorical_column))
elif isinstance(feature_column, fc.NumericColumn):
_errors.append(_validate_numeric_column(feature_column))
_errors = list(filter(lambda e: e is not None, _errors))
if len(_errors) > 0:
msg = "Found issues with columns passed to DenseFeatures:"
msg += "\n\t".join(_errors)
raise ValueError(_errors)
def _validate_stack_dimensions(feature_columns):
dims = []
for feature_column in feature_columns:
if isinstance(feature_column, fc.EmbeddingColumn):
dimension = feature_column.dimension
elif isinstance(feature_column, fc.IndicatorColumn):
dimension = feature_column.categorical_column.num_buckets
else:
dimension = feature_column.shape[0]
dims.append(dimension)
dim0 = dims[0]
if not all(dim == dim0 for dim in dims[1:]):
dims = ", ".join(map(str, dims))
raise ValueError(
"'stack' aggregation requires all categorical "
"embeddings and continuous features to have same "
"size. Found dimensions {}".format(dims)
)
def _categorical_embedding_lookup(table, inputs, feature_name, combiner):
# check for sparse embeddings by name
# build values and nnz tensors into ragged array, convert to sparse
if isinstance(inputs[feature_name], tuple):
values = inputs[feature_name][0][:, 0]
row_lengths = inputs[feature_name][1][:, 0]
x = tf.RaggedTensor.from_row_lengths(values, row_lengths).to_sparse()
# use ragged array for sparse embedding lookup.
# note we're using safe_embedding_lookup_sparse to handle empty rows
# ( https://github.com/NVIDIA/NVTabular/issues/389 )
embeddings = tf.nn.safe_embedding_lookup_sparse(table, x, None, combiner=combiner)
else:
embeddings = tf.gather(table, inputs[feature_name][:, 0])
return embeddings
def _handle_continuous_feature(inputs, feature_column):
if feature_column.shape[0] > 1:
x = inputs[feature_column.name]
if isinstance(x, tuple):
x = x[0]
return tf.reshape(x, (-1, feature_column.shape[0]))
return inputs[feature_column.name]
[docs]class DenseFeatures(tf.keras.layers.Layer):
"""
Layer which maps a dictionary of input tensors to a dense, continuous
vector digestible by a neural network. Meant to reproduce the API exposed
by `tf.keras.layers.DenseFeatures` while reducing overhead for the
case of one-hot categorical and scalar numeric features.
Uses TensorFlow `feature_column` to represent inputs to the layer, but
does not perform any preprocessing associated with those columns. As such,
it should only be passed `numeric_column` objects and their subclasses,
`embedding_column` and `indicator_column`. Preprocessing functionality should
be moved to NVTabular.
For multi-hot categorical or vector continuous data, represent the data for
a feature with a dictionary entry `"<feature_name>__values"` corresponding
to the flattened array of all values in the batch. For multi-hot categorical
data, there should be a corresponding `"<feature_name>__nnzs"` entry that
describes how many categories are present in each sample (and so has length
`batch_size`).
Note that categorical columns should be wrapped in embedding or
indicator columns first, consistent with the API used by
`tf.keras.layers.DenseFeatures`.
Example usage::
column_a = tf.feature_column.numeric_column("a", (1,))
column_b = tf.feature_column.categorical_column_with_identity("b", 100)
column_b_embedding = tf.feature_column.embedding_column(column_b, 4)
inputs = {
"a": tf.keras.Input(name="a", shape=(1,), dtype=tf.float32),
"b": tf.keras.Input(name="b", shape=(1,), dtype=tf.int64)
}
x = DenseFeatures([column_a, column_b_embedding])(inputs)
Parameters
----------
feature_columns : list of `tf.feature_column`
feature columns describing the inputs to the layer
aggregation : str in ("concat", "stack")
how to combine the embeddings from multiple features
"""
def __init__(self, feature_columns, aggregation="concat", name=None, **kwargs):
# sort feature columns to make layer independent of column order
feature_columns = _sort_columns(feature_columns)
_validate_dense_feature_columns(feature_columns)
if aggregation == "stack":
_validate_stack_dimensions(feature_columns)
elif aggregation != "concat":
raise ValueError(
"Unrecognized aggregation {}, must be stack or concat".format(aggregation)
)
self.feature_columns = feature_columns
self.aggregation = aggregation
super(DenseFeatures, self).__init__(name=name, **kwargs)
[docs] def build(self, input_shapes):
assert all(shape[1] == 1 for shape in input_shapes.values() if not isinstance(shape, tuple))
assert all(shape[0][1] == 1 for shape in input_shapes.values() if isinstance(shape, tuple))
self.embedding_tables = {}
for feature_column in self.feature_columns:
if isinstance(feature_column, fc.NumericColumn):
continue
feature_name = feature_column.categorical_column.key
num_buckets = feature_column.categorical_column.num_buckets
if isinstance(feature_column, fc.EmbeddingColumn):
self.embedding_tables[feature_name] = self.add_weight(
name="{}/embedding_weights".format(feature_name),
trainable=True,
initializer="glorot_normal",
shape=(num_buckets, feature_column.dimension),
)
else:
self.embedding_tables[feature_name] = self.add_weight(
name="{}/embedding_weights".format(feature_name),
trainable=False,
initializer=tf.constant_initializer(np.eye(num_buckets)),
shape=(num_buckets, num_buckets),
)
self.built = True
[docs] def call(self, inputs):
features = []
for feature_column in self.feature_columns:
if isinstance(feature_column, fc.NumericColumn):
x = _handle_continuous_feature(inputs, feature_column)
features.append(x)
else:
feature_name = feature_column.categorical_column.name
table = self.embedding_tables[feature_name]
combiner = getattr(feature_column, "combiner", "sum")
embeddings = _categorical_embedding_lookup(table, inputs, feature_name, combiner)
features.append(embeddings)
if self.aggregation == "stack":
return tf.stack(features, axis=1)
return tf.concat(features, axis=1)
[docs] def compute_output_shape(self, input_shapes):
input_shape = list(input_shapes.values())[0]
if self.aggregation == "concat":
output_dim = len(self.numeric_features) + sum(
[shape[-1] for shape in self.embedding_shapes.values()]
)
return (input_shape[0], output_dim)
else:
embedding_dim = list(self.embedding_shapes.values())[0]
return (input_shape[0], len(self.embedding_shapes), embedding_dim)
[docs] def get_config(self):
return {
"feature_columns": self.feature_columns,
"aggregation": self.aggregation,
}
def _validate_linear_feature_columns(feature_columns):
_errors = []
for feature_column in feature_columns:
if isinstance(feature_column, (fc.EmbeddingColumn, fc.IndicatorColumn)):
_errors.append(
"Only pass categorical or numeric columns to ScalarLinearFeatures "
"layer, found column {} of type".format(feature_column)
)
elif isinstance(feature_column, fc.NumericColumn):
_errors.append(_validate_numeric_column(feature_column))
else:
_errors.append(_validate_categorical_column(feature_column))
_errors = list(filter(lambda e: e is not None, _errors))
if len(_errors) > 0:
msg = "Found issues with columns passed to ScalarDenseFeatures:"
msg += "\n\t".join(_errors)
raise ValueError(_errors)
# TODO: is there a clean way to combine these two layers
# into one, maybe with a "sum" aggregation? Major differences
# seem to be whether categorical columns are wrapped in
# embeddings and the numeric matmul, both of which seem
# reasonably easy to check. At the very least, we should
# be able to subclass I think?
[docs]class LinearFeatures(tf.keras.layers.Layer):
"""
Layer which implements a linear combination of one-hot categorical
and scalar numeric features. Based on the "wide" branch of the Wide & Deep
network architecture.
Uses TensorFlow ``feature_column``s to represent inputs to the layer, but
does not perform any preprocessing associated with those columns. As such,
it should only be passed ``numeric_column`` and
``categorical_column_with_identity``. Preprocessing functionality should
be moved to NVTabular.
Also note that, unlike ScalarDenseFeatures, categorical columns should
NOT be wrapped in embedding or indicator columns first.
Example usage::
column_a = tf.feature_column.numeric_column("a", (1,))
column_b = tf.feature_column.categorical_column_with_identity("b", 100)
inputs = {
"a": tf.keras.Input(name="a", shape=(1,), dtype=tf.float32),
"b": tf.keras.Input(name="b", shape=(1,), dtype=tf.int64)
}
x = ScalarLinearFeatures([column_a, column_b])(inputs)
Parameters
----------
feature_columns : list of tf.feature_column
feature columns describing the inputs to the layer
"""
def __init__(self, feature_columns, name=None, **kwargs):
feature_columns = _sort_columns(feature_columns)
_validate_linear_feature_columns(feature_columns)
self.feature_columns = feature_columns
super(LinearFeatures, self).__init__(name=name, **kwargs)
[docs] def build(self, input_shapes):
assert all(shape[1] == 1 for shape in input_shapes.values() if not isinstance(shape, tuple))
assert all(shape[0][1] == 1 for shape in input_shapes.values() if isinstance(shape, tuple))
# TODO: I've tried combining all the categorical tables
# into a single giant lookup op, but it ends up turning
# out the adding the offsets to lookup indices at call
# time ends up being much slower due to kernel overhead
# Still, a better (and probably custom) solutions would
# probably be desirable
numeric_kernel_dim = 0
self.embedding_tables = {}
for feature_column in self.feature_columns:
if isinstance(feature_column, fc.NumericColumn):
numeric_kernel_dim += feature_column.shape[0]
continue
self.embedding_tables[feature_column.key] = self.add_weight(
name="{}/embedding_weights".format(feature_column.key),
initializer="zeros",
trainable=True,
shape=(feature_column.num_buckets, 1),
)
if numeric_kernel_dim > 0:
self.embedding_tables["numeric"] = self.add_weight(
name="numeric/embedding_weights",
initializer="zeros",
trainable=True,
shape=(numeric_kernel_dim, 1),
)
self.bias = self.add_weight(name="bias", initializer="zeros", trainable=True, shape=(1,))
self.built = True
[docs] def call(self, inputs):
x = self.bias
numeric_inputs = []
for feature_column in self.feature_columns:
if isinstance(feature_column, fc.NumericColumn):
numeric_inputs.append(_handle_continuous_feature(inputs, feature_column))
else:
table = self.embedding_tables[feature_column.key]
embeddings = _categorical_embedding_lookup(table, inputs, feature_column.key, "sum")
x = x + embeddings
if len(numeric_inputs) > 0:
numerics = tf.concat(numeric_inputs, axis=1)
x = x + tf.matmul(numerics, self.embedding_tables["numeric"])
return x
[docs] def compute_output_shape(self, input_shapes):
batch_size = list(input_shapes.values())[0].shape[0]
return (batch_size, 1)
[docs] def get_config(self):
return {
"feature_columns": self.feature_columns,
}