Source code for mindfoundry.optaas.client.sklearn_pipelines.sklearn_task

import copy
from typing import List, Dict, Callable, Union, TYPE_CHECKING

from sklearn import clone
from sklearn.pipeline import Pipeline

from mindfoundry.optaas.client.configuration import Configuration
from mindfoundry.optaas.client.result import StoredResult, ScoringFunctionOutput, ScoreValueOrDict
from mindfoundry.optaas.client.session import OPTaaSResponse
from mindfoundry.optaas.client.sklearn_pipelines.mixin import EstimatorChoice, OptimizablePipeline
from mindfoundry.optaas.client.sklearn_pipelines.utils import EstimatorTuple, Estimator
from mindfoundry.optaas.client.task import Task, LoggingLevel
from mindfoundry.optaas.client.utils import move_dict_value_up_one_level, get_first_key

if TYPE_CHECKING:  # pragma: no cover
    from pandas import DataFrame  # pylint: disable=unused-import


[docs]class SklearnTask(Task): """A Task that can convert a :class:`.Configuration` into a sklearn :class:`.Pipeline`""" def __init__(self, task: Task, estimators: List[EstimatorTuple]) -> None: self._estimators = estimators super().__init__(task.json, task._session) # pylint: disable=protected-access
[docs] def run(self, scoring_function: Callable[[Pipeline], ScoringFunctionOutput], max_iterations: int, score_threshold: ScoreValueOrDict = None, logging_level: LoggingLevel = "INFO") -> Union[StoredResult, List[StoredResult]]: """Run this task, using the provided scoring function to calculate the score for each configuration. Args: scoring_function (Callable[[Pipeline], ScoringFunctionOutput]): Function that takes a sklearn Pipeline as input and returns a score or a tuple of (score, variance). max_iterations (int): Max number of iterations to run, i.e. number of results to record before stopping. score_threshold (float, optional): Stop running the task when the score is equal to or better than this value. logging_level (Literal["DEBUG", "INFO", "WARNING", "ERROR"], optional, defaults to "INFO"): Set the logging level. Returns: The best recorded :class:`.Result` with the :class:`.Pipeline` that was used to achieve it. For multi-objective tasks, the set of Pareto front Results will be returned instead. Raises: :class:`.OPTaaSError` if the server is unavailable. """ def calculate_score(**configuration_values): pipeline = self._make_pipeline(configuration_values['pipeline']) return scoring_function(pipeline) return super().run(scoring_function=calculate_score, max_iterations=max_iterations, score_threshold=score_threshold, logging_level=logging_level)
def _make_stored_results(self, response: OPTaaSResponse) -> List[StoredResult]: results = super()._make_stored_results(response) for result in results: if isinstance(result.configuration, Configuration): result.pipeline = self.make_pipeline(result.configuration) return results
[docs] def make_pipeline(self, configuration: Configuration) -> Pipeline: """Creates a sklearn :class:`.Pipeline` and sets its parameters based on the provided :class:`.Configuration`""" return self._make_pipeline(configuration.values['pipeline'])
def _make_pipeline(self, pipeline_values: Dict) -> Pipeline: configuration_values = copy.deepcopy(pipeline_values) pipeline_parameters = self.parameters[0]['items'] return self._make_pipeline_from_values(configuration_values, pipeline_parameters, self._estimators) def _make_pipeline_from_values(self, values: Dict, parameters: List[Dict], estimators: List[EstimatorTuple]): new_estimators = [] for name, estimator in estimators: if name in values: estimator_parameter = next(p for p in parameters if p['name'] == name) new_estimator = self._make_estimator(name, estimator, values, estimator_parameter) new_estimators.append((name, new_estimator)) return Pipeline(new_estimators) def _make_estimator(self, name: str, estimator: Estimator, values: Dict, estimator_parameter: Dict): if isinstance(estimator, OptimizablePipeline): return self._make_pipeline_from_values(values[name], estimator_parameter['items'], estimator.estimators) if isinstance(estimator, EstimatorChoice): choice = get_first_key(values[name]) choice_index = int(choice) move_dict_value_up_one_level(values, name) estimator = estimator.estimators[choice_index] estimator_parameter = estimator_parameter['choices'][choice_index] return self._make_estimator(name, estimator, values, estimator_parameter) _flatten_values(values[name], estimator_parameter) cloned_estimator = clone(estimator) cloned_estimator.set_params(**values[name]) return cloned_estimator
def _flatten_values(values: Dict, parameter_group: Dict) -> None: for parameter in parameter_group['items']: name = parameter['name'] if name in values: if parameter['type'] == 'choice': move_dict_value_up_one_level(values, name) elif parameter['type'] == 'group': value: Dict = values.get(name) # type: ignore group_values = list(value.values()) if len(group_values) == 1 and isinstance(group_values[0], Dict): values[name] = group_values[0] else: values[name] = group_values else: values[name] = None