Skip to content
XrDriver.py 9.26 KiB
Newer Older
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#pylint: disable=invalid-name, missing-function-docstring, line-too-long, logging-fstring-interpolation, missing-class-docstring, missing-module-docstring
import logging
import threading
import json
from typing import Any, Iterator, List, Optional, Tuple, Union
import urllib3
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from common.method_wrappers.Decorator import MetricsPool, metered_subclass_method
from common.type_checkers.Checkers import chk_type
from device.service.driver_api._Driver import _Driver
from .cm.cm_connection import CmConnection, ConsistencyMode
from .cm import tf

# Don't complain about non-verified SSL certificate. This driver is demo only
# and CM is not provisioned in demos with a proper certificate.
urllib3.disable_warnings()

LOGGER = logging.getLogger(__name__)

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
METRICS_POOL = MetricsPool('Device', 'Driver', labels={'driver': 'xr'})

class XrDriver(_Driver):
    def __init__(self, address: str, port: int, **settings) -> None:    # pylint: disable=super-init-not-called
        self.__lock = threading.Lock()
        self.__started = threading.Event()
        self.__terminate = threading.Event()
        self.__timeout = int(settings.get('timeout', 120))
        self.__cm_address = address
        # Mandatory key, an exception will get thrown if missing
        self.__hub_module_name = settings["hub_module_name"]

        tls_verify = False # Currently using self signed certificates
        username = settings.get("username", "xr-user-1")
        password = settings.get("password", "xr-user-1")
        
        # Options are:
        #    asynchronous --> operation considered complete when IPM responds with suitable status code,
        #                     including "accepted", that only means request is semantically good and queued.
        #    synchronous  --> operation is considered complete once result is also reflected in GETs in REST API.
        #    lifecycle    --> operation is considered successfull once IPM has completed pluggaable configuration
        #                     or failed in it. This is typically unsuitable for production use
        #                     (as some optics may be transiently unreachable), but is convenient for demos and testin.
        consistency_mode = ConsistencyMode.from_str(settings.get("consistency-mode", "asynchronous"))
        self.__cm_connection = CmConnection(address, int(port), username, password, self.__timeout, tls_verify = tls_verify, consistency_mode=consistency_mode)
        self.__constellation = None
        LOGGER.info(f"XrDriver instantiated, cm {address}:{port}, consistency mode {str(consistency_mode)}, {settings=}")
    def __str__(self):
        return f"{self.__hub_module_name}@{self.__cm_address}"

    def Connect(self) -> bool:
        LOGGER.info(f"Connect[{self}]")
            if self.__started.is_set():
                return True
            if not self.__cm_connection.Connect():
                return False
            else:
                self.__started.set()
                return True

    def Disconnect(self) -> bool:
        LOGGER.info(f"Disconnect[{self}]")
        with self.__lock:
            self.__terminate.set()
            return True

    def GetInitialConfig(self) -> List[Tuple[str, Any]]:
        LOGGER.info(f"GetInitialConfig[{self}]")
    #pylint: disable=dangerous-default-value
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    @metered_subclass_method(METRICS_POOL)
    def GetConfig(self, resource_keys : List[str] = []) -> List[Tuple[str, Union[Any, None, Exception]]]:
        LOGGER.info(f"GetConfig[{self}]: {resource_keys=}")
        chk_type('resources', resource_keys, list)

        # Empty resource_keys means all resources. As we only have endpoints, we ignore parameter and always
        # return everything.
        with self.__lock:
            constellation = self.__cm_connection.get_constellation_by_hub_name(self.__hub_module_name)
            if constellation:
                self.__constellation = constellation
                return [(f"/endpoints/endpoint[{ifname}]", {'uuid': ifname, 'type': 'optical', 'sample_types': {}}) for ifname in constellation.ifnames()]
            else:
                return []
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    @metered_subclass_method(METRICS_POOL)
    def SetConfig(self, resources: List[Tuple[str, Any]]) -> List[Union[bool, Exception]]:
        LOGGER.info(f"SetConfig[{self}]: {resources=}")
        # Logged config seems like:
        #[('/service[52ff5f0f-fda4-40bd-a0b1-066f4ff04079:optical]', '{"capacity_unit": "GHz", "capacity_value": 1, "direction": "UNIDIRECTIONAL", "input_sip": "XR HUB 1|XR-T4", "layer_protocol_name": "PHOTONIC_MEDIA", "layer_protocol_qualifier": "tapi-photonic-media:PHOTONIC_LAYER_QUALIFIER_NMC", "output_sip": "XR LEAF 1|XR-T1", "uuid": "52ff5f0f-fda4-40bd-a0b1-066f4ff04079:optical"}')]
        # Post February 2023
        #[('/services/service[e1b9184c-767d-44b9-bf83-a1f643d82bef]', '{"capacity_unit": "GHz", "capacity_value": 50.0, "direction": "UNIDIRECTIONAL", "input_sip": "XR LEAF 1|XR-T1", "layer_protocol_name": "PHOTONIC_MEDIA", "layer_protocol_qualifier": "tapi-photonic-media:PHOTONIC_LAYER_QUALIFIER_NMC", "output_sip": "XR HUB 1|XR-T4", "uuid": "e1b9184c-767d-44b9-bf83-a1f643d82bef"}')]
        with self.__lock:
            if self.__constellation is None:
                self.__constellation = self.__cm_connection.get_constellation_by_hub_name(self.__hub_module_name)

            if self.__constellation is None:
                LOGGER.error("SetConfig: no valid constellation")
                return [False] * len(resources)

            results = []
            if len(resources) == 0:
                return results

            for key, config in resources:
                service_uuid = self.__cm_connection.service_uuid(key)
                if service_uuid:
                    config = json.loads(config)
                    results.append(tf.set_config_for_service(self.__cm_connection, self.__constellation, service_uuid, config))
                else:
                    results.append(False)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    @metered_subclass_method(METRICS_POOL)
    def DeleteConfig(self, resources: List[Tuple[str, Any]]) -> List[Union[bool, Exception]]:
        LOGGER.info(f"DeleteConfig[{self}]: {resources=}")

        # Input looks like:
        #  resources=[('/service[c8a35e81-88d8-4468-9afc-a8abd92a64d0:optical]', '{"uuid": "c8a35e81-88d8-4468-9afc-a8abd92a64d0:optical"}')]
        with self.__lock:
            results = []
            if len(resources) == 0:
                return results

            # Temporary dummy version
            for key, _config in resources:
                service_uuid = self.__cm_connection.service_uuid(key)
                if service_uuid:
                    connection = self.__cm_connection.get_connection_by_teraflow_uuid(service_uuid)
                    if connection is None:
                        LOGGER.info(f"DeleteConfig: Connection {service_uuid} does not exist, delete is no-op")
                        results.append(True)
                        was_deleted = self.__cm_connection.delete_connection(connection.href)
                        if was_deleted:
                            LOGGER.info(f"DeleteConfig: Connection {service_uuid} deleted (was {str(connection)})")
                        else:
                            LOGGER.info(f"DeleteConfig: Connection {service_uuid} delete failure (was {str(connection)})")

                            active_tc = self.__cm_connection.get_transport_capacity_by_teraflow_uuid(service_uuid)
                            if active_tc is not None:
                                if self.__cm_connection.delete_transport_capacity(active_tc.href):
                                    LOGGER.info(f"DeleteConfig: Transport Capacity {active_tc} deleted")
                                else:
                                    LOGGER.error(f"DeleteConfig: Transport Capacity {active_tc} delete failure")

                        results.append(was_deleted)
                else:
                    results.append(False)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    @metered_subclass_method(METRICS_POOL)
    def SubscribeState(self, subscriptions : List[Tuple[str, float, float]]) -> List[Union[bool, Exception]]:
        # Not supported
        return [False for _ in subscriptions]

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    @metered_subclass_method(METRICS_POOL)
    def UnsubscribeState(self, subscriptions : List[Tuple[str, float, float]]) -> List[Union[bool, Exception]]:
        # Not supported
        return [False for _ in subscriptions]

    def GetState(
        self, blocking=False, terminate : Optional[threading.Event] = None
    ) -> Iterator[Tuple[float, str, Any]]:
        # Not supported
        return []