[1]:
# Copyright 2021 NVIDIA Corporation. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
Conversion Script for Criteo Dataset (CSV-to-Parquet)
Step 1: Import libraries
[2]:
import os
import rmm
import cudf
from cudf.io.parquet import ParquetWriter
from fsspec.core import get_fs_token_paths
import numpy as np
import pyarrow.parquet as pq
from dask.dataframe.io.parquet.utils import _analyze_paths
from dask.base import tokenize
from dask.utils import natural_sort_key
from dask.highlevelgraph import HighLevelGraph
from dask.delayed import Delayed
from dask.distributed import Client
from dask_cuda import LocalCUDACluster
import nvtabular as nvt
from nvtabular.utils import device_mem_size, get_rmm_size
Step 2: Specify options
Specify the input and output paths, unless the INPUT_DATA_DIR
and OUTPUT_DATA_DIR
environment variables are already set. In order to utilize a multi-GPU system, be sure to specify allow_multi_gpu=True
(and check the setting of your CUDA_VISIBLE_DEVICES
environment variable).
[3]:
INPUT_PATH = os.environ.get('INPUT_DATA_DIR', '/datasets/criteo/crit_orig')
OUTPUT_PATH = os.environ.get('OUTPUT_DATA_DIR', '/raid/criteo/tests/demo_out')
CUDA_VISIBLE_DEVICES = os.environ.get("CUDA_VISIBLE_DEVICES", "0")
n_workers = len(CUDA_VISIBLE_DEVICES.split(","))
frac_size = 0.15
allow_multi_gpu = False
use_rmm_pool = False
max_day = None # (Optional) -- Limit the dataset to day 0-max_day for debugging
Step 3: Define helper/task functions
[4]:
def _pool(frac=0.8):
rmm.reinitialize(
pool_allocator=True,
initial_pool_size=get_rmm_size(frac * device_mem_size()),
)
def _convert_file(path, name, out_dir, frac_size, fs, cols, dtypes):
fn = f"{name}.parquet"
out_path = fs.sep.join([out_dir, f"{name}.parquet"])
writer = ParquetWriter(out_path, compression=None)
for gdf in nvt.Dataset(
path,
engine="csv",
names=cols,
part_memory_fraction=frac_size,
sep='\t',
dtypes=dtypes,
).to_iter():
writer.write_table(gdf)
del gdf
md = writer.close(metadata_file_path=fn)
return md
def _write_metadata(md_list, fs, path):
rg_sizes = []
if md_list:
metadata_path = fs.sep.join([path, "_metadata"])
_meta = (
cudf.io.merge_parquet_filemetadata(md_list)
if len(md_list) > 1
else md_list[0]
)
with fs.open(metadata_path, "wb") as fil:
_meta.tofile(fil)
return True
Step 4: (Optionally) Start a Dask cluster
[5]:
# Start up cluster if we have multiple devices
# (and `allow_multi_gpu == True`)
client = None
if n_workers > 1 and allow_multi_gpu:
cluster = LocalCUDACluster(
n_workers=n_workers,
CUDA_VISIBLE_DEVICES=CUDA_VISIBLE_DEVICES,
)
client = Client(cluster)
if use_rmm_pool:
client.run(_pool)
elif use_rmm_pool:
_pool()
Step 5: Main conversion script (build Dask task graph)
[6]:
fs = get_fs_token_paths(INPUT_PATH, mode="rb")[0]
file_list = [
x for x in fs.glob(fs.sep.join([INPUT_PATH, "day_*"]))
if not x.endswith("parquet")
]
file_list = sorted(file_list, key=natural_sort_key)
file_list = file_list[:max_day] if max_day else file_list
name_list = _analyze_paths(file_list, fs)[1]
cont_names = ["I" + str(x) for x in range(1, 14)]
cat_names = ["C" + str(x) for x in range(1, 27)]
cols = ["label"] + cont_names + cat_names
dtypes = {}
dtypes["label"] = np.int32
for x in cont_names:
dtypes[x] = np.int32
for x in cat_names:
dtypes[x] = "hex"
dsk = {}
token = tokenize(file_list, name_list, OUTPUT_PATH, frac_size, fs, cols, dtypes)
convert_file_name = "convert_file-" + token
for i, (path, name) in enumerate(zip(file_list, name_list)):
key = (convert_file_name, i)
dsk[key] = (_convert_file, path, name, OUTPUT_PATH, frac_size, fs, cols, dtypes)
write_meta_name = "write-metadata-" + token
dsk[write_meta_name] = (
_write_metadata,
[(convert_file_name, i) for i in range(len(file_list))],
fs,
OUTPUT_PATH,
)
graph = HighLevelGraph.from_collections(write_meta_name, dsk, dependencies=[])
conversion_delayed = Delayed(write_meta_name, graph)
Step 6: Execute conversion
[7]:
%%time
if client:
conversion_delayed.compute()
else:
conversion_delayed.compute(scheduler="synchronous")
CPU times: user 8min 48s, sys: 12min 53s, total: 21min 41s
Wall time: 30min 51s