Source code for merlin.models.tf.inputs.base
#
# Copyright (c) 2021, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import logging
from typing import Callable, Dict, Optional, Tuple, Type, Union
from tensorflow.keras.layers import Layer
from merlin.models.tf.core.aggregation import SequenceAggregation, SequenceAggregator
from merlin.models.tf.core.base import Block, BlockType
from merlin.models.tf.core.combinators import ParallelBlock, TabularAggregationType
from merlin.models.tf.inputs.continuous import Continuous, ContinuousFeatures
from merlin.models.tf.inputs.embedding import (
ContinuousEmbedding,
EmbeddingFeatures,
EmbeddingOptions,
Embeddings,
SequenceEmbeddingFeatures,
)
from merlin.models.tf.transforms.tensor import ListToDense
from merlin.schema import Schema, Tags, TagsType
LOG = logging.getLogger("merlin-models")
[docs]def InputBlock(
schema: Schema,
branches: Optional[Dict[str, Block]] = None,
post: Optional[BlockType] = None,
aggregation: Optional[TabularAggregationType] = None,
seq: bool = False,
max_seq_length: Optional[int] = None,
add_continuous_branch: bool = True,
continuous_tags: Optional[Union[TagsType, Tuple[Tags]]] = (Tags.CONTINUOUS,),
continuous_projection: Optional[Block] = None,
add_embedding_branch: bool = True,
embedding_options: EmbeddingOptions = EmbeddingOptions(),
categorical_tags: Optional[Union[TagsType, Tuple[Tags]]] = (Tags.CATEGORICAL,),
sequential_tags: Optional[Union[TagsType, Tuple[Tags]]] = (Tags.SEQUENCE,),
split_sparse: bool = False,
seq_aggregator: Block = SequenceAggregator(SequenceAggregation.MEAN),
**kwargs,
) -> Block:
"""The entry block of the model to process input features from a schema.
This function creates continuous and embedding layers, and connects them via `ParallelBlock`.
If aggregation argument is not set, it returns a dictionary of multiple tensors
each corresponds to an input feature.
Otherwise, it merges the tensors into one using the aggregation method.
Example usage::
mlp = ml.InputBlock(schema).connect(ml.MLPBlock([64, 32]))
Parameters:
----------
schema: Schema
Schema of the input data. This Schema object will be automatically generated using
[NVTabular](https://nvidia-merlin.github.io/NVTabular/main/Introduction.html).
Next to this, it's also possible to construct it manually.
branches: Dict[str, Block], optional
Dictionary of branches to use inside the InputBlock.
post: Optional[BlockType]
Transformations to apply on the inputs after the module is
called (so **after** `forward`).
Defaults to None
aggregation: Optional[TabularAggregationType]
Aggregation to apply after processing the `forward`-method to output a single Tensor.
Defaults to None
seq: bool
Whether to process inputs for sequential model (returns 3-D tensor)
or not (returns 2-D tensor). Use `seq=True` to treat the sparse (list) features
as sequences (e.g. for sequential recommendation) and `seq=False` to treat sparse
features as multi-hot categorical representations.
Defaults to False
add_continuous_branch: bool
If set, add the branch to process continuous features
Defaults to True
continuous_tags: Optional[Union[TagsType, Tuple[Tags]]]
Tags to filter the continuous features
Defaults to (Tags.CONTINUOUS,)
continuous_projection: Optional[Block]
If set, concatenate all numerical features and projet using the
specified Block.
Defaults to None
add_embedding_branch: bool
If set, add the branch to process categorical features
Defaults to True
embedding_options : EmbeddingOptions, optional
An EmbeddingOptions instance, which allows for a number of
options for the embedding table, by default EmbeddingOptions()
categorical_tags: Optional[Union[TagsType, Tuple[Tags]]]
Tags to filter the continuous features
Defaults to (Tags.CATEGORICAL,)
sequential_tags: Optional[Union[TagsType, Tuple[Tags]]]
Tags to filter the sparse features
Defaults to (Tags.SEQUENCE,)
split_sparse: Optional[bool]
When True, separate the processing of context (2-D) and sparse features (3-D).
Defaults to False
seq_aggregator: Block
If non-sequential model (seq=False):
aggregate the sparse features tensor along the sequence axis.
Defaults to SequenceAggregator('mean')
"""
branches = branches or {}
if split_sparse:
sparse_schema = schema.select_by_tag(sequential_tags)
context_schema = schema.remove_by_tag(sequential_tags)
if not sparse_schema:
raise ValueError(
"Please make sure that schema has features tagged as 'sequence' when"
"`split_context` is set to True"
)
if not aggregation:
LOG.info(
"aggregation is not provided, "
"default `concat` will be used to merge sequential features"
)
aggregation = "concat"
agg = aggregation
sparse_interactions = InputBlock(
sparse_schema,
branches,
post,
aggregation=agg,
seq=True,
max_seq_length=max_seq_length,
add_continuous_branch=add_continuous_branch,
continuous_tags=continuous_tags,
continuous_projection=continuous_projection,
add_embedding_branch=add_embedding_branch,
embedding_options=embedding_options,
categorical_tags=categorical_tags,
split_sparse=False,
)
if not seq:
sparse_interactions = sparse_interactions.connect(seq_aggregator)
if not context_schema:
return sparse_interactions
branches["sparse"] = sparse_interactions
return InputBlock(
context_schema,
branches,
post,
aggregation=agg,
seq=False,
add_continuous_branch=add_continuous_branch,
continuous_tags=continuous_tags,
continuous_projection=continuous_projection,
add_embedding_branch=add_embedding_branch,
embedding_options=embedding_options,
categorical_tags=categorical_tags,
split_sparse=False,
)
if (
add_continuous_branch
and schema.select_by_tag(continuous_tags).excluding_by_tag(Tags.TARGET).column_schemas
):
pre = None
if max_seq_length and seq:
pre = ListToDense(max_seq_length)
branches["continuous"] = ContinuousFeatures.from_schema( # type: ignore
schema,
tags=continuous_tags,
pre=pre,
)
if (
add_embedding_branch
and schema.select_by_tag(categorical_tags).excluding_by_tag(Tags.TARGET).column_schemas
):
emb_cls: Type[EmbeddingFeatures] = SequenceEmbeddingFeatures if seq else EmbeddingFeatures
emb_kwargs = {}
if max_seq_length and seq:
emb_kwargs["max_seq_length"] = max_seq_length
branches["categorical"] = emb_cls.from_schema( # type: ignore
schema, tags=categorical_tags, embedding_options=embedding_options, **emb_kwargs
)
if continuous_projection:
return ContinuousEmbedding(
ParallelBlock(branches),
continuous_projection,
aggregation=aggregation,
post=post,
name="continuous_projection",
)
return ParallelBlock(branches, aggregation=aggregation, post=post, is_input=True, **kwargs)
INPUT_TAG_TO_BLOCK: Dict[Tags, Callable[[Schema], Layer]] = {
Tags.CONTINUOUS: Continuous,
Tags.CATEGORICAL: Embeddings,
}
def InputBlockV2(
schema: Optional[Schema] = None,
categorical: Union[Tags, Layer] = Tags.CATEGORICAL,
continuous: Union[Tags, Layer] = Tags.CONTINUOUS,
pre: Optional[BlockType] = None,
post: Optional[BlockType] = None,
aggregation: Optional[TabularAggregationType] = "concat",
tag_to_block=INPUT_TAG_TO_BLOCK,
**branches,
) -> ParallelBlock:
"""The entry block of the model to process input features from a schema.
This is a new version of InputBlock, which is more flexible for accepting
the external definition of `embeddings` block. After `22.10` this will become the default.
Simple Usage::
inputs = InputBlockV2(schema)
Custom Embeddings::
inputs = InputBlockV2(
schema,
categorical=Embeddings(schema, dim=32)
)
Sparse outputs for one-hot::
inputs = InputBlockV2(
schema,
categorical=CategoryEncoding(schema, sparse=True),
post=ToSparse()
)
Add continuous projection::
inputs = InputBlockV2(
schema,
continuous=ContinuousProjection(continuous_schema, MLPBlock([32])),
)
Merge 2D and 3D (for session-based)::
inputs = InputBlockV2(
schema,
post=BroadcastToSequence(context_schema, sequence_schema)
)
Parameters
----------
schema : Schema
Schema of the input data. This Schema object will be automatically generated using
[NVTabular](https://nvidia-merlin.github.io/NVTabular/main/Introduction.html).
Next to this, it's also possible to construct it manually.
categorical : Union[Tags, Layer], defaults to `Tags.CATEGORICAL`
A block or column-selector to use for categorical-features.
If a column-selector is provided (either a schema or tags), the selector
will be passed to `Embeddings` to infer the embedding tables from the column-selector.
continuous : Union[Tags, Layer], defaults to `Tags.CONTINUOUS`
A block to use for continuous-features.
If a column-selector is provided (either a schema or tags), the selector
will be passed to `Continuous` to infer the features from the column-selector.
pre : Optional[BlockType], optional
Transformation block to apply before the embeddings lookup, by default None
post : Optional[BlockType], optional
Transformation block to apply after the embeddings lookup, by default None
aggregation : Optional[TabularAggregationType], optional
Transformation block to apply for aggregating the inputs, by default "concat"
tag_to_block : Dict[str, Callable[[Schema], Layer]], optional
Mapping from tag to block-type, by default:
Tags.CONTINUOUS -> Continuous
Tags.CATEGORICAL -> Embeddings
**branches : dict
Extra branches to add to the input block.
Returns
-------
ParallelBlock
Returns a ParallelBlock with a Dict with two branches:
continuous and embeddings
"""
unparsed = {"categorical": categorical, "continuous": continuous, **branches}
parsed = {}
for name, branch in unparsed.items():
if isinstance(branch, Layer):
parsed[name] = branch
else:
if not isinstance(schema, Schema):
raise ValueError(
"If you pass a column-selector as a branch, "
"you must also pass a `schema` argument."
)
if branch not in tag_to_block:
raise ValueError(f"No default-block provided for {branch}")
branch_schema: Schema = schema.select_by_tag(branch)
if branch_schema:
parsed[name] = tag_to_block[branch](branch_schema)
if not parsed:
raise ValueError("No columns selected for the input block")
return ParallelBlock(
parsed,
pre=pre,
post=post,
aggregation=aggregation,
is_input=True,
schema=schema,
)