Source code for mindfoundry.optaas.client.task

import sys
from typing import Any, List, Literal, Dict, Callable, Optional, Tuple, TypeVar, Union, TYPE_CHECKING

from loguru import logger
from mindfoundry.optaas.client.configuration import Configuration
from mindfoundry.optaas.client.user_defined_configuration import UserDefinedConfiguration
from mindfoundry.optaas.client.goal import Goal
from mindfoundry.optaas.client.prediction import Prediction
from mindfoundry.optaas.client.result import StoredResult, Result, ScoreValue, ScoreDict, ScoreValueOrDict, VarianceValueOrDict, \
    ScoringFunctionOutput
from mindfoundry.optaas.client.session import OPTaaSSession, OPTaaSResponse
from mindfoundry.optaas.client.utils import _pprint

if TYPE_CHECKING:  # pragma: no cover
    from pandas import DataFrame  # pylint: disable=unused-import

_MULTI_OBJECTIVE_UNSUPPORTED_ERROR = "best_first is not supported for multi-objective Tasks.\n" \
                                     "Did you mean get_pareto_set?"

_SINGLE_OBJECTIVE_UNSUPPORTED_ERROR = "get_pareto_set is not supported for single-objective Tasks.\n" \
                                      "Did you mean get_best_result_and_configuration?"

_NO_SCORE_THRESHOLD_MSG = "no score threshold set"

_UNTIL_SCORE_THRESHOLD_MSG = (
    lambda score_threshold: f"or until score is {score_threshold} or better"
)

_UNTIL_SCORES_THRESHOLD_MSG = (
    lambda score_threshold: f"or until scores are {score_threshold} or better"
)

T = TypeVar('T')

MaybeList = Union[T, List[T]]
LoggingLevel = Literal["DEBUG", "INFO", "WARNING", "ERROR"]

[docs]class Task: """Allows you to access Task attributes and perform all Task-related functions. Attributes: json (Dict): The full JSON representation of this Task in OPTaaS. id (str): Unique id for the Task. title (str): Name/description as provided when the Task was created. parameters (List[Dict]): JSON representation of the :class:`Parameters <.Parameter>` defined for this Task. constraints (List[str]): List of OPTaaS string representations of the :class:`Constraints <.Constraint>` defined for this Task. prior_means (List[str]): List of OPTaaS string representations of the :class:`Prior Means <.PriorMeanExpression>` defined for this Task. status (str): Current status of the Task, e.g. 'running' or 'done' number_of_iterations (int): Number of :class:`Results <.Result>` provided for this Task so far. user_defined_data (Any): Any other data as provided when the Task was created. """ def __init__(self, json: dict, session: OPTaaSSession) -> None: self.json = json self.id = json['id'] # pylint: disable=invalid-name self.parameters = json['parameters'] self.constraints = json['constraints'] self.prior_means = json['priorMeans'] self._update_attributes(json) self._session = session self._task_url = json['_links']['self']['href'] self._configurations_url = json['_links']['configurations']['href'] self._results_url = json['_links']['results']['href'] self._complete_url = json['_links']['complete']['href'] self._resume_url = json['_links']['resume']['href'] self._predictions_url = json['_links']['predictions']['href'] self._pareto_url = json['_links']['pareto']['href'] self._targets_estimates_url = json['_links']['targets_estimates']['href'] self._sqrt_crlb_url = json['_links']['sqrt_crlb']['href'] self._handler_id: Optional[int] = None def _update_attributes(self, json: dict): self.title = json['title'] self.status = json['status'] self.number_of_iterations = json['numberOfIterations'] self.user_defined_data = json.get('userDefined') def _setup_logging_level(self, logging_level: LoggingLevel) -> None: # remove default handler try: logger.remove(0) except ValueError: pass if self._handler_id is not None: logger.remove(self._handler_id) self._handler_id = logger.add( sys.stderr, filter=__name__.split(".", 1)[0], format="<green>{time:YYYY-MM-DD HH:mm:ss.SSS}</green> | <level>{level: <8}</level> | <cyan> Task.{function}</cyan> | <level>{message}</level>", level=logging_level, colorize=True, )
[docs] def run(self, scoring_function: Callable[..., ScoringFunctionOutput], max_iterations: int, score_threshold: ScoreValueOrDict = None, logging_level: LoggingLevel = "INFO") -> Union[StoredResult, List[StoredResult]]: """Run this task, using the provided scoring function to calculate the score for each configuration. Args: scoring_function (Callable[..., ScoringFunctionOutput]): Function that takes configuration values as arguments, e.g. if you have parameters x and y, your function would be `def get_score(x, y)`. The function can return just a score, or a tuple of (score, variance). For multi-objective tasks the score and variance must be dictionaries, with Objective.id values as keys, e.g. `{"id1": 1.23, "id2": 4.56}`. max_iterations (int): Max number of iterations to run, i.e. number of results to record before stopping. score_threshold (ScoreValueOrDict, optional, defaults to min/max known score if defined): Stop running the task when the score is equal to or better than this value. For multi-objective tasks, use a dictionary with Objective.id values as keys, e.g. `{"id1": 1.23, "id2": 4.56}`. logging_level (Literal["DEBUG", "INFO", "WARNING", "ERROR"], optional, defaults to "INFO"): Set the logging level. Returns: The best recorded :class:`.Result` with the :class:`.Configuration` that was used to achieve it. For multi-objective tasks, the set of Pareto front Results will be returned instead. Raises: :class:`.OPTaaSError` if the server is unavailable. """ self._setup_logging_level(logging_level) starting_msg = f'Running task "{self.title}" for {max_iterations} iterations' reached_threshold, msg = self._reached_threshold_if_defined(score_threshold) logger.info(f"{starting_msg} ({msg})\n") configuration = self.generate_configurations(1)[0] for i in range(max_iterations): scoring_result = scoring_function(**configuration.values) if isinstance(scoring_result, tuple): score, variance = scoring_result score_msg = ( f"Iteration: {i} | Score: {score} | Variance: {variance}" ) else: score = scoring_result variance = None # type: ignore score_msg = f"Iteration: {i} | Score: {score}" logger.info(f"{score_msg} | Configuration: {configuration.values}") configuration = self.record_result(configuration=configuration, score=score, variance=variance) # type: ignore if reached_threshold(score): # type: ignore break return_value = self.get_pareto_set() if self.json.get('objectives') else self.get_best_result_and_configuration() self.complete() self.refresh() logger.info("Task Completed") return return_value # type: ignore
def _reached_threshold_if_defined(self, score_threshold: ScoreValueOrDict = None) -> Tuple[Union[Callable[[ScoreValue], bool], Callable[[ScoreDict], bool]], str]: objectives = self.json.get('objectives') if objectives: if score_threshold is None or isinstance(score_threshold, Dict): return self._reached_multi_objective_threshold_if_defined(objectives, score_threshold or {}) raise ValueError("Score threshold must be a dictionary") goal = Goal[self.json['goal']] if score_threshold is None: score_threshold = self.json.get('minKnownScore' if goal == Goal.min else 'maxKnownScore') if score_threshold is None: print('(no score threshold set)') return lambda score: False, _NO_SCORE_THRESHOLD_MSG print(f'(or until score is {score_threshold} or better)') if goal == Goal.min: return lambda score: score <= score_threshold, _UNTIL_SCORE_THRESHOLD_MSG( score_threshold ) return lambda score: score >= score_threshold, _UNTIL_SCORE_THRESHOLD_MSG( score_threshold ) @staticmethod def _reached_multi_objective_threshold_if_defined(objectives: List[Dict], score_threshold: ScoreDict) -> Tuple[Callable[[ScoreDict], bool], str]: goals = {objective['id']: Goal[objective['goal']] for objective in objectives} for objective in objectives: id_ = objective['id'] if id_ not in score_threshold: best_known_score = objective.get('minKnownScore' if goals[id_] == Goal.min else 'maxKnownScore') if best_known_score is not None: score_threshold[id_] = best_known_score if score_threshold == {}: return lambda scores: False, _NO_SCORE_THRESHOLD_MSG print(f'(or until scores are {score_threshold} or better)') def reached_all_thresholds(scores: ScoreDict) -> bool: for id_, threshold in score_threshold.items(): score = scores.get(id_) if score is None: return False if goals[id_] == Goal.min: if score > threshold: return False elif score < threshold: return False return True return reached_all_thresholds, _UNTIL_SCORES_THRESHOLD_MSG(score_threshold)
[docs] def refresh(self): """Make a GET request to OPTaaS to retrieve the latest Task data and update this object accordingly.""" response = self._session.get(self._task_url) self._update_attributes(response.body)
[docs] def delete(self): """Delete this Task (cannot be undone).""" self._session.delete(self._task_url)
[docs] def generate_configurations(self, quantity: int = 1) -> List[Configuration]: """Make a POST request to OPTaaS to generate a set of new :class:`Configurations <.Configuration>` for this Task. Args: quantity (int, optional, default 1): The number of configurations to generate (minimum 1). Returns: A list of the newly created :class:`Configurations <.Configuration>`. Raises: :class:`.OPTaaSError` if the server is unavailable or the quantity is invalid. """ response = self._session.post(self._configurations_url + f'?quantity={quantity}', {}) return [Configuration(json) for json in response.body['configurations']]
[docs] def add_user_defined_configuration( self, values: Optional[Dict] = None, score: Optional[ScoreValueOrDict] = None, variance: Optional[VarianceValueOrDict] = None, user_defined_data: Any = None, configurations: Optional[List[UserDefinedConfiguration]] = None ) -> MaybeList[Configuration]: """Make a POST request to OPTaaS to store a list of user-provided :class:`.Configuration` using the values provided. Also optionally store a list of :class:`.Result` for this Configuration using the provided `score`. This is useful for giving OPTaaS a "warm start" by providing some examples of good/bad Configurations. Optionally, it is possible to pass the input as single unwrapped values. Args: values (Dict, optional): Values assigned to each :class:`.Parameter`. See :attr:`.Configuration.values`. score (ScoreValueOrDict, optional): Score obtained when using this Configuration. For multi-objective tasks, a dictionary of scores for each objective, using Objective.id values as keys, e.g. `{"id1": 1.23, "id2": 4.56}`. variance (VarianceValueOrDict >=0, optional, defaults to 0): Variance associated with the score. For multi-objective tasks, a dictionary of variances for each objective, using Objective.id values as keys, e.g. `{"id1": 1.23, "id2": 4.56}`. user_defined_data (Any, optional, ignored if `score` is not provided): Any other data to store in the Result. configurations (List[UserDefinedConfiguration], optional): list of UserDefinedConfiguration. Either values or configurations should be provided. Returns: The newly created list of :class:`.Configuration`. In case a single unwrapped value was provided, it returns the unwrapped configuration. Raises: :class:`.OPTaaSError` if the values are otherwise invalid or the server is unavailable. """ if (values is None) == (configurations is None): raise ValueError("Either one of `values` or `configurations` should be provided, but not both.") is_unwrapped = configurations is None if is_unwrapped: configurations = [UserDefinedConfiguration(values, score, variance, user_defined_data)] # type: ignore body = {"configurations": [c.get_body() for c in configurations]} # type: ignore response = self._session.post(self._configurations_url, body) returned_configurations = [Configuration(c) for c in response.body['configurations']] if is_unwrapped: return returned_configurations[0] return returned_configurations
[docs] def record_result(self, configuration: Configuration, score: ScoreValueOrDict = None, error: str = None, variance: VarianceValueOrDict = None, user_defined_data: Any = None, return_next_config: bool = True) -> Optional[Configuration]: """Make a POST request to OPTaaS to record a :class:`.Result` for the given :class:`.Configuration`. Must specify **either** `score` **or** `error`. After the Result is recorded, OPTaaS will automatically generate the next Configuration for you to try. Args: configuration (Configuration): The Configuration used to obtain this Result. score (ScoreValueOrDict): The score obtained. For multi-objective tasks, a dictionary of scores for each objective, using Objective.id values as keys, e.g. `{"id1": 1.23, "id2": 4.56}`. error (Any): Any data related to an error encountered while calculating the score. variance (VarianceValueOrDict >=0, optional, defaults to 0): Variance associated with the score. For multi-objective tasks, a dictionary of variances for each objective, using Objective.id values as keys, e.g. `{"id1": 1.23, "id2": 4.56}`. user_defined_data (Any, optional): Any other data you wish to store as part of this Result. return_next_config (bool, optional, default True): Whether to return the next Configuration to try. Returns: If `return_next_config`==True, the next :class:`.Configuration` generated by OPTaaS, otherwise None. Raises: :class:`.OPTaaSError` if the data provided is invalid or the server is unavailable. :class:`.ValueError` if both score and error are provided, or neither. """ result = Result(configuration=configuration, score=score, error=error, variance=variance, user_defined_data=user_defined_data) body = result.to_json_without_configuration() body["return_next_config"] = return_next_config response = self._session.post(configuration.results_url, body) if return_next_config: return Configuration(response.body['nextConfiguration']) return None
[docs] def record_results(self, results: List[Result], return_next_config: bool = True) -> List[Configuration]: """Make a POST request to OPTaaS to store a batch of :class:`Results <.Result>` and get the next batch of :class:`Configurations <.Configuration>`. Args: results (List[Result]): List of Results to record. Must be non-empty. return_next_config (bool, optional, default True): Whether to return the next Configurations to try. Returns: If `return_next_config`==True, a list of the same length as `results`, containing the next :class:`Configurations <.Configuration>` for you to try, otherwise an empty list. Raises: :class:`.OPTaaSError` if the data provided is invalid or the server is unavailable. :class:`.ValueError` if the result list is empty. """ if not results: raise ValueError("Result list must be non-empty.") body = {'results': [result.to_json() for result in results], "return_next_config": return_next_config} response = self._session.post(self._results_url, body) return [Configuration(json) for json in response.body['nextConfigurations']]
[docs] def get_configurations(self, limit: int = None) -> List[Configuration]: """Make a GET request to OPTaaS to retrieve a list of :class:`Configurations <.Configuration>` for this Task. Args: limit (int, optional, minimum 1): Upper bound on the number of Configurations that will be returned. Returns: The list of :class:`Configurations <.Configuration>`. Raises: :class:`.OPTaaSError` if the limit is invalid or the server is unavailable. """ url = self._configurations_url if limit is not None: url += f'?limit={limit}' response = self._session.get(url) return [Configuration(json) for json in response.body['configurations']]
[docs] def get_configuration(self, configuration_id: str) -> Configuration: """Make a GET request to OPTaaS to retrieve a specific :class:`.Configuration` by id. Args: configuration_id (str): Unique id for the Configuration. Returns: The retrieved :class:`.Configuration`. Raises: :class:`.OPTaaSError` if the configuration_id is invalid or the server is unavailable. """ url = f'{self._configurations_url}/{configuration_id}' response = self._session.get(url) return Configuration(response.body)
[docs] def get_results(self, limit: int = None, best_first: bool = False, as_dataframe: bool = False, include_configurations=None) -> Union[List[StoredResult], 'DataFrame']: """Make a GET request to OPTaaS to retrieve a list of :class:`Results <.StoredResult>` for this Task. Args: limit (int, optional, minimum 1): Upper bound on the number of Results that will be returned. best_first (bool, optional, default False): If True, Results will appear in score order, with the best score first (not currently supported for multi-objective tasks). If False, Results will appear in the order they were created. as_dataframe (bool, optional, default False): Return the data as a Pandas DataFrame. It will include a column for each parameter, plus the score, variance and error from each Result. include_configurations: Deprecated. Returns: The list of :class:`Results <.StoredResult>` or a DataFrame. Raises: :class:`.OPTaaSError` if the limit is invalid or the server is unavailable. """ if include_configurations is not None: raise ValueError("include_configurations has been deprecated. Results will now always include the Configuration.") if best_first and self.json.get('objectives'): raise ValueError(_MULTI_OBJECTIVE_UNSUPPORTED_ERROR) url = self._results_url query_params: Dict[str, Any] = {} if limit is not None: query_params['limit'] = limit if best_first: query_params['order'] = 'bestFirst' response = self._session.get(url, query_params=query_params) results = self._make_stored_results(response) if as_dataframe: from pandas.io.json import json_normalize # pylint: disable=import-outside-toplevel return json_normalize([result.as_pandas_row() for result in results]) return results
def _make_stored_results(self, response: OPTaaSResponse) -> List[StoredResult]: # pylint: disable=no-self-use return [StoredResult(json) for json in response.body['results']]
[docs] def get_result(self, result_id: int) -> StoredResult: """Make a GET request to OPTaaS to retrieve a specific :class:`.StoredResult` by id. Args: result_id (str): Unique id for the Result. Returns: The retrieved :class:`.StoredResult`. Raises: :class:`.OPTaaSError` if the result_id is invalid or the server is unavailable. """ url = f'{self._results_url}/{result_id}' response = self._session.get(url) return StoredResult(response.body)
[docs] def get_best_result_and_configuration(self) -> StoredResult: """Make a GET request to OPTaaS to retrieve the Result with the best score, including the Configuration used to obtain it. Not currently supported for multi-objective Tasks. Returns: The best :class:`.StoredResult` with included :class:`.Configuration`. Raises: :class:`.OPTaaSError` if the server is unavailable. :class:`.ValueError` if no results have been posted for this task. """ results = self.get_results(best_first=True, limit=1) if results: return results[0] raise ValueError('No results available for this task yet')
[docs] def get_targets_estimates(self): """Make a GET request to OPTaaS to retrieve the current estimates for the targets of this task. Not supported for tasks without targets. Returs: The current estimates for the targets of this task as dictionary from target id to estimate.""" if not self.json.get('targets'): raise ValueError("Targets estimates is provided only for tasks with targets.") response = self._session.get(self._targets_estimates_url) return response.body["results"]
[docs] def get_sqrt_crlb(self): """Make a GET request to OPTaaS to retrieve the square roots of the Cramer-Rao lower bounds of the estimates. https://en.wikipedia.org/wiki/Cram%C3%A9r%E2%80%93Rao_bound The value can be used as a proxy of the quality of the estimate. Not supported for tasks without targets. Returs: The square root of the CRLB for the targets of this task as dictionary from target id to sqrt_crlb.""" if not self.json.get('targets'): raise ValueError("The square roots of the Cramer-Rao lower bounds are provided only for tasks with targets.") response = self._session.get(self._sqrt_crlb_url) return response.body["results"]
[docs] def get_pareto_set(self) -> List[StoredResult]: """Make a GET request to OPTaaS to retrieve the set of Pareto front Results for a multi-objective Task. These are the Results where, for each objective, the score cannot be improved without reducing the score for another objective. Not supported for single-objective Tasks. Note: if a Result doesn't contain a score value for all Objectives, it will be excluded from the Pareto set. Returns: The list of Pareto front Results. Raises: :class:`.OPTaaSError` if the server is unavailable or the task is not multi-objective. """ if not self.json.get('objectives'): raise ValueError(_SINGLE_OBJECTIVE_UNSUPPORTED_ERROR) response = self._session.get(self._pareto_url) return self._make_stored_results(response)
[docs] def get_surrogate_predictions(self, configurations: List[Dict]) -> List[Prediction]: """Make a POST request to OPTaaS to retrieve the surrogate prediction for some configurations Args: configurations (List[Dict]): key value pairs corresponding to parameter names and their values. See :attr:`.Configuration.values`. Returns: :List[Prediction]: a list of objects, each containing the mean and variance of the surrogate at each corresponding configuration point. For multi-objective Tasks, the mean and variance are dictionaries containing mean (and variance respectively) for each objective. Raises: :class:`.OPTaaSError` if the configurations are invalid or the server is unavailable """ body = {'configurations': [{'values': configuration} for configuration in configurations]} response = self._session.post(self._predictions_url, body) predictions = response.body['predictions'] return [Prediction(json=prediction) for prediction in predictions]
[docs] def complete(self): """Make a PUT request to OPTaaS to complete the task (no further configurations or results can be created)""" self._session.put(self._complete_url, {})
[docs] def resume(self): """Make a PUT request to OPTaaS to resume a completed task""" self._session.put(self._resume_url, {})
def __eq__(self, other): return self.__class__ == other.__class__ and self.__dict__ == other.__dict__ def __repr__(self): return _pprint(self, 'id', 'title', 'user_defined_data', 'parameters', 'constraints')