Newer
Older
# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#pylint: disable=invalid-name, missing-function-docstring, line-too-long, logging-fstring-interpolation, missing-class-docstring, missing-module-docstring
import logging
import threading
import json
from typing import Any, Iterator, List, Optional, Tuple, Union
from common.method_wrappers.Decorator import MetricsPool, metered_subclass_method
from common.type_checkers.Checkers import chk_type
from device.service.driver_api._Driver import _Driver
from .cm.cm_connection import CmConnection, ConsistencyMode
# Don't complain about non-verified SSL certificate. This driver is demo only
# and CM is not provisioned in demos with a proper certificate.
urllib3.disable_warnings()
LOGGER = logging.getLogger(__name__)
METRICS_POOL = MetricsPool('Device', 'Driver', labels={'driver': 'xr'})
class XrDriver(_Driver):
def __init__(self, address: str, port: int, **settings) -> None: # pylint: disable=super-init-not-called
self.__lock = threading.Lock()
self.__started = threading.Event()
self.__terminate = threading.Event()
self.__timeout = int(settings.get('timeout', 120))
self.__cm_address = address
# Mandatory key, an exception will get thrown if missing
self.__hub_module_name = settings["hub_module_name"]
tls_verify = False # Currently using self signed certificates
username = settings.get("username", "xr-user-1")
password = settings.get("password", "xr-user-1")
# Options are:
# asynchronous --> operation considered complete when IPM responds with suitable status code,
# including "accepted", that only means request is semantically good and queued.
# synchronous --> operation is considered complete once result is also reflected in GETs in REST API.
# lifecycle --> operation is considered successfull once IPM has completed pluggaable configuration
# or failed in it. This is typically unsuitable for production use
# (as some optics may be transiently unreachable), but is convenient for demos and testin.
consistency_mode = ConsistencyMode.from_str(settings.get("consistency-mode", "asynchronous"))
self.__cm_connection = CmConnection(address, int(port), username, password, self.__timeout, tls_verify = tls_verify, consistency_mode=consistency_mode)
LOGGER.info(f"XrDriver instantiated, cm {address}:{port}, consistency mode {str(consistency_mode)}, {settings=}")
def __str__(self):
return f"{self.__hub_module_name}@{self.__cm_address}"
def Connect(self) -> bool:
LOGGER.info(f"Connect[{self}]")
with self.__lock:
if self.__started.is_set():
return True
if not self.__cm_connection.Connect():
return False
else:
self.__started.set()
return True
def Disconnect(self) -> bool:
LOGGER.info(f"Disconnect[{self}]")
with self.__lock:
self.__terminate.set()
return True
def GetInitialConfig(self) -> List[Tuple[str, Any]]:
LOGGER.info(f"GetInitialConfig[{self}]")
with self.__lock:
return []
#pylint: disable=dangerous-default-value
def GetConfig(self, resource_keys : List[str] = []) -> List[Tuple[str, Union[Any, None, Exception]]]:
LOGGER.info(f"GetConfig[{self}]: {resource_keys=}")
chk_type('resources', resource_keys, list)
# Empty resource_keys means all resources. As we only have endpoints, we ignore parameter and always
# return everything.
with self.__lock:
constellation = self.__cm_connection.get_constellation_by_hub_name(self.__hub_module_name)
if constellation:
self.__constellation = constellation
return [(f"/endpoints/endpoint[{ifname}]", {'uuid': ifname, 'type': 'optical', 'sample_types': {}}) for ifname in constellation.ifnames()]
else:
return []
def SetConfig(self, resources: List[Tuple[str, Any]]) -> List[Union[bool, Exception]]:
LOGGER.info(f"SetConfig[{self}]: {resources=}")
# Logged config seems like:
#[('/service[52ff5f0f-fda4-40bd-a0b1-066f4ff04079:optical]', '{"capacity_unit": "GHz", "capacity_value": 1, "direction": "UNIDIRECTIONAL", "input_sip": "XR HUB 1|XR-T4", "layer_protocol_name": "PHOTONIC_MEDIA", "layer_protocol_qualifier": "tapi-photonic-media:PHOTONIC_LAYER_QUALIFIER_NMC", "output_sip": "XR LEAF 1|XR-T1", "uuid": "52ff5f0f-fda4-40bd-a0b1-066f4ff04079:optical"}')]
with self.__lock:
if self.__constellation is None:
self.__constellation = self.__cm_connection.get_constellation_by_hub_name(self.__hub_module_name)
if self.__constellation is None:
LOGGER.error("SetConfig: no valid constellation")
return [False] * len(resources)
results = []
if len(resources) == 0:
return results
for key, config in resources:
service_uuid = self.__cm_connection.service_uuid(key)
if service_uuid:
config = json.loads(config)
results.append(tf.set_config_for_service(self.__cm_connection, self.__constellation, service_uuid, config))
else:
results.append(False)
return results
def DeleteConfig(self, resources: List[Tuple[str, Any]]) -> List[Union[bool, Exception]]:
LOGGER.info(f"DeleteConfig[{self}]: {resources=}")
# Input looks like:
# resources=[('/service[c8a35e81-88d8-4468-9afc-a8abd92a64d0:optical]', '{"uuid": "c8a35e81-88d8-4468-9afc-a8abd92a64d0:optical"}')]
with self.__lock:
results = []
if len(resources) == 0:
return results
# Temporary dummy version
for key, _config in resources:
service_uuid = self.__cm_connection.service_uuid(key)
if service_uuid:
connection = self.__cm_connection.get_connection_by_teraflow_uuid(service_uuid)
if connection is None:
LOGGER.info(f"DeleteConfig: Connection {service_uuid} does not exist, delete is no-op")
results.append(True)
was_deleted = self.__cm_connection.delete_connection(connection.href)
if was_deleted:
LOGGER.info(f"DeleteConfig: Connection {service_uuid} deleted (was {str(connection)})")
else:
LOGGER.info(f"DeleteConfig: Connection {service_uuid} delete failure (was {str(connection)})")
if self.__constellation.is_vti_mode():
active_tc = self.__cm_connection.get_transport_capacity_by_teraflow_uuid(service_uuid)
if active_tc is not None:
if self.__cm_connection.delete_transport_capacity(active_tc.href):
LOGGER.info(f"DeleteConfig: Transport Capacity {active_tc} deleted")
else:
LOGGER.error(f"DeleteConfig: Transport Capacity {active_tc} delete failure")
results.append(was_deleted)
else:
results.append(False)
def SubscribeState(self, subscriptions : List[Tuple[str, float, float]]) -> List[Union[bool, Exception]]:
# Not supported
return [False for _ in subscriptions]
def UnsubscribeState(self, subscriptions : List[Tuple[str, float, float]]) -> List[Union[bool, Exception]]:
# Not supported
return [False for _ in subscriptions]
def GetState(
self, blocking=False, terminate : Optional[threading.Event] = None
) -> Iterator[Tuple[float, str, Any]]:
# Not supported
return []