# Copyright 2019 IBM Corporation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import itertools
import logging
from typing import (
Any,
Dict,
Generic,
Iterable,
List,
Optional,
Set,
Tuple,
TypeVar,
Union,
)
import jsonschema
from .schema_ranges import SchemaRange
from .schema_utils import (
JsonSchema,
SFalse,
STrue,
is_false_schema,
is_lale_any_schema,
is_true_schema,
isForOptimizer,
makeAllOf,
makeAnyOf,
makeOneOf,
)
from .type_checking import always_validate_schema
logger = logging.getLogger(__name__)
# Goal: given a json schema, convert it into an equivalent json-schema
# in "grouped-dnf" form:
# allOf: [anyOf: nochoice], where
# nochoice
#
# initial version, which does not try to group things intelligently:
# allOf [anyOf [P1 P2], anyOf[Q1 Q2]] ==
# anyOf [map allOf [Ps]x[Pqs]]
# Note that P1 == anyOf [P] == allOf [P]
# Given a schema, if it is an anyof, return the list of choices.
# Otherwise, return a singleton choice -- the schema
# enumerations should logically be sets.
# However, the keys are not hashable
VV = TypeVar("VV")
[docs]class set_with_str_for_keys(Generic[VV]):
"""This mimicks a set, but uses the string representation
of the elements for comparison tests.
It can be used for unhashable elements, as long
as the str function is injective
"""
_elems: Dict[str, VV]
def __init__(self, elems: Union[Dict[str, VV], Iterable[VV]]):
if isinstance(elems, dict):
# The type hint is needed since technically a Dict[str, something_else]
# is an Iterable[str], which could match the latter type,
# but pass this type guard
self._elems = elems # type: ignore
else:
self._elems = {str(v): v for v in elems}
def __iter__(self):
return iter(self._elems.values())
def __bool__(self):
return bool(self._elems)
def __str__(self):
return str(list(self._elems.values()))
def __contains__(self, key):
return key in self._elems
[docs] def union(self, *others):
return set_with_str_for_keys(
[elem for subl in [self] + list(others) for elem in subl]
)
[docs] def intersection(self, *others: "set_with_str_for_keys[VV]"):
d: Dict[str, VV] = dict(self._elems)
for ssk in others:
for k in list(d.keys()):
if k not in ssk:
del d[k]
return set_with_str_for_keys(d)
[docs] def difference(self, *others):
d: Dict[str, VV] = dict(self._elems)
for ssk in others:
for k in list(d.keys()):
if k in ssk:
del d[k]
return set_with_str_for_keys(d)
[docs]def toAnyOfList(schema: JsonSchema) -> List[JsonSchema]:
if "anyOf" in schema:
return schema["anyOf"]
else:
return [schema]
[docs]def toAllOfList(schema: JsonSchema) -> List[JsonSchema]:
if "allOf" in schema:
return schema["allOf"]
else:
return [schema]
[docs]def liftAllOf(schemas: List[JsonSchema]) -> Iterable[JsonSchema]:
"""Given a list of schemas, if any of them are
allOf schemas, lift them out to the top level
"""
for sch in schemas:
schs2 = toAllOfList(sch)
for s in schs2:
yield s
[docs]def liftAnyOf(schemas: List[JsonSchema]) -> Iterable[JsonSchema]:
"""Given a list of schemas, if any of them are
anyOf schemas, lift them out to the top level
"""
for sch in schemas:
schs2 = toAnyOfList(sch)
for s in schs2:
yield s
# This is a great function for a breakpoint :-)
[docs]def impossible() -> JsonSchema:
return SFalse
[docs]def enumValues(
es: set_with_str_for_keys[Any], s: JsonSchema
) -> set_with_str_for_keys[Any]:
"""Given an enumeration set and a schema, return all the consistent values of the enumeration."""
# TODO: actually check. This should call the json schema validator
ret = []
for e in es:
try:
always_validate_schema(e, s)
ret.append(e)
except jsonschema.ValidationError:
logger.debug(
f"enumValues: {e} removed from {es} because it does not validate according to {s}"
)
return set_with_str_for_keys(iter(ret))
# invariants for all the simplify* functions:
# - invariant: if floatAny then at most the top level return value will be 'anyOf'
# - invariant: if there is no (nested or top level) 'anyOf' then the result will not have any either
extra_field_names: List[str] = ["default", "description"]
[docs]def hasAllOperatorSchemas(schemas: List[JsonSchema]) -> bool:
if not schemas:
return False
for s in schemas:
if "anyOf" in s:
if not hasAnyOperatorSchemas(s["anyOf"]):
return False
elif "allOf" in s:
if not hasAllOperatorSchemas(s["allOf"]):
return False
else:
to = s.get("laleType", None)
if to != "operator":
return False
return True
[docs]def hasAnyOperatorSchemas(schemas: List[JsonSchema]) -> bool:
for s in schemas:
if "anyOf" in s:
if hasAnyOperatorSchemas(s["anyOf"]):
return True
elif "allOf" in s:
if hasAllOperatorSchemas(s["allOf"]):
return True
else:
to = s.get("laleType", None)
if to == "operator":
return True
return False
[docs]def simplifyAll(schemas: List[JsonSchema], floatAny: bool) -> JsonSchema:
# First, we partition the schemas into the different types
# that we care about
combined_original_schema: JsonSchema = {"allOf": schemas}
s_all: List[JsonSchema] = schemas
s_any: List[List[JsonSchema]] = []
s_one: List[JsonSchema] = []
s_not: List[JsonSchema] = []
s_not_number_list: List[JsonSchema] = (
[]
) # a list of schemas that are a top level 'not' with a type='integer' or 'number' under it
s_not_enum_list: List[set_with_str_for_keys[Any]] = []
s_enum_list: List[set_with_str_for_keys[Any]] = []
s_type: Optional[str] = None
s_type_for_optimizer: Optional[str] = None
s_typed: List[JsonSchema] = []
s_other: List[JsonSchema] = []
s_not_for_optimizer: List[JsonSchema] = []
s_extra: Dict[str, Any] = {}
while s_all:
l: List[JsonSchema] = s_all
s_all = []
s: JsonSchema
for s in l:
if s is None:
continue
s = simplify(s, floatAny)
if s is None:
continue
if not isForOptimizer(s):
logger.info(
f"simplifyAll: skipping not for optimizer {s} (after simplification)"
)
s_not_for_optimizer.append(s)
continue
if is_true_schema(s):
continue
if is_false_schema(s):
return SFalse
if is_lale_any_schema(s):
continue
if "allOf" in s:
s_all.extend(s["allOf"])
elif "anyOf" in s:
s_any.append(s["anyOf"])
elif "oneOf" in s:
s_one.append(s)
elif "not" in s:
snot = s["not"]
if snot is None:
continue
if "enum" in snot:
ev = enumValues(
set_with_str_for_keys(snot["enum"]),
{"not": combined_original_schema},
)
s_not_enum_list.append(ev)
elif "type" in snot and (
snot["type"] == "number" or snot["type"] == "integer"
):
s_not_number_list.append(s)
else:
s_not.append(s)
elif "enum" in s:
ev = enumValues(
set_with_str_for_keys(s["enum"]), combined_original_schema
)
if ev:
s_enum_list.append(ev)
for k in extra_field_names:
if k in s:
d = s[k]
if k in s_extra and s_extra[k] != d:
logger.info(
f"mergeAll: conflicting {k} fields: {s_extra[k]} and {d} found when merging schemas {schemas}"
)
else:
s_extra[k] = d
else:
logger.info(
f"simplifyAll: {schemas} is not a satisfiable list of conjoined schemas because the enumeration {list(s['enum'])} has no elements that are satisfiable by the conjoined schemas"
)
return impossible()
elif "type" in s:
t = s.get("type", None)
to = s.get("laleType", None)
if t == "array":
# tuples are distinct from arrays
if to is not None and to == "tuple":
t = to
if s_type:
# handle subtyping relation between integers and numbers
if (
s_type == "number"
and t == "integer"
or s_type == "integer"
and t == "number"
):
s_type = "integer"
elif s_type != t:
logger.info(
f"simplifyAll: {schemas} is not a satisfiable list of conjoined schemas because {s} has type '{t}' and a previous schema had type '{s_type}'"
)
return impossible()
else:
s_type = t
s_typed.append(s)
elif "XXX TODO XXX" in s and len(s) == 1:
# Ignore missing constraints
pass
else:
to = s.get("laleType", None)
if to is None:
logger.warning(f"simplifyAll: '{s}' has unknown type")
s_other.append(s)
to = s.get("laleType", None)
if to == "operator":
if (
s_type_for_optimizer is not None
and s_type_for_optimizer != "operator"
):
logger.error(
f"simplifyAll: '{s}' has operator type for optimizer, but we also have another type for optimizer saved"
)
s_type_for_optimizer = to
# Now that we have partitioned things
# Note: I am sure some of our assumptions here are not correct :-(, but this should do for now :-)
# let's try to find a quick contradiction
if s_not or s_not_number_list:
# a bit of a special case here (which should eventually be replaced by more prinicipalled logic):
# if one of the not cases is identical to to one of the extra cases
# then this entire case is impossible.
# This provides a workaround to #42 amongst other problems
# first gather the set of extras
pos_k: Set[str] = set()
pk: JsonSchema
for pk in s_typed:
pos_k.add(str(pk))
for sn in itertools.chain(s_not, s_not_number_list):
snn = sn["not"]
if str(snn) in pos_k:
logger.info(
f"simplifyAll: Contradictory schema {str(combined_original_schema)} contains both {str(snn)} and its negation"
)
return impossible()
# first, we simplify enumerations
s_enum: Optional[set_with_str_for_keys[Any]] = None
s_not_enum: Optional[set_with_str_for_keys[Any]] = None
if s_enum_list:
# if there are enumeration constraints, we want their intersection
# pylint note: s_enum_list must be non-empty, and the first element will be used as self
s_enum = (
set_with_str_for_keys.intersection( # pylint:disable=no-value-for-parameter
*s_enum_list
)
)
if not s_enum:
# This means that enumeration values where specified
# but none are possible, so this schema is impossible to satisfy
logger.info(
f"simplifyAll: {schemas} is not a satisfiable list of conjoined schemas because the conjugation of these enumerations {list(s_enum_list)} is unsatisfiable (the intersection is empty)"
)
return impossible()
if s_not_enum_list:
# pylint note: s_enum_list must be non-empty, and the first element will be used as self
s_not_enum = (
set_with_str_for_keys.union( # pylint:disable=no-value-for-parameter
*s_not_enum_list
)
)
if s_enum and s_not_enum:
s_enum_diff = set_with_str_for_keys.difference(s_enum, s_not_enum)
if not s_enum_diff:
# This means that enumeration values where specified
# but none are possible, so this schema is impossible to satisfy
logger.info(
f"simplifyAll: {schemas} is not a satisfiable list of conjoined schemas because the conjugation of the enumerations is {s_enum} all of which are excluded by the conjugation of the disallowed enumerations {s_not_enum}"
)
return impossible()
s_enum = s_enum_diff
s_not_enum = None
# break out, combine, and keep 'extra' fields, like description
if s_typed:
s_typed = [s.copy() for s in s_typed]
for o in s_typed:
for k in extra_field_names:
if k in o:
d = o[k]
if k in s_extra and s_extra[k] != d:
logger.info(
f"mergeAll: conflicting {k} fields: {s_extra[k]} and {d} found when merging schemas {schemas}"
)
else:
s_extra[k] = d
del o[k]
s_typed = [s for s in s_typed if s]
if s_type in ["number", "integer"]:
# First we combine all the positive number range schemas
s_range = SchemaRange()
s_range_for_optimizer = SchemaRange()
for o in s_typed:
o_range = SchemaRange.fromSchema(o)
s_range &= o_range
o_range_for_optimizer = SchemaRange.fromSchemaForOptimizer(o)
s_range_for_optimizer &= o_range_for_optimizer
# now let us look at negative number ranges
# for now, we will not handle cases that would require splitting ranges
# TODO: 42 is about handling more reasoning
s_not_list = s_not_number_list
s_not_number_list = []
for s in s_not_list:
snot = s["not"]
o_range = SchemaRange.fromSchema(snot)
success = s_range.diff(o_range)
if success is None:
logger.info(
f"simplifyAll: [range]: {s} is not a satisfiable schema, since it negates everything, falsifying the entire combined schema {combined_original_schema}"
)
return impossible()
o_range_for_optimizer = SchemaRange.fromSchemaForOptimizer(snot)
success2 = s_range_for_optimizer.diff(o_range_for_optimizer)
if success2 is None:
logger.info(
f"simplifyAll: [range]: {s} is not a satisfiable schema for the optimizer, since it negates everything, falsifying the entire combined schema {combined_original_schema}"
)
return impossible()
elif success is False or success2 is False:
s_not_number_list.append(s)
# Now we look at negative enumarations.
# for now, we will not handle cases that would require splitting ranges
# TODO: 42 is about handling more reasoning
if s_not_enum:
s_cur_not_enum_list: set_with_str_for_keys[Any] = s_not_enum
s_not_enum_l: List[Any] = []
for s in s_cur_not_enum_list:
if isinstance(s, (int, float)):
success = s_range.remove_point(s)
if success is None:
logger.info(
f'simplifyAll: [range]: {{"not": {{"enum": [{s}]}}}} is not a satisfiable schema, since it negates everything, falsifying the entire combined schema {combined_original_schema}'
)
return impossible()
success2 = s_range_for_optimizer.remove_point(s)
if success2 is None:
logger.info(
f'simplifyAll: [range]: {{"not": {{"enum": [{s}]}}}} is not a satisfiable schema for the optimizer, since it negates everything, falsifying the entire combined schema {combined_original_schema}'
)
return impossible()
elif success is False or success2 is False:
s_not_enum_l.append(s)
s_not_enum = set_with_str_for_keys(iter(s_not_enum_l))
# now let us put everything back together
number_schema = SchemaRange.to_schema_with_optimizer(
s_range, s_range_for_optimizer
)
if SchemaRange.is_empty2(s_range, s_range):
logger.info(
f"simplifyAll: [range]: range simplification determined that the required minimum is greater than the required maximum, so the entire thing is unsatisfiable {combined_original_schema}"
)
# if the actual range is empty, the entire schema is invalid
return impossible()
elif SchemaRange.is_empty2(s_range_for_optimizer, s_range):
number_schema["forOptimizer"] = SFalse
logger.info(
f"simplifyAll: [range]: range simplification determined that the required minimum for the optimizer is greater than the required maximum, so the range is being marked as not for the optimizer: {number_schema}"
)
elif SchemaRange.is_empty2(s_range, s_range_for_optimizer):
number_schema["forOptimizer"] = SFalse
logger.info(
f"simplifyAll: [range]: range simplification determined that the required minimum is greater than the required maximum for the optimizer, so the range is being marked as not for the optimizer: {number_schema}"
)
elif SchemaRange.is_empty2(s_range_for_optimizer, s_range_for_optimizer):
logger.info(
f"simplifyAll: [range]: range simplification determined that the required minimum for the optimizer is greater than the required maximum for the optimizer, so the range is being marked as not for the optimizer: {number_schema}"
)
number_schema["forOptimizer"] = SFalse
s_typed = [number_schema]
elif s_type == "object":
# if this is an object type, we want to merge the properties
s_required: Set[str] = set()
s_props: Dict[str, List[JsonSchema]] = {}
# TODO: generalize this to handle schema types here
s_additionalProperties = True
# propertyNames = []
for o in s_typed:
o_required = o.get("required", None)
if o_required:
s_required = s_required.union(o_required)
# TODO: handle empty/absent properties case
if "properties" in o:
o_props = o["properties"]
else:
o_props = {}
o_additionalProperties = (
"additionalProperties" not in o or o["additionalProperties"]
)
# safety check:
if not o_additionalProperties:
for p in s_required:
if p not in o_props:
# There is a required key, but our schema
# does not contain that key and does not allow additional properties
# This schema can never be satisfied, so we can simplify this whole thing to the False schema
logger.info(
f"simplifyAll: {s_typed} is not a mergable list of schemas because {o} does not have the required key '{p}' and excludes additional properties"
)
return impossible()
# If we do not allow additional properties
# Remove all existing properties that are
# not in our schema
if not o_additionalProperties:
for p in s_props: # pylint:disable=consider-using-dict-items
if p not in o_props:
del s_props[p]
# now go through our properties and add them
for p, pv in o_props.items():
if p in s_props:
s_props[p].append(pv)
elif s_additionalProperties:
s_props[p] = [pv]
s_additionalProperties = s_additionalProperties and o_additionalProperties
# at this point, we have aggregated the object schemas
# for all the properties in them
if s_required and not s_additionalProperties:
for k in s_required:
if k not in s_props:
logger.info(
f"simplifyAll: {s_typed} is not a mergable list of schemas because one of the schemas requires key '{k}', which is not in the other schemas, and a different schema excluded additional properties"
)
return impossible()
merged_props = {p: simplifyAll(v, False) for p, v in s_props.items()}
if s_required:
for k in s_required:
# if the schema is not present, it could be in another branch (such as an anyOf conjunct)
if is_false_schema(merged_props.get(k, STrue)):
logger.info(
f"simplifyAll: required key {k} is False, so the entire conjugation of schemas {schemas} is False"
)
return impossible()
obj: Dict[Any, Any] = {}
obj["type"] = "object"
if merged_props:
obj["properties"] = merged_props
if not s_additionalProperties:
obj["additionalProperties"] = False
if len(s_required) != 0:
obj["required"] = list(s_required)
s_typed = [obj]
elif s_type in ["array", "tuple"]:
is_tuple = s_type == "tuple"
min_size: int = 0
max_size: Optional[int] = None
min_size_for_optimizer: int = 0
max_size_for_optimizer: Optional[int] = None
longest_item_list: int = 0
items_schemas: List[JsonSchema] = []
item_list_entries: List[Tuple[List[JsonSchema], Optional[JsonSchema]]] = []
for arr in s_typed:
arr_min_size = arr.get("minItems", 0)
min_size = max(min_size, arr_min_size)
arr_min_size_for_optimizer = arr.get("minItemsForOptimizer", 0)
min_size_for_optimizer = max(
min_size_for_optimizer, arr_min_size_for_optimizer
)
arr_max_size = arr.get("maxItems", None)
if arr_max_size is not None:
if max_size is None:
max_size = arr_max_size
else:
max_size = min(max_size, arr_max_size)
arr_max_size_for_optimizer = arr.get("maxItemsForOptimizer", None)
if arr_max_size_for_optimizer is not None:
if max_size_for_optimizer is None:
max_size_for_optimizer = arr_max_size_for_optimizer
else:
max_size_for_optimizer = min(
max_size_for_optimizer, arr_max_size_for_optimizer
)
arr_item = arr.get("items", None)
if arr_item is not None:
if isinstance(arr_item, list):
arr_item_len = len(arr_item)
longest_item_list = max(longest_item_list, arr_item_len)
arr_additional = arr.get("additionalItems", None)
item_list_entries.append((arr_item, arr_additional))
if arr_additional is False:
# If we are not allowed additional elements,
# that effectively sets the maximum allowed length
if max_size is None:
max_size = arr_item_len
else:
max_size = min(max_size, arr_item_len)
else:
items_schemas.append(arr_item)
# We now have accurate min/max bounds, and if there are item lists
# we know how long the longest one is
# additionally, we have gathered up all the item (object) schemas
ret_arr: Dict[str, Any] = {"type": "array"}
if is_tuple:
ret_arr["laleType"] = "tuple"
if min_size > 0:
ret_arr["minItems"] = min_size
if min_size_for_optimizer > min_size:
ret_arr["minItemsForOptimizer"] = min_size_for_optimizer
all_items_schema: Optional[JsonSchema] = None
if items_schemas:
all_items_schema = simplifyAll(items_schemas, floatAny=floatAny)
if not item_list_entries:
# there are no list items schemas
assert longest_item_list == 0
if all_items_schema:
# deal with False schemas
if is_false_schema(all_items_schema):
if min_size > 0 or min_size_for_optimizer > 0:
return impossible()
else:
max_size = 0
max_size_for_optimizer = None
ret_arr["items"] = all_items_schema
else:
ret_item_list_list: List[List[JsonSchema]] = [
[] for _ in range(longest_item_list)
]
additional_schemas: List[JsonSchema] = []
for arr_item_list, arr_additional_schema in item_list_entries:
for x in range(longest_item_list):
ils = ret_item_list_list[x]
if x < len(arr_item_list):
ils.append(arr_item_list[x])
elif arr_additional_schema:
ils.append(arr_additional_schema)
if all_items_schema:
ils.append(all_items_schema)
if arr_additional_schema:
additional_schemas.append(arr_additional_schema)
if max_size is None or max_size > longest_item_list:
# if it is possible to have more elements
# we constrain them as specified
if additional_schemas:
if all_items_schema is not None:
additional_schemas.append(all_items_schema)
all_items_schema = simplifyAll(
additional_schemas, floatAny=floatAny
)
if all_items_schema is not None:
ret_arr["additionalItems"] = all_items_schema
ret_item_list: List[JsonSchema] = [
simplifyAll(x, floatAny=True) for x in ret_item_list_list
]
first_false: Optional[int] = None
for i, s in enumerate(ret_item_list):
if is_false_schema(s):
first_false = i
break
if first_false is not None:
if min_size > first_false or min_size_for_optimizer > first_false:
return impossible()
else:
if max_size is None:
max_size = first_false
else:
max_size = min(max_size, first_false)
if max_size_for_optimizer is not None:
if max_size_for_optimizer >= max_size:
max_size_for_optimizer = None
ret_item_list = ret_item_list[0:first_false]
ret_arr["items"] = ret_item_list
if max_size is not None:
ret_arr["maxItems"] = max_size
if max_size_for_optimizer is not None:
if max_size is None or max_size_for_optimizer < max_size:
ret_arr["maxItemsForOptimizer"] = max_size_for_optimizer
s_typed = [ret_arr]
# TODO: more!
assert not s_all
ret_all = []
ret_main: JsonSchema = s_extra if s_extra else {}
if s_type_for_optimizer is not None:
ret_main["laleType"] = s_type_for_optimizer
if s_enum:
# we should simplify these as for s_not_enum
ret_main["enum"] = list(s_enum)
# now, we do some extra work to keep 'laleType':'operator' annotations
if s_type_for_optimizer is None:
from lale.operators import Operator
if all(isinstance(x, Operator) for x in s_enum):
# All the enumeration values are operators
# This means it is probably an operator schema
# which might have been missed if
# this is being allOf'ed with an anyOfList
if s_any and all(hasAnyOperatorSchemas(s) for s in s_any):
ret_main["laleType"] = "operator"
return ret_main
if ret_main:
if s_typed:
s_typed[0] = {**ret_main, **s_typed[0]}
elif s_other:
s_other[0] = {**ret_main, **s_other[0]}
else:
ret_all.append(ret_main)
if s_typed:
ret_all.extend(s_typed)
if s_other:
ret_all.extend(s_other)
if s_not_for_optimizer:
ret_all.extend(s_not_for_optimizer)
if s_one:
ret_all.extend(s_one)
if s_not_number_list:
ret_all.extend(s_not_number_list)
if s_not:
ret_all.extend(s_not)
if s_not_enum:
# We can't do not alongside anything else
# TODO: we should validate the list against the
# other parts of ret_all (this would need to move down): if any elements don't validate
# then they already would be excluded
# we can simplify +enum's the same way
ret_all_agg = makeAllOf(ret_all)
s_not_enum_simpl = enumValues(s_not_enum, ret_all_agg)
if s_not_enum_simpl:
sne = {"not": {"enum": list(s_not_enum)}}
ret_all.append(sne)
else:
logger.debug(
f"simplifyAll: {s_not_enum} was a negated enum that was simplified away because its elements anyway don't satisfy the additional constraints {ret_all_agg}"
)
s_not_enum = s_not_enum_simpl
if not floatAny:
ret_all.extend([simplifyAny(s, False) for s in s_any])
ret_all_schema = makeAllOf(ret_all)
if floatAny and s_any:
args = list(([ret_all_schema], *tuple(s_any)))
cp = list(itertools.product(*args))
alls = [simplifyAll(list(s), False) for s in cp]
ret = simplifyAny(alls, False)
return ret
else:
return ret_all_schema
[docs]def simplifyAny(schema: List[JsonSchema], floatAny: bool) -> JsonSchema:
s_any = schema
s_enum_list: List[set_with_str_for_keys[Any]] = []
s_not_enum_list: List[set_with_str_for_keys[Any]] = []
s_other: List[JsonSchema] = []
s_not_for_optimizer: List[JsonSchema] = []
while s_any:
schema_list = s_any
s_any = []
for s in schema_list:
if s is None:
continue
s = simplify(s, floatAny)
if s is None:
continue
if not isForOptimizer(s):
logger.info(
f"simplifyAny: skipping not for optimizer {s} (after simplification)"
)
s_not_for_optimizer.append(s)
continue
if is_true_schema(s):
return STrue
if is_false_schema(s):
continue
if "anyOf" in s:
s_any.extend(s["anyOf"])
elif "enum" in s:
ev = enumValues(set_with_str_for_keys(s["enum"]), s)
if ev:
s_enum_list.append(ev)
elif "not" in s:
snot = s["not"]
if "enum" in s["not"]:
ev = enumValues(set_with_str_for_keys(snot["enum"]), snot)
if ev:
s_not_enum_list.append(ev)
else:
s_other.append(s)
s_enum: Optional[set_with_str_for_keys[Any]] = None
s_not_enum: Optional[set_with_str_for_keys[Any]] = None
if s_enum_list:
# if there are enumeration constraints, we want their intersection
# pylint note: s_enum_list must be non-empty, and the first element will be used as self
s_enum = set_with_str_for_keys.union( # pylint:disable=no-value-for-parameter
*s_enum_list
)
if s_not_enum_list:
# pylint note: s_enum_list must be non-empty, and the first element will be used as self
s_not_enum = (
set_with_str_for_keys.intersection( # pylint:disable=no-value-for-parameter
*s_not_enum_list
)
)
if s_enum and s_not_enum:
s_not_enum = set_with_str_for_keys.difference(s_not_enum, s_enum)
s_enum = None
assert not s_any
ret: List[JsonSchema] = []
if s_enum:
ret.append({"enum": list(s_enum)})
if s_not_enum:
ret.append({"not": {"enum": list(s_not_enum)}})
ret.extend(s_other)
ret.extend(s_not_for_optimizer)
return makeAnyOf(ret)
[docs]def simplifyNot(schema: JsonSchema, floatAny: bool) -> JsonSchema:
return simplifyNot_(schema, floatAny, alreadySimplified=False)
[docs]def simplifyNot_(
schema: JsonSchema, floatAny: bool, alreadySimplified: bool = False
) -> JsonSchema:
"""alreadySimplified=true implies that schema has already been simplified"""
if "not" in schema:
# if there is a not/not, we can just skip it
ret = simplify(schema["not"], floatAny)
return ret
elif "anyOf" in schema:
anys = schema["anyOf"]
alls = [{"not": s} for s in anys]
ret = simplifyAll(alls, floatAny)
return ret
elif "allOf" in schema:
alls = schema["allOf"]
anys = [{"not": s} for s in alls]
ret = simplifyAny(anys, floatAny)
return ret
elif not alreadySimplified:
s = simplify(schema, floatAny)
# it is possible that the result of calling simplify
# resulted in something that we can push 'not' down into
# so we call ourselves, being careful to avoid an infinite loop.
return simplifyNot_(s, floatAny, alreadySimplified=True)
else:
return {"not": schema}
[docs]def simplify(schema: JsonSchema, floatAny: bool) -> JsonSchema:
"""Tries to simplify a schema into an equivalent but
more compact/simpler one. If floatAny if true, then
the only anyOf in the return value will be at the top level.
Using this option may cause a combinatorial blowup in the size
of the schema
"""
if is_true_schema(schema):
return STrue
if is_false_schema(schema):
return SFalse
if "enum" in schema:
# TODO: simplify the schemas by removing anything that does not validate
# against the rest of the schema
return schema
if "allOf" in schema:
ret = simplifyAll(schema["allOf"], floatAny)
return ret
elif "anyOf" in schema:
ret = simplifyAny(schema["anyOf"], floatAny)
return ret
elif "not" in schema:
return simplifyNot(schema["not"], floatAny)
elif "type" in schema and schema["type"] == "object" and "properties" in schema:
schema2 = schema.copy()
props = {}
all_objs = [schema2]
# TODO: how does this interact with required?
# {k1:s_1, k2:anyOf:[s2s], k3:anyOf:[s3s]}
# If floatAny is true and any properties have an anyOf in them
# we need to float it out to the top. We can then
# give it to simplifyAll, which does the cross product to lift
# them out of the list
for k, v in schema["properties"].items():
s = simplify(v, floatAny)
if is_false_schema(s) and "required" in schema and s in schema["required"]:
logger.info(
f"simplify: required key {k} is False, so the entire schema {schema} is False"
)
return impossible()
if (not is_true_schema(s)) and floatAny and "anyOf" in s:
all_objs.append(
{
"anyOf": [
{"type": "object", "properties": {k: vv}}
for vv in s["anyOf"]
]
}
)
# If we are disallowing additionalProperties, then we can't remove this property entirely
if not schema.get("additionalProperties", True):
props[k] = STrue
else:
props[k] = s
schema2["properties"] = props
if len(all_objs) == 1:
return schema2
else:
# The termination argument here is somewhat subtle
s = simplifyAll(all_objs, floatAny)
return s
else:
return schema
# TODO: semantically, allOf should force an intersection
# of relevantFields, yet union seems kinder to the user/more modular (at least if additionalProperties:True)
[docs]def findRelevantFields(schema: JsonSchema) -> Optional[Set[str]]:
"""Either returns the relevant fields for the schema, or None if there was none specified"""
if "allOf" in schema:
fields_list: List[Optional[Set[str]]] = [
findRelevantFields(s) for s in schema["allOf"]
]
real_fields_list: List[Set[str]] = [f for f in fields_list if f is not None]
if real_fields_list:
return set.union(*real_fields_list)
else:
return None
else:
if "relevantToOptimizer" in schema:
return set(schema["relevantToOptimizer"])
else:
return None
# does not handle nested objects and nested relevant fields well
[docs]def narrowToGivenRelevantFields(
schema: JsonSchema, relevantFields: Set[str]
) -> JsonSchema:
if is_true_schema(schema) or is_false_schema(schema):
return schema
if "anyOf" in schema:
return {
"anyOf": [
narrowToGivenRelevantFields(a, relevantFields) for a in schema["anyOf"]
]
}
if "allOf" in schema:
return {
"allOf": [
narrowToGivenRelevantFields(a, relevantFields) for a in schema["allOf"]
]
}
if "not" in schema:
return {"not": narrowToGivenRelevantFields(schema["not"], relevantFields)}
if "type" in schema and schema["type"] == "object" and "properties" in schema:
props = schema["properties"]
new_props = {
k: narrowToGivenRelevantFields(v, relevantFields)
for (k, v) in props.items()
if k in relevantFields
}
schema2 = schema.copy()
schema2["properties"] = new_props
if "required" in schema:
reqs = set(schema["required"])
schema2["required"] = list(reqs.intersection(relevantFields))
return schema2
else:
return schema
[docs]def narrowToRelevantFields(schema: JsonSchema) -> JsonSchema:
relevantFields: Optional[Set[str]] = findRelevantFields(schema)
if relevantFields is not None:
return narrowToGivenRelevantFields(schema, relevantFields)
else:
return schema
# Given a json schema, removes any elements marked as 'forOptimizer:false'
# also does some basic simplifications
[docs]def filterForOptimizer(schema: JsonSchema) -> Optional[JsonSchema]:
if schema is None or is_true_schema(schema) or is_false_schema(schema):
return schema
if not isForOptimizer(schema):
return None
if "anyOf" in schema:
subs = schema["anyOf"]
sch = [filterForOptimizer(s) for s in subs]
sch_nnil = [s for s in sch if s is not None]
if sch_nnil:
return makeAnyOf(sch_nnil)
else:
return None
if "allOf" in schema:
subs = schema["allOf"]
sch = [filterForOptimizer(s) for s in subs]
sch_nnil = [s for s in sch if s is not None]
filtered_sch = sch_nnil
if len(sch_nnil) != len(sch):
# Questionable semantics here (aka HACK!!!!)
# Since we removed something from the schema
# we will also remove negated schemas
filtered_sch = [
s for s in sch_nnil if not isinstance(s, dict) or "not" not in s
]
if filtered_sch:
return makeAllOf(filtered_sch)
else:
return None
if "oneOf" in schema:
subs = schema["oneOf"]
sch = [filterForOptimizer(s) for s in subs]
sch_nnil = [s for s in sch if s is not None]
if sch_nnil:
return makeOneOf(sch_nnil)
else:
return None
if "not" in schema:
s = filterForOptimizer(schema["not"])
if s is None:
return None
else:
return {"not": s}
if "type" in schema and schema["type"] == "object" and "properties" in schema:
# required = schema.get("required", None)
props = {}
for k, v in schema["properties"].items():
s = filterForOptimizer(v)
if s is None:
# if required and k in required:
# if this field is required (and has now been filtered)
# filter the whole object schema
return None
else:
props[k] = s
ret = schema.copy()
ret["properties"] = props
return ret
return schema
[docs]def narrowToRelevantConstraints(schema: JsonSchema) -> JsonSchema:
# only narrow in top-level conjuncts, to avoid tricky reasoning
if "allOf" not in schema:
return schema
# drop conjuncts that are explicitly marked as not relevant to
# optimizer, to reduce cost in the simplify() call that would be
# wasted when a filterForOptimizer() call later drops them anyway
result = {
**schema,
"allOf": [
narrowToRelevantConstraints(s)
for s in schema["allOf"]
if s.get("forOptimizer", True)
],
}
return result
[docs]def narrowSimplifyAndFilter(schema: JsonSchema, floatAny: bool) -> Optional[JsonSchema]:
nc_schema = narrowToRelevantConstraints(schema)
nf_schema = narrowToRelevantFields(nc_schema)
simplified_schema = simplify(nf_schema, floatAny)
filtered_schema = filterForOptimizer(simplified_schema)
return filtered_schema