Source code for lale.datasets.multitable.fetch_datasets

# Copyright 2019 IBM Corporation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import logging
import os
import urllib.request

import numpy as np
import pandas as pd

import lale.datasets.openml
from lale.datasets.data_schemas import add_table_name
from lale.helpers import datatype_param_type

logging.basicConfig()
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)

try:
    from pyspark.sql import SparkSession

    from lale.datasets.data_schemas import (  # pylint:disable=ungrouped-imports
        SparkDataFrameWithIndex,
    )

    spark_installed = True
except ImportError:
    spark_installed = False


[docs]def get_data_from_csv(datatype: datatype_param_type, data_file_name): datatype = datatype.casefold() # type: ignore if datatype == "pandas": return pd.read_csv(data_file_name) elif datatype == "spark": if spark_installed: spark = SparkSession.builder.appName("GoSales Dataset").getOrCreate() # type: ignore df = spark.read.options(inferSchema="True", delimiter=",").csv( data_file_name, header=True ) return SparkDataFrameWithIndex(df) else: raise ValueError("Spark is not installed on this machine.") else: raise ValueError( "Can fetch the go_sales data in pandas or spark dataframes only. Pass either 'pandas' or 'spark' in datatype parameter." )
[docs]def fetch_go_sales_dataset(datatype: datatype_param_type = "pandas"): """ Fetches the Go_Sales dataset from IBM's Watson's ML samples. It contains information about daily sales, methods, retailers and products of a company in form of 5 CSV files. This method downloads and stores these 5 CSV files under the 'lale/lale/datasets/multitable/go_sales_data' directory. It creates this directory by itself if it does not exists. Dataset URL: https://github.com/IBM/watson-machine-learning-samples/raw/master/cloud/data/go_sales/ Parameters ---------- datatype : string, optional, default 'pandas' If 'pandas', Returns a list of singleton dictionaries (each element of the list is one table from the dataset) after reading the downloaded CSV files. The key of each dictionary is the name of the table and the value contains a pandas dataframe consisting of the data. If 'spark', Returns a list of singleton dictionaries (each element of the list is one table from the dataset) after reading the downloaded CSV files. The key of each dictionary is the name of the table and the value contains a spark dataframe consisting of the data extended with an index column. Else, Throws an error as it does not support any other return type. Returns ------- go_sales_list : list of singleton dictionary of pandas / spark dataframes """ download_data_dir = os.path.join(os.path.dirname(__file__), "go_sales_data") base_url = "https://github.com/IBM/watson-machine-learning-samples/raw/master/cloud/data/go_sales/" filenames = [ "go_1k.csv", "go_daily_sales.csv", "go_methods.csv", "go_products.csv", "go_retailers.csv", ] go_sales_list = [] for file in filenames: data_file_name = os.path.join(download_data_dir, file) if not os.path.exists(data_file_name): if not os.path.exists(download_data_dir): os.makedirs(download_data_dir) # this request is to a hardcoded https url, so does not risk leaking local data urllib.request.urlretrieve(base_url + file, data_file_name) # nosec logger.info(f" Created: {data_file_name}") table_name = file.split(".", maxsplit=1)[0] data_frame = get_data_from_csv(datatype, data_file_name) go_sales_list.append(add_table_name(data_frame, table_name)) logger.info(" Fetched the Go_Sales dataset. Process completed.") return go_sales_list
[docs]def fetch_imdb_dataset(datatype: datatype_param_type = "pandas"): """ Fetches the IMDB movie dataset from Relational Dataset Repo. It contains information about directors, actors, roles and genres of multiple movies in form of 7 CSV files. This method downloads and stores these 7 CSV files under the 'lale/lale/datasets/multitable/imdb_data' directory. It creates this directory by itself if it does not exists. Dataset URL: https://relational.fit.cvut.cz/dataset/IMDb Parameters ---------- datatype : string, optional, default 'pandas' If 'pandas', Returns a list of singleton dictionaries (each element of the list is one table from the dataset) after reading the already existing CSV files. The key of each dictionary is the name of the table and the value contains a pandas dataframe consisting of the data. If 'spark', Returns a list of singleton dictionaries (each element of the list is one table from the dataset) after reading the downloaded CSV files. The key of each dictionary is the name of the table and the value contains a spark dataframe consisting of the data extended with an index column. Else, Throws an error as it does not support any other return type. Returns ------- imdb_list : list of singleton dictionary of pandas / spark dataframes Raises ------ jsonschema.ValueError dataset not found """ download_data_dir = os.path.join(os.path.dirname(__file__), "imdb_data") imdb_list = [] if not os.path.exists(download_data_dir): raise ValueError( f"IMDB dataset not found at {download_data_dir}. Please download it using lalegpl repository." ) for _root, _dirs, files in os.walk(download_data_dir): for file in files: filename, extension = os.path.splitext(file) if extension == ".csv": data_file_name = os.path.join(download_data_dir, file) table_name = filename data_frame = get_data_from_csv(datatype, data_file_name) imdb_list.append(add_table_name(data_frame, table_name)) if len(imdb_list) == 7: logger.info(" Fetched the IMDB dataset. Process completed.") else: raise ValueError( f"Incomplete IMDB dataset found at {download_data_dir}. Please download complete dataset using lalegpl repository." ) return imdb_list
[docs]def fetch_creditg_multitable_dataset(datatype: datatype_param_type = "pandas"): """ Fetches credit-g dataset from OpenML, but in a multi-table format. It transforms the [credit-g](https://www.openml.org/d/31) dataset from OpenML to a multi-table format. We split the dataset into 3 tables: `loan_application`, `bank_account_info` and `existing_credits_info`. The table `loan_application` serves as our primary table, and we treat the other two tables as providing additional information related to the applicant's bank account and existing credits. As one can see, this is very close to a real life scenario where information is present in multiple tables in normalized forms. We created a primary key column `id` as a proxy to the loan applicant's identity number. Parameters ---------- datatype : string, optional, default 'pandas' If 'pandas', Returns a list of singleton dictionaries (each element of the list is one table from the dataset) after reading the downloaded CSV files. The key of each dictionary is the name of the table and the value contains a pandas dataframe consisting of the data. Returns ------- dataframes_list : list of singleton dictionary of pandas dataframes """ (train_X, train_y), (test_X, test_y) = lale.datasets.openml.fetch( "credit-g", "classification", preprocess=False ) # vstack train and test X = pd.concat([train_X, test_X], axis=0) y = pd.concat([train_y, test_y], axis=0) bank_account_columns = ["checking_status", "savings_status"] loan_application_columns = [ "duration", "credit_history", "purpose", "credit_amount", "employment", "installment_commitment", "personal_status", "other_parties", "residence_since", "property_magnitude", "age", "other_payment_plans", "housing", "job", "num_dependents", "own_telephone", "foreign_worker", ] dataframes_list = [] bank_acc_df = X[bank_account_columns] bank_acc_df = bank_acc_df.copy() bank_acc_df.insert(0, "id", bank_acc_df.index) dataframes_list.append(add_table_name(bank_acc_df, "bank_account_info")) loan_application_df = X[loan_application_columns] loan_application_df = loan_application_df.copy() loan_application_df.insert(0, "id", loan_application_df.index) loan_application_df["class"] = y loan_application_df.iloc[2, 7] = "M single" loan_application_df.iloc[996, 7] = "M single" loan_application_df.iloc[998, 7] = "F div/dep/mar" dataframes_list.append(add_table_name(loan_application_df, "loan_application")) # existing credits is a fake table we are adding, so a join and count can create the `existing_credits` column df_col = X["existing_credits"] records = [] for row in df_col.iteritems(): row_id = row[0] credit_count = int(row[1]) for _i in range(credit_count): records.append( { "id": row_id, "credit_number": np.random.randint(1, 1000000), "type": "credit", "status": "on", } ) existing_credits_df = pd.DataFrame.from_records(records) dataframes_list.append(add_table_name(existing_credits_df, "existing_credits_info")) return dataframes_list