Newer
Older
# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#pylint: disable=invalid-name, missing-function-docstring, line-too-long, logging-fstring-interpolation, missing-class-docstring, missing-module-docstring
import logging, requests, threading
from typing import Any, Iterator, List, Optional, Tuple, Union
from common.type_checkers.Checkers import chk_string, chk_type
from device.service.driver_api._Driver import _Driver
from . import ALL_RESOURCE_KEYS
#from .Tools import create_connectivity_service, find_key, config_getter, delete_connectivity_service
import json
from .CmConnection import CmConnection
# Don't complain about non-verified SSL certificate. This driver is demo only
# and CM is not provisioned in demos with a proper certificate.
import urllib3
urllib3.disable_warnings()
LOGGER = logging.getLogger(__name__)
class XrDriver(_Driver):
def __init__(self, address: str, port: int, **settings) -> None: # pylint: disable=super-init-not-called
self.__lock = threading.Lock()
self.__started = threading.Event()
self.__terminate = threading.Event()
self.__timeout = int(settings.get('timeout', 120))
# Mandatory key, an exception will get thrown if missing
self.__hub_module_name = settings["hub_module_name"]
tls_verify = False # Currently using self signed certificates
username = settings["username"] if "username" in settings else "xr-user-1"
password = settings["password"] if "password" in settings else "xr-user-1"
self.__cm_connection = CmConnection(address, int(port), username, password, self.__timeout, tls_verify = tls_verify)
LOGGER.info(f"XrDriver instantiated, cm {address}:{port}, {settings=}")
def Connect(self) -> bool:
with self.__lock:
if self.__started.is_set(): return True
if not self.__cm_connection.Connect():
return False
else:
self.__started.set()
return True
def Disconnect(self) -> bool:
with self.__lock:
self.__terminate.set()
return True
def GetInitialConfig(self) -> List[Tuple[str, Any]]:
with self.__lock:
return []
def GetConfig(self, resource_keys : List[str] = []) -> List[Tuple[str, Union[Any, None, Exception]]]:
chk_type('resources', resource_keys, list)
constellation = self.__cm_connection.get_constellation_by_hub_name(self.__hub_module_name)
if constellation:
_cid, if_list = constellation
return [(f"/endpoints/endpoint[{ifname}]", {'uuid': ifname, 'type': 'optical', 'sample_types': {}}) for ifname in if_list]
else:
return []
def SetConfig(self, resources: List[Tuple[str, Any]]) -> List[Union[bool, Exception]]:
LOGGER.info(f"SetConfig {resources=}");
# Logged config seems like:
#[('/service[52ff5f0f-fda4-40bd-a0b1-066f4ff04079:optical]', '{"capacity_unit": "GHz", "capacity_value": 1, "direction": "UNIDIRECTIONAL", "input_sip": "XR HUB 1|XR-T4", "layer_protocol_name": "PHOTONIC_MEDIA", "layer_protocol_qualifier": "tapi-photonic-media:PHOTONIC_LAYER_QUALIFIER_NMC", "output_sip": "XR LEAF 1|XR-T1", "uuid": "52ff5f0f-fda4-40bd-a0b1-066f4ff04079:optical"}')]
results = []
if len(resources) == 0:
return results
for key, config in resources:
service_uuid = self.__cm_connection.service_uuid(key)
if service_uuid:
config = json.loads(config)
href = self.__cm_connection.create_or_update_connection_ifnames(service_uuid, config["input_sip"], config["output_sip"])
if href:
LOGGER.info(f"SetConfig: Created service {service_uuid} as {href}")
results.append(True)
else:
LOGGER.error(f"SetConfig: Service creation failure for {service_uuid}")
results.append(False)
else:
results.append(False)
return results
def DeleteConfig(self, resources: List[Tuple[str, Any]]) -> List[Union[bool, Exception]]:
LOGGER.info(f"DeleteConfig {resources=}");
# Input looks like:
# resources=[('/service[c8a35e81-88d8-4468-9afc-a8abd92a64d0:optical]', '{"uuid": "c8a35e81-88d8-4468-9afc-a8abd92a64d0:optical"}')]
results = []
if len(resources) == 0: return results
# Temporary dummy version
for key, _config in resources:
service_uuid = self.__cm_connection.service_uuid(key)
if service_uuid:
connection = self.__cm_connection.get_connection_by_teraflow_uuid(service_uuid)
if connection is None:
LOGGER.info(f"DeleteConfig: Connection {service_uuid} does not exist, delete is no-op")
results.append(True)
else:
was_deleted = self.__cm_connection.delete_connection(connection.href)
if was_deleted:
LOGGER.info(f"DeleteConfig: Connection {service_uuid} deleted (was {str(connection)})")
else:
LOGGER.info(f"DeleteConfig: Connection {service_uuid} delete failure (was {str(connection)})")
results.append(was_deleted)
else:
results.append(False)
return results
def SubscribeState(self, subscriptions : List[Tuple[str, float, float]]) -> List[Union[bool, Exception]]:
# Not supported
return [False for _ in subscriptions]
def UnsubscribeState(self, subscriptions : List[Tuple[str, float, float]]) -> List[Union[bool, Exception]]:
# Not supported
return [False for _ in subscriptions]
def GetState(
self, blocking=False, terminate : Optional[threading.Event] = None
) -> Iterator[Tuple[float, str, Any]]:
# Not supported
return []