#
# Copyright (c) 2021, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
try:
    import cupy
    import cupy.sparse
except ImportError:
    cupy = None
import numba
import numpy
import pandas as pd
import scipy.sparse
try:
    from cupyx.scipy.sparse import coo_matrix
except ImportError:
    from scipy.sparse import coo_matrix
from merlin.core.dispatch import DataFrameType, annotate
from merlin.schema import Schema, Tags
from nvtabular.ops.operator import ColumnSelector, Operator
[docs]class ColumnSimilarity(Operator):
    """Calculates the similarity between two columns using tf-idf, cosine or
    inner product as the distance metric. For each row, this calculates the distance
    between the two columns by looking up features for those columns in a sparse matrix,
    and then computing the distance between the rows of the feature matrices.
    Example usage::
        # Read in the 'document_categories' file from the kaggle outbrains dataset and convert
        # to a sparse matrix
        df = cudf.read_csv("document_categories.csv.zip")
        categories = cupyx.scipy.sparse.coo_matrix((cupy.ones(len(df)),
                                                   (df.document_id.values, df.category_id.values))
        # compute a new column 'document_id_document_id_promo_sim' between the document_id and
        # document_id_promo columns on tfidf distance on the categories matrix we just loaded up
        sim_features = [["document_id", "document_id_promo"]] >> ColumnSimilarity(categories,
                                                                metric='tfidf', on_device=False)
        workflow = nvt.Workflow(sim_features)
    Parameters
    -----------
    left_features : csr_matrix
        Sparse feature matrix for the left column
    right_features : csr_matrix, optional
        Sparse feature matrix for the right column in each pair. If not given will use the
        same feature matrix as for the left (for example when calculating document-document
        distances)
    on_device : bool
        Whether to compute on the GPU or CPU. Computing on the GPU will be
        faster, but requires that the left_features/right_features sparse matrices
        fit into GPU memory.
    """
[docs]    def __init__(self, left_features, right_features=None, metric="tfidf", on_device=True):
        super(ColumnSimilarity, self).__init__()
        self.metric = metric
        self.left_features = left_features
        self.right_features = right_features
        self.on_device = on_device
        self._initialized = False 
    def _initialize_features(self):
        if not self._initialized:
            self.left_features = _convert_features(self.left_features, self.metric, self.on_device)
            self.right_features = (
                _convert_features(self.right_features, self.metric, self.on_device)
                if self.right_features is not None
                else self.left_features.copy()
            )
            self._initialized = True
    transform.__doc__ = Operator.transform.__doc__
[docs]    def compute_selector(
        self,
        input_schema: Schema,
        selector: ColumnSelector,
        parents_selector: ColumnSelector,
        dependencies_selector: ColumnSelector,
    ) -> ColumnSelector:
        self._validate_matching_cols(input_schema, parents_selector, "computing input selector")
        return parents_selector 
[docs]    def column_mapping(self, col_selector):
        column_mapping = {}
        for group in col_selector.grouped_names:
            a, b = group
            col_name = f"{a}_{b}_sim"
            column_mapping[col_name] = [a, b]
        return column_mapping 
    @property
    def output_tags(self):
        return [Tags.CONTINUOUS]
    @property
    def output_dtype(self):
        return numpy.float 
def row_wise_inner_product(a, a_features, b, b_features, on_device=True):
    """Computes the similarity between two columns, by computing the inner product
    along two sparse feature matrices . Both a_features and b_features are
    required to be in canonical CSR format.
    Parameters
    -----------
    a : array of int
        Array of rowids to use in looking up a_features
    a_features: CSR matrix
        Sparse feature matrix
    b : array of int
        Array of rowids to use in looking up in b_features
    b_features: CSR matrix
        Sparse feature matrix
    on_device: bool
        Whether to compute on the GPU or CPU. Computing on the GPU will be
        faster, but requires that the a_features/b_features sparse matrices
        fit into GPU memory.
    """
    # run a JIT compiled version of this either on gpu/cpu with numba.
    # note that numba doesn't handle sparse matrix types, so we're splitting
    # out to the relevant cupy/numpy arrays for indptr/indices/data
    if on_device:
        threadsperblock = 32
        blockspergrid = (a.size + (threadsperblock - 1)) // threadsperblock
        output = cupy.zeros(len(a), dtype=a_features.data.dtype)
        _row_wise_inner_product_gpu[blockspergrid, threadsperblock](
            a,
            a_features.indptr,
            a_features.indices,
            a_features.data,
            b,
            b_features.indptr,
            b_features.indices,
            b_features.data,
            output,
        )
    else:
        output = numpy.zeros(len(a), dtype=a_features.data.dtype)
        _row_wise_inner_product_cpu(
            a,
            a_features.indptr,
            a_features.indices,
            a_features.data,
            b,
            b_features.indptr,
            b_features.indices,
            b_features.data,
            output,
        )
    return output
@numba.njit(parallel=True)
def _row_wise_inner_product_cpu(
    a, a_indptr, a_indices, a_data, b, b_indptr, b_indices, b_data, output
):
    # https://github.com/PyCQA/pylint/issues/2910
    # pylint: disable=not-an-iterable
    for i in numba.prange(len(a)):
        output[i] = _inner_product_cpu(
            a[i], a_indptr, a_indices, a_data, b[i], b_indptr, b_indices, b_data
        )
@numba.cuda.jit
def _row_wise_inner_product_gpu(
    a, a_indptr, a_indices, a_data, b, b_indptr, b_indices, b_data, output
):
    i = numba.cuda.grid(1)
    if i < a.size:
        output[i] = _inner_product_gpu(
            a[i], a_indptr, a_indices, a_data, b[i], b_indptr, b_indices, b_data
        )
def _inner_product(a, a_indptr, a_indices, a_data, b, b_indptr, b_indices, b_data):
    # adapted from scipy:
    # https://github.com/scipy/scipy/blob/312b706c1d98980ed140adae943d41f9f7dc08f5/scipy/sparse/sparsetools/csr.h#L780-L854
    a_pos, a_end = a_indptr[a], a_indptr[a + 1]
    b_pos, b_end = b_indptr[b], b_indptr[b + 1]
    similarity = 0.0
    while a_pos < a_end and b_pos < b_end:
        a_j = a_indices[a_pos]
        b_j = b_indices[b_pos]
        if a_j == b_j:
            similarity += a_data[a_pos] * b_data[b_pos]
            a_pos += 1
            b_pos += 1
        elif a_j < b_j:
            a_pos += 1
        else:
            b_pos += 1
    return similarity
# JIT the _inner_product function to run on both CPU/GPU using numba
_inner_product_cpu = numba.njit(inline="always")(_inner_product)
_inner_product_gpu = numba.cuda.jit(device=True, inline=True)(_inner_product)
def _convert_features(features, metric, on_device):
    if on_device:
        # take a shallow copy to avoid mutating the input, but keep gpu
        # memory as low as possible. (also convert to coo_matrix if passed
        # a CSR etc)
        features = coo_matrix(features)
    else:
        if not isinstance(features, scipy.sparse.coo_matrix):
            # convert to host first if the sparse matrix is on the device
            if features.__class__.__module__.startswith("cupy"):
                features = features.get()
            # make sure we're a coo matrix
            if not isinstance(features, scipy.sparse.coo_matrix):
                features = scipy.sparse.coo_matrix(features)
    # Normalizes the matrix so that we can compute the distance metric
    # with only the inner product
    np = cupy if on_device else numpy
    if metric == "tfidf":
        features = _normalize(_tfidf_weight(features.copy(), np), np)
    elif metric == "cosine":
        features = _normalize(features.copy(), np)
    elif metric != "inner":
        raise ValueError(f"unknown distance metric {metric}")
    # we need features in CSR format to do the row lookup
    return features.tocsr()
def _tfidf_weight(X, np):
    N = float(X.shape[0])
    idf = np.log(N / np.bincount(X.col))
    X.data = X.data * idf[X.col]
    return X
def _normalize(X, np):
    X.data = X.data / np.sqrt(np.bincount(X.row, X.data**2))[X.row]
    return X