# Copyright 2021 NVIDIA Corporation. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
# Each user is responsible for checking the content of datasets and the
# applicable licenses and determining if suitable for the intended use.
Scaling Criteo: Download and Convert#
This notebook is created using the latest stable merlin-hugectr, merlin-tensorflow, or merlin-pytorch container.
Criteo 1TB Click Logs dataset#
The Criteo 1TB Click Logs dataset is the largest public available dataset for recommender system. It contains ~1.3 TB of uncompressed click logs containing over four billion samples spanning 24 days. Each record contains 40 features: one label indicating a click or no click, 13 numerical figures, and 26 categorical features. The dataset is provided by CriteoLabs. A subset of 7 days was used in this Kaggle Competition. We will use the dataset as an example how to scale ETL, Training and Inference.
First, we will download the data and extract it. We define the base directory for the dataset and the numbers of day. Criteo provides 24 days. We will use the last day as validation dataset and the remaining days as training.
Each day has a size of ~15GB compressed .gz
and uncompressed ~47GB. You can define a smaller subset of days, if you like. Each day takes ~20-30min to download and extract it.
import os
from merlin.core.utils import download_file
download_criteo = True
BASE_DIR = os.environ.get("BASE_DIR", "/raid/data/criteo")
input_path = os.path.join(BASE_DIR, "crit_orig")
NUMBER_DAYS = os.environ.get("NUMBER_DAYS", 2)
We create the folder structure and download and extract the files. If the file already exist, it will be skipped.
%%time
if download_criteo:
# Test if NUMBER_DAYS in valid range
if NUMBER_DAYS < 2 or NUMBER_DAYS > 23:
raise ValueError(
str(NUMBER_DAYS) +
" is not supported. A minimum of 2 days are " +
"required and a maximum of 24 (0-23 days) are available"
)
# Create BASE_DIR if not exists
if not os.path.exists(BASE_DIR):
os.makedirs(BASE_DIR)
# Create input dir if not exists
if not os.path.exists(input_path):
os.makedirs(input_path)
# Iterate over days
for i in range(0, NUMBER_DAYS):
file = os.path.join(input_path, "day_" + str(i) + ".gz")
# Download file, if there is no .gz, .csv or .parquet file
if not (
os.path.exists(file)
or os.path.exists(
file.replace(".gz", ".parquet").replace("crit_orig", "converted/criteo/")
)
or os.path.exists(file.replace(".gz", ""))
):
download_file(
"https://storage.googleapis.com/criteo-cail-datasets/day_" +
str(i) +
".gz",
file,
)
downloading day_0.gz: 16.3GB [03:16, 83.0MB/s]
downloading day_1.gz: 16.7GB [04:32, 61.3MB/s]
CPU times: user 15min 12s, sys: 3min 48s, total: 19min
Wall time: 23min 25s
The original dataset is in text format. We will convert the dataset into .parquet
format. Parquet is a compressed, column-oriented file structure and requires less disk space.
Conversion Script for Criteo Dataset (CSV-to-Parquet)#
Import libraries
import os
import glob
import numpy as np
from dask.distributed import Client
from dask_cuda import LocalCUDACluster
import nvtabular as nvt
from merlin.core.utils import device_mem_size, get_rmm_size
Specify options
Specify the input and output paths, unless the INPUT_DATA_DIR
and OUTPUT_DATA_DIR
environment variables are already set. For multi-GPU systems, check that the CUDA_VISIBLE_DEVICES
environment variable includes all desired device IDs.
INPUT_PATH = os.environ.get("INPUT_DATA_DIR", input_path)
OUTPUT_PATH = os.environ.get("OUTPUT_DATA_DIR", os.path.join(BASE_DIR, "converted"))
CUDA_VISIBLE_DEVICES = os.environ.get("CUDA_VISIBLE_DEVICES", "0")
frac_size = 0.10
(Optionally) Start a Dask cluster
cluster = None # Connect to existing cluster if desired
if cluster is None:
cluster = LocalCUDACluster(
CUDA_VISIBLE_DEVICES=CUDA_VISIBLE_DEVICES,
rmm_pool_size=get_rmm_size(0.8 * device_mem_size()),
local_directory=os.path.join(OUTPUT_PATH, "dask-space"),
)
client = Client(cluster)
2022-12-06 09:52:35,564 - distributed.preloading - INFO - Import preload module: dask_cuda.initialize
Convert original data to an NVTabular Dataset
# Specify column names
cont_names = ["I" + str(x) for x in range(1, 14)]
cat_names = ["C" + str(x) for x in range(1, 27)]
cols = ["label"] + cont_names + cat_names
# Specify column dtypes. Note that "hex" means that
# the values will be hexadecimal strings that should
# be converted to int32
dtypes = {}
dtypes["label"] = np.int32
for x in cont_names:
dtypes[x] = np.int32
for x in cat_names:
dtypes[x] = "hex"
# Create an NVTabular Dataset from a CSV-file glob
file_list = glob.glob(os.path.join(INPUT_PATH, "day_*[!.gz]"))
dataset = nvt.Dataset(
file_list,
engine="csv",
names=cols,
part_mem_fraction=frac_size,
sep="\t",
dtypes=dtypes,
client=client,
)
/usr/local/lib/python3.8/dist-packages/merlin/core/utils.py:384: FutureWarning: The `client` argument is deprecated from Dataset and will be removed in a future version of NVTabular. By default, a global client in the same python context will be detected automatically, and `merlin.utils.set_dask_client` (as well as `Distributed` and `Serial`) can be used for explicit control.
warnings.warn(
Write Dataset to Parquet
dataset.to_parquet(
os.path.join(OUTPUT_PATH, "criteo"),
preserve_files=True,
)
You can delete the original criteo files as they require a lot of disk space.