Source code for transformers4rec.torch.block.base

#
# Copyright (c) 2021, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import abc
import inspect
import logging
from collections import OrderedDict
from typing import List, Optional, Union

import torch
from torch.nn import Module

from merlin_standard_lib.utils.misc_utils import filter_kwargs

from ..utils import torch_utils

LOG = logging.getLogger("transformers4rec")


[docs]class BlockBase(torch_utils.OutputSizeMixin, torch.nn.Module, metaclass=abc.ABCMeta):
[docs] def to_model(self, prediction_task_or_head, inputs=None, **kwargs): from ..model.base import Head, Model, PredictionTask if isinstance(prediction_task_or_head, PredictionTask): head = prediction_task_or_head.to_head(self, inputs=inputs, **kwargs) elif isinstance(prediction_task_or_head, Head): head = prediction_task_or_head else: raise ValueError( "`prediction_task_or_head` needs to be a `Head` or `PredictionTask` " f"found: {type(prediction_task_or_head)}" ) return Model(head, **kwargs)
[docs] def as_tabular(self, name=None): from ..tabular.base import AsTabular if not name: name = self.name return SequentialBlock(self, AsTabular(name))
[docs]class Block(BlockBase): def __init__(self, module: torch.nn.Module, output_size: Union[List[int], torch.Size]): super().__init__() self.module = module self._output_size = output_size
[docs] def forward(self, inputs): return self.module(inputs)
[docs] def forward_output_size(self, input_size): if self._output_size[0] is None: batch_size = torch_utils.calculate_batch_size_from_input_size(input_size) return [batch_size] + self._output_size[1:] return self._output_size
[docs]class SequentialBlock(BlockBase, torch.nn.Sequential): def __init__(self, *args, output_size=None): from transformers4rec.torch import TabularSequenceFeatures, TransformerBlock if isinstance(args[0], TabularSequenceFeatures) and any( isinstance(arg, TransformerBlock) for arg in args ): masking = args[0].masking for arg in args: if isinstance(arg, TransformerBlock): if arg.masking != masking: LOG.warning( "Masking is set in the input module but not in the " "TransformerBlock, provide this through the masking argument" ) super().__init__() self._static_output_size = output_size self.input_size = None if len(args) == 1 and isinstance(args[0], OrderedDict): last = None for idx, key, module in enumerate(args[0].items()): self.add_module_and_maybe_build(key, module, last, idx) last = module else: if len(args) == 1 and isinstance(args[0], list): args = args[0] last = None for idx, module in enumerate(args): last = self.add_module_and_maybe_build(str(idx), module, last, idx) @property def inputs(self): from transformers4rec.torch import TabularFeatures, TabularSequenceFeatures first = list(self)[0] if isinstance(first, (TabularSequenceFeatures, TabularFeatures)): return first
[docs] def add_module(self, name: str, module: Optional[Module]) -> None: from ..tabular.base import FilterFeatures if isinstance(module, list): module = FilterFeatures(module) super().add_module(name, module)
[docs] def add_module_and_maybe_build(self, name: str, module, parent, idx) -> torch.nn.Module: # Check if module needs to be built if getattr(parent, "output_size", None) and getattr(module, "build", None): module = module.build(parent.output_size()) if idx == 0: self.input_size = getattr(module, "input_size", None) self.add_module(name, module) return module
def __rrshift__(self, other): return right_shift_block(self, other) def __rshift__(self, other): # pylint: disable=arguments-out-of-order return right_shift_block(other, self)
[docs] def forward(self, input, training=False, testing=False, **kwargs): # from transformers4rec.torch import TabularSequenceFeatures for i, module in enumerate(self): if i == len(self) - 1: filtered_kwargs = filter_kwargs(kwargs, module, filter_positional_or_keyword=False) input = module(input, **filtered_kwargs) elif "training" in inspect.signature(module.forward).parameters: if "testing" in inspect.signature(module.forward).parameters: input = module(input, training=training, testing=testing) else: input = module(input, training=training) else: input = module(input) return input
[docs] def build(self, input_size, schema=None, **kwargs): output_size = input_size for module in self: if not hasattr(module, "build"): break module.build(output_size, schema=schema) output_size = module.output_size() return super(SequentialBlock, self).build(input_size, schema=None, **kwargs)
[docs] def as_tabular(self, name=None): from transformers4rec.torch import AsTabular if not name: name = self.name return SequentialBlock(self, AsTabular(name))
def __add__(self, other): from ..tabular.base import merge_tabular return merge_tabular(self, other)
[docs] def forward_output_size(self, input_size): if self._static_output_size: return self._static_output_size x = input_size for module in list(self): if getattr(module, "output_size", None): x = module.output_size(x) else: # TODO log warning here return None return x
[docs] @staticmethod def get_children_by_class_name(parent, *class_name): children = [] def add_if_class_name_matches(to_check): if to_check.__class__.__name__ in class_name: children.append(to_check) for child in parent: if getattr(child, "merge_values", None): for to_merge in child.merge_values: add_if_class_name_matches(to_merge) add_if_class_name_matches(child) return children
[docs]def build_blocks(*modules): return list(SequentialBlock(*modules))
[docs]class BuildableBlock(abc.ABC):
[docs] @abc.abstractmethod def build(self, input_size) -> BlockBase: raise NotImplementedError
def __rrshift__(self, other): return right_shift_block(self, other)
[docs] def to_module(self, shape_or_module): shape = shape_or_module if isinstance(shape_or_module, torch.nn.Module): shape = getattr(shape_or_module, "output_size", None) if shape: shape = shape() return self.build(shape)
[docs]def right_shift_block(self, other): from ..tabular.base import FilterFeatures if isinstance(other, list): left_side = [FilterFeatures(other)] else: left_side = list(other) if isinstance(other, SequentialBlock) else [other] right_side = list(self) if isinstance(self, SequentialBlock) else [self] # Maybe build right-side if hasattr(left_side[-1], "output_size") and left_side[-1].output_size(): _right_side = [] x = left_side[-1].output_size() for module in right_side: if getattr(module, "build", None): if "parents" in inspect.signature(module.build).parameters: build = module.build(x, left_side) else: build = module.build(x) if build: module = build x = module.output_size() if hasattr(module, "output_size") else None _right_side.append(module) right_side = _right_side sequential = left_side + right_side need_moving_to_gpu = False if isinstance(self, torch.nn.Module): need_moving_to_gpu = need_moving_to_gpu or torch_utils.check_gpu(self) if isinstance(other, torch.nn.Module): need_moving_to_gpu = need_moving_to_gpu or torch_utils.check_gpu(other) out = SequentialBlock(*sequential) if getattr(left_side[-1], "input_size", None) and left_side[-1].input_size: out.input_size = left_side[-1].input_size if need_moving_to_gpu: out.to("cuda") return out
BlockType = Union[BlockBase, BuildableBlock] BlockOrModule = Union[BlockBase, BuildableBlock, torch.nn.Module]