[ ]:
# Copyright 2021 NVIDIA Corporation. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================

Getting Started Outbrain: Download and Convert

Overview

Outbrain dataset was published in Kaggle Outbrain click prediction competition, where the ‘Kagglers’ were challenged to predict on which ads and other forms of sponsored content its global users would click. One of the top finishers’ preprocessing and feature engineering pipeline is taken into consideration here, and this pipeline was restructured using NVTabular and cuDF.

[2]:
import os

# Get dataframe library - cudf or pandas
from nvtabular.dispatch import get_lib, random_uniform, reinitialize
df_lib = get_lib()

Download the dataset

First, you need to download the Kaggle Outbrain click prediction challenge and set DATA_BUCKET_FOLDER with the dataset path.

[3]:
DATA_BUCKET_FOLDER = os.environ.get("INPUT_DATA_DIR", "~/nvt-examples/outbrain/data/")

The OUTPUT_BUCKET_FOLDER is the folder where the preprocessed dataset will be saved.

[4]:
OUTPUT_BUCKET_FOLDER = os.environ.get("OUTPUT_DATA_DIR", "./outbrain-preprocessed/")
os.makedirs(OUTPUT_BUCKET_FOLDER, exist_ok=True)

Preparing Our Dataset

Here, we merge the component tables of our dataset into a single data frame, using cuDF, which is a GPU DataFrame library for loading, joining, aggregating, filtering, and otherwise manipulating data. We do this because NVTabular applies a workflow to a single table. We also re-initialize managed memory. rmm.reinitialize() provides an easy way to initialize RMM (RAPIDS Memory Manager) with specific memory resource options across multiple devices. The reason we re-initialize managed memory here is to allow us to perform memory intensive merge operation. Note that dask-cudf can also be used here.

[5]:
# use managed memory for device memory allocation
reinitialize(managed_memory=True)

# Alias for read_csv
read_csv = df_lib.read_csv

# Merge all the CSV files together
documents_meta = read_csv(DATA_BUCKET_FOLDER + "documents_meta.csv", na_values=["\\N", ""])
merged = (
    read_csv(DATA_BUCKET_FOLDER + "clicks_train.csv", na_values=["\\N", ""])
    .merge(
        read_csv(DATA_BUCKET_FOLDER + "events.csv", na_values=["\\N", ""]),
        on="display_id",
        how="left",
        suffixes=("", "_event"),
    )
    .merge(
        read_csv(DATA_BUCKET_FOLDER + "promoted_content.csv", na_values=["\\N", ""]),
        on="ad_id",
        how="left",
        suffixes=("", "_promo"),
    )
    .merge(documents_meta, on="document_id", how="left")
    .merge(
        documents_meta,
        left_on="document_id_promo",
        right_on="document_id",
        how="left",
        suffixes=("", "_promo"),
    )
)

Splitting into train and validation datasets

We use a time-stratified sample to create a validation set that is more recent, and save both our train and validation sets to parquet files to be read by NVTabular. Note that you should run the cell below only once, then save your train and valid data frames as parquet files. If you want to rerun this notebook you might end up with a different train-validation split each time because samples are drawn from a uniform distribution.

[6]:
# Do a stratified split of the merged dataset into a training/validation dataset
merged["day_event"] = (merged["timestamp"] / 1000 / 60 / 60 / 24).astype(int)
random_state = df_lib.Series(random_uniform(size=len(merged)))
valid_set, train_set = merged.scatter_by_map(
    ((merged.day_event <= 10) & (random_state > 0.2)).astype(int)
)
[7]:
train_set.head()
[7]:
display_id ad_id clicked uuid document_id timestamp platform geo_location document_id_promo campaign_id advertiser_id source_id publisher_id publish_time source_id_promo publisher_id_promo publish_time_promo day_event
9376 15071 50640 0 ef25dd3a359f77 91525 1053684 1.0 AU>02 869942 6714 1913 8107.0 9.0 2012-02-13 00:00:00 8484.0 <NA> 2015-09-13 21:00:00 0
11904 1545 75959 0 7be3eb33be486c 1697627 107138 1.0 US>MA>521 1078193 959 8 8610.0 1142.0 <NA> 1493.0 305.0 2016-02-04 00:00:00 0
12512 3544 149756 0 33e08ba8e59293 1326664 243025 1.0 US>TX>623 1233590 19007 285 948.0 450.0 <NA> 10972.0 1147.0 2016-04-01 00:00:00 0
15072 229 150817 0 fc514cfac3fd61 1776403 16513 1.0 US>AZ>789 1362397 19188 2407 1095.0 1004.0 2016-06-13 16:00:00 8610.0 1142.0 2016-04-28 15:00:00 0
15584 1534 116696 0 da4d73f0a12bfa 1503347 106591 2.0 US>CA>866 1191113 15005 83 5792.0 704.0 2016-05-31 00:00:00 7357.0 <NA> <NA> 0

We save the dataset to disk.

[8]:
train_filename = os.path.join(OUTPUT_BUCKET_FOLDER, "train_gdf.parquet")
valid_filename = os.path.join(OUTPUT_BUCKET_FOLDER, "valid_gdf.parquet")
train_set.to_parquet(train_filename, compression=None)
valid_set.to_parquet(valid_filename, compression=None)
merged = train_set = valid_set = None
[9]:
reinitialize(managed_memory=False)