Source code for nvtabular.ops.lambdaop
#
# Copyright (c) 2021, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from inspect import getsourcelines, signature
from merlin.core.dispatch import DataFrameType, annotate
from nvtabular.ops.operator import ColumnSelector, Operator
[docs]class LambdaOp(Operator):
    """
    LambdaOp allows you to apply row level functions to an NVTabular workflow.
    Example usage 1::
        # Define a ColumnSelector that LamdaOp will apply to
        # then define a custom function, e.g. extract first 5 character from a string
        lambda_feature = ColumnSelector(["col1"])
        new_lambda_feature = lambda_feature >> LambdaOp(lambda col: col.str.slice(0, 5))
        workflow = nvtabular.Workflow(new_lambda_feature + 'label')
    Example usage 2::
        # define a custom function e.g. calculate probability for different events.
        # Rename the each new feature column name.
        lambda_features = ColumnSelector(['event1', 'event2', 'event3']), # columns, f is applied to
        def cond_prob(col, gdf):
            col = col.astype(np.float32)
            col = col / gdf['total_events']
            return col
        new_lambda_features = lambda_features >> LambdaOp(cond_prob, dependency=["total_events"]) \
>> Rename(postfix="_cond")
        workflow = nvtabular.Workflow(new_lambda_features + 'label')
    Parameters
    -----------
    f : callable
        Defines a function that takes a Series and an optional DataFrame as input,
        and returns a new Series as the output.
    dependency : list, default None
        Whether to provide a dependency column or not.
    """
[docs]    def __init__(self, f, dependency=None, label=None, dtype=None, tags=None, properties=None):
        super().__init__()
        if f is None:
            raise ValueError("f cannot be None. LambdaOp op applies f to dataframe")
        self.f = f
        self._param_count = len(signature(self.f).parameters)
        if self._param_count not in (1, 2):
            raise ValueError("lambda function must accept either one or two parameters")
        self.dependency = dependency
        self._label = label
        self._dtype = dtype
        self._tags = tags or []
        self._properties = properties or {} 
    transform.__doc__ = Operator.transform.__doc__
    @property
    def dependencies(self):
        return self.dependency
    @property
    def label(self):
        # if we're given an explicit label to use, return it
        if self._label:
            return self._label
        # if we have a named function (not a lambda) return the function name
        name = self.f.__name__
        if name != "<lambda>":
            return name
        else:
            try:
                # otherwise get the lambda source code from the inspect module if possible
                source = getsourcelines(self.f)[0][0]
                lambdas = [op.strip() for op in source.split(">>") if "lambda " in op]
                if len(lambdas) == 1 and lambdas[0].count("lambda") == 1:
                    return lambdas[0]
            except Exception:  # pylint: disable=broad-except
                # we can fail to load the source in distributed environments. Since the
                # label is mainly used for diagnostics, don't worry about the error here and
                # fallback to the default labelling
                pass
        # Failed to figure out the source
        return "LambdaOp"
[docs]    def column_mapping(self, col_selector):
        filtered_selector = self._remove_deps(col_selector, self.dependencies)
        return super().column_mapping(filtered_selector) 
    def _remove_deps(self, col_selector, dependencies):
        dependencies = dependencies or []
        to_skip = ColumnSelector(
            [
                dep if isinstance(dep, str) else dep.output_schema.column_names
                for dep in dependencies
            ]
        )
        return col_selector.filter_columns(to_skip)
    @property
    def dynamic_dtypes(self):
        return True
    @property
    def output_dtype(self):
        return self._dtype
    @property
    def output_tags(self):
        return self._tags
    @property
    def output_properties(self):
        return self._properties