# Copyright 2019 IBM Corporation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import math
import re
from typing import Dict, Iterable, List, Optional
from hyperopt import hp
from hyperopt.pyll import scope
from lale import helpers
from lale.helpers import make_indexed_name
from lale.operators import Operator
from lale.search.PGO import FrequencyDistribution
from lale.search.search_space import (
SearchSpace,
SearchSpaceArray,
SearchSpaceDict,
SearchSpaceEmpty,
SearchSpaceEnum,
SearchSpaceError,
SearchSpaceNumber,
SearchSpaceObject,
SearchSpaceOperator,
SearchSpaceProduct,
SearchSpaceSum,
)
from lale.util.Visitor import Visitor, accept
[docs]def search_space_to_hp_expr(space: SearchSpace, name: str):
return SearchSpaceHPExprVisitor.run(space, name)
[docs]def search_space_to_hp_str(space: SearchSpace, name: str) -> str:
return SearchSpaceHPStrVisitor.run(space, name)
[docs]def search_space_to_str_for_comparison(space: SearchSpace, name: str) -> str:
return SearchSpaceHPStrVisitor.run(space, name, counter=None, useCounter=False)
def _mk_label(label, counter, useCounter=True):
if counter is None or not useCounter:
return label
else:
return f"{label}{counter}"
[docs]@scope.define
def pgo_sample(pgo, sample):
return pgo[sample]
[docs]@scope.define
def make_nested_hyperopt(space):
return helpers.make_nested_hyperopt_space(space)
[docs]class SearchSpaceHPExprVisitor(Visitor):
names: Dict[str, int]
[docs] @classmethod
def run(cls, space: SearchSpace, name: str):
visitor = cls(name)
return accept(space, visitor, name)
def __init__(self, name: str):
super().__init__()
self.names = {}
[docs] def get_unique_name(self, name: str) -> str:
if name in self.names:
counter = self.names[name] + 1
self.names[name] = counter
return f"{name}${str(counter)}"
else:
self.names[name] = 0
return name
[docs] def mk_label(self, label, counter, useCounter=True):
return self.get_unique_name(_mk_label(label, counter, useCounter=useCounter))
[docs] def visitSearchSpaceEnum(self, space: SearchSpaceEnum, path: str, counter=None):
def as_hp_vals(v):
# Lists are not "safe" to pass to hyperopt without wrapping
if isinstance(v, (list, tuple, Operator)):
return helpers.val_wrapper(v)
else:
return v
if len(space.vals) == 1:
return as_hp_vals(space.vals[0])
else:
return hp.choice(
self.mk_label(path, counter), [as_hp_vals(v) for v in space.vals]
)
visitSearchSpaceConstant = visitSearchSpaceEnum
visitSearchSpaceBool = visitSearchSpaceEnum
[docs] def visitSearchSpaceNumber(self, space: SearchSpaceNumber, path: str, counter=None):
label = self.mk_label(path, counter)
if space.pgo is not None:
return scope.pgo_sample(
space.pgo, hp.quniform(label, 0, len(space.pgo) - 1, 1)
)
dist = "uniform"
if space.distribution:
dist = space.distribution
if space.maximum is None:
raise SearchSpaceError(
path, f"maximum not specified for a number with distribution {dist}"
)
space_max = space.getInclusiveMax()
# if the maximum is not None, the inclusive maximum should not be none
assert space_max is not None
# These distributions need only a maximum
if dist == "integer":
if not space.discrete:
raise SearchSpaceError(
path,
"integer distribution specified for a non discrete numeric type",
)
return hp.randint(label, space_max)
if space.minimum is None:
raise SearchSpaceError(
path, f"minimum not specified for a number with distribution {dist}"
)
space_min = space.getInclusiveMin()
# if the minimum is not None, the inclusive minimum should not be none
assert space_min is not None
if dist == "uniform":
if space.discrete:
return scope.int(hp.quniform(label, space_min, space_max, 1))
else:
return hp.uniform(label, space_min, space_max)
elif dist == "loguniform":
# for log distributions, hyperopt requires that we provide the log of the min/max
if space_min <= 0:
raise SearchSpaceError(
path,
f"minimum of 0 specified with a {dist} distribution. This is not allowed; please set it (possibly using minimumForOptimizer) to be positive",
)
if space_min > 0:
space_min = math.log(space_min)
if space_max > 0:
space_max = math.log(space_max)
if space.discrete:
return scope.int(hp.qloguniform(label, space_min, space_max, 1))
else:
return hp.loguniform(label, space_min, space_max)
else:
raise SearchSpaceError(path, f"Unknown distribution type: {dist}")
[docs] def array_single_expr_(self, space: SearchSpaceArray, path: str, num):
p = _mk_label(path, num) + "_"
items: Iterable[SearchSpace] = space.items()
ret = [accept(sub, self, p, counter=x) for x, sub in enumerate(items)]
return tuple(ret) if space.is_tuple else ret
[docs] def visitSearchSpaceArray(self, space: SearchSpaceArray, path: str, counter=None):
assert space.maximum >= space.minimum
p = _mk_label(path, counter)
cp = p + "_"
if space.minimum == space.maximum:
expr = self.array_single_expr_(space, cp, space.minimum)
return expr
else:
exprs = [
self.array_single_expr_(space, cp, x)
for x in range(space.minimum, space.maximum + 1)
]
res = hp.choice(p, exprs)
return res
[docs] def visitSearchSpaceObject(self, space: SearchSpaceObject, path: str, counter=None):
search_space = {}
any_path = self.get_unique_name(_mk_label(path, counter) + "_" + "combos")
search_space["name"] = space.longName
child_counter = None
def asexpr(key, e):
nonlocal child_counter
if e is None:
return None
else:
ee = accept(e, self, path + "_" + key, counter=child_counter)
if child_counter is None:
child_counter = 1
else:
child_counter = child_counter + 1
return ee
def choice_as_tuple_expr(c):
assert len(space.keys) == len(c)
ret = [asexpr(space.keys[ind], e) for ind, e in enumerate(c)]
return ret
choices = [choice_as_tuple_expr(c) for c in space.choices]
valid_hyperparam_combinations = hp.choice(any_path, choices)
i = 0
for k in space.keys:
search_space[k] = valid_hyperparam_combinations[i]
i = i + 1
return search_space
[docs] def visitSearchSpaceDict(self, sd: SearchSpaceDict, path: str, counter=None):
search_spaces = {
name: accept(space, self, path + "_" + name)
for name, space in sd.space_dict.items()
}
return search_spaces
[docs] def visitSearchSpaceProduct(
self, prod: SearchSpaceProduct, path: str, counter=None
):
search_spaces = [
accept(space, self, self.get_unique_name(make_indexed_name(name, index)))
for name, index, space in prod.get_indexed_spaces()
]
return search_spaces
[docs] def visitSearchSpaceSum(self, space_sum: SearchSpaceSum, path: str, counter=None):
if len(space_sum.sub_spaces) == 1:
return accept(space_sum.sub_spaces[0], self, "")
else:
unique_name: str = self.get_unique_name("choice")
search_spaces = hp.choice(
unique_name,
[
{str(i): accept(m, self, "")}
for i, m in enumerate(space_sum.sub_spaces)
],
)
return search_spaces
[docs] def visitSearchSpaceOperator(
self, op: SearchSpaceOperator, path: str, counter=None
):
return scope.make_nested_hyperopt(accept(op.sub_space, self, path))
[docs] def visitSearchSpaceEmpty(self, op: SearchSpaceEmpty, path: str, counter=None):
raise SearchSpaceError(
path, "The hyperopt backend can't compile an empty (sub-) search space"
)
[docs]class SearchSpaceHPStrVisitor(Visitor):
pgo_dict: Dict[str, FrequencyDistribution]
names: Dict[str, int]
pgo_header: Optional[str]
nested_header: Optional[str]
decls: str
[docs] @classmethod
def run(cls, space: SearchSpace, name: str, counter=None, useCounter=True):
visitor = cls(name)
ret: str = ""
body = accept(space, visitor, name, counter=counter, useCounter=useCounter)
if visitor.pgo_header is not None:
ret += visitor.pgo_header
if visitor.nested_header is not None:
ret += visitor.nested_header
if visitor.decls:
ret += visitor.decls + "\n"
ret += "return " + body
return ret
[docs] def get_unique_name(self, name: str) -> str:
if name in self.names:
counter = self.names[name] + 1
self.names[name] = counter
return f"{name}${str(counter)}"
else:
self.names[name] = 0
return name
[docs] def get_unique_variable_name(self, name: str) -> str:
if name in self.names:
counter = self.names[name] + 1
self.names[name] = counter
return f"{name}__{str(counter)}"
else:
self.names[name] = 0
return name
[docs] def mk_label(self, label, counter, useCounter=True):
return self.get_unique_name(_mk_label(label, counter, useCounter=useCounter))
def __init__(self, name: str):
super().__init__()
self.pgo_dict = {}
self.names = {}
self.pgo_header = None
self.nested_header = None
self.decls = ""
[docs] def visitSearchSpaceEnum(
self, space: SearchSpaceEnum, path: str, counter=None, useCounter=True
):
def val_as_str(v):
if v is None:
return "null"
elif isinstance(v, str):
return f"'{v}'"
else:
return str(v)
if len(space.vals) == 1:
return val_as_str(space.vals[0])
else:
vals_str = "[" + ", ".join([val_as_str(v) for v in space.vals]) + "]"
return (
f"hp.choice('{self.mk_label(path, counter, useCounter)}', {vals_str})"
)
visitSearchSpaceConstant = visitSearchSpaceEnum
visitSearchSpaceBool = visitSearchSpaceEnum
[docs] def visitSearchSpaceNumber(
self, space: SearchSpaceNumber, path: str, counter=None, useCounter=True
):
label = self.mk_label(path, counter, useCounter=useCounter)
if space.pgo is not None:
self.pgo_dict[label] = space.pgo
return f"scope.pgo_sample(pgo_{label}, hp.quniform('{label}', {0}, {len(space.pgo)-1}, 1))"
dist = "uniform"
if space.distribution:
dist = space.distribution
if space.maximum is None:
raise SearchSpaceError(
path, f"maximum not specified for a number with distribution {dist}"
)
space_max = space.getInclusiveMax()
# if the maximum is not None, the inclusive maximum should not be none
assert space_max is not None
# These distributions need only a maximum
if dist == "integer":
if not space.discrete:
raise SearchSpaceError(
path,
"integer distribution specified for a non discrete numeric type....",
)
return f"hp.randint('{label}', {space_max})"
if space.minimum is None:
raise SearchSpaceError(
path, f"minimum not specified for a number with distribution {dist}"
)
space_min = space.getInclusiveMin()
# if the minimum is not None, the inclusive minimum should not be none
assert space_min is not None
if dist == "uniform":
if space.discrete:
return f"hp.quniform('{label}', {space_min}, {space_max}, 1)"
else:
return f"hp.uniform('{label}', {space_min}, {space_max})"
elif dist == "loguniform":
# for log distributions, hyperopt requires that we provide the log of the min/max
if space_min <= 0:
raise SearchSpaceError(
path,
f"minimum of 0 specified with a {dist} distribution. This is not allowed; please set it (possibly using minimumForOptimizer) to be positive",
)
if space_min > 0:
space_min = math.log(space_min)
if space_max > 0:
space_max = math.log(space_max)
if space.discrete:
return f"hp.qloguniform('{label}', {space_min}, {space_max}, 1)"
else:
return f"hp.loguniform('{label}', {space_min}, {space_max})"
else:
raise SearchSpaceError(path, f"Unknown distribution type: {dist}")
[docs] def array_single_str_(
self, space: SearchSpaceArray, path: str, num, useCounter=True
) -> str:
p = _mk_label(path, num, useCounter=useCounter) + "_"
ret = "(" if space.is_tuple else "["
items: Iterable[SearchSpace] = space.items()
ret += ",".join(
(
accept(sub, self, p, counter=x, useCounter=useCounter)
for x, sub in enumerate(items)
)
)
ret += ")" if space.is_tuple else "]"
return ret
[docs] def visitSearchSpaceArray(
self, space: SearchSpaceArray, path: str, counter=None, useCounter=True
) -> str:
assert space.maximum >= space.minimum
p = _mk_label(path, counter, useCounter=useCounter)
cp = p + "_"
if space.minimum == space.maximum:
return self.array_single_str_(
space, cp, space.minimum, useCounter=useCounter
)
else:
res = "hp.choice(" + p + ", ["
res += ",".join(
(
self.array_single_str_(space, cp, x, useCounter=useCounter)
for x in range(space.minimum, space.maximum + 1)
)
)
res += "])"
return res
[docs] def visitSearchSpaceObject(
self, space: SearchSpaceObject, path: str, counter=None, useCounter=True
):
s_decls = []
space_name = self.get_unique_variable_name("search_space")
any_name = self.get_unique_variable_name("valid_hyperparam_combinations")
any_path = self.get_unique_name(
_mk_label(path, counter, useCounter=useCounter) + "_" + "combos"
)
s_decls.append(space_name + " = {}")
s_decls.append(f"{space_name}['name'] = {space.longName}")
child_counter = None
def cstr(key, x):
nonlocal child_counter
if x is None:
return "None"
else:
s = accept(
x, self, path + "_" + key, child_counter, useCounter=useCounter
)
if child_counter is None:
child_counter = 1
else:
child_counter = child_counter + 1
return s
def choice_as_tuple_str(c):
assert len(space.keys) == len(c)
ret = [cstr(space.keys[ind], e) for ind, e in enumerate(c)]
return ret
str_choices = (
"["
+ ",".join(
["(" + ",".join(choice_as_tuple_str(c)) + ")" for c in space.choices]
)
+ "]"
)
s_decls.append(f"{any_name} = hp.choice('{any_path}', {str_choices})")
i = 0
for k in space.keys:
s_decls.append(f"{space_name}['{k}'] = {any_name}[{i}]")
i = i + 1
if self.pgo_dict:
if not self.pgo_header:
self.pgo_header = """
@scope.define
def pgo_sample(pgo, sample):
return pgo[sample]
"""
# use this to sort the pgo_labels by the unique ind
# appended to the key.
# This puts them in the order they appear in the hyperopt
# expression, making it easier to read
def last_num(kv):
matches = re.search(r"(\d+)$", kv[0])
if matches is None:
return 0
else:
return int(matches.group(0))
pgo_decls: List[str] = []
for k, v in sorted(self.pgo_dict.items(), key=last_num):
fl = v.freq_dist.tolist()
pgo_decls.append(f"pgo_{k} = {fl}")
if self.decls:
self.decls = self.decls + "\n"
self.decls = self.decls + "\n".join(pgo_decls)
self.decls += "\n".join(s_decls) + "\n"
return space_name
[docs] def visitSearchSpaceDict(
self, sd: SearchSpaceDict, path: str, counter=None, useCounter=True
):
search_spaces = (
name + ":" + accept(space, self, path + "_" + name)
for name, space in sd.space_dict.items()
)
return "{" + ",".join(search_spaces) + "}"
[docs] def visitSearchSpaceProduct(
self, prod: SearchSpaceProduct, path: str, counter=None, useCounter=True
):
search_spaces = (
accept(space, self, self.get_unique_name(make_indexed_name(name, index)))
for name, index, space in prod.get_indexed_spaces()
)
return "[" + ",".join(search_spaces) + "]"
[docs] def visitSearchSpaceSum(
self, sum_space: SearchSpaceSum, path: str, counter=None, useCounter=True
):
unique_name: str = self.get_unique_name("choice")
sub_str: Iterable[str] = (
'"' + str(i) + '"' + " : " + '"' + accept(m, self, "") + '"'
for i, m in enumerate(sum_space.sub_spaces)
)
sub_spaces_str: str = "[" + ",".join(sub_str) + "]"
return f"hp.choice({unique_name}, {sub_spaces_str})"
[docs] def visitSearchSpaceOperator(
self, op: SearchSpaceOperator, path: str, counter=None, useCounter=True
):
if not self.nested_header:
self.nested_header = """
@scope.define
def make_nested_hyperopt(space):
from lale.helpers import make_nested_hyperopt_space
return make_nested_hyperopt_space(space)
"""
return f"scope.make_nested_hyperopt({accept(op.sub_space, self, path, counter=counter, useCounter=useCounter)})"
[docs] def visitSearchSpaceEmpty(
self, op: SearchSpaceEmpty, path: str, counter=None, useCounter=True
) -> str:
return "***EMPTY**"