Source code for merlin.dataloader.tf_utils
#
# Copyright (c) 2021, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import os
import warnings
import tensorflow as tf
from packaging import version
from tensorflow.python.feature_column import feature_column_v2 as fc
from merlin.core.dispatch import HAS_GPU
from merlin.core.utils import device_mem_size
[docs]def configure_tensorflow(memory_allocation=None, device=None):
"""Control the GPU memory allocation that is performed when using TensorFlow.
Example usage::
# Allocate 20% of GPU memory to TensorFlow on the first
# GPU in the system.
configure_tensorflow(.2, 0)
Parameters
----------
memory_allocation : float, optional
Value between 0 and 1 that represents the fraction of GPU memory to
allocate to TensorFlow.
This parameter overrides the ``TF_MEMORY_ALLOCATION`` environment
variable.
If you do not specify a value and the environment variable is not set,
50% of GPU memory is allocated to TensorFlow.
The default value is None.
device : Int, optional
Integer representing the index of the GPU to run on.
This parameter overrides the ``TF_VISIBLE_DEVICE`` environment variable.
The default value is None.
"""
total_gpu_mem_mb = device_mem_size(kind="total", cpu=(not HAS_GPU)) / (1024**2)
if memory_allocation is None:
memory_allocation = os.environ.get("TF_MEMORY_ALLOCATION", 0.5)
if float(memory_allocation) < 1:
memory_allocation = total_gpu_mem_mb * float(memory_allocation)
memory_allocation = int(memory_allocation)
assert memory_allocation < total_gpu_mem_mb
# TODO: what will this look like in any sort
# of distributed set up?
if device is None:
device = int(os.environ.get("TF_VISIBLE_DEVICE", 0))
tf_devices = tf.config.list_physical_devices("GPU")
if HAS_GPU and len(tf_devices) == 0:
raise ImportError("TensorFlow is not configured for GPU")
if HAS_GPU:
try:
tf.config.set_logical_device_configuration(
tf_devices[device],
[tf.config.LogicalDeviceConfiguration(memory_limit=memory_allocation)],
)
except RuntimeError:
warnings.warn(
"TensorFlow runtime already initialized, may not be enough memory for cudf"
)
try:
tf.config.experimental.set_virtual_device_configuration(
tf_devices[device],
[tf.config.experimental.VirtualDeviceConfiguration(memory_limit=memory_allocation)],
)
except RuntimeError as e:
# Virtual devices must be set before GPUs have been initialized
warnings.warn(e)
# versions using TF earlier than 2.3.0 need to use extension
# library for dlpack support to avoid memory leak issue
__TF_DLPACK_STABLE_VERSION = "2.3.0"
if version.parse(tf.__version__) < version.parse(__TF_DLPACK_STABLE_VERSION):
try:
from tfdlpack import from_dlpack
except ModuleNotFoundError as e:
message = "If using TensorFlow < 2.3.0, you must install tfdlpack-gpu extension library"
raise ModuleNotFoundError(message) from e
else:
from tensorflow.experimental.dlpack import from_dlpack
return from_dlpack
def _get_parents(column):
"""
recursive function for finding the feature columns
that supply inputs for a given `column`. If there are
none, returns the column. Uses sets so is not
deterministic.
"""
if isinstance(column.parents[0], str):
return set([column])
parents = set()
for parent in column.parents:
parents |= _get_parents(parent)
return parents
[docs]def get_dataset_schema_from_feature_columns(feature_columns):
"""Maps from a list of TensorFlow ``feature_column`` to
two lists.
The first list provides the categorical feature names and the second list
provides the continuous feature names for a dataset.
This function is useful for constructing NVTabular Workflows from feature
columns.
Returns
-------
list[str], list[str]
One sorted list of categorical feature names and one sorted list of
continuous feature names.
The lists can have zero length.
"""
base_columns = set()
for column in feature_columns:
base_columns |= _get_parents(column)
cat_names, cont_names = [], []
for column in base_columns:
if isinstance(column, fc.CategoricalColumn):
cat_names.append(column.name)
else:
cont_names.append(column.name)
cat_names = sorted(cat_names)
cont_names = sorted(cont_names)
return cat_names, cont_names