# Copyright 2019 IBM Corporation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Any, List, Optional, Tuple, Type, Union
import numpy as np
from numpy import issubdtype, ndarray
from pandas import DataFrame, Series
from pandas.core.groupby import DataFrameGroupBy, SeriesGroupBy
from scipy.sparse import csr_matrix
import lale.type_checking
from lale.helpers import _is_spark_df
from lale.type_checking import JSON_TYPE
try:
import torch
from torch import Tensor
torch_installed = True
except ImportError:
torch_installed = False
try:
import py4j.protocol
from pyspark.sql import DataFrame as SparkDataFrame
from pyspark.sql import GroupedData as SparkGroupedData
spark_installed = True
except ImportError:
spark_installed = False
# See instructions for subclassing numpy ndarray:
# https://docs.scipy.org/doc/numpy/user/basics.subclassing.html
[docs]class NDArrayWithSchema(ndarray):
def __new__(
cls,
shape,
dtype=float,
buffer=None,
offset=0,
strides=None,
order=None,
json_schema=None,
table_name=None,
):
result = super( # pylint:disable=too-many-function-args
NDArrayWithSchema, cls
).__new__(
cls, shape, dtype, buffer, offset, strides, order # type: ignore
)
result.json_schema = json_schema
result.table_name = table_name
return result
def __array_finalize__(self, obj):
if obj is None:
return
self.json_schema = getattr(obj, "json_schema", None)
self.table_name = getattr(obj, "table_name", None)
# See instructions for subclassing pandas DataFrame:
# https://pandas.pydata.org/pandas-docs/stable/development/extending.html#extending-subclassing-pandas
[docs]class DataFrameWithSchema(DataFrame):
_internal_names = DataFrame._internal_names + ["json_schema", "table_name"]
_internal_names_set = set(_internal_names)
@property
def _constructor(self):
return DataFrameWithSchema
[docs]class SeriesWithSchema(Series):
_internal_names = DataFrame._internal_names + [
"json_schema",
"table_name",
"folds_for_monoid",
]
_internal_names_set = set(_internal_names)
@property
def _constructor(self):
return SeriesWithSchema
if spark_installed:
def _gen_index_name(df, cpt=None):
name = f"index{cpt if cpt is not None else ''}"
if name in df.columns:
return _gen_index_name(df, cpt=cpt + 1 if cpt is not None else 0)
else:
return name
class SparkDataFrameWithIndex(SparkDataFrame): # type: ignore
def __init__(self, df, index_names=None):
if index_names is not None and len(index_names) == 1:
index_name = index_names[0]
elif index_names is None or len(index_names) == 0:
index_name = _gen_index_name(df)
index_names = [index_name]
else:
index_name = None
if index_name is not None and index_name not in df.columns:
df_with_index = (
df.rdd.zipWithIndex()
.map(lambda row: row[0] + (row[1],))
.toDF(df.columns + [index_name])
)
else:
df_with_index = df
table_name = get_table_name(df)
if table_name is not None:
df_with_index = df_with_index.alias(table_name)
super().__init__(df_with_index._jdf, df_with_index.sql_ctx)
self.index_name = index_name
self.index_names = index_names
for f in df.schema.fieldNames():
self.schema[f].metadata = df.schema[f].metadata
def drop_indexes(self):
result = self.drop(*self.index_names)
result = add_table_name(result, get_table_name(self))
return result
@property
def columns_without_indexes(self):
cols = list(self.columns)
for name in self.index_names:
cols.remove(name)
return cols
def toPandas(self, *args, **kwargs):
df = super().toPandas(*args, **kwargs)
return df.set_index(self.index_names)
else:
[docs] class SparkDataFrameWithIndex: # type: ignore
def __init__(self, df, index_names=None) -> None:
raise ValueError("pyspark is not installed") # type: ignore
@property
def index_name(self) -> Union[str, None]:
raise ValueError("pyspark is not installed") # type: ignore
@property
def index_names(self) -> List[str]:
raise ValueError("pyspark is not installed") # type: ignore
[docs] def toPandas(self, *args, **kwargs) -> DataFrame:
raise ValueError("pyspark is not installed") # type: ignore
@property
def schema(self) -> Any:
raise ValueError("pyspark is not installed") # type: ignore
[docs]def add_schema(obj, schema=None, raise_on_failure=False, recalc=False) -> Any:
from lale.settings import disable_data_schema_validation
if disable_data_schema_validation:
return obj
if obj is None:
return None
if isinstance(obj, NDArrayWithSchema):
result = obj
elif isinstance(obj, ndarray):
result = obj.view(NDArrayWithSchema)
elif isinstance(obj, SeriesWithSchema):
result = obj
elif isinstance(obj, Series):
result = SeriesWithSchema(obj)
elif isinstance(obj, DataFrameWithSchema):
result = obj
elif isinstance(obj, DataFrame):
result = DataFrameWithSchema(obj)
elif is_list_tensor(obj):
obj = np.array(obj)
result = obj.view(NDArrayWithSchema)
elif raise_on_failure:
raise ValueError(f"unexpected type(obj) {type(obj)}")
else:
return obj
if recalc:
setattr(result, "json_schema", None)
if getattr(result, "json_schema", None) is None:
if schema is None:
setattr(result, "json_schema", to_schema(obj))
else:
lale.type_checking.validate_is_schema(schema)
setattr(result, "json_schema", schema)
return result
[docs]def add_schema_adjusting_n_rows(obj, schema):
assert isinstance(obj, (ndarray, DataFrame, Series)), type(obj)
assert schema.get("type", None) == "array", schema
n_rows = obj.shape[0]
mod_schema = {**schema, "minItems": n_rows, "maxItems": n_rows}
result = add_schema(obj, mod_schema)
return result
[docs]def add_table_name(obj, name) -> Any:
if obj is None:
return None
if name is None:
return obj
if spark_installed and isinstance(obj, SparkDataFrame):
# alias method documentation: https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.DataFrame.alias.html
# Python class DataFrame with method alias(self, alias): https://github.com/apache/spark/blob/master/python/pyspark/sql/dataframe.py
# Scala type DataFrame: https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/package.scala
# Scala class DataSet with method as(alias: String): https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
o = obj.alias(name)
for f in obj.schema.fieldNames():
o.schema[f].metadata = obj.schema[f].metadata
if isinstance(obj, SparkDataFrameWithIndex):
o = SparkDataFrameWithIndex(o, obj.index_names)
return o
if isinstance(obj, NDArrayWithSchema):
result = obj.view(NDArrayWithSchema)
if hasattr(obj, "json_schema"):
result.json_schema = obj.json_schema
elif isinstance(obj, ndarray):
result = obj.view(NDArrayWithSchema)
elif isinstance(obj, SeriesWithSchema):
result = obj.copy(deep=False)
if hasattr(obj, "json_schema"):
result.json_schema = obj.json_schema
elif isinstance(obj, Series):
result = SeriesWithSchema(obj)
elif isinstance(obj, DataFrameWithSchema):
result = obj.copy(deep=False)
if hasattr(obj, "json_schema"):
result.json_schema = obj.json_schema
elif isinstance(obj, DataFrame):
result = DataFrameWithSchema(obj)
elif is_list_tensor(obj):
obj = np.array(obj)
result = obj.view(NDArrayWithSchema)
elif isinstance(obj, (DataFrameGroupBy, SeriesGroupBy)):
result = obj
elif spark_installed and isinstance(obj, SparkGroupedData):
result = obj
else:
raise ValueError(f"unexpected type(obj) {type(obj)}")
setattr(result, "table_name", name)
return result
[docs]def get_table_name(obj):
if spark_installed and isinstance(obj, SparkDataFrame):
# Python class DataFrame with field self._jdf: https://github.com/apache/spark/blob/master/python/pyspark/sql/dataframe.py
# Scala type DataFrame: https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/package.scala
# Scala class DataSet with field queryExecution: https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
# Scala fields turn into Java nullary methods
# Py4J exposes Java methods as Python methods
# Scala class QueryExecution with field analyzed: LogicalPlan: https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
spark_query = obj._jdf.queryExecution().analyzed() # type: ignore
try:
# calling spark_df.explain("extended") shows the analyzed contents
# after spark_df.alias("foo"), analyzed contents should be SubqueryAlias
# Scala class SuqueryAlias with field identifier: https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
# str(..) converts the Java string into a Python string
result = str(spark_query.identifier())
except py4j.protocol.Py4JError:
result = None
return result
if isinstance(
obj,
(
NDArrayWithSchema,
SeriesWithSchema,
DataFrameWithSchema,
DataFrameGroupBy,
SeriesGroupBy,
),
) or (spark_installed and isinstance(obj, SparkGroupedData)):
return getattr(obj, "table_name", None)
return None
[docs]def get_index_name(obj):
result = None
if spark_installed and isinstance(obj, SparkDataFrameWithIndex):
result = obj.index_name
elif isinstance(
obj,
(
SeriesWithSchema,
DataFrameWithSchema,
DataFrameGroupBy,
SeriesGroupBy,
),
):
result = obj.index.name
return result
[docs]def get_index_names(obj):
result = None
if spark_installed and isinstance(obj, SparkDataFrameWithIndex):
result = obj.index_names
elif isinstance(
obj,
(
SeriesWithSchema,
DataFrameWithSchema,
DataFrameGroupBy,
SeriesGroupBy,
),
):
result = obj.index.names
return result
[docs]def strip_schema(obj):
if isinstance(obj, NDArrayWithSchema):
result = np.array(obj)
assert type(result) is ndarray # pylint:disable=unidiomatic-typecheck
elif isinstance(obj, SeriesWithSchema):
result = Series(obj)
assert type(result) is Series # pylint:disable=unidiomatic-typecheck
elif isinstance(obj, DataFrameWithSchema):
result = DataFrame(obj)
assert type(result) is DataFrame # pylint:disable=unidiomatic-typecheck
else:
result = obj
return result
def _dtype_to_schema(typ) -> JSON_TYPE:
result: JSON_TYPE
if typ is bool or issubdtype(typ, np.bool_):
result = {"type": "boolean"}
elif issubdtype(typ, np.unsignedinteger):
result = {"type": "integer", "minimum": 0}
elif issubdtype(typ, np.integer):
result = {"type": "integer"}
elif issubdtype(typ, np.number):
result = {"type": "number"}
elif issubdtype(typ, np.string_) or issubdtype(typ, np.unicode_):
result = {"type": "string"}
elif isinstance(typ, np.dtype):
if typ.fields:
props = {k: _dtype_to_schema(t) for k, t in typ.fields.items()}
result = {"type": "object", "properties": props}
elif typ.shape:
result = _shape_and_dtype_to_schema(typ.shape, typ.subdtype)
elif issubdtype(typ, np.object_):
result = {"type": "string"}
else:
assert False, f"unexpected dtype {typ}"
else:
assert False, f"unexpected non-dtype {typ}"
return result
[docs]def dtype_to_schema(typ) -> JSON_TYPE:
result = _dtype_to_schema(typ)
lale.type_checking.validate_is_schema(result)
return result
def _shape_and_dtype_to_schema(shape, dtype) -> JSON_TYPE:
result = _dtype_to_schema(dtype)
for dim in reversed(shape):
result = {"type": "array", "minItems": dim, "maxItems": dim, "items": result}
return result
[docs]def shape_and_dtype_to_schema(shape, dtype) -> JSON_TYPE:
result = _shape_and_dtype_to_schema(shape, dtype)
lale.type_checking.validate_is_schema(result)
return result
[docs]def list_tensor_to_shape_and_dtype(ls) -> Optional[Tuple[Tuple[int, ...], Type]]:
if isinstance(ls, (int, float, str)):
return ((), type(ls))
if isinstance(ls, list):
sub_result: Any = "Any"
for item in ls:
item_result = list_tensor_to_shape_and_dtype(item)
if item_result is None:
return None
if sub_result == "Any":
sub_result = item_result
elif sub_result != item_result:
return None
if sub_result == "Any" and len(ls) == 0:
return ((len(ls),) + (), int)
sub_shape, sub_dtype = sub_result
return ((len(ls),) + sub_shape, sub_dtype)
return None
[docs]def is_list_tensor(obj) -> bool:
if isinstance(obj, list):
shape_and_dtype = list_tensor_to_shape_and_dtype(obj)
return shape_and_dtype is not None
return False
def _list_tensor_to_schema(ls) -> Optional[JSON_TYPE]:
shape_and_dtype = list_tensor_to_shape_and_dtype(ls)
if shape_and_dtype is None:
return None
result = _shape_and_dtype_to_schema(*shape_and_dtype)
return result
[docs]def list_tensor_to_schema(ls) -> Optional[JSON_TYPE]:
result = _list_tensor_to_schema(ls)
if result is None:
return None
lale.type_checking.validate_is_schema(result)
return result
def _ndarray_to_schema(array) -> JSON_TYPE:
assert isinstance(array, ndarray)
if (
isinstance(array, NDArrayWithSchema)
and hasattr(array, "json_schema")
and array.json_schema is not None
):
return array.json_schema
return _shape_and_dtype_to_schema(array.shape, array.dtype)
[docs]def ndarray_to_schema(array) -> JSON_TYPE:
result = _ndarray_to_schema(array)
lale.type_checking.validate_is_schema(result)
return result
def _csr_matrix_to_schema(matrix) -> JSON_TYPE:
assert isinstance(matrix, csr_matrix)
result = _shape_and_dtype_to_schema(matrix.shape, matrix.dtype)
result["isSparse"] = {} # true schema
return result
[docs]def csr_matrix_to_schema(matrix) -> JSON_TYPE:
result = _csr_matrix_to_schema(matrix)
lale.type_checking.validate_is_schema(result)
return result
def _dataframe_to_schema(df) -> JSON_TYPE:
assert isinstance(df, DataFrame)
if (
isinstance(df, DataFrameWithSchema)
and hasattr(df, "json_schema")
and df.json_schema is not None
):
return df.json_schema
n_rows, n_columns = df.shape
df_dtypes = df.dtypes
assert n_columns == len(df.columns) and n_columns == len(df_dtypes)
items = [
{"description": str(col), **_dtype_to_schema(df_dtypes[col])}
for col in df.columns
]
result = {
"type": "array",
"minItems": n_rows,
"maxItems": n_rows,
"items": {
"type": "array",
"minItems": n_columns,
"maxItems": n_columns,
"items": items,
},
}
return result
[docs]def dataframe_to_schema(df) -> JSON_TYPE:
result = _dataframe_to_schema(df)
lale.type_checking.validate_is_schema(result)
return result
def _series_to_schema(series) -> JSON_TYPE:
assert isinstance(series, Series)
if (
isinstance(series, SeriesWithSchema)
and hasattr(series, "json_schema")
and series.json_schema is not None
):
return series.json_schema
(n_rows,) = series.shape
result = {
"type": "array",
"minItems": n_rows,
"maxItems": n_rows,
"items": {"description": str(series.name), **_dtype_to_schema(series.dtype)},
}
return result
[docs]def series_to_schema(series) -> JSON_TYPE:
result = _series_to_schema(series)
lale.type_checking.validate_is_schema(result)
return result
def _torch_tensor_to_schema(tensor) -> JSON_TYPE:
assert torch_installed, """Your Python environment does not have torch installed. You can install it with
pip install torch
or with
pip install 'lale[full]'"""
assert isinstance(tensor, Tensor)
result: JSON_TYPE
# https://pytorch.org/docs/stable/tensor_attributes.html#torch-dtype
if tensor.dtype == torch.bool:
result = {"type": "boolean"}
elif tensor.dtype == torch.uint8:
result = {"type": "integer", "minimum": 0, "maximum": 255}
elif torch.is_floating_point(tensor):
result = {"type": "number"}
else:
result = {"type": "integer"}
for dim in reversed(tensor.shape):
result = {"type": "array", "minItems": dim, "maxItems": dim, "items": result}
return result
[docs]def torch_tensor_to_schema(tensor) -> JSON_TYPE:
result = _torch_tensor_to_schema(tensor)
lale.type_checking.validate_is_schema(result)
return result
[docs]def is_liac_arff(obj) -> bool:
expected_types = {
"description": str,
"relation": str,
"attributes": list,
"data": list,
}
if not isinstance(obj, dict):
return False
for k, t in expected_types.items():
if k not in obj or not isinstance(obj[k], t):
return False
return True
def _liac_arff_to_schema(larff) -> JSON_TYPE:
assert is_liac_arff(
larff
), """Your Python environment might contain an 'arff' package different from 'liac-arff'. You can install it with
pip install 'liac-arff>=2.4.0'
or with
pip install 'lale[full]'"""
n_rows, n_columns = len(larff["data"]), len(larff["attributes"])
def larff_type_to_schema(larff_type) -> JSON_TYPE:
if isinstance(larff_type, str):
a2j = {
"numeric": "number",
"real": "number",
"integer": "integer",
"string": "string",
}
return {"type": a2j[larff_type.lower()]}
assert isinstance(larff_type, list)
return {"enum": [*larff_type]}
items = [
{"description": attr[0], **larff_type_to_schema(attr[1])}
for attr in larff["attributes"]
]
result = {
"type": "array",
"minItems": n_rows,
"maxItems": n_rows,
"items": {
"type": "array",
"minItems": n_columns,
"maxItems": n_columns,
"items": items,
},
}
return result
[docs]def liac_arff_to_schema(larff) -> JSON_TYPE:
result = _liac_arff_to_schema(larff)
lale.type_checking.validate_is_schema(result)
return result
[docs]def make_optional_schema(schema: JSON_TYPE) -> JSON_TYPE:
return {"anyOf": [schema, {"enum": [None]}]}
def _spark_df_to_schema(df) -> JSON_TYPE:
assert spark_installed, """Your Python environment does not have spark installed. You can install it with
pip install pyspark
"""
assert isinstance(df, SparkDataFrameWithIndex)
import pyspark.sql.types as stypes
from pyspark.sql.types import StructField, StructType
def maybe_make_optional(schema: JSON_TYPE, is_option: bool) -> JSON_TYPE:
if is_option:
return make_optional_schema(schema)
return schema
def spark_datatype_to_json_schema(dtype: stypes.DataType) -> JSON_TYPE:
if isinstance(dtype, stypes.ArrayType):
return {
"type": "array",
"items": maybe_make_optional(
spark_datatype_to_json_schema(dtype.elementType), dtype.containsNull
),
}
if isinstance(dtype, stypes.BooleanType):
return {"type": "boolean"}
if isinstance(dtype, stypes.DoubleType):
return {"type": "number"}
if isinstance(dtype, stypes.FloatType):
return {"type": "number"}
if isinstance(dtype, stypes.IntegerType):
return {"type": "integer"}
if isinstance(dtype, stypes.LongType):
return {"type": "integer"}
if isinstance(dtype, stypes.ShortType):
return {"type": "integer"}
if isinstance(dtype, stypes.NullType):
return {"enum": [None]}
if isinstance(dtype, stypes.StringType):
return {"type": "string"}
return {}
def spark_struct_field_to_json_schema(f: StructField) -> JSON_TYPE:
type_schema = spark_datatype_to_json_schema(f.dataType)
result = maybe_make_optional(type_schema, f.nullable)
if f.name is not None:
result["description"] = f.name
return result
def spark_struct_to_json_schema(
s: StructType, index_names, table_name: Optional[str] = None
) -> JSON_TYPE:
items = [
spark_struct_field_to_json_schema(f) for f in s if f.name not in index_names
]
num_items = len(items)
result = {
"type": "array",
"items": {
"type": "array",
"description": "rows",
"minItems": num_items,
"maxItems": num_items,
"items": items,
},
}
if table_name is not None:
result["description"] = table_name
return result
return spark_struct_to_json_schema(df.schema, df.index_names, get_table_name(df))
[docs]def spark_df_to_schema(df) -> JSON_TYPE:
result = _spark_df_to_schema(df)
lale.type_checking.validate_is_schema(result)
return result
def _to_schema(obj) -> JSON_TYPE:
result = None
if obj is None:
result = {"enum": [None]}
elif isinstance(obj, ndarray):
result = _ndarray_to_schema(obj)
elif isinstance(obj, csr_matrix):
result = _csr_matrix_to_schema(obj)
elif isinstance(obj, DataFrame):
result = _dataframe_to_schema(obj)
elif isinstance(obj, Series):
result = _series_to_schema(obj)
elif torch_installed and isinstance(obj, Tensor):
result = _torch_tensor_to_schema(obj)
elif is_liac_arff(obj):
result = _liac_arff_to_schema(obj)
elif isinstance(obj, list):
result = _list_tensor_to_schema(obj)
elif _is_spark_df(obj):
result = _spark_df_to_schema(obj)
elif lale.type_checking.is_schema(obj):
result = obj
# Does not need to validate again the schema
return result # type: ignore
if result is None:
raise ValueError(f"to_schema(obj), type {type(obj)}, value {obj}")
return result
[docs]def to_schema(obj) -> JSON_TYPE:
result = _to_schema(obj)
lale.type_checking.validate_is_schema(result)
return result