from typing import Dict
from requests import Session, Response
from requests.adapters import HTTPAdapter
from urllib3 import Retry
from mindfoundry.optaas.client.version_check import compare_versions, CLIENT_VERSION
DEFAULT_MAX_RETRIES = 3
RETRY_STATUS_CODES = [500, 501, 502, 503, 504]
DEFAULT_TIMEOUT = 60
[docs]class OPTaaSError(RuntimeError):
"""Wrapper class for an error :class:`.Response` received from OPTaaS."""
def __init__(self, response: Response) -> None:
self.status_code = response.status_code
try:
self.message = response.json()['message']
except BaseException:
self.message = response.content.decode() if response.content else ''
super().__init__(f"Status: {self.status_code} Message: {self.message}")
[docs]class OPTaaSResponse:
"""Wrapper class for a successful :class:`.Response` received from OPTaaS."""
def __init__(self, response: Response, disable_version_check: bool = False) -> None:
if not disable_version_check:
compare_versions(response)
if response.ok:
self.body = response.json()
else:
raise OPTaaSError(response)
[docs]class OPTaaSSession:
"""Wrapper class for a :class:`.Session` that makes requests to OPTaaS.
Args:
server_url (str): URL of your OPTaaS server
api_key (str): Your personal API key
disable_version_check (bool, default False):
Set to True if you don't want to be notified when a new version of the client is available
max_retries (int, default 3): How many times to retry a request if a connection error occurs.
keep_alive (bool, default True): Whether to set the `Connection` HTTP Header to "keep-alive".
"""
def __init__(self, server_url: str, api_key: str, disable_version_check: bool, max_retries: int, keep_alive: bool) -> None:
self._session = self._make_session(server_url, max_retries)
self._root_url = server_url
self._headers = {
'X-ApiKey': api_key,
'User-Agent': 'PythonClient/' + CLIENT_VERSION,
'Connection': 'keep-alive' if keep_alive else 'close'
}
self._disable_version_check = disable_version_check
@staticmethod
def _make_session(server_url: str, max_retries: int) -> Session:
session = Session()
retry = Retry(total=max_retries, connect=max_retries, backoff_factor=0.5, allowed_methods=None,
status_forcelist=RETRY_STATUS_CODES)
session.mount(server_url, HTTPAdapter(max_retries=retry))
return session
[docs] def post(self, endpoint: str, body: dict, timeout: float = DEFAULT_TIMEOUT) -> OPTaaSResponse:
"""Make a POST request to OPTaaS with a JSON body.
Args:
endpoint (str): Endpoint for the request (will be appended to the server_url).
body (dict): Request body in JSON format.
timeout (float, optional): Request timeout in seconds
Returns:
The :class:`.OPTaaSResponse` to the request.
Raises:
:class:`.OPTaaSError` if an error response is received.
"""
response = self._session.post(self._root_url + endpoint, json=body, headers=self._headers, timeout=timeout)
return OPTaaSResponse(response, disable_version_check=self._disable_version_check)
[docs] def put(self, endpoint: str, body: dict, timeout: float = DEFAULT_TIMEOUT) -> OPTaaSResponse:
"""Make a PUT request to OPTaaS with a JSON body.
Args:
endpoint (str): Endpoint for the request (will be appended to the server_url).
body (dict): Request body in JSON format.
timeout (float, optional): Request timeout in seconds
Returns:
The :class:`.OPTaaSResponse` to the request.
Raises:
:class:`.OPTaaSError` if an error response is received.
"""
response = self._session.put(self._root_url + endpoint, json=body, headers=self._headers, timeout=timeout)
return OPTaaSResponse(response, disable_version_check=self._disable_version_check)
[docs] def get(self, endpoint: str, query_params: Dict = None, timeout: float = DEFAULT_TIMEOUT) -> OPTaaSResponse:
"""Make a GET request to OPTaaS
Args:
endpoint (str): Endpoint for the request (will be appended to the server_url).
query_params (Dict, optional): Query parameters in Dict format (will be appended to the url).
timeout (float, optional): Request timeout in seconds
Returns:
The :class:`.OPTaaSResponse` to the request.
Raises:
:class:`.OPTaaSError` if an error response is received.
"""
response = self._session.get(self._root_url + endpoint, headers=self._headers, params=query_params, timeout=timeout)
return OPTaaSResponse(response, disable_version_check=self._disable_version_check)
[docs] def delete(self, endpoint: str, timeout: float = DEFAULT_TIMEOUT) -> OPTaaSResponse:
"""Make a DELETE request to OPTaaS
Args:
endpoint (str): Endpoint for the request (will be appended to the server_url).
timeout (float, optional): Request timeout in seconds
Returns:
The :class:`.OPTaaSResponse` to the request.
Raises:
:class:`.OPTaaSError` if an error response is received.
"""
response = self._session.delete(self._root_url + endpoint, headers=self._headers, timeout=timeout)
return OPTaaSResponse(response, disable_version_check=self._disable_version_check)
def __eq__(self, other):
return self.__class__ == other.__class__ and self._root_url == other._root_url # pylint: disable=protected-access