# Copyright 2019 IBM Corporation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import random
from enum import Enum
from typing import (
Any,
Dict,
Generic,
Iterable,
Iterator,
List,
Optional,
Sequence,
Tuple,
TypeVar,
Union,
cast,
overload,
)
import jsonschema
import numpy as np
Freqs = Dict[str, int]
PGO = Dict[str, Dict[str, Freqs]]
[docs]class DefaultValue(Enum):
token = 0
_default_value = DefaultValue.token
Def = TypeVar("Def")
Defaultable = Union[DefaultValue, Def]
XDK = TypeVar("XDK")
XDV = TypeVar("XDV")
[docs]def remove_defaults_dict(d: Dict[XDK, Union[DefaultValue, XDV]]) -> Dict[XDK, XDV]:
ret: Dict[XDK, XDV] = {}
for k, v in d.items():
if v is not _default_value:
assert not isinstance(v, DefaultValue)
# not sure why pyright can't figure this out
ret[k] = v # type: ignore
return ret
# utilites to load a pgo from json-ish
[docs]def load_pgo_file(filepath) -> PGO:
with open(filepath) as json_file: # pylint:disable=unspecified-encoding
json_data = json.load(json_file)
return load_pgo_data(json_data)
[docs]def load_pgo_data(json_data) -> PGO:
jsonschema.validate(json_data, _input_schema, jsonschema.Draft4Validator)
norm = normalize_pgo_type(json_data)
return norm
# TODO: Add support for falling back on an underlying distribution
# with some probability
T = TypeVar("T")
[docs]class FrequencyDistribution(Generic[T]):
"""Represents the distribution implied by a histogram"""
freq_dist: np.ndarray # Array[T,int]
vals: np.ndarray # Array[T]
cumulative_freqs: np.ndarray # Array[int]
[docs] @classmethod
def asIntegerValues(
cls,
freqs: Iterable[Tuple[Any, int]],
inclusive_min: Optional[float] = None,
inclusive_max: Optional[float] = None,
) -> "FrequencyDistribution[int]":
freqs = freqsAsIntegerValues(
freqs, inclusive_min=inclusive_min, inclusive_max=inclusive_max
)
return FrequencyDistribution[int](list(freqs), dtype=int)
[docs] @classmethod
def asFloatValues(
cls,
freqs: Iterable[Tuple[Any, int]],
inclusive_min: Optional[float] = None,
inclusive_max: Optional[float] = None,
) -> "FrequencyDistribution[float]":
freqs = freqsAsFloatValues(
freqs, inclusive_min=inclusive_min, inclusive_max=inclusive_max
)
return FrequencyDistribution[float](list(freqs), dtype=float)
[docs] @classmethod
def asEnumValues(
cls, freqs: Iterable[Tuple[Any, int]], values: List[Any]
) -> "FrequencyDistribution[Any]":
freqs = freqsAsEnumValues(freqs, values=values)
return FrequencyDistribution[Any](list(freqs), dtype=object)
def __init__(self, freqs: Iterable[Tuple[Defaultable[T], int]], dtype=object):
# we need them to be sorted for locality
sorted_freq_list = sorted(
freqs,
key=(
lambda k: (
k[0] is _default_value,
None if k[0] is _default_value else k[0],
)
),
)
freqs_array = np.array(
sorted_freq_list, dtype=[("value", object), ("frequency", int)]
)
# freqs_array.sort(order='value')
self.freq_dist = freqs_array
self.vals = freqs_array["value"]
self.cumulative_freqs = np.cumsum(freqs_array["frequency"])
def __len__(self) -> int:
return cast(int, np.int_(self.cumulative_freqs[-1]))
@overload
def __getitem__(self, key: int) -> T: ...
@overload
def __getitem__(self, key: Sequence[int]) -> Sequence[T]: ...
@overload
def __getitem__(self, key: slice) -> Sequence[T]: ...
def __getitem__(
self, key: Union[int, Sequence[int], slice]
) -> Union[T, Sequence[T]]:
indices: Sequence[int]
single = False
if isinstance(key, (int, float)):
single = True
indices = [key]
elif isinstance(key, slice):
# TODO: this could be made more efficient
indices = range(key.start or 0, key.stop or len(self), key.step or 1)
else:
indices = key
val_indices = np.searchsorted(self.cumulative_freqs, indices, side="right")
values = self.vals[val_indices].tolist()
if single:
assert len(values) == 1
return values[0]
else:
return values
[docs] def sample(self) -> T:
ll = len(self)
# This choice does not need to be cryptographically secure or hard to predict
i = random.randrange(ll) # nosec
return self[i]
[docs] def samples(self, count: int) -> Sequence[T]:
ll = len(self)
# This choice does not need to be cryptographically secure or hard to predict
i: Sequence[int] = [random.randrange(ll) for _ in range(count)] # nosec
return self[i]
# utiltities to convert and sample from a PGO frequency distribution
DEFAULT_STR = "default"
[docs]def freqsAsIntegerValues(
freqs: Iterable[Tuple[Any, int]],
inclusive_min: Optional[float] = None,
inclusive_max: Optional[float] = None,
) -> Iterator[Tuple[Defaultable[int], int]]:
"""maps the str values to integers, and skips anything that does not look like an integer"""
for v, f in freqs:
try:
if v == DEFAULT_STR:
yield _default_value, f
continue
i = int(v)
if inclusive_min is not None and inclusive_min > i:
continue
if inclusive_max is not None and inclusive_max < i:
continue
yield i, f
except ValueError:
pass
[docs]def freqsAsFloatValues(
freqs: Iterable[Tuple[Any, int]],
inclusive_min: Optional[float] = None,
inclusive_max: Optional[float] = None,
) -> Iterator[Tuple[Defaultable[float], int]]:
"""maps the str values to integers, and skips anything that does not look like an integer"""
for v, f in freqs:
try:
if v == DEFAULT_STR:
yield _default_value, f
continue
i = float(v)
if inclusive_min is not None and inclusive_min > i:
continue
if inclusive_max is not None and inclusive_max < i:
continue
yield i, f
except ValueError:
pass
# TODO: we can get a dictionary from freqs (before items() was called)
# and then lookup values in it (since values is likely smaller then freqs)
# or, of course, check which one is smaller and iterate through it
[docs]def freqsAsEnumValues(
freqs: Iterable[Tuple[Any, int]], values: List[Any]
) -> Iterator[Tuple[Defaultable[Any], int]]:
"""only keeps things that match the string representation of values in the enumeration.
converts from the string to the value as represented in the enumeration.
"""
def as_str(v) -> str:
"""There are some quirks in how the PGO files
encodes values relative to python's str method
"""
if v is None:
return "none"
elif v is True:
return "true"
elif v is False:
return "false"
else:
return str(v)
value_lookup = {as_str(k): k for k in values}
for v, f in freqs:
if v == DEFAULT_STR:
yield _default_value, f
continue
if v in value_lookup:
yield value_lookup[v], f
_input_type = Dict[str, Dict[str, Union[int, Dict[str, Union[str, int]]]]]
# For now, we skip things of the form
# alg -> {default: number}
# (i.e. without parameters)
[docs]def normalize_pgo_type(data: _input_type) -> PGO:
return {
alg: {
param_keys: {
param_values: int(param_counts)
for param_values, param_counts in v2.items()
}
for param_keys, v2 in v1.items()
if isinstance(v2, dict)
}
for alg, v1 in data.items()
}
_input_schema: Any = {
"$schema": "http://json-schema.org/draft-04/schema#",
"description": "Input format for pgo files. Keys are the name of the algorithm",
"type": "object",
"additionalProperties": {
"anyOf": [
{
"description": "Keys are the parameter names",
"type": "object",
"additionalProperties": {
"description": "Keys are value names",
"type": "object",
"additionalProperties": {
"anyOf": [
{
"description": "the number of times this value was found",
"type": "integer",
},
{
"description": "the number of times this value was found",
"type": "string",
},
]
},
},
},
{
"description": "default value for the optimizer",
"type": "object",
"additionalProperties": False,
"required": ["default"],
"properties": {
"default": {
"anyOf": [
{
"description": "the number of times the default was found",
"type": "integer",
},
{
"description": "the number of times the default was found",
"type": "string",
},
]
}
},
},
]
},
}