Skip to content
XrDriver.py 6.53 KiB
Newer Older
# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import logging, requests, threading
from typing import Any, Iterator, List, Optional, Tuple, Union
from common.type_checkers.Checkers import chk_string, chk_type
from device.service.driver_api._Driver import _Driver
from . import ALL_RESOURCE_KEYS
#from .Tools import create_connectivity_service, find_key, config_getter, delete_connectivity_service
import json

LOGGER = logging.getLogger(__name__)

class XrDriver(_Driver):
    def __init__(self, address: str, port: int, **settings) -> None:    # pylint: disable=super-init-not-called
        self.__lock = threading.Lock()
        self.__started = threading.Event()
        self.__terminate = threading.Event()
        self.__cm_root = 'https://' + address + ':' + str(port)
        self.__timeout = int(settings.get('timeout', 120))
        self.__verify = False; # Currently using self signed certificates
        self.__audience = settings["audience"] if "audience" in settings else "test"
        self.__client_id = settings["client_id"] if "client_id" in settings else "test"

        # FIXME: remove
        LOGGER.info(f"FIXME!!! XrDriver, cm {address}:{port}, {settings=}");

    def Connect(self) -> bool:
        url = self.__cm_root + '/oauth/token'
        with self.__lock:
            if self.__started.is_set(): return True
            try:
                # TODO: could also do get: https://${HOSTNAME}:443/oauth/token?client_id=test&audience=test"
                req = {"grant_type":"client_credentials","client_id": self.__client_id, "audience": self.__audience}
                response = requests.post(url,data=req,timeout=self.__timeout,verify=self.__verify)
                resp = json.loads(response.text)
                if 'access_token' in resp:
                    self.__access_token=resp['access_token']
                    LOGGER.info(f"FIXME!!! CM connected, {self.__access_token=}")  ## TODO: remove

                    # Use in subsequend requests as named argument headers=self.__cm_http_headers
                    self.__cm_http_headers = {'Authorization': 'Bearer '+ self.__access_token}
                else:
                    LOGGER.exception('No access token provided by {:s}'.format(str(self.__cm_root)))
                    return False
            except requests.exceptions.Timeout:
                LOGGER.exception('Timeout connecting {:s}'.format(str(self.__cm_root)))
                return False
            except json.JSONDecodeError as json_err:
                LOGGER.exception(f"Exception parsing JSON access token from {str(self.__cm_root)}, {str(json_err)}")
                return False
            except Exception:  # pylint: disable=broad-except
                LOGGER.exception('Exception connecting {:s}'.format(str(self.__cm_root)))
                return False
            else:
                self.__started.set()
                return True

    def Disconnect(self) -> bool:
        with self.__lock:
            self.__terminate.set()
            return True

    def GetInitialConfig(self) -> List[Tuple[str, Any]]:
        with self.__lock:
            return []

    def fake_interface_names(self) -> List[str]:
        interfaces = []
        # Using 4 as max leaf and lane to keep prints small during development
        for lane in range(0,4):
            interfaces.append(f"HUB-LANE-{lane:02}")
        for leaf in range(1,5):
            for lane in range(0,4):
                interfaces.append(f"LEAF-{leaf:02}-LANE-{lane:02}")
        return interfaces

    def GetConfig(self, resource_keys : List[str] = []) -> List[Tuple[str, Union[Any, None, Exception]]]:
        chk_type('resources', resource_keys, list)
        results = []

        # TODO: Completely fake interface information until we get same info from CM
        for ifname in self.fake_interface_names():
            results.append((f"/endpoints/endpoint[{ifname}]", {'uuid': ifname, 'type': 'optical', 'sample_types': {}}))
    def SetConfig(self, resources: List[Tuple[str, Any]]) -> List[Union[bool, Exception]]:
        LOGGER.info(f"FIXME!!! XrDriver, SetConfig {resources=}");

        # Logged config seems like:
           #[('/service[44ca3570-4e1a-49b5-8aab-06c92f239fab:optical]', '{"capacity_unit": "GHz", "capacity_value": 1, "direction": "UNIDIRECTIONAL", "input_sip": "HUB-LANE-01", "layer_protocol_name": "PHOTONIC_MEDIA", "layer_protocol_qualifier": "tapi-photonic-media:PHOTONIC_LAYER_QUALIFIER_NMC", "output_sip": "LEAF-02-LANE-01", "uuid": "44ca3570-4e1a-49b5-8aab-06c92f239fab:optical"}')]
        results = []
        if len(resources) == 0:
            return results

        # Temporary dummy version
        for key, config in resources:
            self.__services[key] = config
            
            # TODO: config to CM
            # Ignore "direction=UNIDIRECITONAL", it seems that controller only creates one direction...
            results.append(True)

        return results

    def DeleteConfig(self, resources: List[Tuple[str, Any]]) -> List[Union[bool, Exception]]:
        LOGGER.info(f"FIXME!!! XrDriver, DeleteConfig {resources=}");

        results = []
        if len(resources) == 0: return results

        # Temporary dummy version
        for key, config in resources:
            if key in self.__services[key]:
                del self.__services[key]
                # TODO: Delete config from CM
                results.append(True)
            else:
                results.append(False)


        return results

    def SubscribeState(self, subscriptions : List[Tuple[str, float, float]]) -> List[Union[bool, Exception]]:
        # Not supported
        return [False for _ in subscriptions]

    def UnsubscribeState(self, subscriptions : List[Tuple[str, float, float]]) -> List[Union[bool, Exception]]:
        # Not supported
        return [False for _ in subscriptions]

    def GetState(
        self, blocking=False, terminate : Optional[threading.Event] = None
    ) -> Iterator[Tuple[float, str, Any]]:
        # Not supported
        return []