Source code for lale.json_operator

# Copyright 2019-2023 IBM Corporation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import importlib
import inspect
import keyword
import logging
import re
from typing import Any, Dict, Tuple, cast

import jsonschema

import lale.operators
from lale.helpers import GenSym

logger = logging.getLogger(__name__)

JSON_TYPE = Dict[str, Any]

SCHEMA = {
    "$schema": "http://json-schema.org/draft-04/schema#",
    "definitions": {
        "operator": {
            "anyOf": [
                {"$ref": "#/definitions/planned_individual_op"},
                {"$ref": "#/definitions/trainable_individual_op"},
                {"$ref": "#/definitions/trained_individual_op"},
                {"$ref": "#/definitions/planned_pipeline"},
                {"$ref": "#/definitions/trainable_pipeline"},
                {"$ref": "#/definitions/trained_pipeline"},
                {"$ref": "#/definitions/operator_choice"},
            ]
        },
        "individual_op": {
            "type": "object",
            "required": ["class", "state", "operator"],
            "properties": {
                "class": {
                    "type": "string",
                    "pattern": "^([A-Za-z_][A-Za-z_0-9]*[.])*[A-Za-z_][A-Za-z_0-9]*$",
                },
                "state": {"enum": ["metamodel", "planned", "trainable", "trained"]},
                "operator": {"type": "string", "pattern": "^[A-Za-z_][A-Za-z_0-9]*$"},
                "label": {"type": "string", "pattern": "^[A-Za-z_][A-Za-z_0-9]*$"},
                "documentation_url": {"type": "string"},
                "hyperparams": {
                    "anyOf": [
                        {"enum": [None]},
                        {
                            "type": "object",
                            "patternProperties": {"^[A-Za-z_][A-Za-z_0-9]*$": {}},
                        },
                    ]
                },
                "steps": {
                    "description": "Nested operators in higher-order individual op.",
                    "type": "object",
                    "patternProperties": {
                        "^[a-z][a-z_0-9]*$": {"$ref": "#/definitions/operator"}
                    },
                },
                "is_frozen_trainable": {"type": "boolean"},
                "is_frozen_trained": {"type": "boolean"},
                "coefs": {"enum": [None, "coefs_not_available"]},
                "customize_schema": {
                    "anyOf": [
                        {"enum": ["not_available"]},
                        {"type": "object"},
                    ],
                },
            },
        },
        "planned_individual_op": {
            "allOf": [
                {"$ref": "#/definitions/individual_op"},
                {"type": "object", "properties": {"state": {"enum": ["planned"]}}},
            ]
        },
        "trainable_individual_op": {
            "allOf": [
                {"$ref": "#/definitions/individual_op"},
                {
                    "type": "object",
                    "required": ["hyperparams", "is_frozen_trainable"],
                    "properties": {"state": {"enum": ["trainable"]}},
                },
            ]
        },
        "trained_individual_op": {
            "allOf": [
                {"$ref": "#/definitions/individual_op"},
                {
                    "type": "object",
                    "required": ["hyperparams", "coefs", "is_frozen_trained"],
                    "properties": {"state": {"enum": ["trained"]}},
                },
            ]
        },
        "pipeline": {
            "type": "object",
            "required": ["class", "state", "edges", "steps"],
            "properties": {
                "class": {
                    "enum": [
                        "lale.operators.PlannedPipeline",
                        "lale.operators.TrainablePipeline",
                        "lale.operators.TrainedPipeline",
                    ]
                },
                "state": {"enum": ["planned", "trainable", "trained"]},
                "edges": {
                    "type": "array",
                    "items": {
                        "type": "array",
                        "minItems": 2,
                        "maxItems": 2,
                        "items": {"type": "string", "pattern": "^[a-z][a-z_0-9]*$"},
                    },
                },
                "steps": {
                    "type": "object",
                    "patternProperties": {
                        "^[a-z][a-z_0-9]*$": {"$ref": "#/definitions/operator"}
                    },
                },
            },
        },
        "planned_pipeline": {
            "allOf": [
                {"$ref": "#/definitions/pipeline"},
                {
                    "type": "object",
                    "properties": {
                        "state": {"enum": ["planned"]},
                        "class": {"enum": ["lale.operators.PlannedPipeline"]},
                    },
                },
            ]
        },
        "trainable_pipeline": {
            "allOf": [
                {"$ref": "#/definitions/pipeline"},
                {
                    "type": "object",
                    "properties": {
                        "state": {"enum": ["trainable"]},
                        "class": {"enum": ["lale.operators.TrainablePipeline"]},
                        "steps": {
                            "type": "object",
                            "patternProperties": {
                                "^[a-z][a-z_0-9]*$": {
                                    "type": "object",
                                    "properties": {
                                        "state": {"enum": ["trainable", "trained"]}
                                    },
                                }
                            },
                        },
                    },
                },
            ]
        },
        "trained_pipeline": {
            "allOf": [
                {"$ref": "#/definitions/pipeline"},
                {
                    "type": "object",
                    "properties": {
                        "state": {"enum": ["trained"]},
                        "class": {"enum": ["lale.operators.TrainedPipeline"]},
                        "steps": {
                            "type": "object",
                            "patternProperties": {
                                "^[a-z][a-z_0-9]*$": {
                                    "type": "object",
                                    "properties": {"state": {"enum": ["trained"]}},
                                }
                            },
                        },
                    },
                },
            ]
        },
        "operator_choice": {
            "type": "object",
            "required": ["class", "state", "operator", "steps"],
            "properties": {
                "class": {"enum": ["lale.operators.OperatorChoice"]},
                "state": {"enum": ["planned"]},
                "operator": {"type": "string"},
                "steps": {
                    "type": "object",
                    "patternProperties": {
                        "^[a-z][a-z_0-9]*$": {"$ref": "#/definitions/operator"}
                    },
                },
            },
        },
    },
    "$ref": "#/definitions/operator",
}


[docs]def json_op_kind(jsn: JSON_TYPE) -> str: if jsn["class"] == "lale.operators.OperatorChoice": return "OperatorChoice" if jsn["class"] in [ "lale.operators.PlannedPipeline", "lale.operators.TrainablePipeline", "lale.operators.TrainedPipeline", ]: return "Pipeline" return "IndividualOp"
def _get_state(op: "lale.operators.Operator") -> str: if isinstance(op, lale.operators.TrainedOperator): return "trained" if isinstance(op, lale.operators.TrainableOperator): return "trainable" if isinstance(op, (lale.operators.PlannedOperator, lale.operators.OperatorChoice)): return "planned" if isinstance(op, lale.operators.Operator): return "metamodel" raise TypeError(f"Expected lale.operators.Operator, got {type(op)}.") def _get_cls2label(call_depth: int) -> Dict[str, str]: inspect_stack = inspect.stack() if call_depth >= len(inspect_stack): return {} frame = inspect_stack[call_depth][0] cls2label: Dict[str, str] = {} cls2state: Dict[str, str] = {} all_items: Dict[str, Any] = {**frame.f_locals, **frame.f_globals} for label, op in all_items.items(): if isinstance(op, lale.operators.IndividualOp): state = _get_state(op) cls = op.class_name() if cls in cls2state: insert = ( (cls2state[cls] == "trainable" and state == "planned") or ( cls2state[cls] == "trained" and state in ["trainable", "planned"] ) or (cls2state[cls] == state and label[0].isupper()) ) else: insert = True if insert: if not label.islower(): cls2label[cls] = label cls2state[cls] = state return cls2label def _camelCase_to_snake(name): s1 = re.sub("(.)([A-Z][a-z]+)", r"\1_\2", name) return re.sub("([a-z0-9])([A-Z])", r"\1_\2", s1).lower() def _init_gensym(op: "lale.operators.Operator", cls2label: Dict[str, str]) -> GenSym: label2count: Dict[str, int] = {} def populate_label2count(op: "lale.operators.Operator"): if isinstance(op, lale.operators.IndividualOp): label = cls2label.get(op.class_name(), op.name()) elif isinstance(op, lale.operators.BasePipeline): for s in op.steps_list(): populate_label2count(s) label = "pipeline" elif isinstance(op, lale.operators.OperatorChoice): for s in op.steps_list(): populate_label2count(s) label = "choice" else: raise ValueError(f"Unexpected argument of type: {type(op)}") label2count[label] = label2count.get(label, 0) + 1 populate_label2count(op) non_unique_labels = {ll for ll, c in label2count.items() if c > 1} snakes = {_camelCase_to_snake(ll) for ll in non_unique_labels} return GenSym( {"lale", "make_pipeline", "make_union", "make_choice"} | set(keyword.kwlist) | non_unique_labels | snakes ) def _hps_to_json_rec( hps, cls2label: Dict[str, str], gensym: GenSym, steps, add_custom_default: bool, ) -> Any: if isinstance(hps, lale.operators.Operator): step_uid, step_jsn = _op_to_json_rec(hps, cls2label, gensym, add_custom_default) steps[step_uid] = step_jsn return {"$ref": f"../steps/{step_uid}"} elif isinstance(hps, dict): return { hp_name: _hps_to_json_rec( hp_val, cls2label, gensym, steps, add_custom_default ) for hp_name, hp_val in hps.items() } elif isinstance(hps, tuple): return tuple( _hps_to_json_rec(hp_val, cls2label, gensym, steps, add_custom_default) for hp_val in hps ) elif isinstance(hps, list): return [ _hps_to_json_rec(hp_val, cls2label, gensym, steps, add_custom_default) for hp_val in hps ] else: return hps def _get_customize_schema(after, before): if after == before: return {} if after is None or before is None: return "not_available" def dict_equal_modulo(d1, d2, mod): for k in d1.keys(): if k != mod and (k not in d2 or d1[k] != d2[k]): return False for k in d2.keys(): if k != mod and k not in d1: return False return True def list_equal_modulo(l1, l2, mod): if len(l1) != len(l2): return False for i, (v1, v2) in enumerate(zip(l1, l2)): if i != mod and v1 != v2: return False return True if not dict_equal_modulo(after, before, "properties"): return "not_available" after = after["properties"] before = before["properties"] if not dict_equal_modulo(after, before, "hyperparams"): return "not_available" after = after["hyperparams"]["allOf"] before = before["hyperparams"]["allOf"] if not list_equal_modulo(after, before, 0): return "not_available" after = after[0] before = before[0] if not dict_equal_modulo(after, before, "properties"): return "not_available" after = after["properties"] before = before["properties"] # TODO: only supports customizing the schema for individual hyperparams hp_diff = { hp_name: hp_schema for hp_name, hp_schema in after.items() if hp_name not in before or hp_schema != before[hp_name] } result = { "properties": { "hyperparams": {"allOf": [{"type": "object", "properties": hp_diff}]} } } return result def _top_schemas_to_hparams(top_level_schemas) -> JSON_TYPE: if not isinstance(top_level_schemas, dict): return {} return top_level_schemas.get("properties", {}).get("hyperparams", {}) def _hparams_schemas_to_props(hparams_schemas) -> JSON_TYPE: if not isinstance(hparams_schemas, dict): return {} return hparams_schemas.get("allOf", [{}])[0].get("properties", {}) def _top_schemas_to_hp_props(top_level_schemas) -> JSON_TYPE: hparams = _top_schemas_to_hparams(top_level_schemas) props = _hparams_schemas_to_props(hparams) return props def _op_to_json_rec( op: "lale.operators.Operator", cls2label: Dict[str, str], gensym: GenSym, add_custom_default: bool, ) -> Tuple[str, JSON_TYPE]: jsn: JSON_TYPE = {} jsn["class"] = op.class_name() jsn["state"] = _get_state(op) if isinstance(op, lale.operators.IndividualOp): jsn["operator"] = op.name() jsn["label"] = cls2label.get(op.class_name(), op.name()) uid = gensym(_camelCase_to_snake(jsn["label"])) documentation_url = op.documentation_url() if documentation_url is not None: jsn["documentation_url"] = documentation_url if isinstance(op, lale.operators.TrainableIndividualOp): if hasattr(op._impl, "viz_label"): jsn["viz_label"] = op._impl.viz_label() hyperparams = op.reduced_hyperparams() if hyperparams is None: jsn["hyperparams"] = {} if hasattr(op._impl, "fit") else None else: hp_schema = _hparams_schemas_to_props(op.hyperparam_schema()) hyperparams = { k: v for k, v in hyperparams.items() if hp_schema.get(k, {}).get("transient", False) is not True } for k, s in hp_schema.items(): if s.get("transient", False) == "alwaysPrint": if k not in hyperparams and "default" in s: hyperparams[k] = s["default"] steps: Dict[str, JSON_TYPE] = {} jsn["hyperparams"] = _hps_to_json_rec( hyperparams, cls2label, gensym, steps, add_custom_default ) if len(steps) > 0: jsn["steps"] = steps jsn["is_frozen_trainable"] = op.is_frozen_trainable() if isinstance(op, lale.operators.TrainedIndividualOp): if hasattr(op._impl, "fit"): jsn["coefs"] = "coefs_not_available" else: jsn["coefs"] = None jsn["is_frozen_trained"] = op.is_frozen_trained() orig_schemas = lale.operators.get_lib_schemas(op.impl_class) if op._schemas is not orig_schemas: jsn["customize_schema"] = _get_customize_schema(op._schemas, orig_schemas) if add_custom_default and isinstance( jsn.get("customize_schema", None), dict ): if isinstance(jsn.get("hyperparams", None), dict): assert jsn["hyperparams"] is not None # to help pyright orig = _top_schemas_to_hp_props(orig_schemas) cust = _top_schemas_to_hp_props(jsn["customize_schema"]) for hp_name, hp_schema in cust.items(): if "default" in hp_schema: if hp_name not in jsn["hyperparams"]: cust_default = hp_schema["default"] if hp_name in orig and "default" in orig[hp_name]: orig_default = orig[hp_name]["default"] if cust_default != orig_default: jsn["hyperparams"][hp_name] = cust_default else: jsn["hyperparams"][hp_name] = cust_default elif isinstance(op, lale.operators.BasePipeline): uid = gensym("pipeline") child2uid: Dict[lale.operators.Operator, str] = {} child2jsn: Dict[lale.operators.Operator, JSON_TYPE] = {} for child in op.steps_list(): child_uid, child_jsn = _op_to_json_rec( child, cls2label, gensym, add_custom_default ) child2uid[child] = child_uid child2jsn[child] = child_jsn jsn["edges"] = [[child2uid[x], child2uid[y]] for x, y in op.edges()] jsn["steps"] = {child2uid[z]: child2jsn[z] for z in op.steps_list()} elif isinstance(op, lale.operators.OperatorChoice): jsn["operator"] = "OperatorChoice" uid = gensym("choice") jsn["state"] = "planned" jsn["steps"] = {} for step in op.steps_list(): child_uid, child_jsn = _op_to_json_rec( step, cls2label, gensym, add_custom_default ) jsn["steps"][child_uid] = child_jsn else: raise ValueError(f"Unexpected argument of type: {type(op)}") return uid, jsn
[docs]def to_json( op: "lale.operators.Operator", call_depth: int = 1, add_custom_default: bool = False, ) -> JSON_TYPE: from lale.settings import disable_hyperparams_schema_validation cls2label = _get_cls2label(call_depth + 1) gensym = _init_gensym(op, cls2label) _uid, jsn = _op_to_json_rec(op, cls2label, gensym, add_custom_default) if not disable_hyperparams_schema_validation: jsonschema.validate(jsn, SCHEMA, jsonschema.Draft4Validator) return jsn
def _hps_from_json_rec(jsn: Any, steps: JSON_TYPE) -> Any: if isinstance(jsn, dict): if "$ref" in jsn: step_uid = jsn["$ref"].split("/")[-1] step_jsn = steps[step_uid] return _op_from_json_rec(step_jsn) else: return {k: _hps_from_json_rec(v, steps) for k, v in jsn.items()} elif isinstance(jsn, tuple): return tuple(_hps_from_json_rec(v, steps) for v in jsn) elif isinstance(jsn, list): return [_hps_from_json_rec(v, steps) for v in jsn] else: return jsn def _op_from_json_rec(jsn: JSON_TYPE) -> "lale.operators.Operator": kind = json_op_kind(jsn) if kind == "Pipeline": steps_dict = {uid: _op_from_json_rec(jsn["steps"][uid]) for uid in jsn["steps"]} steps = [steps_dict[i] for i in steps_dict] edges = [(steps_dict[x], steps_dict[y]) for (x, y) in jsn["edges"]] return lale.operators.make_pipeline_graph(steps, edges) elif kind == "OperatorChoice": steps = [_op_from_json_rec(s) for s in jsn["steps"].values()] name = jsn["operator"] return lale.operators.OperatorChoice(steps, name) else: assert kind == "IndividualOp" full_class_name = jsn["class"] last_period = full_class_name.rfind(".") module = importlib.import_module(full_class_name[:last_period]) impl = getattr(module, full_class_name[last_period + 1 :]) schemas = lale.operators.get_lib_schemas(impl) name = jsn["operator"] result = lale.operators.make_operator(impl, schemas, name) if jsn.get("customize_schema", {}) != {}: new_hps = _top_schemas_to_hp_props(jsn["customize_schema"]) result = result.customize_schema(**new_hps) if jsn["state"] in ["trainable", "trained"]: if _get_state(result) == "planned": hps = jsn["hyperparams"] if hps is None: result = result() else: hps = _hps_from_json_rec(hps, jsn.get("steps", {})) result = result(**hps) trnbl = cast(lale.operators.TrainableIndividualOp, result) if jsn["is_frozen_trainable"] and not trnbl.is_frozen_trainable(): trnbl = trnbl.freeze_trainable() assert jsn["is_frozen_trainable"] == trnbl.is_frozen_trainable() result = trnbl if jsn["state"] == "trained": if jsn["coefs"] == "coefs_not_available": logger.warning( f"Since the JSON representation of trained operator {name} lacks coefficients, from_json returns a trainable operator instead." ) else: assert jsn["coefs"] is None, jsn["coefs"] assert ( _get_state(result) == jsn["state"] or jsn["state"] == "trained" and jsn["coefs"] == "coefs_not_available" ) if "documentation_url" in jsn: assert result.documentation_url() == jsn["documentation_url"] return result assert False, f"unexpected JSON {jsn}"
[docs]def from_json(jsn: JSON_TYPE) -> "lale.operators.Operator": jsonschema.validate(jsn, SCHEMA, jsonschema.Draft4Validator) result = _op_from_json_rec(jsn) return result