Source code for lale.lib.lale.auto_pipeline

# Copyright 2020 IBM Corporation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import time
import warnings
from typing import Optional

import hyperopt
import pandas as pd
import sklearn.metrics
import sklearn.model_selection

import lale.docstrings
import lale.helpers
import lale.operators
from lale.lib._common_schemas import (
    schema_best_score_single,
    schema_cv,
    schema_max_opt_time,
    schema_scoring_single,
)

with warnings.catch_warnings():
    warnings.filterwarnings("ignore", category=FutureWarning)
    try:
        import xgboost  # noqa: F401

        xgboost_installed = True
    except ImportError:
        xgboost_installed = False
    try:
        import lightgbm.sklearn  # noqa: F401

        lightgbm_installed = True
    except ImportError:
        lightgbm_installed = False


[docs]def auto_prep(X): from lale.lib.lale import ConcatFeatures, Project, categorical from lale.lib.sklearn import OneHotEncoder, SimpleImputer n_cols = X.shape[1] n_cats = len(categorical()(X)) prep_num = SimpleImputer(strategy="mean") prep_cat = SimpleImputer(strategy="most_frequent") >> OneHotEncoder( handle_unknown="ignore" ) if n_cats == 0: result = prep_num elif n_cats == n_cols: result = prep_cat else: result = ( ( Project(columns={"type": "number"}, drop_columns=categorical()) >> prep_num ) & (Project(columns=categorical()) >> prep_cat) ) >> ConcatFeatures return result
[docs]def auto_gbt(prediction_type): if prediction_type == "regression": if xgboost_installed: from lale.lib.xgboost import XGBRegressor return XGBRegressor(verbosity=0) elif lightgbm_installed: from lale.lib.lightgbm import LGBMRegressor return LGBMRegressor() else: from lale.lib.sklearn import GradientBoostingRegressor return GradientBoostingRegressor() else: assert prediction_type in ["binary", "multiclass", "classification"] if xgboost_installed: from lale.lib.xgboost import XGBClassifier return XGBClassifier(verbosity=0) elif lightgbm_installed: from lale.lib.lightgbm import LGBMClassifier return LGBMClassifier() else: from lale.lib.sklearn import GradientBoostingClassifier return GradientBoostingClassifier()
class _AutoPipelineImpl: _summary: Optional[pd.DataFrame] def __init__( self, *, prediction_type="classification", scoring=None, best_score=0.0, verbose=False, max_evals=100, max_opt_time=600.0, max_eval_time=120.0, cv=5, ): self.prediction_type = prediction_type self.max_opt_time = max_opt_time self.max_eval_time = max_eval_time self.max_evals = max_evals self.verbose = verbose if scoring is None: scoring = "r2" if prediction_type == "regression" else "accuracy" self.scoring = scoring self._scorer = sklearn.metrics.get_scorer(scoring) self.best_score = best_score self._summary = None self.cv = cv def _try_and_add(self, name, trainable, X, y): assert name not in self._pipelines if self._name_of_best is not None: if time.time() > self._start_fit + self.max_opt_time: return with warnings.catch_warnings(): warnings.simplefilter("ignore") cv = sklearn.model_selection.check_cv( cv=self.cv, classifier=(self.prediction_type != "regression") ) ( cv_score, logloss, execution_time, ) = lale.helpers.cross_val_score_track_trials( trainable, X, y, self.scoring, cv ) loss = self.best_score - cv_score if self._name_of_best is None or ( self._summary is None or loss < self._summary.at[self._name_of_best, "loss"] ): self._name_of_best = name record = { "name": name, "loss": loss, "time": execution_time, "log_loss": logloss, "status": hyperopt.STATUS_OK, } singleton_summary = pd.DataFrame.from_records([record], index="name") if self._summary is None: self._summary = singleton_summary else: self._summary = pd.concat([self._summary, singleton_summary]) if name == self._name_of_best: self._pipelines[name] = trainable.fit(X, y) else: self._pipelines[name] = trainable def _fit_dummy(self, X, y): from lale.lib.sklearn import DummyClassifier, DummyRegressor if self.prediction_type == "regression": trainable = DummyRegressor() else: trainable = DummyClassifier() self._try_and_add("dummy", trainable, X, y) def _fit_gbt_num(self, X, y): from lale.lib.lale import Project from lale.lib.sklearn import SimpleImputer gbt = auto_gbt(self.prediction_type) trainable = ( Project(columns={"type": "number"}) >> SimpleImputer(strategy="mean") >> gbt ) self._try_and_add("gbt_num", trainable, X, y) def _fit_gbt_all(self, X, y): prep = auto_prep(X) gbt = auto_gbt(self.prediction_type) trainable = prep >> gbt self._try_and_add("gbt_all", trainable, X, y) def _fit_hyperopt(self, X, y): from lale.lib.lale import Hyperopt, NoOp from lale.lib.sklearn import ( PCA, DecisionTreeClassifier, DecisionTreeRegressor, KNeighborsClassifier, KNeighborsRegressor, MinMaxScaler, RandomForestClassifier, RandomForestRegressor, RobustScaler, SelectKBest, SGDClassifier, SGDRegressor, StandardScaler, ) remaining_time = self.max_opt_time - (time.time() - self._start_fit) if remaining_time <= 0: return prep = auto_prep(X) scale = MinMaxScaler | StandardScaler | RobustScaler | NoOp reduce_dims = PCA | SelectKBest | NoOp gbt = auto_gbt(self.prediction_type) if self.prediction_type == "regression": estim_trees = gbt | DecisionTreeRegressor | RandomForestRegressor estim_notree = SGDRegressor | KNeighborsRegressor else: estim_trees = gbt | DecisionTreeClassifier | RandomForestClassifier estim_notree = SGDClassifier | KNeighborsClassifier model_trees = reduce_dims >> estim_trees model_notree = scale >> reduce_dims >> estim_notree planned = prep >> (model_trees | model_notree) prior_evals = self._summary.shape[0] if self._summary is not None else 0 trainable = Hyperopt( estimator=planned, max_evals=self.max_evals - prior_evals, scoring=self.scoring, best_score=self.best_score, max_opt_time=remaining_time, max_eval_time=self.max_eval_time, verbose=self.verbose, show_progressbar=False, cv=self.cv, ) trained = trainable.fit(X, y) # The static types are not currently smart enough to verify # that the conditionally defined summary method is actually present # But it must be, since the hyperopt impl type provides it summary: pd.DataFrame = trained.summary() # type: ignore if list(summary.status) == ["new"]: return # only one trial and that one timed out best_trial = trained._impl._trials.best_trial if "loss" in best_trial["result"]: if ( self._summary is None or best_trial["result"]["loss"] < self._summary.at[self._name_of_best, "loss"] ): self._name_of_best = f'p{best_trial["tid"]}' if self._summary is None: self._summary = summary else: self._summary = pd.concat([self._summary, summary]) for name in summary.index: assert name not in self._pipelines if summary.at[name, "status"] == hyperopt.STATUS_OK: self._pipelines[name] = trained.get_pipeline(name) def fit(self, X, y): self._start_fit = time.time() self._name_of_best = None self._summary = None self._pipelines = {} self._fit_dummy(X, y) self._fit_gbt_num(X, y) self._fit_gbt_all(X, y) self._fit_hyperopt(X, y) return self def predict(self, X, **predict_params): best_pipeline = self._pipelines[self._name_of_best] result = best_pipeline.predict(X, **predict_params) return result def summary(self): """Table summarizing the trial results (name, tid, loss, time, log_loss, status). Returns ------- result : DataFrame""" if self._summary is not None: self._summary.sort_values(by="loss", inplace=True) return self._summary def get_pipeline( self, pipeline_name: Optional[str] = None, astype: lale.helpers.astype_type = "lale", ): """Retrieve one of the trials. Parameters ---------- pipeline_name : union type, default None - string Key for table returned by summary(), return a trainable pipeline. - None When not specified, return the best trained pipeline found. astype : 'lale' or 'sklearn', default 'lale' Type of resulting pipeline. Returns ------- result : Trained operator if best, trainable operator otherwise.""" if pipeline_name is None: pipeline_name = self._name_of_best result = self._pipelines[pipeline_name] if result is None or astype == "lale": return result assert astype == "sklearn", astype return result.export_to_sklearn_pipeline() _hyperparams_schema = { "allOf": [ { "type": "object", "required": [ "prediction_type", "scoring", "max_evals", "max_opt_time", "max_eval_time", "cv", ], "relevantToOptimizer": [], "additionalProperties": False, "properties": { "prediction_type": { "description": "The kind of learning problem.", "enum": ["binary", "multiclass", "classification", "regression"], "default": "classification", }, "scoring": schema_scoring_single, "best_score": schema_best_score_single, "verbose": { "description": """Whether to print errors from each of the trials if any. This is also logged using logger.warning in Hyperopt.""", "type": "boolean", "default": False, }, "max_evals": { "description": "Number of trials of Hyperopt search.", "type": "integer", "minimum": 1, "default": 100, }, "max_opt_time": { **schema_max_opt_time, "default": 600.0, }, "max_eval_time": { "description": "Maximum time in seconds for each evaluation.", "anyOf": [ {"type": "number", "minimum": 0.0, "exclusiveMinimum": True}, {"description": "No runtime bound.", "enum": [None]}, ], "default": 120.0, }, "cv": schema_cv, }, } ] } _input_fit_schema = { "type": "object", "required": ["X", "y"], "properties": { "X": { "type": "array", "items": {"type": "array", "items": {"laleType": "Any"}}, }, "y": { "anyOf": [ {"type": "array", "items": {"type": "number"}}, {"type": "array", "items": {"type": "string"}}, {"type": "array", "items": {"type": "boolean"}}, ] }, }, } _input_predict_schema = { "type": "object", "required": ["X"], "properties": { "X": {"type": "array", "items": {"type": "array", "items": {"laleType": "Any"}}} }, } _output_predict_schema = { "anyOf": [ {"type": "array", "items": {"type": "number"}}, {"type": "array", "items": {"type": "string"}}, {"type": "array", "items": {"type": "boolean"}}, ] } _combined_schemas = { "description": """Automatically find a pipeline for a dataset. This is a high-level entry point to get an initial trained pipeline without having to specify your own planned pipeline first. It is designed to be simple at the expense of not offering much control. For an example, see `demo_auto_pipeline.ipynb`_. .. _`demo_auto_pipeline.ipynb`: https://github.com/IBM/lale/blob/master/examples/demo_auto_pipeline.ipynb """, "documentation_url": "https://lale.readthedocs.io/en/latest/modules/lale.lib.lale.auto_pipeline.html", "import_from": "lale.lib.lale", "type": "object", "tags": {"pre": [], "op": ["estimator"], "post": []}, "properties": { "hyperparams": _hyperparams_schema, "input_fit": _input_fit_schema, "input_predict": _input_predict_schema, "output_predict": _output_predict_schema, }, } AutoPipeline = lale.operators.make_operator(_AutoPipelineImpl, _combined_schemas) lale.docstrings.set_docstrings(AutoPipeline)