# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/) # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import logging, requests, threading from typing import Any, Iterator, List, Optional, Tuple, Union from common.type_checkers.Checkers import chk_string, chk_type from device.service.driver_api._Driver import _Driver from . import ALL_RESOURCE_KEYS #from .Tools import create_connectivity_service, find_key, config_getter, delete_connectivity_service import json LOGGER = logging.getLogger(__name__) class XrDriver(_Driver): def __init__(self, address: str, port: int, **settings) -> None: # pylint: disable=super-init-not-called self.__lock = threading.Lock() self.__started = threading.Event() self.__terminate = threading.Event() self.__cm_root = 'https://' + address + ':' + str(port) self.__timeout = int(settings.get('timeout', 120)) self.__verify = False; # Currently using self signed certificates self.__audience = settings["audience"] if "audience" in settings else "test" self.__client_id = settings["client_id"] if "client_id" in settings else "test" # FIXME: remove LOGGER.info(f"FIXME!!! XrDriver, cm {address}:{port}, {settings=}"); def Connect(self) -> bool: url = self.__cm_root + '/oauth/token' with self.__lock: if self.__started.is_set(): return True try: # TODO: could also do get: https://${HOSTNAME}:443/oauth/token?client_id=test&audience=test" req = {"grant_type":"client_credentials","client_id": self.__client_id, "audience": self.__audience} response = requests.post(url,data=req,timeout=self.__timeout,verify=self.__verify) resp = json.loads(response.text) if 'access_token' in resp: self.__access_token=resp['access_token'] LOGGER.info(f"FIXME!!! CM connected, {self.__access_token=}") ## TODO: remove # Use in subsequend requests as named argument headers=self.__cm_http_headers self.__cm_http_headers = {'Authorization': 'Bearer '+ self.__access_token} else: LOGGER.exception('No access token provided by {:s}'.format(str(self.__cm_root))) return False except requests.exceptions.Timeout: LOGGER.exception('Timeout connecting {:s}'.format(str(self.__cm_root))) return False except json.JSONDecodeError as json_err: LOGGER.exception(f"Exception parsing JSON access token from {str(self.__cm_root)}, {str(json_err)}") return False except Exception: # pylint: disable=broad-except LOGGER.exception('Exception connecting {:s}'.format(str(self.__cm_root))) return False else: self.__started.set() return True def Disconnect(self) -> bool: with self.__lock: self.__terminate.set() return True def GetInitialConfig(self) -> List[Tuple[str, Any]]: with self.__lock: return [] def GetConfig(self, resource_keys : List[str] = []) -> List[Tuple[str, Union[Any, None, Exception]]]: chk_type('resources', resource_keys, list) results = [] # TODO return results def SetConfig(self, resources: List[Tuple[str, Any]]) -> List[Union[bool, Exception]]: results = [] if len(resources) == 0: return results # TODO return results def DeleteConfig(self, resources: List[Tuple[str, Any]]) -> List[Union[bool, Exception]]: results = [] if len(resources) == 0: return results #TODO return results def SubscribeState(self, subscriptions : List[Tuple[str, float, float]]) -> List[Union[bool, Exception]]: # Not supported return [False for _ in subscriptions] def UnsubscribeState(self, subscriptions : List[Tuple[str, float, float]]) -> List[Union[bool, Exception]]: # Not supported return [False for _ in subscriptions] def GetState( self, blocking=False, terminate : Optional[threading.Event] = None ) -> Iterator[Tuple[float, str, Any]]: # Not supported return []