[1]:
# Copyright 2021 NVIDIA Corporation. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================

Conversion Script for Criteo Dataset (CSV-to-Parquet)

Step 1: Import libraries

[2]:
import os
import rmm
import cudf
from cudf.io.parquet import ParquetWriter
from fsspec.core import get_fs_token_paths
import numpy as np
import pyarrow.parquet as pq

from dask.dataframe.io.parquet.utils import _analyze_paths
from dask.base import tokenize
from dask.utils import natural_sort_key
from dask.highlevelgraph import HighLevelGraph
from dask.delayed import Delayed
from dask.distributed import Client
from dask_cuda import LocalCUDACluster

import nvtabular as nvt
from nvtabular.utils import device_mem_size, get_rmm_size

Step 2: Specify options

Specify the input and output paths, unless the INPUT_DATA_DIR and OUTPUT_DATA_DIR environment variables are already set. In order to utilize a multi-GPU system, be sure to specify allow_multi_gpu=True (and check the setting of your CUDA_VISIBLE_DEVICES environment variable).

[3]:
INPUT_PATH = os.environ.get('INPUT_DATA_DIR', '/datasets/criteo/crit_orig')
OUTPUT_PATH = os.environ.get('OUTPUT_DATA_DIR', '/raid/criteo/tests/demo_out')
CUDA_VISIBLE_DEVICES = os.environ.get("CUDA_VISIBLE_DEVICES", "0")
n_workers = len(CUDA_VISIBLE_DEVICES.split(","))
frac_size = 0.15
allow_multi_gpu = False
use_rmm_pool = False
max_day = None  # (Optional) -- Limit the dataset to day 0-max_day for debugging

Step 3: Define helper/task functions

[4]:
def _pool(frac=0.8):
    rmm.reinitialize(
        pool_allocator=True,
        initial_pool_size=get_rmm_size(frac * device_mem_size()),
    )

def _convert_file(path, name, out_dir, frac_size, fs, cols, dtypes):
    fn = f"{name}.parquet"
    out_path = fs.sep.join([out_dir, f"{name}.parquet"])
    writer = ParquetWriter(out_path, compression=None)
    for gdf in nvt.Dataset(
        path,
        engine="csv",
        names=cols,
        part_memory_fraction=frac_size,
        sep='\t',
        dtypes=dtypes,
    ).to_iter():
        writer.write_table(gdf)
        del gdf
    md = writer.close(metadata_file_path=fn)
    return md

def _write_metadata(md_list, fs, path):
    rg_sizes = []
    if md_list:
        metadata_path = fs.sep.join([path, "_metadata"])
        _meta = (
            cudf.io.merge_parquet_filemetadata(md_list)
            if len(md_list) > 1
            else md_list[0]
        )
        with fs.open(metadata_path, "wb") as fil:
            _meta.tofile(fil)
    return True

Step 4: (Optionally) Start a Dask cluster

[5]:
# Start up cluster if we have multiple devices
# (and `allow_multi_gpu == True`)
client = None
if n_workers > 1 and allow_multi_gpu:
    cluster = LocalCUDACluster(
        n_workers=n_workers,
        CUDA_VISIBLE_DEVICES=CUDA_VISIBLE_DEVICES,
    )
    client = Client(cluster)
    if use_rmm_pool:
        client.run(_pool)
elif use_rmm_pool:
    _pool()

Step 5: Main conversion script (build Dask task graph)

[6]:
fs = get_fs_token_paths(INPUT_PATH, mode="rb")[0]
file_list = [
    x for x in fs.glob(fs.sep.join([INPUT_PATH, "day_*"]))
    if not x.endswith("parquet")
]
file_list = sorted(file_list, key=natural_sort_key)
file_list = file_list[:max_day] if max_day else file_list
name_list = _analyze_paths(file_list, fs)[1]

cont_names = ["I" + str(x) for x in range(1, 14)]
cat_names = ["C" + str(x) for x in range(1, 27)]
cols = ["label"] + cont_names + cat_names

dtypes = {}
dtypes["label"] = np.int32
for x in cont_names:
    dtypes[x] = np.int32
for x in cat_names:
    dtypes[x] = "hex"

dsk = {}
token = tokenize(file_list, name_list, OUTPUT_PATH, frac_size, fs, cols, dtypes)
convert_file_name = "convert_file-" + token
for i, (path, name) in enumerate(zip(file_list, name_list)):
    key = (convert_file_name, i)
    dsk[key] = (_convert_file, path, name, OUTPUT_PATH, frac_size, fs, cols, dtypes)

write_meta_name = "write-metadata-" + token
dsk[write_meta_name] = (
    _write_metadata,
    [(convert_file_name, i) for i in range(len(file_list))],
    fs,
    OUTPUT_PATH,
)
graph = HighLevelGraph.from_collections(write_meta_name, dsk, dependencies=[])
conversion_delayed = Delayed(write_meta_name, graph)

Step 6: Execute conversion

[7]:
%%time
if client:
    conversion_delayed.compute()
else:
    conversion_delayed.compute(scheduler="synchronous")
CPU times: user 8min 48s, sys: 12min 53s, total: 21min 41s
Wall time: 30min 51s