# Copyright 2019 IBM Corporation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import ast
import copy
import importlib
import logging
import time
import traceback
from importlib import util
from typing import (
TYPE_CHECKING,
Any,
Dict,
Iterable,
List,
Literal,
Mapping,
Optional,
Set,
Tuple,
TypeVar,
Union,
overload,
)
import numpy as np
import pandas as pd
import scipy.sparse
import sklearn.pipeline
from numpy.random import RandomState
from sklearn.metrics import accuracy_score, check_scoring, log_loss
from sklearn.model_selection import StratifiedKFold
from sklearn.utils.metaestimators import _safe_split
import lale.datasets.data_schemas
try:
import torch
torch_installed = True
except ImportError:
torch_installed = False
spark_loader = util.find_spec("pyspark")
spark_installed = spark_loader is not None
if spark_installed:
from pyspark.sql.dataframe import DataFrame as spark_df
logger = logging.getLogger(__name__)
LALE_NESTED_SPACE_KEY = "__lale_nested_space"
astype_type = Literal["lale", "sklearn"]
datatype_param_type = Literal["pandas", "spark"]
randomstate_type = Union[RandomState, int, None]
[docs]def make_nested_hyperopt_space(sub_space):
return {LALE_NESTED_SPACE_KEY: sub_space}
[docs]def assignee_name(level=1) -> Optional[str]:
tb = traceback.extract_stack()
file_name, _line_number, _function_name, text = tb[-(level + 2)]
try:
tree = ast.parse(text, file_name)
except SyntaxError:
return None
assert tree is not None and isinstance(tree, ast.Module)
if len(tree.body) == 1:
stmt = tree.body[0]
if isinstance(stmt, ast.Assign):
lhs = stmt.targets
if len(lhs) == 1:
res = lhs[0]
if isinstance(res, ast.Name):
return res.id
return None
[docs]def arg_name(pos=0, level=1) -> Optional[str]:
tb = traceback.extract_stack()
file_name, _line_number, _function_name, text = tb[-(level + 2)]
try:
tree = ast.parse(text, file_name)
except SyntaxError:
return None
assert tree is not None and isinstance(tree, ast.Module)
if len(tree.body) == 1:
stmt = tree.body[0]
if isinstance(stmt, ast.Expr):
expr = stmt.value
if isinstance(expr, ast.Call):
args = expr.args
if pos < len(args):
res = args[pos]
if isinstance(res, ast.Name):
return res.id
return None
[docs]def data_to_json(data, subsample_array: bool = True) -> Union[list, dict, int, float]:
if isinstance(data, tuple):
# convert to list
return [data_to_json(elem, subsample_array) for elem in data]
if isinstance(data, list):
return [data_to_json(elem, subsample_array) for elem in data]
elif isinstance(data, dict):
return {key: data_to_json(data[key], subsample_array) for key in data}
elif isinstance(data, np.ndarray):
return ndarray_to_json(data, subsample_array)
elif isinstance(data, scipy.sparse.csr_matrix):
return ndarray_to_json(data.toarray(), subsample_array)
elif isinstance(data, (pd.DataFrame, pd.Series)):
np_array = data.values
return ndarray_to_json(np_array, subsample_array)
elif torch_installed and isinstance(data, torch.Tensor):
np_array = data.detach().numpy()
return ndarray_to_json(np_array, subsample_array)
elif isinstance(data, (np.int64, np.int32, np.int16)): # type: ignore
return int(data)
elif isinstance(data, (np.float32, np.float64)): # type: ignore
return float(data)
else:
return data
[docs]def is_empty_dict(val) -> bool:
return isinstance(val, dict) and len(val) == 0
[docs]def dict_without(orig_dict: Dict[str, Any], key: str) -> Dict[str, Any]:
if key not in orig_dict:
return orig_dict
return {k: v for k, v in orig_dict.items() if k != key}
[docs]def json_lookup(ptr, jsn, default=None):
steps = ptr.split("/")
sub_jsn = jsn
for s in steps:
if s not in sub_jsn:
return default
sub_jsn = sub_jsn[s]
return sub_jsn
[docs]def ndarray_to_json(arr: np.ndarray, subsample_array: bool = True) -> Union[list, dict]:
# sample 10 rows and no limit on columns
num_subsamples: List[int]
if subsample_array:
num_subsamples = [10, np.iinfo(int).max, np.iinfo(int).max]
else:
num_subsamples = [
np.iinfo(int).max,
np.iinfo(int).max,
np.iinfo(int).max,
]
def subarray_to_json(indices: Tuple[int, ...]) -> Any:
if len(indices) == len(arr.shape):
if isinstance(arr[indices], (bool, int, float, str)):
return arr[indices]
elif np.issubdtype(arr.dtype, np.bool_):
return bool(arr[indices])
elif np.issubdtype(arr.dtype, np.integer):
return int(arr[indices])
elif np.issubdtype(arr.dtype, np.number):
return float(arr[indices])
elif arr.dtype.kind in ["U", "S", "O"]:
return str(arr[indices])
else:
raise ValueError(
f"Unexpected dtype {arr.dtype}, "
f"kind {arr.dtype.kind}, "
f"type {type(arr[indices])}."
)
else:
assert len(indices) < len(arr.shape)
return [
subarray_to_json(indices + (i,))
for i in range(
min(num_subsamples[len(indices)], arr.shape[len(indices)])
)
]
return subarray_to_json(())
[docs]def split_with_schemas(estimator, all_X, all_y, indices, train_indices=None):
subset_X, subset_y = _safe_split(estimator, all_X, all_y, indices, train_indices)
if hasattr(all_X, "json_schema"):
n_rows = subset_X.shape[0]
schema = {
"type": "array",
"minItems": n_rows,
"maxItems": n_rows,
"items": all_X.json_schema["items"],
}
lale.datasets.data_schemas.add_schema(subset_X, schema)
if hasattr(all_y, "json_schema"):
n_rows = subset_y.shape[0]
schema = {
"type": "array",
"minItems": n_rows,
"maxItems": n_rows,
"items": all_y.json_schema["items"],
}
lale.datasets.data_schemas.add_schema(subset_y, schema)
return subset_X, subset_y
[docs]def fold_schema(X, y, cv=1, is_classifier=True):
def fold_schema_aux(data, n_rows):
orig_schema = lale.datasets.data_schemas._to_schema(data)
aux_result = {**orig_schema, "minItems": n_rows, "maxItems": n_rows}
return aux_result
n_splits = cv if isinstance(cv, int) else cv.get_n_splits()
try:
n_samples = X.shape[0] if hasattr(X, "shape") else len(X)
except TypeError: # raised for Spark dataframes.
n_samples = X.count() if hasattr(X, "count") else 0
if n_splits == 1:
n_rows_fold = n_samples
elif is_classifier:
n_classes = len(set(y))
n_rows_unstratified = (n_samples // n_splits) * (n_splits - 1)
# in stratified case, fold sizes can differ by up to n_classes
n_rows_fold = max(1, n_rows_unstratified - n_classes)
else:
n_rows_fold = (n_samples // n_splits) * (n_splits - 1)
schema_X = fold_schema_aux(X, n_rows_fold)
schema_y = fold_schema_aux(y, n_rows_fold)
result = {"properties": {"X": schema_X, "y": schema_y}}
return result
[docs]def cross_val_score_track_trials(
estimator,
X,
y=None,
scoring: Any = accuracy_score,
cv: Any = 5,
args_to_scorer: Optional[Dict[str, Any]] = None,
args_to_cv: Optional[Dict[str, Any]] = None,
**fit_params,
):
"""
Use the given estimator to perform fit and predict for splits defined by 'cv' and compute the given score on
each of the splits.
Parameters
----------
estimator: A valid sklearn_wrapper estimator
X: Valid data that works with the estimator
y: Valid target that works with the estimator
scoring: string or a scorer object created using
https://scikit-learn.org/stable/modules/generated/sklearn.metrics.make_scorer.html#sklearn.metrics.make_scorer.
A string from sklearn.metrics.SCORERS.keys() can be used or a scorer created from one of
sklearn.metrics (https://scikit-learn.org/stable/modules/classes.html#module-sklearn.metrics).
A completely custom scorer object can be created from a python function following the example at
https://scikit-learn.org/stable/modules/model_evaluation.html
The metric has to return a scalar value,
cv: an integer or an object that has a split function as a generator yielding (train, test) splits as arrays of indices.
Integer value is used as number of folds in sklearn.model_selection.StratifiedKFold, default is 5.
Note that any of the iterators from https://scikit-learn.org/stable/modules/cross_validation.html#cross-validation-iterators can be used here.
args_to_scorer: A dictionary of additional keyword arguments to pass to the scorer.
Used for cases where the scorer has a signature such as ``scorer(estimator, X, y, **kwargs)``.
args_to_cv: A dictionary of additional keyword arguments to pass to the split method of cv.
This is only applicable when cv is not an integer.
fit_params: Additional parameters that should be passed when calling fit on the estimator
Returns
-------
cv_results: a list of scores corresponding to each cross validation fold
"""
if isinstance(cv, int):
cv = StratifiedKFold(cv)
if args_to_scorer is None:
args_to_scorer = {}
if args_to_cv is None:
args_to_cv = {}
scorer = check_scoring(estimator, scoring=scoring)
cv_results: List[float] = []
log_loss_results = []
time_results = []
for train, test in cv.split(X, y, **args_to_cv):
X_train, y_train = split_with_schemas(estimator, X, y, train)
X_test, y_test = split_with_schemas(estimator, X, y, test, train)
start = time.time()
# Not calling sklearn.base.clone() here, because:
# (1) For Lale pipelines, clone() calls the pipeline constructor
# with edges=None, so the resulting topology is incorrect.
# (2) For Lale individual operators, the fit() method already
# clones the impl object, so cloning again is redundant.
trained = estimator.fit(X_train, y_train, **fit_params)
score_value = scorer(trained, X_test, y_test, **args_to_scorer)
execution_time = time.time() - start
# not all estimators have predict probability
try:
y_pred_proba = trained.predict_proba(X_test)
logloss = log_loss(y_true=y_test, y_pred=y_pred_proba)
log_loss_results.append(logloss)
except BaseException:
logger.debug("Warning, log loss cannot be computed")
cv_results.append(score_value)
time_results.append(execution_time)
result = (
np.array(cv_results).mean(),
np.array(log_loss_results).mean(),
np.array(time_results).mean(),
)
return result
[docs]def cross_val_score(estimator, X, y=None, scoring: Any = accuracy_score, cv: Any = 5):
"""
Use the given estimator to perform fit and predict for splits defined by 'cv' and compute the given score on
each of the splits.
Parameters
----------
estimator: A valid sklearn_wrapper estimator
X: Valid data value that works with the estimator
y: Valid target value that works with the estimator
scoring: a scorer object from sklearn.metrics (https://scikit-learn.org/stable/modules/classes.html#module-sklearn.metrics)
Default value is accuracy_score.
cv: an integer or an object that has a split function as a generator yielding (train, test) splits as arrays of indices.
Integer value is used as number of folds in sklearn.model_selection.StratifiedKFold, default is 5.
Note that any of the iterators from https://scikit-learn.org/stable/modules/cross_validation.html#cross-validation-iterators can be used here.
Returns
-------
cv_results: a list of scores corresponding to each cross validation fold
"""
if isinstance(cv, int):
cv = StratifiedKFold(cv)
cv_results = []
for train, test in cv.split(X, y):
X_train, y_train = split_with_schemas(estimator, X, y, train)
X_test, y_test = split_with_schemas(estimator, X, y, test, train)
trained_estimator = estimator.fit(X_train, y_train)
predicted_values = trained_estimator.predict(X_test)
cv_results.append(scoring(y_test, predicted_values))
return cv_results
[docs]def create_individual_op_using_reflection(class_name, operator_name, param_dict):
instance = None
if class_name is not None:
class_name_parts = class_name.split(".")
assert (
len(class_name_parts)
) > 1, (
"The class name needs to be fully qualified, i.e. module name + class name"
)
module_name = ".".join(class_name_parts[0:-1])
class_name = class_name_parts[-1]
module = importlib.import_module(module_name)
class_ = getattr(module, class_name)
if param_dict is None:
instance = class_()
else:
instance = class_(**param_dict)
return instance
if TYPE_CHECKING:
import lale.operators
[docs]def to_graphviz(
lale_operator: "lale.operators.Operator",
ipython_display: bool = True,
call_depth: int = 1,
**dot_graph_attr,
):
import lale.json_operator
import lale.operators
import lale.visualize
if not isinstance(lale_operator, lale.operators.Operator):
raise TypeError("The input to to_graphviz needs to be a valid LALE operator.")
jsn = lale.json_operator.to_json(lale_operator, call_depth=call_depth + 1)
dot = lale.visualize.json_to_graphviz(jsn, ipython_display, dot_graph_attr)
return dot
[docs]def instantiate_from_hyperopt_search_space(obj_hyperparams, new_hyperparams):
if isinstance(new_hyperparams, dict) and LALE_NESTED_SPACE_KEY in new_hyperparams:
sub_params = new_hyperparams[LALE_NESTED_SPACE_KEY]
sub_op = obj_hyperparams
if isinstance(sub_op, list):
if len(sub_op) == 1:
sub_op = sub_op[0]
else:
step_index, step_params = list(sub_params)[0]
if step_index < len(sub_op):
sub_op = sub_op[step_index]
sub_params = step_params
return create_instance_from_hyperopt_search_space(sub_op, sub_params)
elif isinstance(new_hyperparams, (list, tuple)):
assert isinstance(obj_hyperparams, (list, tuple))
params_len = len(new_hyperparams)
assert params_len == len(obj_hyperparams)
res: Optional[List[Any]] = None
for i in range(params_len):
nhi = new_hyperparams[i]
ohi = obj_hyperparams[i]
updated_params = instantiate_from_hyperopt_search_space(ohi, nhi)
if updated_params is not None:
if res is None:
res = list(new_hyperparams)
res[i] = updated_params
if res is not None:
if isinstance(obj_hyperparams, tuple):
return tuple(res)
else:
return res
# workaround for what seems to be a hyperopt bug
# where hyperopt returns a tuple even though the
# hyperopt search space specifies a list
is_obj_tuple = isinstance(obj_hyperparams, tuple)
is_new_tuple = isinstance(new_hyperparams, tuple)
if is_obj_tuple != is_new_tuple:
if is_obj_tuple:
return tuple(new_hyperparams)
else:
return list(new_hyperparams)
return None
elif isinstance(new_hyperparams, dict):
assert isinstance(obj_hyperparams, dict)
for k, sub_params in new_hyperparams.items():
if k in obj_hyperparams:
sub_op = obj_hyperparams[k]
updated_params = instantiate_from_hyperopt_search_space(
sub_op, sub_params
)
if updated_params is not None:
new_hyperparams[k] = updated_params
return None
else:
return None
[docs]def create_instance_from_hyperopt_search_space(
lale_object, hyperparams
) -> "lale.operators.Operator":
"""
Hyperparams is a n-tuple of dictionaries of hyper-parameters, each
dictionary corresponds to an operator in the pipeline
"""
# lale_object can either be an individual operator, a pipeline or an operatorchoice
# Validate that the number of elements in the n-tuple is the same
# as the number of steps in the current pipeline
from lale.operators import (
BasePipeline,
OperatorChoice,
PlannedIndividualOp,
TrainableOperator,
TrainablePipeline,
)
if isinstance(lale_object, PlannedIndividualOp):
new_hyperparams: Dict[str, Any] = dict_without(hyperparams, "name")
hps = lale_object.hyperparams()
if hps:
obj_hyperparams = dict(hps)
else:
obj_hyperparams = {}
for k, sub_params in new_hyperparams.items():
if k in obj_hyperparams:
sub_op = obj_hyperparams[k]
updated_params = instantiate_from_hyperopt_search_space(
sub_op, sub_params
)
if updated_params is not None:
new_hyperparams[k] = updated_params
all_hyperparams = {**obj_hyperparams, **new_hyperparams}
return lale_object(**all_hyperparams)
elif isinstance(lale_object, BasePipeline):
steps = lale_object.steps_list()
if len(hyperparams) != len(steps):
raise ValueError(
"The number of steps in the hyper-parameter space does not match the number of steps in the pipeline."
)
op_instances = []
edges = lale_object.edges()
# op_map:Dict[PlannedOpType, TrainableOperator] = {}
op_map = {}
for op_index, sub_params in enumerate(hyperparams):
sub_op = steps[op_index]
op_instance = create_instance_from_hyperopt_search_space(sub_op, sub_params)
assert isinstance(op_instance, TrainableOperator)
assert (
isinstance(sub_op, OperatorChoice)
or sub_op.class_name() == op_instance.class_name()
), f"sub_op {sub_op.class_name()}, op_instance {op_instance.class_name()}"
op_instances.append(op_instance)
op_map[sub_op] = op_instance
# trainable_edges:List[Tuple[TrainableOperator, TrainableOperator]]
try:
trainable_edges = [(op_map[x], op_map[y]) for (x, y) in edges]
except KeyError as e:
raise ValueError(
"An edge was found with an endpoint that is not a step (" + str(e) + ")"
) from e
return TrainablePipeline(op_instances, trainable_edges, ordered=True) # type: ignore
elif isinstance(lale_object, OperatorChoice):
# Hyperopt search space for an OperatorChoice is generated as a dictionary with a single element
# corresponding to the choice made, the only key is the index of the step and the value is
# the params corresponding to that step.
step_index: int
choices = lale_object.steps_list()
if len(choices) == 1:
step_index = 0
else:
step_index_str, hyperparams = list(hyperparams.items())[0]
step_index = int(step_index_str)
step_object = choices[step_index]
return create_instance_from_hyperopt_search_space(step_object, hyperparams)
else:
assert False, f"Unknown operator type: {type(lale_object)}"
[docs]def find_lale_wrapper(sklearn_obj: Any) -> Optional[Any]:
"""
:param sklearn_obj: An sklearn compatible object that may have a lale wrapper
:return: The lale wrapper type, or None if one could not be found
"""
from .operator_wrapper import get_lale_wrapper_modules
module_names = get_lale_wrapper_modules()
class_name = sklearn_obj.__class__.__name__
for module_name in module_names:
try:
module = importlib.import_module(module_name)
except ModuleNotFoundError:
continue
try:
class_ = getattr(module, class_name)
return class_
except AttributeError:
continue
return None
def _import_from_sklearn_inplace_helper(
sklearn_obj, fitted: bool = True, is_nested=False
):
"""
This method take an object and tries to wrap sklearn objects
(at the top level or contained within hyperparameters of other
sklearn objects).
It will modify the object to add in the appropriate lale wrappers.
It may also return a wrapper or different object than given.
:param sklearn_obj: the object that we are going to try and wrap
:param fitted: should we return a TrainedOperator
:param is_hyperparams: is this a nested invocation (which allows for returning
a Trainable operator even if fitted is set to True)
"""
@overload
def import_nested_params(
orig_hyperparams: dict, partial_dict: bool
) -> Optional[dict]: ...
@overload
def import_nested_params(orig_hyperparams: Any, partial_dict: bool) -> Any: ...
def import_nested_params(orig_hyperparams: Any, partial_dict: bool = False):
"""
look through lists/tuples/dictionaries for sklearn compatible objects to import.
:param orig_hyperparams: the input to recursively look through for sklearn compatible objects
:param partial_dict: If this is True and the input is a dictionary, the returned dictionary will only have the
keys with modified values
:return: Either a modified version of the input or None if nothing was changed
"""
if isinstance(orig_hyperparams, (tuple, list)):
new_list: list = []
list_modified: bool = False
for e in orig_hyperparams:
new_e = import_nested_params(e, partial_dict=False)
if new_e is None:
new_list.append(e)
else:
new_list.append(new_e)
list_modified = True
if not list_modified:
return None
if isinstance(orig_hyperparams, tuple):
return tuple(new_list)
else:
return new_list
if isinstance(orig_hyperparams, dict):
new_dict: dict = {}
dict_modified: bool = False
for k, v in orig_hyperparams.items():
new_v = import_nested_params(v, partial_dict=False)
if new_v is None:
if not partial_dict:
new_dict[k] = v
else:
new_dict[k] = new_v
dict_modified = True
if not dict_modified:
return None
return new_dict
if isinstance(orig_hyperparams, object) and hasattr(
orig_hyperparams, "get_params"
):
newobj = _import_from_sklearn_inplace_helper(
orig_hyperparams, fitted=fitted, is_nested=True
) # allow nested_op to be trainable
if newobj is orig_hyperparams:
return None
return newobj
return None
if sklearn_obj is None:
return None
if isinstance(sklearn_obj, lale.operators.TrainedIndividualOp):
# if fitted=False, we may want to return a TrainedIndidivualOp
return sklearn_obj
# if the object is a trainable operator, we clean that up
if isinstance(sklearn_obj, lale.operators.TrainableIndividualOp) and hasattr(
sklearn_obj, "_trained"
):
if fitted:
# get rid of the indirection, and just return the trained operator directly
return sklearn_obj._trained
else:
# since we are not supposed to be trained, delete the trained part
delattr(sklearn_obj, "_trained") # delete _trained before returning
return sklearn_obj
if isinstance(sklearn_obj, lale.operators.Operator):
if (
fitted and is_nested or not hasattr(sklearn_obj._impl_instance(), "fit")
): # Operators such as NoOp do not have a fit, so return them as is.
return sklearn_obj
if fitted:
raise ValueError(
f"""The input pipeline has an operator {sklearn_obj} that is not trained and fitted is set to True,
please pass fitted=False if you want a trainable pipeline as output."""
)
# the lale operator is not trained and fitted=False
return sklearn_obj
# special case for FeatureUnion.
# An alternative would be to (like for sklearn pipeline)
# create a lale wrapper for the sklearn feature union
# as a higher order operator
# and then the special case would be just to throw away the outer wrapper
# Note that lale union does not currently support weights or other features of feature union.
if isinstance(sklearn_obj, sklearn.pipeline.FeatureUnion):
transformer_list = sklearn_obj.transformer_list
concat_predecessors = [
_import_from_sklearn_inplace_helper(
transformer[1], fitted=fitted, is_nested=is_nested
)
for transformer in transformer_list
]
return lale.operators.make_union(*concat_predecessors)
if not hasattr(sklearn_obj, "get_params"):
# if it does not have a get_params method,
# then we just return it without trying to wrap it
return sklearn_obj
class_ = find_lale_wrapper(sklearn_obj)
if not class_:
return sklearn_obj # Return the original object
# next, we need to figure out what the right hyperparameters are
orig_hyperparams = sklearn_obj.get_params(deep=False)
hyperparams = import_nested_params(orig_hyperparams, partial_dict=True)
if hyperparams:
# if we have updated any of the hyperparameters then we modify them in the actual sklearn object
try:
new_obj = sklearn_obj.set_params(**hyperparams)
if new_obj is not None:
sklearn_obj = new_obj
except NotImplementedError:
# if the set_params method does not work, then do our best
pass
all_new_hyperparams = {**orig_hyperparams, **hyperparams}
else:
all_new_hyperparams = orig_hyperparams
# now, we get the lale operator for the wrapper, with the corresponding hyperparameters
if not fitted: # If fitted is False, we do not want to return a Trained operator.
lale_op_obj_base = class_
else:
lale_op_obj_base = lale.operators.TrainedIndividualOp(
class_._name,
class_._impl,
class_._schemas,
None,
_lale_trained=True,
)
lale_op_obj = lale_op_obj_base(**all_new_hyperparams)
from lale.lib.sklearn import Pipeline as LaleSKPipelineWrapper
# If this is a scklearn pipeline, then we want to discard the outer wrapper
# and just return a lale pipeline
if isinstance(lale_op_obj, LaleSKPipelineWrapper): # type: ignore
return lale_op_obj.shallow_impl._pipeline
# at this point, the object's hyper-parameters are modified as needed
# and our wrapper is initialized with the correct hyperparameters.
# Now we need to replace the wrapper impl with our (possibly modified)
# sklearn object
cl_shallow_impl = lale_op_obj.shallow_impl
if hasattr(cl_shallow_impl, "_wrapped_model"):
cl_shallow_impl._wrapped_model = sklearn_obj
else:
lale_op_obj._impl = sklearn_obj
lale_op_obj._impl_class_ = sklearn_obj.__class__
return lale_op_obj
[docs]def import_from_sklearn(sklearn_obj: Any, fitted: bool = True, in_place: bool = False):
"""
This method take an object and tries to wrap sklearn objects
(at the top level or contained within hyperparameters of other
sklearn objects).
It will modify the object to add in the appropriate lale wrappers.
It may also return a wrapper or different object than given.
:param sklearn_obj: the object that we are going to try and wrap
:param fitted: should we return a TrainedOperator
:param in_place: should we try to mutate what we can in place, or should we
aggressively deepcopy everything
:return: The wrapped object (or the input object if we could not wrap it)
"""
obj = sklearn_obj
if in_place:
obj = sklearn_obj
else:
obj = copy.deepcopy(sklearn_obj)
return _import_from_sklearn_inplace_helper(obj, fitted=fitted, is_nested=False)
[docs]def import_from_sklearn_pipeline(sklearn_pipeline: Any, fitted: bool = True):
"""
Note: Same as import_from_sklearn. This alternative name exists for backwards compatibility.
This method take an object and tries to wrap sklearn objects
(at the top level or contained within hyperparameters of other
sklearn objects).
It will modify the object to add in the appropriate lale wrappers.
It may also return a wrapper or different object than given.
:param sklearn_pipeline: the object that we are going to try and wrap
:param fitted: should we return a TrainedOperator
:return: The wrapped object (or the input object if we could not wrap it)
"""
op = import_from_sklearn(sklearn_pipeline, fitted=fitted, in_place=False)
from typing import cast
from lale.operators import TrainableOperator
# simplify using the returned value in the common case
return cast(TrainableOperator, op)
[docs]class val_wrapper:
"""This is used to wrap values that cause problems for hyper-optimizer backends
lale will unwrap these when given them as the value of a hyper-parameter"""
def __init__(self, base):
self._base = base
[docs] def unwrap_self(self):
return self._base
[docs] @classmethod
def unwrap(cls, obj):
if isinstance(obj, cls):
return cls.unwrap(obj.unwrap_self())
else:
return obj
[docs]def append_batch(data, batch_data):
if data is None:
return batch_data
elif isinstance(data, np.ndarray):
if isinstance(batch_data, np.ndarray):
if len(data.shape) == 1 and len(batch_data.shape) == 1:
return np.concatenate([data, batch_data])
else:
return np.vstack((data, batch_data))
elif isinstance(data, tuple):
X, y = data
if isinstance(batch_data, tuple):
batch_X, batch_y = batch_data
X = append_batch(X, batch_X)
y = append_batch(y, batch_y)
return X, y
elif torch_installed and isinstance(data, torch.Tensor):
if isinstance(batch_data, torch.Tensor):
return torch.cat((data, batch_data))
elif isinstance(data, (pd.Series, pd.DataFrame)):
return pd.concat([data, batch_data], axis=0)
try:
import h5py
if isinstance(data, h5py.File):
if isinstance(batch_data, tuple):
batch_X, batch_y = batch_data
except ModuleNotFoundError:
pass
raise ValueError(
f"{type(data)} is unsupported. Supported types are np.ndarray, torch.Tensor and h5py file"
)
[docs]def create_data_loader(
X: Any,
y: Any = None,
batch_size: int = 1,
num_workers: int = 0,
shuffle: bool = True,
):
"""A function that takes a dataset as input and outputs a Pytorch dataloader.
Parameters
----------
X : Input data.
The formats supported are Pandas DataFrame, Numpy array,
a sparse matrix, torch.tensor, torch.utils.data.Dataset, path to a HDF5 file,
lale.util.batch_data_dictionary_dataset.BatchDataDict,
a Python dictionary of the format `{"dataset": torch.utils.data.Dataset,
"collate_fn":collate_fn for torch.utils.data.DataLoader}`
y : Labels., optional
Supported formats are Numpy array or Pandas series, by default None
batch_size : int, optional
Number of samples in each batch, by default 1
num_workers : int, optional
Number of workers used by the data loader, by default 0
shuffle: boolean, optional, default True
Whether to use SequentialSampler or RandomSampler for creating batches
Returns
-------
torch.utils.data.DataLoader
Raises
------
TypeError
Raises a TypeError if the input format is not supported.
"""
from torch.utils.data import DataLoader, Dataset, TensorDataset
from lale.util.batch_data_dictionary_dataset import BatchDataDict
from lale.util.hdf5_to_torch_dataset import HDF5TorchDataset
from lale.util.numpy_torch_dataset import NumpyTorchDataset, numpy_collate_fn
from lale.util.pandas_torch_dataset import PandasTorchDataset, pandas_collate_fn
collate_fn = None
worker_init_fn = None
if isinstance(X, Dataset) and not isinstance(X, BatchDataDict):
dataset = X
elif isinstance(X, pd.DataFrame):
dataset = PandasTorchDataset(X, y)
collate_fn = pandas_collate_fn
elif isinstance(X, scipy.sparse.csr_matrix):
# unfortunately, NumpyTorchDataset won't accept a subclass of np.ndarray
X = X.toarray() # type: ignore
if isinstance(y, lale.datasets.data_schemas.NDArrayWithSchema):
y = y.view(np.ndarray)
dataset = NumpyTorchDataset(X, y)
collate_fn = numpy_collate_fn
elif isinstance(X, np.ndarray):
# unfortunately, NumpyTorchDataset won't accept a subclass of np.ndarray
if isinstance(X, lale.datasets.data_schemas.NDArrayWithSchema):
X = X.view(np.ndarray)
if isinstance(y, lale.datasets.data_schemas.NDArrayWithSchema):
y = y.view(np.ndarray)
dataset = NumpyTorchDataset(X, y)
collate_fn = numpy_collate_fn
elif isinstance(X, str): # Assume that this is path to hdf5 file
dataset = HDF5TorchDataset(X)
elif isinstance(X, BatchDataDict):
dataset = X
def my_collate_fn(batch):
return batch[
0
] # because BatchDataDict's get_item returns a batch, so no collate is required.
return DataLoader(
dataset, batch_size=1, collate_fn=my_collate_fn, shuffle=shuffle
)
elif isinstance(X, dict): # Assumed that it is data indexed by batch number
if "dataset" in X:
dataset = X["dataset"]
collate_fn = X.get("collate_fn", None)
worker_init_fn = getattr(dataset, "worker_init_fn", None)
else:
return [X]
elif isinstance(X, torch.Tensor) and y is not None:
if isinstance(y, np.ndarray):
y = torch.from_numpy(y)
dataset = TensorDataset(X, y)
elif isinstance(X, torch.Tensor):
dataset = TensorDataset(X)
else:
raise TypeError(
f"Can not create a data loader for a dataset with type {type(X)}"
)
return DataLoader(
dataset,
batch_size=batch_size,
collate_fn=collate_fn,
num_workers=num_workers,
worker_init_fn=worker_init_fn,
shuffle=shuffle,
)
[docs]def write_batch_output_to_file(
file_obj,
file_path,
total_len,
batch_idx,
batch_X,
batch_y,
batch_out_X,
batch_out_y,
):
if file_obj is None and file_path is None:
raise ValueError("Only one of the file object or file path can be None.")
if file_obj is None:
import h5py
file_obj = h5py.File(file_path, "w")
# estimate the size of the dataset based on the first batch output size
transform_ratio = int(len(batch_out_X) / len(batch_X))
if len(batch_out_X.shape) == 1:
h5_data_shape = (transform_ratio * total_len,)
elif len(batch_out_X.shape) == 2:
h5_data_shape = (transform_ratio * total_len, batch_out_X.shape[1])
elif len(batch_out_X.shape) == 3:
h5_data_shape = (
transform_ratio * total_len,
batch_out_X.shape[1],
batch_out_X.shape[2],
)
else:
raise ValueError(
"batch_out_X is expected to be a 1-d, 2-d or 3-d array. Any other data types are not handled."
)
dataset = file_obj.create_dataset(
name="X", shape=h5_data_shape, chunks=True, compression="gzip"
)
if batch_out_y is None and batch_y is not None:
batch_out_y = batch_y
if batch_out_y is not None:
if len(batch_out_y.shape) == 1:
h5_labels_shape = (transform_ratio * total_len,)
elif len(batch_out_y.shape) == 2:
h5_labels_shape = (transform_ratio * total_len, batch_out_y.shape[1])
else:
raise ValueError(
"batch_out_y is expected to be a 1-d or 2-d array. Any other data types are not handled."
)
dataset = file_obj.create_dataset(
name="y", shape=h5_labels_shape, chunks=True, compression="gzip"
)
dataset = file_obj["X"]
dataset[batch_idx * len(batch_out_X) : (batch_idx + 1) * len(batch_out_X)] = (
batch_out_X
)
if batch_out_y is not None or batch_y is not None:
labels = file_obj["y"]
if batch_out_y is not None:
labels[
batch_idx * len(batch_out_y) : (batch_idx + 1) * len(batch_out_y)
] = batch_out_y
else:
labels[batch_idx * len(batch_y) : (batch_idx + 1) * len(batch_y)] = batch_y
return file_obj
[docs]def add_missing_values(orig_X, missing_rate=0.1, seed=None):
# see scikit-learn.org/stable/auto_examples/impute/plot_missing_values.html
n_samples, n_features = orig_X.shape
n_missing_samples = int(n_samples * missing_rate)
if seed is None:
rng = np.random.RandomState()
else:
rng = np.random.RandomState(seed)
missing_samples = np.zeros(n_samples, dtype=bool)
missing_samples[:n_missing_samples] = True
rng.shuffle(missing_samples)
missing_features = rng.randint(0, n_features, n_missing_samples)
missing_X = orig_X.copy()
if isinstance(missing_X, np.ndarray):
missing_X[missing_samples, missing_features] = np.nan
else:
assert isinstance(missing_X, pd.DataFrame)
i_missing_sample = 0
for i_sample in range(n_samples):
if missing_samples[i_sample]:
i_feature = missing_features[i_missing_sample]
i_missing_sample += 1
missing_X.iloc[i_sample, i_feature] = np.nan
return missing_X
# helpers for manipulating (extended) sklearn style paths.
# documentation of the path format is part of the operators module docstring
[docs]def partition_sklearn_params(
d: Dict[str, Any]
) -> Tuple[Dict[str, Any], Dict[str, Dict[str, Any]]]:
sub_parts: Dict[str, Dict[str, Any]] = {}
main_parts: Dict[str, Any] = {}
for k, v in d.items():
ks = k.split("__", 1)
if len(ks) == 1:
assert k not in main_parts
main_parts[k] = v
else:
assert len(ks) == 2
bucket: Dict[str, Any] = {}
group: str = ks[0]
param: str = ks[1]
if group in sub_parts:
bucket = sub_parts[group]
else:
sub_parts[group] = bucket
assert param not in bucket
bucket[param] = v
return (main_parts, sub_parts)
[docs]def partition_sklearn_choice_params(d: Dict[str, Any]) -> Tuple[int, Dict[str, Any]]:
discriminant_value: int = -1
choice_parts: Dict[str, Any] = {}
for k, v in d.items():
if k == discriminant_name:
assert discriminant_value == -1
discriminant_value = int(v)
else:
k_rest = unnest_choice(k)
choice_parts[k_rest] = v
assert discriminant_value != -1
return (discriminant_value, choice_parts)
DUMMY_SEARCH_SPACE_GRID_PARAM_NAME: str = "$"
discriminant_name: str = "?"
choice_prefix: str = "?"
structure_type_name: str = "#"
structure_type_list: str = "list"
structure_type_tuple: str = "tuple"
structure_type_dict: str = "dict"
[docs]def get_name_and_index(name: str) -> Tuple[str, int]:
"""given a name of the form "name@i", returns (name, i)
if given a name of the form "name", returns (name, 0)
"""
splits = name.split("@", 1)
if len(splits) == 1:
return splits[0], 0
else:
return splits[0], int(splits[1])
[docs]def make_degen_indexed_name(name, index):
return f"{name}@{index}"
[docs]def make_indexed_name(name, index):
if index == 0:
return name
else:
return f"{name}@{index}"
[docs]def make_array_index_name(index, is_tuple: bool = False):
sep = "##" if is_tuple else "#"
return f"{sep}{str(index)}"
[docs]def is_numeric_structure(structure_type: str):
if structure_type in ["list", "tuple"]:
return True
elif structure_type == "dict":
return False
else:
assert False, f"Unknown structure type {structure_type} found"
V = TypeVar("V")
[docs]def nest_HPparam(name: str, key: str):
if key == DUMMY_SEARCH_SPACE_GRID_PARAM_NAME:
# we can get rid of the dummy now, since we have a name for it
return name
return name + "__" + key
[docs]def nest_HPparams(name: str, grid: Mapping[str, V]) -> Dict[str, V]:
return {(nest_HPparam(name, k)): v for k, v in grid.items()}
[docs]def nest_all_HPparams(
name: str, grids: Iterable[Mapping[str, V]]
) -> List[Dict[str, V]]:
"""Given the name of an operator in a pipeline, this transforms every key(parameter name) in the grids
to use the operator name as a prefix (separated by __). This is the convention in scikit-learn pipelines.
"""
return [nest_HPparams(name, grid) for grid in grids]
[docs]def nest_choice_HPparam(key: str):
return choice_prefix + key
[docs]def nest_choice_HPparams(grid: Mapping[str, V]) -> Dict[str, V]:
return {(nest_choice_HPparam(k)): v for k, v in grid.items()}
[docs]def nest_choice_all_HPparams(grids: Iterable[Mapping[str, V]]) -> List[Dict[str, V]]:
"""this transforms every key(parameter name) in the grids
to be nested under a choice, using a ? as a prefix (separated by __). This is the convention in scikit-learn pipelines.
"""
return [nest_choice_HPparams(grid) for grid in grids]
[docs]def unnest_choice(k: str) -> str:
assert k.startswith(choice_prefix)
return k[len(choice_prefix) :]
[docs]def unnest_HPparams(k: str) -> List[str]:
return k.split("__")
[docs]def are_hyperparameters_equal(hyperparam1, hyperparam2):
if isinstance(
hyperparam1, np.ndarray
): # hyperparam2 is from schema default, so it may not always be an array
return np.all(hyperparam1 == hyperparam2)
else:
return hyperparam1 == hyperparam2
def _is_ast_subscript(expr):
return isinstance(expr, ast.Subscript)
def _is_ast_attribute(expr):
return isinstance(expr, ast.Attribute)
def _is_ast_constant(expr):
return isinstance(expr, ast.Constant)
def _is_ast_subs_or_attr(expr):
return isinstance(expr, (ast.Subscript, ast.Attribute))
def _is_ast_call(expr):
return isinstance(expr, ast.Call)
def _is_ast_name(expr):
return isinstance(expr, ast.Name)
def _ast_func_id(expr):
if isinstance(expr, ast.Name):
return expr.id
else:
raise ValueError("function name expected")
def _is_df(df):
return _is_pandas_df(df) or _is_spark_df(df)
def _is_pandas_series(df):
return isinstance(df, pd.Series)
def _is_pandas_df(df):
return isinstance(df, pd.DataFrame)
def _is_pandas(df):
return isinstance(df, (pd.Series, pd.DataFrame))
def _is_spark_df(df):
if spark_installed:
return isinstance(df, lale.datasets.data_schemas.SparkDataFrameWithIndex)
else:
return False
def _is_spark_df_without_index(df):
if spark_installed:
return isinstance(df, spark_df) and not _is_spark_df(df)
else:
return False
def _ensure_pandas(df) -> pd.DataFrame:
if _is_spark_df(df):
return df.toPandas()
assert _is_pandas(df), type(df)
return df
def _get_subscript_value(subscript_expr):
if isinstance(subscript_expr.slice, ast.Constant): # for Python 3.9
subscript_value = subscript_expr.slice.value
else:
subscript_value = subscript_expr.slice.value.s # type: ignore
return subscript_value
[docs]class GenSym:
def __init__(self, names: Set[str]):
self._names = names
def __call__(self, prefix):
if prefix in self._names:
suffix = 0
while True:
result = f"{prefix}_{suffix}"
if result not in self._names:
break
suffix += 1
else:
result = prefix
self._names |= {result}
return result
[docs]def get_sklearn_estimator_name() -> str:
"""Some higher order sklearn operators changed the name of the nested estimatator in later versions.
This returns the appropriate version dependent paramater name
"""
from packaging import version
import lale.operators
if lale.operators.sklearn_version < version.Version("1.2"):
return "base_estimator"
else:
return "estimator"
[docs]def with_fixed_estimator_name(**kwargs):
"""Some higher order sklearn operators changed the name of the nested estimator in later versions.
This fixes up the arguments, renaming estimator and base_estimator appropriately.
"""
if "base_estimator" in kwargs or "estimator" in kwargs:
from packaging import version
import lale.operators
if lale.operators.sklearn_version < version.Version("1.2"):
return {
"base_estimator" if k == "estimator" else k: v
for k, v in kwargs.items()
}
else:
return {
"estimator" if k == "base_estimator" else k: v
for k, v in kwargs.items()
}
return kwargs
[docs]def get_estimator_param_name_from_hyperparams(hyperparams):
be = hyperparams.get("base_estimator", "deprecated")
if be == "deprecated" or (be is None and "estimator" in hyperparams):
return "estimator"
else:
return "base_estimator"