Source code for lale.lib.rasl.datasets

# Copyright 2022 IBM Corporation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import Iterable, Literal, Tuple, Union, cast, overload

import pandas as pd
import sklearn.model_selection
import sklearn.tree
from typing_extensions import TypeAlias

import lale.helpers
from lale.datasets import pandas2spark
from lale.helpers import datatype_param_type

from .split_xy import SplitXy

_PandasBatch: TypeAlias = Tuple[pd.DataFrame, pd.Series]

if lale.helpers.spark_installed:
    from pyspark.sql.dataframe import DataFrame as SparkDataFrame

    _SparkBatch: TypeAlias = Tuple[SparkDataFrame, SparkDataFrame]

    _PandasOrSparkBatchAux = Union[
        _PandasBatch,
        _SparkBatch,
    ]

else:
    _PandasOrSparkBatchAux = _PandasBatch  # type: ignore

# pyright does not currently accept a TypeAlias with conditional definitions
_PandasOrSparkBatch: TypeAlias = _PandasOrSparkBatchAux  # type: ignore

try:
    import arff

    from lale.datasets.openml import openml_datasets  # pylint:disable=ungrouped-imports

    liac_arff_installed = True
except ModuleNotFoundError:
    liac_arff_installed = False


[docs]def arff_data_loader( file_name: str, label_name: str, rows_per_batch: int ) -> Iterable[_PandasBatch]: """Incrementally load an ARFF file and yield it one (X, y) batch at a time.""" assert liac_arff_installed split_x_y = SplitXy(label_name=label_name) def make_batch(): start = n_batches * rows_per_batch stop = start + len(row_list) df = pd.DataFrame(row_list, range(start, stop), column_names) X, y = split_x_y.transform_X_y(df, None) return X, y with open(file_name) as f: # pylint:disable=unspecified-encoding arff_dict = arff.load(f, return_type=arff.DENSE_GEN) column_names = [name.lower() for name, _ in arff_dict["attributes"]] row_list = [] n_batches = 0 for row in arff_dict["data"]: row_list.append(row) if len(row_list) >= rows_per_batch: yield make_batch() row_list = [] n_batches += 1 if len(row_list) > 0: # last chunk yield make_batch()
[docs]def csv_data_loader( file_name: str, label_name: str, rows_per_batch: int ) -> Iterable[_PandasBatch]: """Incrementally load an CSV file and yield it one (X, y) batch at a time.""" split_x_y = SplitXy(label_name=label_name) with pd.read_csv(file_name, chunksize=rows_per_batch) as reader: for df in reader: X, y = split_x_y.transform_X_y(df, None) yield X, y
@overload def mockup_data_loader( X: pd.DataFrame, y: pd.Series, n_batches: int, astype: Literal["pandas"], shuffle: bool = False, ) -> Iterable[_PandasBatch]: ... @overload def mockup_data_loader( X: pd.DataFrame, y: pd.Series, n_batches: int, astype: datatype_param_type, shuffle: bool = False, ) -> Iterable[_PandasOrSparkBatch]: ...
[docs]def mockup_data_loader( X: pd.DataFrame, y: pd.Series, n_batches: int, astype: datatype_param_type, shuffle: bool = False, ) -> Iterable[_PandasOrSparkBatch]: """Split (X, y) into batches to emulate loading them incrementally. Only intended for testing purposes, because if X and y are already materialized in-memory, there is little reason to batch them. """ pandas_gen: Iterable[_PandasBatch] if n_batches == 1: pandas_gen = [(X, y)] else: cv = sklearn.model_selection.KFold(n_batches, shuffle=shuffle) estimator = sklearn.tree.DecisionTreeClassifier() pandas_gen = ( lale.helpers.split_with_schemas(estimator, X, y, test, train) for train, test in cv.split(X, y) ) if astype == "pandas": return pandas_gen elif astype == "spark": return ((pandas2spark(X), pandas2spark(y)) for X, y in pandas_gen) raise ValueError(f"expected astype in ['pandas', 'spark'], got {astype}")
[docs]def openml_data_loader(dataset_name: str, batch_size: int) -> Iterable[_PandasBatch]: """Download the OpenML dataset, incrementally load it, and yield it one (X,y) batch at a time.""" assert liac_arff_installed metadata = openml_datasets.experiments_dict[dataset_name] label_name = cast(str, metadata["target"]).lower() file_name = openml_datasets.download_if_missing(dataset_name) return arff_data_loader(file_name, label_name, batch_size)