Source code for lale.lib.rasl.metrics

# Copyright 2022 IBM Corporation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import functools
from abc import abstractmethod
from typing import Dict, Iterable, Optional, Protocol, Tuple, TypeVar, Union, cast

import numpy as np
import pandas as pd
from typing_extensions import TypeAlias

from lale.expressions import astype, count, it
from lale.expressions import sum as lale_sum
from lale.helpers import spark_installed
from lale.operators import TrainedOperator

from .aggregate import Aggregate
from .group_by import GroupBy
from .map import Map
from .monoid import Monoid, MonoidFactory

MetricMonoid = Monoid

_M = TypeVar("_M", bound=MetricMonoid)

_PandasBatch: TypeAlias = Tuple[pd.DataFrame, pd.Series]

if spark_installed:
    from pyspark.sql.dataframe import DataFrame as SparkDataFrame

    _SparkBatch: TypeAlias = Tuple[SparkDataFrame, SparkDataFrame]

    _Batch_XyAux = Union[_PandasBatch, _SparkBatch]

    _Batch_yyXAux = Tuple[
        Union[pd.Series, np.ndarray, SparkDataFrame],
        Union[pd.Series, np.ndarray, SparkDataFrame],
        Union[pd.DataFrame, SparkDataFrame],
    ]

else:
    _Batch_XyAux = _PandasBatch  # type: ignore

    _Batch_yyXAux = Tuple[  # type: ignore
        Union[pd.Series, np.ndarray], Union[pd.Series, np.ndarray], pd.DataFrame
    ]

# pyright does not currently accept a TypeAlias with conditional definitions
_Batch_Xy: TypeAlias = _Batch_XyAux  # type: ignore
_Batch_yyX: TypeAlias = _Batch_yyXAux  # type: ignore


[docs]class MetricMonoidFactory(MonoidFactory[_Batch_yyX, float, _M], Protocol): """Abstract base class for factories that create metrics with an associative monoid interface."""
[docs] @abstractmethod def to_monoid(self, batch: _Batch_yyX) -> _M: pass
[docs] @abstractmethod def score_data( self, y_true: pd.Series, y_pred: pd.Series, X: Optional[pd.DataFrame] = None ) -> float: pass # keeping this abstract to allow inheriting non-batched version
[docs] @abstractmethod def score_estimator( self, estimator: TrainedOperator, X: pd.DataFrame, y: pd.Series ) -> float: pass # keeping this abstract to allow inheriting non-batched version
def __call__( self, estimator: TrainedOperator, X: pd.DataFrame, y: pd.Series ) -> float: return self.score_estimator(estimator, X, y)
[docs] def score_data_batched(self, batches: Iterable[_Batch_yyX]) -> float: lifted_batches = (self.to_monoid(b) for b in batches) combined = functools.reduce(lambda a, b: a.combine(b), lifted_batches) return self.from_monoid(combined)
[docs] def score_estimator_batched( self, estimator: TrainedOperator, batches: Iterable[_Batch_Xy] ) -> float: predicted_batches = ((y, estimator.predict(X), X) for X, y in batches) return self.score_data_batched(predicted_batches)
class _MetricMonoidMixin(MetricMonoidFactory[_M], Protocol): # pylint:disable=abstract-method # This is an abstract class as well def score_data( self, y_true: pd.Series, y_pred: pd.Series, X: Optional[pd.DataFrame] = None ) -> float: return self.from_monoid(self.to_monoid((y_true, y_pred, X))) def score_estimator( self, estimator: TrainedOperator, X: pd.DataFrame, y: pd.Series ) -> float: return self.score_data(y_true=y, y_pred=estimator.predict(X), X=X) def _make_dataframe_yy(batch): def make_series_y(y): if isinstance(y, np.ndarray): series = pd.Series(y) elif isinstance(y, pd.DataFrame): series = y.squeeze() elif spark_installed and isinstance(y, SparkDataFrame): series = cast(pd.DataFrame, y.toPandas()).squeeze() else: series = y assert isinstance(series, pd.Series), type(series) return series.reset_index(drop=True) y_true, y_pred, _ = batch result = pd.DataFrame( {"y_true": make_series_y(y_true), "y_pred": make_series_y(y_pred)}, ) return result class _AccuracyData(MetricMonoid): def __init__(self, match: int, total: int): self.match = match self.total = total def combine(self, other: "_AccuracyData") -> "_AccuracyData": return _AccuracyData(self.match + other.match, self.total + other.total) class _Accuracy(_MetricMonoidMixin[_AccuracyData]): def __init__(self): self._pipeline = Map( columns={"match": astype("int", it.y_true == it.y_pred)} ) >> Aggregate(columns={"match": lale_sum(it.match), "total": count(it.match)}) def to_monoid(self, batch: _Batch_yyX) -> _AccuracyData: input_df = _make_dataframe_yy(batch) agg_df = self._pipeline.transform(input_df) return _AccuracyData(match=agg_df.at[0, "match"], total=agg_df.at[0, "total"]) def from_monoid(self, monoid: _AccuracyData) -> float: return float(monoid.match / np.float64(monoid.total))
[docs]def accuracy_score(y_true: pd.Series, y_pred: pd.Series) -> float: """Replacement for sklearn's `accuracy_score`_ function. .. _`accuracy_score`: https://scikit-learn.org/stable/modules/generated/sklearn.metrics.accuracy_score.html """ return get_scorer("accuracy").score_data(y_true, y_pred)
class _BalancedAccuracyData(MetricMonoid): def __init__(self, true_pos: Dict[str, int], false_neg: Dict[str, int]): self.true_pos = true_pos self.false_neg = false_neg def combine(self, other: "_BalancedAccuracyData") -> "_BalancedAccuracyData": keys = set(self.true_pos.keys()) | set(other.true_pos.keys()) return _BalancedAccuracyData( {k: self.true_pos.get(k, 0) + other.true_pos.get(k, 0) for k in keys}, {k: self.false_neg.get(k, 0) + other.false_neg.get(k, 0) for k in keys}, ) class _BalancedAccuracy(_MetricMonoidMixin[_BalancedAccuracyData]): def __init__(self): self._pipeline = ( Map( columns={ "y_true": it.y_true, "true_pos": astype("int", (it.y_pred == it.y_true)), "false_neg": astype("int", (it.y_pred != it.y_true)), } ) >> GroupBy(by=[it.y_true]) >> Aggregate( columns={ "true_pos": lale_sum(it.true_pos), "false_neg": lale_sum(it.false_neg), } ) ) def to_monoid(self, batch: _Batch_yyX) -> _BalancedAccuracyData: input_df = _make_dataframe_yy(batch) agg_df = self._pipeline.transform(input_df) return _BalancedAccuracyData( true_pos={k: agg_df.at[k, "true_pos"] for k in agg_df.index}, false_neg={k: agg_df.at[k, "false_neg"] for k in agg_df.index}, ) def from_monoid(self, monoid: _BalancedAccuracyData) -> float: recalls = { k: monoid.true_pos[k] / (monoid.true_pos[k] + monoid.false_neg[k]) for k in monoid.true_pos } result = sum(recalls.values()) / len(recalls) return float(result)
[docs]def balanced_accuracy_score(y_true: pd.Series, y_pred: pd.Series) -> float: """Replacement for sklearn's `balanced_accuracy_score`_ function. .. _`balanced_accuracy_score`: https://scikit-learn.org/stable/modules/generated/sklearn.metrics.balanced_accuracy_score.html """ return get_scorer("balanced_accuracy").score_data(y_true, y_pred)
class _F1Data(MetricMonoid): def __init__(self, true_pos: int, false_pos: int, false_neg: int): self.true_pos = true_pos self.false_pos = false_pos self.false_neg = false_neg def combine(self, other: "_F1Data") -> "_F1Data": return _F1Data( self.true_pos + other.true_pos, self.false_pos + other.false_pos, self.false_neg + other.false_neg, ) class _F1(_MetricMonoidMixin[_F1Data]): def __init__(self, pos_label: Union[int, float, str] = 1): self._pipeline = Map( columns={ "true_pos": astype( "int", (it.y_pred == pos_label) & (it.y_true == pos_label) ), "false_pos": astype( "int", (it.y_pred == pos_label) & (it.y_true != pos_label) ), "false_neg": astype( "int", (it.y_pred != pos_label) & (it.y_true == pos_label) ), } ) >> Aggregate( columns={ "true_pos": lale_sum(it.true_pos), "false_pos": lale_sum(it.false_pos), "false_neg": lale_sum(it.false_neg), } ) def to_monoid(self, batch: _Batch_yyX) -> _F1Data: input_df = _make_dataframe_yy(batch) agg_df = self._pipeline.transform(input_df) return _F1Data( true_pos=agg_df.at[0, "true_pos"], false_pos=agg_df.at[0, "false_pos"], false_neg=agg_df.at[0, "false_neg"], ) def from_monoid(self, monoid: _F1Data) -> float: two_tp = monoid.true_pos + monoid.true_pos result = two_tp / (two_tp + monoid.false_pos + monoid.false_neg) return float(result)
[docs]def f1_score( y_true: pd.Series, y_pred: pd.Series, pos_label: Union[int, float, str] = 1 ) -> float: """Replacement for sklearn's `f1_score`_ function. .. _`f1_score`: https://scikit-learn.org/stable/modules/generated/sklearn.metrics.f1_score.html """ return get_scorer("f1", pos_label=pos_label).score_data(y_true, y_pred)
class _R2Data(MetricMonoid): def __init__(self, n: int, tot_sum: float, tot_sum_sq: float, res_sum_sq: float): self.n = n self.sum = tot_sum self.sum_sq = tot_sum_sq self.res_sum_sq = res_sum_sq def combine(self, other: "_R2Data") -> "_R2Data": return _R2Data( n=self.n + other.n, tot_sum=self.sum + other.sum, tot_sum_sq=self.sum_sq + other.sum_sq, res_sum_sq=self.res_sum_sq + other.res_sum_sq, ) class _R2(_MetricMonoidMixin[_R2Data]): # https://en.wikipedia.org/wiki/Coefficient_of_determination def __init__(self): self._pipeline = Map( columns={ "y": it.y_true, # observed values "f": it.y_pred, # predicted values "y2": it.y_true * it.y_true, # squares "e2": (it.y_true - it.y_pred) * (it.y_true - it.y_pred), } ) >> Aggregate( columns={ "n": count(it.y), "sum": lale_sum(it.y), "sum_sq": lale_sum(it.y2), "res_sum_sq": lale_sum(it.e2), # residual sum of squares } ) def to_monoid(self, batch: _Batch_yyX) -> _R2Data: input_df = _make_dataframe_yy(batch) agg_df = self._pipeline.transform(input_df) return _R2Data( n=agg_df.at[0, "n"], tot_sum=agg_df.at[0, "sum"], tot_sum_sq=agg_df.at[0, "sum_sq"], res_sum_sq=agg_df.at[0, "res_sum_sq"], ) def from_monoid(self, monoid: _R2Data) -> float: ss_tot = monoid.sum_sq - (monoid.sum * monoid.sum / np.float64(monoid.n)) return 1 - float(monoid.res_sum_sq / ss_tot)
[docs]def r2_score(y_true: pd.Series, y_pred: pd.Series) -> float: """Replacement for sklearn's `r2_score`_ function. .. _`r2_score`: https://scikit-learn.org/stable/modules/generated/sklearn.metrics.r2_score.html """ return get_scorer("r2").score_data(y_true, y_pred)
_scorer_cache: Dict[str, Optional[MetricMonoidFactory]] = { "accuracy": None, "balanced_accuracy": None, "r2": None, }
[docs]def get_scorer(scoring: str, **kwargs) -> MetricMonoidFactory: """Replacement for sklearn's `get_scorer`_ function. .. _`get_scorer`: https://scikit-learn.org/stable/modules/generated/sklearn.metrics.get_scorer.html """ if scoring == "f1": return _F1(**kwargs) assert scoring in _scorer_cache, scoring if _scorer_cache[scoring] is None: if scoring == "accuracy": _scorer_cache[scoring] = _Accuracy() elif scoring == "balanced_accuracy": _scorer_cache[scoring] = _BalancedAccuracy() elif scoring == "r2": _scorer_cache[scoring] = _R2() result = _scorer_cache[scoring] assert result is not None return result