#
# Copyright (c) 2021, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import collections.abc
from nvtabular.columns import ColumnSelector, Schema
from nvtabular.ops import LambdaOp, Operator, internal
from nvtabular.ops.internal import ConcatColumns, SelectionOp, SubsetColumns, SubtractionOp
[docs]class WorkflowNode:
"""A WorkflowNode is a group of columns that you want to apply the same transformations to.
WorkflowNode's can be transformed by shifting operators on to them, which returns a new
WorkflowNode with the transformations applied. This lets you define a graph of operations
that makes up your workflow
Parameters
----------
selector: ColumnSelector
Defines which columns to select from the input Dataset using column names and tags.
"""
def __init__(self, selector=None):
self.parents = []
self.children = []
self.dependencies = []
self.op = None
self.input_schema = None
self.output_schema = None
if isinstance(selector, list):
selector = ColumnSelector(selector)
if selector and not isinstance(selector, ColumnSelector):
raise TypeError("The selector argument must be a list or a ColumnSelector")
if selector:
self.op = SelectionOp(selector)
self._selector = selector
@property
def selector(self):
if not self._selector and any(parent._selector for parent in self.parents):
return _combine_selectors(self.parents)
else:
return self._selector
@selector.setter
def selector(self, sel):
if isinstance(sel, list):
sel = ColumnSelector(sel)
self._selector = sel
# These methods must maintain grouping
def add_dependency(self, dep):
dep_nodes = _nodify(dep)
self.dependencies.append(dep_nodes)
def add_child(self, child):
child_nodes = _nodify(child)
if not isinstance(child_nodes, list):
child_nodes = [child_nodes]
for child_node in child_nodes:
child_node.parents.append(self)
self.children.extend(child_nodes)
def add_parent(self, parent):
parent_nodes = _nodify(parent)
if not isinstance(parent_nodes, list):
parent_nodes = [parent_nodes]
for parent_node in parent_nodes:
parent_node.children.append(self)
self.parents.extend(parent_nodes)
def compute_schemas(self, root_schema):
# If parent is an addition node, we may need to propagate grouping
# unless we're a node that already has a selector
if not self.selector:
if (
len(self.parents) == 1
and isinstance(self.parents[0].op, internal.ConcatColumns)
and self.parents[0].selector
and (self.parents[0].selector.names)
):
self.selector = self.parents[0].selector
if isinstance(self.op, ConcatColumns): # +
# For addition nodes, some of the operands are parents and
# others are dependencies so grab schemas from both
self.selector = _combine_selectors(self.grouped_parents_with_dependencies)
self.input_schema = _combine_schemas(self.parents_with_dependencies)
elif isinstance(self.op, SubtractionOp): # -
left_operand = _combine_schemas(self.parents)
if self.dependencies:
right_operand = _combine_schemas(self.dependencies)
self.input_schema = left_operand - right_operand
else:
self.input_schema = left_operand.apply_inverse(self.op.selector)
self.selector = ColumnSelector(self.input_schema.column_names)
elif isinstance(self.op, SubsetColumns): # []
left_operand = _combine_schemas(self.parents)
right_operand = _combine_schemas(self.dependencies)
self.input_schema = left_operand - right_operand
# If we have a selector, apply it to upstream schemas from nodes/dataset
elif isinstance(self.op, SelectionOp): # ^
upstream_schema = root_schema + _combine_schemas(self.parents_with_dependencies)
self.input_schema = upstream_schema.apply(self.selector)
# If none of the above apply, then we don't have a selector
# and we're not an add or sub node, so our input is just the
# parents output
else:
self.input_schema = _combine_schemas(self.parents)
# Then we delegate to the op (if there is one) to compute this node's
# output schema. If there's no op, then outputs are just the inputs
if self.op:
self.output_schema = self.op.compute_output_schema(self.input_schema, self.selector)
else:
self.output_schema = self.input_schema
[docs] def __rshift__(self, operator):
"""Transforms this WorkflowNode by applying an Operator
Parameters
-----------
operators: Operator or callable
Returns
-------
WorkflowNode
"""
if isinstance(operator, type) and issubclass(operator, Operator):
# handle case where an operator class is passed
operator = operator()
elif callable(operator):
# implicit lambdaop conversion.
operator = LambdaOp(operator)
if not isinstance(operator, Operator):
raise ValueError(f"Expected operator or callable, got {operator.__class__}")
child = WorkflowNode()
child.op = operator
child.add_parent(self)
dependencies = operator.dependencies()
if dependencies:
if not isinstance(dependencies, collections.abc.Sequence):
dependencies = [dependencies]
for dependency in dependencies:
child.add_dependency(dependency)
return child
[docs] def __add__(self, other):
"""Adds columns from this WorkflowNode with another to return a new WorkflowNode
Parameters
-----------
other: WorkflowNode or str or list of str
Returns
-------
WorkflowNode
"""
if isinstance(self.op, internal.ConcatColumns):
child = self
else:
# Create a child node
child = WorkflowNode()
child.op = internal.ConcatColumns(label="+")
child.add_parent(self)
# The right operand becomes a dependency
other_nodes = _nodify(other)
other_nodes = [other_nodes]
for other_node in other_nodes:
# If the other node is a `+` node, we want to collapse it into this `+` node to
# avoid creating a cascade of repeated `+`s that we'd need to optimize out by
# re-combining them later in order to clean up the graph
if not isinstance(other_node, list) and isinstance(
other_node.op, internal.ConcatColumns
):
child.dependencies += other_node.grouped_parents_with_dependencies
else:
child.add_dependency(other_node)
return child
# handle the "column_name" + WorkflowNode case
__radd__ = __add__
[docs] def __sub__(self, other):
"""Removes columns from this WorkflowNode with another to return a new WorkflowNode
Parameters
-----------
other: WorkflowNode or str or list of str
Columns to remove
Returns
-------
WorkflowNode
"""
other_nodes = _nodify(other)
if not isinstance(other_nodes, list):
other_nodes = [other_nodes]
child = WorkflowNode()
child.add_parent(self)
child.op = internal.SubtractionOp()
for other_node in other_nodes:
if isinstance(other_node.op, SelectionOp) and not other_node.parents_with_dependencies:
child.selector += other_node.selector
child.op.selector += child.selector
else:
child.add_dependency(other_node)
return child
def __rsub__(self, other):
left_operand = _nodify(other)
right_operand = self
if not isinstance(left_operand, list):
left_operand = [left_operand]
child = WorkflowNode()
child.add_parent(left_operand)
child.op = internal.SubtractionOp()
if (
isinstance(right_operand.op, SelectionOp)
and not right_operand.parents_with_dependencies
):
child.selector += right_operand.selector
child.op.selector += child.selector
else:
child.add_dependency(right_operand)
return child
[docs] def __getitem__(self, columns):
"""Selects certain columns from this WorkflowNode, and returns a new Columngroup with only
those columns
Parameters
-----------
columns: str or list of str
Columns to select
Returns
-------
WorkflowNode
"""
col_selector = ColumnSelector(columns)
child = WorkflowNode(col_selector)
child.op = internal.SubsetColumns(label=str(list(columns)))
child.add_parent(self)
return child
def __repr__(self):
output = " output" if not self.children else ""
return f"<WorkflowNode {self.label}{output}>"
@property
def parents_with_dependencies(self):
nodes = []
for node in self.parents + self.dependencies:
if isinstance(node, list):
nodes.extend(node)
else:
nodes.append(node)
return nodes
@property
def grouped_parents_with_dependencies(self):
return self.parents + self.dependencies
@property
def input_columns(self):
if self.input_schema is None:
raise RuntimeError(
"The input columns aren't computed until the workflow "
"is fit to a dataset or input schema."
)
if self.selector:
# To maintain column groupings
return self.selector
else:
return ColumnSelector(self.input_schema.column_names)
@property
def output_columns(self):
if self.output_schema is None:
raise RuntimeError(
"The output columns aren't computed until the workflow "
"is fit to a dataset or input schema."
)
return ColumnSelector(self.output_schema.column_names)
@property
def dependency_schema(self):
return _combine_schemas(self.dependencies)
@property
def dependency_columns(self):
return _combine_selectors(self.dependencies)
@property
def label(self):
if self.op and hasattr(self.op, "label"):
return self.op.label
elif self.op:
return str(type(self.op))
elif not self.parents:
return f"input cols=[{self._cols_repr}]"
else:
return "??"
@property
def _cols_repr(self):
if self.input_schema:
columns = self.input_schema.column_names
elif self.selector:
columns = self.selector.names
else:
columns = []
cols_repr = ", ".join(map(str, columns[:3]))
if len(columns) > 3:
cols_repr += "..."
return cols_repr
@property
def graph(self):
return _to_graphviz(self)
def iter_nodes(nodes):
queue = nodes[:]
while queue:
current = queue.pop()
if isinstance(current, list):
queue.extend(current)
else:
yield current
# TODO: deduplicate nodes?
for node in current.parents_with_dependencies:
queue.append(node)
def _filter_by_type(elements, type_):
results = []
for elem in elements:
if isinstance(elem, type_):
results.append(elem)
elif isinstance(elem, list):
results += _filter_by_type(elem, type_)
return results
def _combine_schemas(elements):
combined = Schema()
for elem in elements:
if isinstance(elem, WorkflowNode):
combined += elem.output_schema
elif isinstance(elem, ColumnSelector):
combined += Schema(elem.names)
elif isinstance(elem, list):
combined += _combine_schemas(elem)
return combined
def _combine_selectors(elements):
combined = ColumnSelector()
for elem in elements:
if isinstance(elem, WorkflowNode):
if elem.selector:
selector = elem.op.output_column_names(elem.selector)
elif elem.output_schema:
selector = ColumnSelector(elem.output_schema.column_names)
elif elem.input_schema:
selector = ColumnSelector(elem.input_schema.column_names)
selector = elem.op.output_column_names(selector)
else:
selector = ColumnSelector()
combined += selector
elif isinstance(elem, ColumnSelector):
combined += elem
elif isinstance(elem, str):
combined += ColumnSelector(elem)
elif isinstance(elem, list):
combined += ColumnSelector(subgroups=_combine_selectors(elem))
return combined
def _to_selector(value):
if not isinstance(value, (ColumnSelector, WorkflowNode)):
return ColumnSelector(value)
else:
return value
def _strs_to_selectors(elements):
return [_to_selector(elem) for elem in elements]
[docs]def _to_graphviz(workflow_node):
"""Converts a WorkflowNode to a GraphViz DiGraph object useful for display in notebooks"""
from graphviz import Digraph
graph = Digraph()
# get all the nodes from parents of this columngroup
# and add edges between each of them
allnodes = list(set(iter_nodes([workflow_node])))
node_ids = {v: str(k) for k, v in enumerate(allnodes)}
for node, nodeid in node_ids.items():
graph.node(nodeid, node.label)
for parent in node.parents_with_dependencies:
graph.edge(node_ids[parent], nodeid)
if node.selector and node.selector.names:
selector_id = f"{nodeid}_selector"
graph.node(selector_id, str(node.selector.names))
graph.edge(selector_id, nodeid)
# add a single 'output' node representing the final state
output_node_id = str(len(allnodes))
output_string = "output cols"
if workflow_node._cols_repr:
output_string += f"=[{workflow_node._cols_repr}]"
graph.node(output_node_id, output_string)
graph.edge(node_ids[workflow_node], output_node_id)
return graph
def _convert_col(col):
if isinstance(col, (str, tuple)):
return col
elif isinstance(col, list):
return tuple(col)
else:
raise ValueError(f"Invalid column value for WorkflowNode: {col}")
def _nodify(nodable):
if isinstance(nodable, str):
return WorkflowNode(ColumnSelector([nodable]))
if isinstance(nodable, ColumnSelector):
return WorkflowNode(nodable)
elif isinstance(nodable, WorkflowNode):
return nodable
elif isinstance(nodable, list):
nodes = [_nodify(node) for node in nodable]
non_selection_nodes = [node for node in nodes if not node.selector]
selection_nodes = [node.selector for node in nodes if node.selector]
selection_nodes = (
[WorkflowNode(_combine_selectors(selection_nodes))] if selection_nodes else []
)
return non_selection_nodes + selection_nodes
else:
raise TypeError(
"Unsupported type: Cannot convert object " f"of type {type(nodable)} to WorkflowNode."
)