Source code for sparse_operation_kit.core.initialize

#
# Copyright (c) 2021, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

from sparse_operation_kit import kit_lib
from tensorflow.python.ops import collective_ops
import tensorflow.distribute as tf_dist

MirroredStrategy = tf_dist.MirroredStrategy
try:
    MultiWorkerMirroredStrategy = tf_dist.MultiWorkerMirroredStrategy
except AttributeError:
    MultiWorkerMirroredStrategy = tf_dist.experimental.MultiWorkerMirroredStrategy
from tensorflow import constant, TensorShape, function
from tensorflow.dtypes import int32, int64
from tensorflow import print as tf_print
from tensorflow.python.ops import array_ops
from tensorflow.python.framework import ops
from tensorflow.python.platform import tf_logging as logging
import sys
from tensorflow.python.framework import config


[docs]def Init(**kwargs): """ Abbreviated as ``sok.Init(**kwargs)``. This function is used to do the initialization of SparseOperationKit (SOK). SOK will leverage all available GPUs for current CPU process. Please set `CUDA_VISIBLE_DEVICES` or `tf.config.set_visible_devices` to specify which GPU(s) are used in this process before launching tensorflow runtime and calling this function. In **TensorFlow 2.x**, SOK can be used with **tf.distribute.Strategy** or **Horovod**. When it's used with tf.distribute.Strategy, it must be called under `strategy.scope()`. For example, .. code-block:: python with strategy.scope(): sok.Init(**kwargs) When it's used with Horovod, it must be called at each process. For example, .. code-block:: python import horovod.tensorflow as hvd hvd.init() sok.Init(**kwargs) In **TensorFlow 1.15**, SOK can only work with **Horovod**. The retured status must be evaluated with `sess.run`, and it must be the first step before evaluate any other SOK APIs. .. code-block:: python sok_init = sok.Init(global_batch_size=args.global_batch_size) with tf.Session() as sess: sess.run(sok_init) ... Parameters ---------- kwargs: dictionary keyword arguments for this function. Currently, it must contains `global_batch_size` used in all GPUs. Returns ------- status: string a string will be returned if this function executed successfully. And its contents will be 'OK'. """ def _get_visible_devices(): gpus = config.get_visible_devices("GPU") assert len(gpus) > 0 visible_devices = [] for i in range(len(gpus)): visible_devices.append(int(gpus[i].name.split(":")[-1])) return array_ops.constant(visible_devices, dtype=int32) # @function def _single_worker_init(**kwargs): replica_ctx = tf_dist.get_replica_context() replica_ctx.merge_call( lambda strategy: tf_print("You are using the plugin with MirroredStrategy.") ) nccl_unique_id = replica_ctx.merge_call(lambda strategy: kit_lib.get_nccl_unique_id()) global_random_seed = kwargs.get("seed", None) or replica_ctx.merge_call( lambda strategy: kit_lib.gen_random_seed() ) global_id = replica_ctx.replica_id_in_sync_group visible_devices = _get_visible_devices() status = kit_lib.plugin_init( global_id, replica_ctx.num_replicas_in_sync, nccl_unique_id, global_random_seed, visible_devices, global_batch_size=kwargs["global_batch_size"], ) return status def _multi_worker_init(**kwargs): replica_ctx = tf_dist.get_replica_context() global_id = replica_ctx.replica_id_in_sync_group if global_id == 0: unique_id = kit_lib.get_nccl_unique_id() re = collective_ops.broadcast_send( unique_id, TensorShape( [ 32, ] ), int32, group_size=replica_ctx.num_replicas_in_sync, group_key=1, instance_key=2, ) else: re = collective_ops.broadcast_recv( TensorShape( [ 32, ] ), int32, group_size=replica_ctx.num_replicas_in_sync, group_key=1, instance_key=2, ) if global_id == 0: global_seed = kwargs.get("seed", None) or kit_lib.gen_random_seed() re_seed = collective_ops.broadcast_send( global_seed, TensorShape( [ 1, ] ), int64, group_size=replica_ctx.num_replicas_in_sync, group_key=1, instance_key=3, ) else: global_seed = kwargs.get("seed", None) re_seed = collective_ops.broadcast_recv( TensorShape( [ 1, ] ), int64, group_size=replica_ctx.num_replicas_in_sync, group_key=1, instance_key=3, ) if global_seed and global_seed != re_seed: logging.warning( "The seed: {} is not consistent with that from cheif-node: {}, " "and the seed from cheif-node will be used.".format(global_seed, re_seed) ) visible_devices = _get_visible_devices() status = kit_lib.plugin_init( global_id, replica_ctx.num_replicas_in_sync, re, re_seed, visible_devices, global_batch_size=kwargs["global_batch_size"], ) return status # @function def _horovod_init(**kwargs): r""" This function uses horovod to broadcast nccl-id and random-seed which is used by sparse_operation_kit. Please note that the nccl-comm mentioned here is not the same one as the nccl-comm of horovod itself. After broadcasting, this function uses kit_lib.plugin_init and "nccl-id", "random-seed" to initialize sparse_operation_kit. """ local_rank = hvd.local_rank() unique_id = ( kit_lib.get_nccl_unique_id() if local_rank == 0 else array_ops.zeros( [ 32, ], dtype=int32, ) ) unique_id = hvd.broadcast(unique_id, root_rank=0, name="nccl_unique_id") seed = kwargs.get("seed", None) if 0 == local_rank: global_seed = seed or kit_lib.gen_random_seed() else: global_seed = array_ops.zeros( [ 1, ], dtype=int64, ) re_seed = hvd.broadcast(global_seed, root_rank=0, name="random_seed") if seed and seed != re_seed: logging.warning( "The seed: {} is not consistent with that from cheif-node: {}, " "and the seed from cheif-node will be used.".format(global_seed, re_seed) ) visible_devices = _get_visible_devices() status = kit_lib.plugin_init( local_rank, hvd.size(), unique_id, re_seed, visible_devices, global_batch_size=kwargs["global_batch_size"], ) return status def _one_device_init(**kwargs): """ This function use to initialize only one GPU for SOK. """ local_rank = 0 unique_id = kit_lib.get_nccl_unique_id() global_seed = kwargs.get("seed", None) or kit_lib.gen_random_seed() visible_devices = _get_visible_devices() status = kit_lib.plugin_init( local_rank, 1, unique_id, global_seed, visible_devices, global_batch_size=kwargs["global_batch_size"], ) return status if tf_dist.has_strategy(): strategy = tf_dist.get_strategy() @function def _init_wrapper(run_fn, init_fn, **kwargs): return run_fn(init_fn, kwargs=kwargs) if isinstance(strategy, MirroredStrategy): _init_fn = _single_worker_init elif isinstance(strategy, MultiWorkerMirroredStrategy): _init_fn = _multi_worker_init else: raise RuntimeError("This strategy type is not supported yet.") if not kit_lib.in_tensorflow2(): _init_results = _init_wrapper(strategy.experimental_run_v2, _init_fn, **kwargs) if hasattr(_init_results, "values"): _init_results = _init_results.values return _init_results else: return _init_wrapper(strategy.run, _init_fn, **kwargs) elif "horovod.tensorflow" in sys.modules: # imported horovod import horovod.tensorflow as hvd if not kit_lib.in_tensorflow2(): @function def _init_wrapper(**kwargs): return _horovod_init(**kwargs) return _init_wrapper(**kwargs) else: return _horovod_init(**kwargs) else: # horovod not imported return _one_device_init(**kwargs)