Source code for lale.schema_utils

# Copyright 2019 IBM Corporation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import Any, Dict, List, Optional, Set, Union

# Type definitions
JsonSchema = Dict[str, Any]
SchemaEnum = Set[Any]

STrue: JsonSchema = {}
SFalse: JsonSchema = {"not": STrue}

forOptimizerConstant: str = "forOptimizer"
forOptimizerConstantSuffix: str = "ForOptimizer"


[docs]def is_true_schema(s: JsonSchema) -> bool: return s is True or s == STrue
[docs]def is_false_schema(s: JsonSchema) -> bool: return s is False or s == SFalse
[docs]def is_lale_any_schema(s: JsonSchema) -> bool: if isinstance(s, dict): t = s.get("laleType", None) return t == "Any" else: return False
[docs]def getForOptimizer(obj, prop: str): return obj.get(prop + forOptimizerConstantSuffix, None)
[docs]def getMinimum(obj): prop = "minimum" m = obj.get(prop) mfo = getForOptimizer(obj, prop) if mfo is None: return m else: if m is not None and mfo < m: raise ValueError( f"A minimum ({m}) and a *smaller* minimumForOptimizer ({mfo}) was specified in {obj}" ) return mfo
[docs]def getMaximum(obj): prop = "maximum" m = obj.get(prop) mfo = getForOptimizer(obj, prop) if mfo is None: return m else: if m is not None and mfo > m: raise ValueError( f"A maximum ({m}) and a *greater* maximumForOptimizer ({mfo}) was specified in {obj}" ) return mfo
[docs]def getExclusiveMinimum(obj): prop = "exclusveMinimum" m = obj.get(prop) mfo = getForOptimizer(obj, prop) if mfo is None: return m else: return mfo
[docs]def getExclusiveMaximum(obj): prop = "exclusiveMaximum" m = obj.get(prop) mfo = getForOptimizer(obj, prop) if mfo is None: return m else: return mfo
[docs]def isForOptimizer(s: JsonSchema) -> bool: if isinstance(s, dict): return s.get(forOptimizerConstant, True) else: return True
[docs]def makeSingleton_(k: str, schemas: List[JsonSchema]) -> JsonSchema: if len(schemas) == 0: return {} if len(schemas) == 1: return schemas[0] else: return {k: schemas}
[docs]def makeAllOf(schemas: List[JsonSchema]) -> JsonSchema: return makeSingleton_("allOf", schemas)
[docs]def makeAnyOf(schemas: List[JsonSchema]) -> JsonSchema: return makeSingleton_("anyOf", schemas)
[docs]def makeOneOf(schemas: List[JsonSchema]) -> JsonSchema: return makeSingleton_("oneOf", schemas)
[docs]def forOptimizer(schema: JsonSchema) -> Optional[JsonSchema]: if schema is None or schema is True or schema is False: return schema if not isForOptimizer(schema): return None if "anyOf" in schema: subs = schema["anyOf"] sch = [forOptimizer(s) for s in subs] sch_nnil = [s for s in sch if s is not None] if sch_nnil: return makeAnyOf(sch_nnil) else: return None if "allOf" in schema: subs = schema["allOf"] sch = [forOptimizer(s) for s in subs] sch_nnil = [s for s in sch if s is not None] filtered_sch = sch_nnil if len(sch_nnil) != len(sch): # Questionable semantics here (aka HACK!!!!) # Since we removed something from the schema # we will also remove negated schemas filtered_sch = [ s for s in sch_nnil if not isinstance(s, dict) or "not" not in s ] if filtered_sch: return makeAllOf(filtered_sch) else: return None if "oneOf" in schema: subs = schema["oneOf"] sch = [forOptimizer(s) for s in subs] sch_nnil = [s for s in sch if s is not None] if sch_nnil: return makeOneOf(sch_nnil) else: return None if "not" in schema: s = forOptimizer(schema["not"]) if s is None: return None else: return {"not": s} transformedSchema: JsonSchema = {} for k, v in schema.items(): if k.endswith(forOptimizerConstantSuffix): base: str = k[: -len(forOptimizerConstantSuffix)] transformedSchema[base] = v elif k not in transformedSchema: transformedSchema[k] = v schema = transformedSchema if "type" in schema and schema["type"] == "object" and "properties" in schema: # required = schema.get("required", None) props = {} for k, v in schema["properties"].items(): s = forOptimizer(v) if s is None: # if required and k in required: # if this field is required (and has now been filtered) # filter the whole object schema return None else: props[k] = s ret = schema.copy() ret["properties"] = props return ret return schema
[docs]def has_operator(schema: JsonSchema) -> bool: to = schema.get("laleType", None) if to == "operator": return True if "not" in schema: if has_operator(schema["not"]): return True if "anyOf" in schema: if any(has_operator(s) for s in schema["anyOf"]): return True if "allOf" in schema: if any(has_operator(s) for s in schema["allOf"]): return True if "oneOf" in schema: if any(has_operator(s) for s in schema["oneOf"]): return True if "items" in schema: it = schema["items"] if isinstance(it, list): if any(has_operator(s) for s in it): return True else: if has_operator(it): return True if "properties" in schema: props = schema["properties"] if any(has_operator(s) for s in props.values()): return True if "patternProperties" in schema: pattern_props = schema["patternProperties"] if any(has_operator(s) for s in pattern_props.values()): return True if "additionalProperties" in schema: add_props = schema["additionalProperties"] if not isinstance(add_props, bool): if has_operator(add_props): return True if "dependencies" in schema: depends = schema["dependencies"] for d in depends.values(): if not isinstance(d, list): if has_operator(d): return True # if we survived all of the checks, then we return False
[docs]def atomize_schema_enumerations( schema: Union[None, JsonSchema, List[JsonSchema]] ) -> None: """Given a schema, converts structured enumeration values (records, arrays) into schemas where the structured part is specified as a schema, with the primitive as the enum. """ if schema is None: return if isinstance(schema, list): for s in schema: atomize_schema_enumerations(s) return if not isinstance(schema, dict): return for key in ["anyOf", "allOf", "oneOf", "items", "additionalProperties", "not"]: atomize_schema_enumerations(schema.get(key, None)) for key in ["properties", "patternProperties", "dependencies"]: v = schema.get(key, None) if v is not None: atomize_schema_enumerations(list(v.values())) # now that we have handled all the recursive cases ev = schema.get("enum", None) if ev is not None: simple_evs: List[Any] = [] complex_evs: List[JsonSchema] = [] for e in ev: if isinstance(e, dict): required: List[str] = [] props: Dict[str, JsonSchema] = {} for k, v in e.items(): required.append(k) vs = {"enum": [v]} atomize_schema_enumerations(vs) props[k] = vs ds = { "type": "object", "additionalProperties": False, "required": list(e.keys()), "properties": props, } complex_evs.append(ds) elif isinstance(e, (list, tuple)): is_tuple = isinstance(e, tuple) items_len = len(e) items: List[JsonSchema] = [] for v in e: vs = {"enum": [v]} atomize_schema_enumerations(vs) items.append(vs) ls = { "type": "array", "items": items, "additionalItems": False, "minItems": items_len, "maxItems": items_len, } if is_tuple: ls["laleType"] = "tuple" complex_evs.append(ls) else: simple_evs.append(ev) if complex_evs: del schema["enum"] if simple_evs: complex_evs.append({"enum": simple_evs}) if len(complex_evs) == 1: # special case, just update in place schema.update(complex_evs[0]) else: schema["anyOf"] = complex_evs
[docs]def check_operators_schema( schema: Optional[Union[List[JsonSchema], JsonSchema]], warnings: List[str] ) -> None: """Given a schema, collect warnings if there are any enumeration with all Operator values that are not marked as `'laleType':'operator'`. This should be called after simplification. """ if schema is None: return if isinstance(schema, list): for s in schema: check_operators_schema(s, warnings) return if not isinstance(schema, dict): return for key in ["anyOf", "allOf", "oneOf", "items", "additionalProperties", "not"]: v = schema.get(key, None) if v is not None: check_operators_schema(v, warnings) for key in ["properties", "patternProperties", "dependencies"]: v = schema.get(key, None) if v is not None: check_operators_schema(list(v.values()), warnings) if "enum" in schema: es = schema["enum"] if es: to = schema.get("laleType", None) if to != "operator": from lale.operators import Operator if all(isinstance(e, Operator) for e in es): warnings.append( f"The enumeration {[e.name() for e in es]} is all lale operators, but the schema fragment {schema} it is part of does not stipulate that it should be 'laleType':'operator'. While legal, this likely indicates either an omission in the schema or a bug in the schema simplifier" )