Skip to content
XrDriver.py 6.93 KiB
Newer Older
# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#pylint: disable=invalid-name, missing-function-docstring, line-too-long, logging-fstring-interpolation, missing-class-docstring, missing-module-docstring
import logging
import threading
import json
from typing import Any, Iterator, List, Optional, Tuple, Union
import urllib3
from common.type_checkers.Checkers import chk_type
from device.service.driver_api._Driver import _Driver
from .cm.cm_connection import CmConnection
from .cm import tf

# Don't complain about non-verified SSL certificate. This driver is demo only
# and CM is not provisioned in demos with a proper certificate.
urllib3.disable_warnings()

LOGGER = logging.getLogger(__name__)

class XrDriver(_Driver):
    def __init__(self, address: str, port: int, **settings) -> None:    # pylint: disable=super-init-not-called
        self.__lock = threading.Lock()
        self.__started = threading.Event()
        self.__terminate = threading.Event()
        self.__timeout = int(settings.get('timeout', 120))
        # Mandatory key, an exception will get thrown if missing
        self.__hub_module_name = settings["hub_module_name"]

        tls_verify = False # Currently using self signed certificates
        username = settings["username"] if "username" in settings else "xr-user-1"
        password = settings["password"] if "password" in settings else "xr-user-1"
        self.__cm_connection = CmConnection(address, int(port), username, password, self.__timeout, tls_verify = tls_verify)
        self.__constellation = None
        LOGGER.info(f"XrDriver instantiated, cm {address}:{port}, {settings=}")

    def Connect(self) -> bool:
        with self.__lock:
            if self.__started.is_set():
                return True
            if not self.__cm_connection.Connect():
                return False
            else:
                self.__started.set()
                return True

    def Disconnect(self) -> bool:
        with self.__lock:
            self.__terminate.set()
            return True

    def GetInitialConfig(self) -> List[Tuple[str, Any]]:
        with self.__lock:
            return []

    #pylint: disable=dangerous-default-value
    def GetConfig(self, resource_keys : List[str] = []) -> List[Tuple[str, Union[Any, None, Exception]]]:
        chk_type('resources', resource_keys, list)

        constellation = self.__cm_connection.get_constellation_by_hub_name(self.__hub_module_name)
        if constellation:
            self.__constellation = constellation
            return [(f"/endpoints/endpoint[{ifname}]", {'uuid': ifname, 'type': 'optical', 'sample_types': {}}) for ifname in constellation.ifnames()]
    def SetConfig(self, resources: List[Tuple[str, Any]]) -> List[Union[bool, Exception]]:
        LOGGER.info(f"SetConfig {resources=}")

        # Logged config seems like:
           #[('/service[52ff5f0f-fda4-40bd-a0b1-066f4ff04079:optical]', '{"capacity_unit": "GHz", "capacity_value": 1, "direction": "UNIDIRECTIONAL", "input_sip": "XR HUB 1|XR-T4", "layer_protocol_name": "PHOTONIC_MEDIA", "layer_protocol_qualifier": "tapi-photonic-media:PHOTONIC_LAYER_QUALIFIER_NMC", "output_sip": "XR LEAF 1|XR-T1", "uuid": "52ff5f0f-fda4-40bd-a0b1-066f4ff04079:optical"}')]

        if self.__constellation is None:
            self.__constellation = self.__cm_connection.get_constellation_by_hub_name(self.__hub_module_name)

        if self.__constellation is None:
            LOGGER.error("SetConfig: no valid constellation")
            return [False] * len(resources)

        results = []
        if len(resources) == 0:
            return results

        for key, config in resources:
            service_uuid = self.__cm_connection.service_uuid(key)
            if service_uuid:
                config = json.loads(config)
                results.append(tf.set_config_for_service(self.__cm_connection, self.__constellation, service_uuid, config))
            else:
                results.append(False)

        return results

    def DeleteConfig(self, resources: List[Tuple[str, Any]]) -> List[Union[bool, Exception]]:
        LOGGER.info(f"DeleteConfig {resources=}")

        # Input looks like:
        #  resources=[('/service[c8a35e81-88d8-4468-9afc-a8abd92a64d0:optical]', '{"uuid": "c8a35e81-88d8-4468-9afc-a8abd92a64d0:optical"}')]
        if len(resources) == 0:
            return results
        # Temporary dummy version
        for key, _config in resources:
            service_uuid = self.__cm_connection.service_uuid(key)
            if service_uuid:
                connection = self.__cm_connection.get_connection_by_teraflow_uuid(service_uuid)
                if connection is None:
                    LOGGER.info(f"DeleteConfig: Connection {service_uuid} does not exist, delete is no-op")
                    results.append(True)
                else:
                    was_deleted = self.__cm_connection.delete_connection(connection.href)
                    if was_deleted:
                        LOGGER.info(f"DeleteConfig: Connection {service_uuid} deleted (was {str(connection)})")
                    else:
                        LOGGER.info(f"DeleteConfig: Connection {service_uuid} delete failure (was {str(connection)})")

                    if self.__constellation.is_vti_mode():
                        active_tc = self.__cm_connection.get_transport_capacity_by_teraflow_uuid(service_uuid)
                        if active_tc is not None:
                            if self.__cm_connection.delete_transport_capacity(active_tc.href):
                                LOGGER.info(f"DeleteConfig: Transport Capacity {active_tc} deleted")
                            else:
                                LOGGER.error(f"DeleteConfig: Transport Capacity {active_tc} delete failure")

                    results.append(was_deleted)
        return results

    def SubscribeState(self, subscriptions : List[Tuple[str, float, float]]) -> List[Union[bool, Exception]]:
        # Not supported
        return [False for _ in subscriptions]

    def UnsubscribeState(self, subscriptions : List[Tuple[str, float, float]]) -> List[Union[bool, Exception]]:
        # Not supported
        return [False for _ in subscriptions]

    def GetState(
        self, blocking=False, terminate : Optional[threading.Event] = None
    ) -> Iterator[Tuple[float, str, Any]]:
        # Not supported
        return []