Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • tfs/controller
1 result
Show changes
Showing
with 974 additions and 714 deletions
......@@ -12,16 +12,18 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import collections
import copy, json
from typing import Dict, List, Optional, Tuple, Union
from common.DeviceTypes import DeviceTypeEnum
from common.proto.context_pb2 import DeviceDriverEnum
from common.proto.context_pb2 import DeviceDriverEnum, LinkTypeEnum
def get_descriptors_add_contexts(contexts : List[Dict]) -> List[Dict]:
contexts_add = copy.deepcopy(contexts)
for context in contexts_add:
context['topology_ids'] = []
context['service_ids'] = []
context['slice_ids'] = []
return contexts_add
def get_descriptors_add_topologies(topologies : List[Dict]) -> List[Dict]:
......@@ -29,6 +31,7 @@ def get_descriptors_add_topologies(topologies : List[Dict]) -> List[Dict]:
for topology in topologies_add:
topology['device_ids'] = []
topology['link_ids'] = []
topology['optical_link_ids'] = []
return topologies_add
def get_descriptors_add_services(services : List[Dict]) -> List[Dict]:
......@@ -129,3 +132,30 @@ def split_controllers_and_network_devices(devices : List[Dict]) -> Tuple[List[Di
else:
network_devices.append(device)
return controllers, network_devices
def link_type_to_str(link_type : Union[int, str]) -> Optional[str]:
if isinstance(link_type, int): return LinkTypeEnum.Name(link_type)
if isinstance(link_type, str): return LinkTypeEnum.Name(LinkTypeEnum.Value(link_type))
return None
def split_links_by_type(links : List[Dict]) -> Dict[str, List[Dict]]:
typed_links = collections.defaultdict(list)
for link in links:
link_type = link.get('link_type', LinkTypeEnum.LINKTYPE_UNKNOWN)
str_link_type = link_type_to_str(link_type)
if str_link_type is None:
MSG = 'Unsupported LinkType in Link({:s})'
raise Exception(MSG.format(str(link)))
link_type = LinkTypeEnum.Value(str_link_type)
if link_type in {LinkTypeEnum.LINKTYPE_UNKNOWN, LinkTypeEnum.LINKTYPE_COPPER, LinkTypeEnum.LINKTYPE_RADIO}:
typed_links['normal'].append(link)
elif link_type in {LinkTypeEnum.LINKTYPE_FIBER}:
typed_links['optical'].append(link)
elif link_type in {LinkTypeEnum.LINKTYPE_VIRTUAL}:
typed_links['virtual'].append(link)
else:
MSG = 'Unsupported LinkType({:s}) in Link({:s})'
raise Exception(MSG.format(str_link_type, str(link)))
return typed_links
......@@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import logging, time
from enum import Enum
from confluent_kafka.admin import AdminClient, NewTopic
from common.Settings import get_setting
......@@ -21,6 +21,12 @@ from common.Settings import get_setting
LOGGER = logging.getLogger(__name__)
KFK_SERVER_ADDRESS_TEMPLATE = 'kafka-service.{:s}.svc.cluster.local:{:s}'
KAFKA_TOPIC_NUM_PARTITIONS = 1
KAFKA_TOPIC_REPLICATION_FACTOR = 1
KAFKA_TOPIC_LIST_TIMEOUT = 5
TOPIC_CREATE_WAIT_ITERATIONS = 10
TOPIC_CREATE_WAIT_TIME = 1
class KafkaConfig(Enum):
@staticmethod
......@@ -35,59 +41,87 @@ class KafkaConfig(Enum):
@staticmethod
def get_admin_client():
SERVER_ADDRESS = KafkaConfig.get_kafka_address()
ADMIN_CLIENT = AdminClient({'bootstrap.servers': SERVER_ADDRESS })
ADMIN_CLIENT = AdminClient({'bootstrap.servers': SERVER_ADDRESS})
return ADMIN_CLIENT
class KafkaTopic(Enum):
# TODO: Later to be populated from ENV variable.
TELEMETRY_REQUEST = 'topic_telemetry_request'
TELEMETRY_RESPONSE = 'topic_telemetry_response'
RAW = 'topic_raw'
LABELED = 'topic_labeled'
VALUE = 'topic_value'
ALARMS = 'topic_alarms'
ANALYTICS_REQUEST = 'topic_analytics_request'
ANALYTICS_RESPONSE = 'topic_analytics_response'
TELEMETRY_REQUEST = 'topic_telemetry_request'
TELEMETRY_RESPONSE = 'topic_telemetry_response'
RAW = 'topic_raw'
LABELED = 'topic_labeled'
VALUE = 'topic_value'
ALARMS = 'topic_alarms'
ANALYTICS_REQUEST = 'topic_analytics_request'
ANALYTICS_RESPONSE = 'topic_analytics_response'
VNTMANAGER_REQUEST = 'topic_vntmanager_request'
VNTMANAGER_RESPONSE = 'topic_vntmanager_response'
NBI_SOCKETIO_WORKERS = 'tfs_nbi_socketio'
@staticmethod
def create_all_topics() -> bool:
"""
'''
Method to create Kafka topics defined as class members
"""
all_topics = [member.value for member in KafkaTopic]
LOGGER.debug("Kafka server address is: {:} ".format(KafkaConfig.get_kafka_address()))
if( KafkaTopic.create_new_topic_if_not_exists( all_topics )):
LOGGER.debug("All topics are created sucsessfully or Already Exists")
'''
LOGGER.debug('Kafka server address: {:s}'.format(str(KafkaConfig.get_kafka_address())))
kafka_admin_client = KafkaConfig.get_admin_client()
topic_metadata = kafka_admin_client.list_topics(timeout=KAFKA_TOPIC_LIST_TIMEOUT)
existing_topics = set(topic_metadata.topics.keys())
LOGGER.debug('Existing Kafka topics: {:s}'.format(str(existing_topics)))
missing_topics = [
NewTopic(topic.value, KAFKA_TOPIC_NUM_PARTITIONS, KAFKA_TOPIC_REPLICATION_FACTOR)
for topic in KafkaTopic
if topic.value not in existing_topics
]
LOGGER.debug('Missing Kafka topics: {:s}'.format(str(missing_topics)))
if len(missing_topics) == 0:
LOGGER.debug('All topics already existed.')
return True
else:
LOGGER.debug("Error creating all topics")
return False
@staticmethod
def create_new_topic_if_not_exists(new_topics: list) -> bool:
"""
Method to create Kafka topic if it does not exist.
Args:
list of topic: containing the topic name(s) to be created on Kafka
"""
LOGGER.debug("Topics names to be verified and created: {:}".format(new_topics))
for topic in new_topics:
create_topic_future_map = kafka_admin_client.create_topics(missing_topics)
LOGGER.debug('create_topic_future_map: {:s}'.format(str(create_topic_future_map)))
failed_topic_creations = set()
for topic, future in create_topic_future_map.items():
try:
topic_metadata = KafkaConfig.get_admin_client().list_topics(timeout=5)
# LOGGER.debug("Existing topic list: {:}".format(topic_metadata.topics))
if topic not in topic_metadata.topics:
# If the topic does not exist, create a new topic
print("Topic {:} does not exist. Creating...".format(topic))
LOGGER.debug("Topic {:} does not exist. Creating...".format(topic))
new_topic = NewTopic(topic, num_partitions=1, replication_factor=1)
KafkaConfig.get_admin_client().create_topics([new_topic])
else:
print("Topic name already exists: {:}".format(topic))
LOGGER.debug("Topic name already exists: {:}".format(topic))
except Exception as e:
LOGGER.debug("Failed to create topic: {:}".format(e))
return False
return True
LOGGER.info('Waiting for Topic({:s})...'.format(str(topic)))
future.result() # Blocks until topic is created or raises an exception
LOGGER.info('Topic({:s}) successfully created.'.format(str(topic)))
except: # pylint: disable=bare-except
LOGGER.exception('Failed to create Topic({:s})'.format(str(topic)))
failed_topic_creations.add(topic)
if len(failed_topic_creations) > 0: return False
LOGGER.debug('All topics created.')
# Wait until topics appear in metadata
desired_topics = {topic.value for topic in KafkaTopic}
missing_topics = set()
for _ in range(TOPIC_CREATE_WAIT_ITERATIONS):
topic_metadata = kafka_admin_client.list_topics(timeout=KAFKA_TOPIC_LIST_TIMEOUT)
existing_topics = set(topic_metadata.topics.keys())
missing_topics = desired_topics.difference(existing_topics)
if len(missing_topics) == 0: break
MSG = 'Waiting for Topics({:s}) to appear in metadata...'
LOGGER.debug(MSG.format(str(missing_topics)))
time.sleep(TOPIC_CREATE_WAIT_TIME)
if len(missing_topics) > 0:
MSG = 'Something went wrong... Topics({:s}) does not appear in metadata'
LOGGER.error(MSG.format(str(missing_topics)))
return False
else:
LOGGER.debug('All topics created and available.')
return True
# TODO: create all topics after the deployments (Telemetry and Analytics)
if __name__ == '__main__':
import os
if 'KFK_SERVER_ADDRESS' not in os.environ:
os.environ['KFK_SERVER_ADDRESS'] = 'kafka-service.kafka.svc.cluster.local:9092'
KafkaTopic.create_all_topics()
......@@ -15,6 +15,8 @@
import copy
from typing import Dict, List, Optional, Tuple
from common.proto.context_pb2 import LinkTypeEnum
def get_link_uuid(a_endpoint_id : Dict, z_endpoint_id : Dict) -> str:
return '{:s}/{:s}=={:s}/{:s}'.format(
a_endpoint_id['device_id']['device_uuid']['uuid'], a_endpoint_id['endpoint_uuid']['uuid'],
......@@ -25,9 +27,13 @@ def json_link_id(link_uuid : str) -> Dict:
def json_link(
link_uuid : str, endpoint_ids : List[Dict], name : Optional[str] = None,
link_type : LinkTypeEnum = LinkTypeEnum.LINKTYPE_UNKNOWN,
total_capacity_gbps : Optional[float] = None, used_capacity_gbps : Optional[float] = None
) -> Dict:
result = {'link_id': json_link_id(link_uuid), 'link_endpoint_ids': copy.deepcopy(endpoint_ids)}
result = {
'link_id': json_link_id(link_uuid), 'link_type': link_type,
'link_endpoint_ids': copy.deepcopy(endpoint_ids),
}
if name is not None: result['name'] = name
if total_capacity_gbps is not None:
attributes : Dict = result.setdefault('attributes', dict())
......
......@@ -30,10 +30,10 @@ def json_service_id(service_uuid : str, context_id : Optional[Dict] = None):
def json_service(
service_uuid : str, service_type : ServiceTypeEnum, context_id : Optional[Dict] = None,
status : ServiceStatusEnum = ServiceStatusEnum.SERVICESTATUS_PLANNED,
endpoint_ids : List[Dict] = [], constraints : List[Dict] = [], config_rules : List[Dict] = []):
return {
name : Optional[str] = None, status : ServiceStatusEnum = ServiceStatusEnum.SERVICESTATUS_PLANNED,
endpoint_ids : List[Dict] = [], constraints : List[Dict] = [], config_rules : List[Dict] = []
) -> Dict:
result = {
'service_id' : json_service_id(service_uuid, context_id=context_id),
'service_type' : service_type,
'service_status' : {'service_status': status},
......@@ -41,6 +41,8 @@ def json_service(
'service_constraints' : copy.deepcopy(constraints),
'service_config' : {'config_rules': copy.deepcopy(config_rules)},
}
if name is not None: result['name'] = name
return result
def json_service_qkd_planned(
service_uuid : str, endpoint_ids : List[Dict] = [], constraints : List[Dict] = [],
......
......@@ -78,4 +78,4 @@ class OpticalLinkEndPointModel(_Base):
__table_args__ = (
CheckConstraint(position >= 0, name='check_position_value'),
)
\ No newline at end of file
)
......@@ -82,7 +82,7 @@ DRIVERS.append(
]))
from .ietf_l3vpn.driver import IetfL3VpnDriver # pylint: disable=wrong-import-position
from .ietf_l3vpn.IetfL3VpnDriver import IetfL3VpnDriver # pylint: disable=wrong-import-position
DRIVERS.append(
(IetfL3VpnDriver, [
{
......@@ -188,7 +188,10 @@ if LOAD_ALL_DEVICE_DRIVERS:
DRIVERS.append(
(OpticalTfsDriver, [
{
FilterFieldEnum.DEVICE_TYPE: DeviceTypeEnum.OPEN_LINE_SYSTEM,
FilterFieldEnum.DEVICE_TYPE: [
DeviceTypeEnum.OPEN_LINE_SYSTEM,
DeviceTypeEnum.TERAFLOWSDN_CONTROLLER,
],
FilterFieldEnum.DRIVER: DeviceDriverEnum.DEVICEDRIVER_OPTICAL_TFS,
}
]))
......
......@@ -70,9 +70,10 @@ class IetfL2VpnDriver(_Driver):
def Connect(self) -> bool:
with self.__lock:
if self.__started.is_set(): return True
try:
self.wim.check_credentials()
except Exception: # pylint: disable=broad-except
except: # pylint: disable=bare-except
LOGGER.exception('Exception checking credentials')
return False
else:
......
......@@ -13,20 +13,13 @@
# limitations under the License.
import logging, requests
from requests.auth import HTTPBasicAuth
from typing import Dict, List, Optional
from common.tools.client.RestClient import RestClient
from device.service.driver_api.ImportTopologyEnum import ImportTopologyEnum
GET_DEVICES_URL = '{:s}://{:s}:{:d}/tfs-api/devices'
GET_LINKS_URL = '{:s}://{:s}:{:d}/tfs-api/links'
TIMEOUT = 30
HTTP_OK_CODES = {
200, # OK
201, # Created
202, # Accepted
204, # No Content
}
GET_CONTEXT_IDS_URL = '/tfs-api/context_ids'
GET_DEVICES_URL = '/tfs-api/devices'
GET_LINKS_URL = '/tfs-api/links'
MAPPING_STATUS = {
'DEVICEOPERATIONALSTATUS_UNDEFINED': 0,
......@@ -47,36 +40,44 @@ MAPPING_DRIVER = {
'DEVICEDRIVER_OPTICAL_TFS' : 9,
'DEVICEDRIVER_IETF_ACTN' : 10,
'DEVICEDRIVER_OC' : 11,
'DEVICEDRIVER_QKD' : 12,
'DEVICEDRIVER_IETF_L3VPN' : 13,
'DEVICEDRIVER_IETF_SLICE' : 14,
'DEVICEDRIVER_NCE' : 15,
}
MSG_ERROR = 'Could not retrieve devices in remote TeraFlowSDN instance({:s}). status_code={:s} reply={:s}'
LOGGER = logging.getLogger(__name__)
class TfsApiClient:
class TfsApiClient(RestClient):
def __init__(
self, address : str, port : int, scheme : str = 'http',
username : Optional[str] = None, password : Optional[str] = None
username : Optional[str] = None, password : Optional[str] = None,
timeout : Optional[int] = 30
) -> None:
self._devices_url = GET_DEVICES_URL.format(scheme, address, port)
self._links_url = GET_LINKS_URL.format(scheme, address, port)
self._auth = HTTPBasicAuth(username, password) if username is not None and password is not None else None
def get_devices_endpoints(self, import_topology : ImportTopologyEnum = ImportTopologyEnum.DEVICES) -> List[Dict]:
super().__init__(
address, port, scheme=scheme, username=username, password=password,
timeout=timeout, verify_certs=False, allow_redirects=True, logger=LOGGER
)
def check_credentials(self) -> None:
self.get(GET_CONTEXT_IDS_URL, expected_status_codes={requests.codes['OK']})
LOGGER.info('Credentials checked')
def get_devices_endpoints(
self, import_topology : ImportTopologyEnum = ImportTopologyEnum.DEVICES
) -> List[Dict]:
LOGGER.debug('[get_devices_endpoints] begin')
LOGGER.debug('[get_devices_endpoints] import_topology={:s}'.format(str(import_topology)))
reply = requests.get(self._devices_url, timeout=TIMEOUT, verify=False, auth=self._auth)
if reply.status_code not in HTTP_OK_CODES:
msg = MSG_ERROR.format(str(self._devices_url), str(reply.status_code), str(reply))
LOGGER.error(msg)
raise Exception(msg)
MSG = '[get_devices_endpoints] import_topology={:s}'
LOGGER.debug(MSG.format(str(import_topology)))
if import_topology == ImportTopologyEnum.DISABLED:
raise Exception('Unsupported import_topology mode: {:s}'.format(str(import_topology)))
MSG = 'Unsupported import_topology mode: {:s}'
raise Exception(MSG.format(str(import_topology)))
devices = self.get(GET_DEVICES_URL, expected_status_codes={requests.codes['OK']})
result = list()
for json_device in reply.json()['devices']:
for json_device in devices['devices']:
device_uuid : str = json_device['device_id']['device_uuid']['uuid']
device_type : str = json_device['device_type']
#if not device_type.startswith('emu-'): device_type = 'emu-' + device_type
......@@ -87,7 +88,10 @@ class TfsApiClient:
'name': json_device['name'],
'type': device_type,
'status': MAPPING_STATUS[device_status],
'drivers': [MAPPING_DRIVER[driver] for driver in json_device['device_drivers']],
'drivers': [
MAPPING_DRIVER[driver]
for driver in json_device['device_drivers']
],
}
result.append((device_url, device_data))
......@@ -106,17 +110,16 @@ class TfsApiClient:
LOGGER.debug('[get_devices_endpoints] devices only; returning')
return result
reply = requests.get(self._links_url, timeout=TIMEOUT, verify=False, auth=self._auth)
if reply.status_code not in HTTP_OK_CODES:
msg = MSG_ERROR.format(str(self._links_url), str(reply.status_code), str(reply))
LOGGER.error(msg)
raise Exception(msg)
links = self.get(GET_LINKS_URL, expected_status_codes={requests.codes['OK']})
for json_link in reply.json()['links']:
for json_link in links['links']:
link_uuid : str = json_link['link_id']['link_uuid']['uuid']
link_url = '/links/link[{:s}]'.format(link_uuid)
link_endpoint_ids = [
(json_endpoint_id['device_id']['device_uuid']['uuid'], json_endpoint_id['endpoint_uuid']['uuid'])
(
json_endpoint_id['device_id']['device_uuid']['uuid'],
json_endpoint_id['endpoint_uuid']['uuid'],
)
for json_endpoint_id in json_link['link_endpoint_ids']
]
link_data = {
......
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
......@@ -12,41 +12,20 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import logging
import re
import threading
from typing import Any, Iterator, List, Optional, Tuple, Union
import anytree
import requests
from requests.auth import HTTPBasicAuth
import anytree, json, logging, re, requests, threading
from typing import Any, Iterator, List, Optional, Tuple, Union
from common.method_wrappers.Decorator import MetricsPool, metered_subclass_method
from common.type_checkers.Checkers import chk_length, chk_string, chk_type
from device.service.driver_api._Driver import (
RESOURCE_ENDPOINTS,
RESOURCE_SERVICES,
_Driver,
)
from device.service.driver_api.AnyTreeTools import (
TreeNode,
dump_subtree,
get_subnode,
set_subnode_value,
)
from device.service.driver_api.ImportTopologyEnum import (
ImportTopologyEnum,
get_import_topology,
)
from device.service.driver_api._Driver import _Driver, RESOURCE_ENDPOINTS, RESOURCE_SERVICES
from device.service.driver_api.AnyTreeTools import TreeNode, dump_subtree, get_subnode, set_subnode_value
from device.service.driver_api.ImportTopologyEnum import ImportTopologyEnum, get_import_topology
from .Constants import SPECIAL_RESOURCE_MAPPINGS
from .TfsApiClient import TfsApiClient
from .Tools import compose_resource_endpoint
LOGGER = logging.getLogger(__name__)
ALL_RESOURCE_KEYS = [
RESOURCE_ENDPOINTS,
RESOURCE_SERVICES,
......@@ -57,40 +36,34 @@ RE_GET_ENDPOINT_FROM_INTERFACE = re.compile(r"^\/interface\[([^\]]+)\].*")
RE_IETF_L3VPN_DATA = re.compile(r"^\/service\[[^\]]+\]\/IETFL3VPN$")
RE_IETF_L3VPN_OPERATION = re.compile(r"^\/service\[[^\]]+\]\/IETFL3VPN\/operation$")
DRIVER_NAME = "ietf_l3vpn"
METRICS_POOL = MetricsPool("Device", "Driver", labels={"driver": DRIVER_NAME})
DRIVER_NAME = 'ietf_l3vpn'
METRICS_POOL = MetricsPool('Device', 'Driver', labels={'driver': DRIVER_NAME})
class IetfL3VpnDriver(_Driver):
def __init__(self, address: str, port: str, **settings) -> None:
def __init__(self, address : str, port : str, **settings) -> None:
super().__init__(DRIVER_NAME, address, int(port), **settings)
self.__lock = threading.Lock()
self.__started = threading.Event()
self.__terminate = threading.Event()
self.__running = TreeNode(".")
scheme = self.settings.get("scheme", "http")
username = self.settings.get("username")
password = self.settings.get("password")
self.__running = TreeNode('.')
username = self.settings.get('username')
password = self.settings.get('password')
scheme = self.settings.get('scheme', 'http')
timeout = int(self.settings.get('timeout', 60))
self.tac = TfsApiClient(
self.address,
self.port,
scheme=scheme,
username=username,
password=password,
)
self.__auth = None
# (
# HTTPBasicAuth(username, password)
# if username is not None and password is not None
# else None
# )
self.__tfs_nbi_root = "{:s}://{:s}:{:d}".format(
scheme, self.address, int(self.port)
)
self.__timeout = int(self.settings.get("timeout", 120))
self.__import_topology = get_import_topology(
self.settings, default=ImportTopologyEnum.DEVICES
self.address, self.port, scheme=scheme, username=username,
password=password, timeout=timeout
)
#self.__tfs_nbi_root = "{:s}://{:s}:{:d}".format(scheme, self.address, int(self.port))
# Options are:
# disabled --> just import endpoints as usual
# devices --> imports sub-devices but not links connecting them.
# (a remotely-controlled transport domain might exist between them)
# topology --> imports sub-devices and links connecting them.
# (not supported by XR driver)
self.__import_topology = get_import_topology(self.settings, default=ImportTopologyEnum.DEVICES)
endpoints = self.settings.get("endpoints", [])
endpoint_resources = []
for endpoint in endpoints:
......@@ -139,20 +112,12 @@ class IetfL3VpnDriver(_Driver):
return results
def Connect(self) -> bool:
url = (
self.__tfs_nbi_root + "/restconf/data/ietf-l3vpn-svc:l3vpn-svc/vpn-services"
)
with self.__lock:
if self.__started.is_set():
return True
if self.__started.is_set(): return True
try:
# requests.get(url, timeout=self.__timeout, auth=self.__auth)
...
except requests.exceptions.Timeout:
LOGGER.exception("Timeout connecting {:s}".format(url))
return False
except Exception: # pylint: disable=broad-except
LOGGER.exception("Exception connecting {:s}".format(url))
self.tac.check_credentials()
except: # pylint: disable=bare-except
LOGGER.exception('Exception checking credentials')
return False
else:
self.__started.set()
......@@ -170,50 +135,46 @@ class IetfL3VpnDriver(_Driver):
@metered_subclass_method(METRICS_POOL)
def GetConfig(
self, resource_keys: List[str] = []
self, resource_keys : List[str] = []
) -> List[Tuple[str, Union[Any, None, Exception]]]:
chk_type("resources", resource_keys, list)
chk_type('resources', resource_keys, list)
results = []
with self.__lock:
if len(resource_keys) == 0:
return dump_subtree(self.__running)
results = []
resolver = anytree.Resolver(pathattr="name")
self.tac.check_credentials()
if len(resource_keys) == 0: resource_keys = ALL_RESOURCE_KEYS
#if len(resource_keys) == 0:
# return dump_subtree(self.__running)
resolver = anytree.Resolver(pathattr='name')
for i, resource_key in enumerate(resource_keys):
str_resource_name = "resource_key[#{:d}]".format(i)
str_resource_name = 'resource_key[#{:d}]'.format(i)
try:
chk_string(str_resource_name, resource_key, allow_empty=False)
resource_key = SPECIAL_RESOURCE_MAPPINGS.get(
resource_key, resource_key
)
resource_path = resource_key.split("/")
except Exception as e: # pylint: disable=broad-except
LOGGER.exception(
"Exception validating {:s}: {:s}".format(
str_resource_name, str(resource_key)
if resource_key == RESOURCE_ENDPOINTS:
# return endpoints through TFS NBI API and list-devices method
results.extend(self.tac.get_devices_endpoints(self.__import_topology))
else:
resource_key = SPECIAL_RESOURCE_MAPPINGS.get(
resource_key, resource_key
)
)
results.append(
(resource_key, e)
) # if validation fails, store the exception
continue
resource_node = get_subnode(
resolver, self.__running, resource_path, default=None
)
# if not found, resource_node is None
if resource_node is None:
continue
results.extend(dump_subtree(resource_node))
return results
resource_path = resource_key.split('/')
resource_node = get_subnode(
resolver, self.__running, resource_path, default=None
)
# if not found, resource_node is None
if resource_node is None: continue
results.extend(dump_subtree(resource_node))
except Exception as e:
MSG = 'Unhandled error processing {:s}: resource_key({:s})'
LOGGER.exception(MSG.format(str_resource_name, str(resource_key)))
results.append((resource_key, e))
return results
@metered_subclass_method(METRICS_POOL)
def SetConfig(
self, resources: List[Tuple[str, Any]]
self, resources : List[Tuple[str, Any]]
) -> List[Union[bool, Exception]]:
results = []
if len(resources) == 0:
return results
if len(resources) == 0: return results
with self.__lock:
for resource in resources:
resource_key, resource_value = resource
......@@ -224,7 +185,7 @@ class IetfL3VpnDriver(_Driver):
else:
raise Exception("operation type not found in resources")
for resource in resources:
LOGGER.info("resource = {:s}".format(str(resource)))
LOGGER.info('resource = {:s}'.format(str(resource)))
resource_key, resource_value = resource
if not RE_IETF_L3VPN_DATA.match(resource_key):
continue
......@@ -261,7 +222,7 @@ class IetfL3VpnDriver(_Driver):
@metered_subclass_method(METRICS_POOL)
def DeleteConfig(
self, resources: List[Tuple[str, Any]]
self, resources : List[Tuple[str, Any]]
) -> List[Union[bool, Exception]]:
results = []
if len(resources) == 0:
......@@ -290,20 +251,20 @@ class IetfL3VpnDriver(_Driver):
@metered_subclass_method(METRICS_POOL)
def SubscribeState(
self, subscriptions: List[Tuple[str, float, float]]
self, subscriptions : List[Tuple[str, float, float]]
) -> List[Union[bool, Exception]]:
# TODO: IETF L3VPN does not support monitoring by now
# TODO: does not support monitoring by now
return [False for _ in subscriptions]
@metered_subclass_method(METRICS_POOL)
def UnsubscribeState(
self, subscriptions: List[Tuple[str, float, float]]
self, subscriptions : List[Tuple[str, float, float]]
) -> List[Union[bool, Exception]]:
# TODO: IETF L3VPN does not support monitoring by now
# TODO: does not support monitoring by now
return [False for _ in subscriptions]
def GetState(
self, blocking=False, terminate: Optional[threading.Event] = None
self, blocking=False, terminate : Optional[threading.Event] = None
) -> Iterator[Tuple[float, str, Any]]:
# TODO: IETF L3VPN does not support monitoring by now
# TODO: does not support monitoring by now
return []
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
......@@ -12,176 +12,152 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import logging, requests
from typing import Dict, List, Optional
import requests
from requests.auth import HTTPBasicAuth
from common.tools.client.RestClient import RestClient
from device.service.driver_api.ImportTopologyEnum import ImportTopologyEnum
GET_DEVICES_URL = "{:s}://{:s}:{:d}/tfs-api/devices"
GET_LINKS_URL = "{:s}://{:s}:{:d}/tfs-api/links"
L3VPN_URL = "{:s}://{:s}:{:d}/restconf/data/ietf-l3vpn-svc:l3vpn-svc/vpn-services"
TIMEOUT = 30
HTTP_OK_CODES = {
200, # OK
201, # Created
202, # Accepted
204, # No Content
}
GET_CONTEXT_IDS_URL = '/tfs-api/context_ids'
GET_DEVICES_URL = '/tfs-api/devices'
GET_LINKS_URL = '/tfs-api/links'
L3VPN_URL = '/restconf/data/ietf-l3vpn-svc:l3vpn-svc/vpn-services'
MAPPING_STATUS = {
"DEVICEOPERATIONALSTATUS_UNDEFINED": 0,
"DEVICEOPERATIONALSTATUS_DISABLED": 1,
"DEVICEOPERATIONALSTATUS_ENABLED": 2,
'DEVICEOPERATIONALSTATUS_UNDEFINED': 0,
'DEVICEOPERATIONALSTATUS_DISABLED' : 1,
'DEVICEOPERATIONALSTATUS_ENABLED' : 2,
}
MAPPING_DRIVER = {
"DEVICEDRIVER_UNDEFINED": 0,
"DEVICEDRIVER_OPENCONFIG": 1,
"DEVICEDRIVER_TRANSPORT_API": 2,
"DEVICEDRIVER_P4": 3,
"DEVICEDRIVER_IETF_NETWORK_TOPOLOGY": 4,
"DEVICEDRIVER_ONF_TR_532": 5,
"DEVICEDRIVER_XR": 6,
"DEVICEDRIVER_IETF_L2VPN": 7,
"DEVICEDRIVER_GNMI_OPENCONFIG": 8,
"DEVICEDRIVER_OPTICAL_TFS": 9,
"DEVICEDRIVER_IETF_ACTN": 10,
"DEVICEDRIVER_OC": 11,
'DEVICEDRIVER_UNDEFINED' : 0,
'DEVICEDRIVER_OPENCONFIG' : 1,
'DEVICEDRIVER_TRANSPORT_API' : 2,
'DEVICEDRIVER_P4' : 3,
'DEVICEDRIVER_IETF_NETWORK_TOPOLOGY': 4,
'DEVICEDRIVER_ONF_TR_532' : 5,
'DEVICEDRIVER_XR' : 6,
'DEVICEDRIVER_IETF_L2VPN' : 7,
'DEVICEDRIVER_GNMI_OPENCONFIG' : 8,
'DEVICEDRIVER_OPTICAL_TFS' : 9,
'DEVICEDRIVER_IETF_ACTN' : 10,
'DEVICEDRIVER_OC' : 11,
'DEVICEDRIVER_QKD' : 12,
'DEVICEDRIVER_IETF_L3VPN' : 13,
'DEVICEDRIVER_IETF_SLICE' : 14,
'DEVICEDRIVER_NCE' : 15,
}
MSG_ERROR = "Could not retrieve devices in remote TeraFlowSDN instance({:s}). status_code={:s} reply={:s}"
LOGGER = logging.getLogger(__name__)
class TfsApiClient:
class TfsApiClient(RestClient):
def __init__(
self,
address: str,
port: int,
scheme: str = "http",
username: Optional[str] = None,
password: Optional[str] = None,
self, address : str, port : int, scheme : str = 'http',
username : Optional[str] = None, password : Optional[str] = None,
timeout : Optional[int] = 30
) -> None:
self._devices_url = GET_DEVICES_URL.format(scheme, address, port)
self._links_url = GET_LINKS_URL.format(scheme, address, port)
self._l3vpn_url = L3VPN_URL.format(scheme, address, port)
self._auth = None
# (
# HTTPBasicAuth(username, password)
# if username is not None and password is not None
# else None
# )
super().__init__(
address, port, scheme=scheme, username=username, password=password,
timeout=timeout, verify_certs=False, allow_redirects=True, logger=LOGGER
)
def check_credentials(self) -> None:
self.get(GET_CONTEXT_IDS_URL, expected_status_codes={requests.codes['OK']})
LOGGER.info('Credentials checked')
def get_devices_endpoints(
self, import_topology: ImportTopologyEnum = ImportTopologyEnum.DEVICES
self, import_topology : ImportTopologyEnum = ImportTopologyEnum.DEVICES
) -> List[Dict]:
LOGGER.debug("[get_devices_endpoints] begin")
LOGGER.debug(
"[get_devices_endpoints] import_topology={:s}".format(str(import_topology))
)
reply = requests.get(self._devices_url, timeout=TIMEOUT, auth=self._auth)
if reply.status_code not in HTTP_OK_CODES:
msg = MSG_ERROR.format(
str(self._devices_url), str(reply.status_code), str(reply)
)
LOGGER.error(msg)
raise Exception(msg)
LOGGER.debug('[get_devices_endpoints] begin')
MSG = '[get_devices_endpoints] import_topology={:s}'
LOGGER.debug(MSG.format(str(import_topology)))
if import_topology == ImportTopologyEnum.DISABLED:
raise Exception(
"Unsupported import_topology mode: {:s}".format(str(import_topology))
)
MSG = 'Unsupported import_topology mode: {:s}'
raise Exception(MSG.format(str(import_topology)))
devices = self.get(GET_DEVICES_URL, expected_status_codes={requests.codes['OK']})
result = list()
for json_device in reply.json()["devices"]:
device_uuid: str = json_device["device_id"]["device_uuid"]["uuid"]
device_type: str = json_device["device_type"]
device_status = json_device["device_operational_status"]
device_url = "/devices/device[{:s}]".format(device_uuid)
for json_device in devices['devices']:
device_uuid : str = json_device['device_id']['device_uuid']['uuid']
device_type : str = json_device['device_type']
#if not device_type.startswith('emu-'): device_type = 'emu-' + device_type
device_status = json_device['device_operational_status']
device_url = '/devices/device[{:s}]'.format(device_uuid)
device_data = {
"uuid": json_device["device_id"]["device_uuid"]["uuid"],
"name": json_device["name"],
"type": device_type,
"status": MAPPING_STATUS[device_status],
"drivers": [
MAPPING_DRIVER[driver] for driver in json_device["device_drivers"]
'uuid': json_device['device_id']['device_uuid']['uuid'],
'name': json_device['name'],
'type': device_type,
'status': MAPPING_STATUS[device_status],
'drivers': [
MAPPING_DRIVER[driver]
for driver in json_device['device_drivers']
],
}
result.append((device_url, device_data))
for json_endpoint in json_device["device_endpoints"]:
endpoint_uuid = json_endpoint["endpoint_id"]["endpoint_uuid"]["uuid"]
endpoint_url = "/endpoints/endpoint[{:s}]".format(endpoint_uuid)
for json_endpoint in json_device['device_endpoints']:
endpoint_uuid = json_endpoint['endpoint_id']['endpoint_uuid']['uuid']
endpoint_url = '/endpoints/endpoint[{:s}]'.format(endpoint_uuid)
endpoint_data = {
"device_uuid": device_uuid,
"uuid": endpoint_uuid,
"name": json_endpoint["name"],
"type": json_endpoint["endpoint_type"],
'device_uuid': device_uuid,
'uuid': endpoint_uuid,
'name': json_endpoint['name'],
'type': json_endpoint['endpoint_type'],
}
result.append((endpoint_url, endpoint_data))
if import_topology == ImportTopologyEnum.DEVICES:
LOGGER.debug("[get_devices_endpoints] devices only; returning")
LOGGER.debug('[get_devices_endpoints] devices only; returning')
return result
reply = requests.get(self._links_url, timeout=TIMEOUT, auth=self._auth)
if reply.status_code not in HTTP_OK_CODES:
msg = MSG_ERROR.format(
str(self._links_url), str(reply.status_code), str(reply)
)
LOGGER.error(msg)
raise Exception(msg)
for json_link in reply.json()["links"]:
link_uuid: str = json_link["link_id"]["link_uuid"]["uuid"]
link_url = "/links/link[{:s}]".format(link_uuid)
links = self.get(GET_LINKS_URL, expected_status_codes={requests.codes['OK']})
for json_link in links['links']:
link_uuid : str = json_link['link_id']['link_uuid']['uuid']
link_url = '/links/link[{:s}]'.format(link_uuid)
link_endpoint_ids = [
(
json_endpoint_id["device_id"]["device_uuid"]["uuid"],
json_endpoint_id["endpoint_uuid"]["uuid"],
json_endpoint_id['device_id']['device_uuid']['uuid'],
json_endpoint_id['endpoint_uuid']['uuid'],
)
for json_endpoint_id in json_link["link_endpoint_ids"]
for json_endpoint_id in json_link['link_endpoint_ids']
]
link_data = {
"uuid": json_link["link_id"]["link_uuid"]["uuid"],
"name": json_link["name"],
"endpoints": link_endpoint_ids,
'uuid': json_link['link_id']['link_uuid']['uuid'],
'name': json_link['name'],
'endpoints': link_endpoint_ids,
}
result.append((link_url, link_data))
LOGGER.debug("[get_devices_endpoints] topology; returning")
LOGGER.debug('[get_devices_endpoints] topology; returning')
return result
def create_connectivity_service(self, l3vpn_data: dict) -> None:
def create_connectivity_service(self, l3vpn_data : dict) -> None:
MSG = '[create_connectivity_service] l3vpn_data={:s}'
LOGGER.debug(MSG.format(str(l3vpn_data)))
try:
requests.post(self._l3vpn_url, json=l3vpn_data)
LOGGER.debug(
"[create_connectivity_service] l3vpn_data={:s}".format(str(l3vpn_data))
)
except requests.exceptions.ConnectionError:
raise Exception("faild to send post request to TFS L3VPN NBI")
def update_connectivity_service(self, l3vpn_data: dict) -> None:
vpn_id = l3vpn_data['ietf-l3vpn-svc:l3vpn-svc']["vpn-services"]["vpn-service"][0]["vpn-id"]
url = self._l3vpn_url + f"/vpn-service={vpn_id}"
self.post(L3VPN_URL, body=l3vpn_data)
except requests.exceptions.ConnectionError as e:
MSG = 'Failed to send POST request to TFS L3VPN NBI'
raise Exception(MSG) from e
def update_connectivity_service(self, l3vpn_data : dict) -> None:
MSG = '[update_connectivity_service] l3vpn_data={:s}'
LOGGER.debug(MSG.format(str(l3vpn_data)))
vpn_id = l3vpn_data['ietf-l3vpn-svc:l3vpn-svc']['vpn-services']['vpn-service'][0]['vpn-id']
try:
requests.put(url, json=l3vpn_data)
LOGGER.debug(
"[update_connectivity_service] l3vpn_data={:s}".format(str(l3vpn_data))
)
except requests.exceptions.ConnectionError:
raise Exception("faild to send post request to TFS L3VPN NBI")
def delete_connectivity_service(self, service_uuid: str) -> None:
url = self._l3vpn_url + f"/vpn-service={service_uuid}"
self.put(L3VPN_URL + f'/vpn-service={vpn_id}', body=l3vpn_data)
except requests.exceptions.ConnectionError as e:
MSG = 'Failed to send PUT request to TFS L3VPN NBI'
raise Exception(MSG) from e
def delete_connectivity_service(self, service_uuid : str) -> None:
url = L3VPN_URL + f'/vpn-service={service_uuid}'
MSG = '[delete_connectivity_service] url={:s}'
LOGGER.debug(MSG.format(str(url)))
try:
requests.delete(url, auth=self._auth)
LOGGER.debug("[delete_connectivity_service] url={:s}".format(str(url)))
except requests.exceptions.ConnectionError:
raise Exception("faild to send delete request to TFS L3VPN NBI")
self.delete(url)
except requests.exceptions.ConnectionError as e:
MSG = 'Failed to send DELETE request to TFS L3VPN NBI'
raise Exception(MSG) from e
......@@ -12,37 +12,44 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import json, logging, requests, threading
from requests.auth import HTTPBasicAuth
import json, logging, threading
from typing import Any, Iterator, List, Optional, Tuple, Union
from common.method_wrappers.Decorator import MetricsPool, metered_subclass_method
from common.type_checkers.Checkers import chk_string, chk_type
from device.service.driver_api._Driver import _Driver
from . import ALL_RESOURCE_KEYS
from .Tools import find_key, add_lightpath, del_lightpath, get_lightpaths
from device.service.driver_api._Driver import _Driver, RESOURCE_ENDPOINTS
from device.service.drivers.ietf_l2vpn.TfsApiClient import TfsApiClient
from device.service.driver_api._Driver import _Driver, RESOURCE_ENDPOINTS, RESOURCE_SERVICES
from device.service.driver_api.ImportTopologyEnum import ImportTopologyEnum, get_import_topology
from .TfsApiClient import TfsApiClient
#from .TfsOpticalClient import TfsOpticalClient
LOGGER = logging.getLogger(__name__)
ALL_RESOURCE_KEYS = [
RESOURCE_ENDPOINTS,
RESOURCE_SERVICES,
]
DRIVER_NAME = 'optical_tfs'
METRICS_POOL = MetricsPool('Device', 'Driver', labels={'driver': DRIVER_NAME})
class OpticalTfsDriver(_Driver):
def __init__(self, address: str, port: int, **settings) -> None:
super().__init__(DRIVER_NAME, address, port, **settings)
def __init__(self, address : str, port : str, **settings) -> None:
super().__init__(DRIVER_NAME, address, int(port), **settings)
self.__lock = threading.Lock()
self.__started = threading.Event()
self.__terminate = threading.Event()
username = self.settings.get('username')
username = self.settings.get('username')
password = self.settings.get('password')
self.__auth = HTTPBasicAuth(username, password) if username is not None and password is not None else None
scheme = self.settings.get('scheme', 'http')
self.tac = TfsApiClient(self.address, int(self.port), scheme=scheme, username=username, password=password)
self.__base_url = '{:s}://{:s}:{:d}'.format(scheme, self.address, int(self.port))
self.__timeout = int(self.settings.get('timeout', 120))
scheme = self.settings.get('scheme', 'http')
timeout = int(self.settings.get('timeout', 60))
self.tac = TfsApiClient(
self.address, self.port, scheme=scheme, username=username,
password=password, timeout=timeout
)
#self.toc = TfsOpticalClient(
# self.address, int(self.port), scheme=scheme, username=username,
# password=password, timeout=timeout
#)
# Options are:
# disabled --> just import endpoints as usual
......@@ -51,19 +58,14 @@ class OpticalTfsDriver(_Driver):
# topology --> imports sub-devices and links connecting them.
# (not supported by XR driver)
self.__import_topology = get_import_topology(self.settings, default=ImportTopologyEnum.TOPOLOGY)
def Connect(self) -> bool:
url = self.__base_url + '/OpticalTFS/GetLightpaths'
with self.__lock:
if self.__started.is_set(): return True
try:
requests.get(url, timeout=self.__timeout, verify=False, auth=self.__auth)
except requests.exceptions.Timeout:
LOGGER.exception('Timeout connecting {:s}'.format(str(self.__tapi_root)))
return False
except Exception: # pylint: disable=broad-except
LOGGER.exception('Exception connecting {:s}'.format(str(self.__tapi_root)))
self.tac.check_credentials()
except: # pylint: disable=bare-except
LOGGER.exception('Exception checking credentials')
return False
else:
self.__started.set()
......@@ -80,72 +82,91 @@ class OpticalTfsDriver(_Driver):
return []
@metered_subclass_method(METRICS_POOL)
def GetConfig(self, resource_keys : List[str] = []) -> List[Tuple[str, Union[Any, None, Exception]]]:
def GetConfig(
self, resource_keys : List[str] = []
) -> List[Tuple[str, Union[Any, None, Exception]]]:
chk_type('resources', resource_keys, list)
results = []
with self.__lock:
self.tac.check_credentials()
if len(resource_keys) == 0: resource_keys = ALL_RESOURCE_KEYS
for i, resource_key in enumerate(resource_keys):
str_resource_name = 'resource_key[#{:d}]'.format(i)
chk_string(str_resource_name, resource_key, allow_empty=False)
if resource_key == RESOURCE_ENDPOINTS:
# return endpoints through TFS NBI API and list-devices method
results.extend(self.tac.get_devices_endpoints(self.__import_topology))
# results.extend(get_lightpaths(
# self.__base_url, resource_key, timeout=self.__timeout, auth=self.__auth))
try:
chk_string(str_resource_name, resource_key, allow_empty=False)
if resource_key == RESOURCE_ENDPOINTS:
# return endpoints through TFS NBI API and list-devices method
results.extend(self.tac.get_devices_endpoints(self.__import_topology))
elif resource_key == RESOURCE_SERVICES:
# return all services through
results.extend(self.tac.get_services())
else:
MSG = 'ResourceKey({:s}) not implemented'
LOGGER.warning(MSG.format(str(resource_key)))
except Exception as e:
MSG = 'Unhandled error processing {:s}: resource_key({:s})'
LOGGER.exception(MSG.format(str_resource_name, str(resource_key)))
results.append((resource_key, e))
return results
@metered_subclass_method(METRICS_POOL)
def SetConfig(self, resources: List[Tuple[str, Any]]) -> List[Union[bool, Exception]]:
def SetConfig(
self, resources : List[Tuple[str, Any]]
) -> List[Union[bool, Exception]]:
results = []
if len(resources) == 0:
return results
if len(resources) == 0: return results
with self.__lock:
for _, resource in resources:
self.tac.check_credentials()
for resource in resources:
LOGGER.info('resource = {:s}'.format(str(resource)))
src_node = find_key(resource, 'src_node')
dst_node = find_key(resource, 'dst_node')
bitrate = find_key(resource, 'bitrate')
response = add_lightpath(self.__base_url, src_node, dst_node, bitrate,
auth=self.__auth, timeout=self.__timeout)
results.extend(response)
resource_key, resource_value = resource
try:
resource_value = json.loads(resource_value)
self.tac.setup_service(resource_value)
results.append((resource_key, True))
except Exception as e:
MSG = 'Unhandled error processing resource_key({:s})'
LOGGER.exception(MSG.format(str(resource_key)))
results.append((resource_key, e))
return results
@metered_subclass_method(METRICS_POOL)
def DeleteConfig(self, resources: List[Tuple[str, Any]]) -> List[Union[bool, Exception]]:
def DeleteConfig(
self, resources : List[Tuple[str, Any]]
) -> List[Union[bool, Exception]]:
results = []
if len(resources) == 0:
return results
if len(resources) == 0: return results
with self.__lock:
for _, resource in resources:
self.tac.check_credentials()
for resource in resources:
LOGGER.info('resource = {:s}'.format(str(resource)))
flow_id = find_key(resource, 'flow_id')
src_node = find_key(resource, 'src_node')
dst_node = find_key(resource, 'dst_node')
bitrate = find_key(resource, 'bitrate')
response = del_lightpath(self.__base_url, flow_id, src_node, dst_node, bitrate)
results.extend(response)
resource_key,resource_value = resource
try:
resource_value = json.loads(resource_value)
self.tac.teardown_service(resource_value)
results.append((resource_key, True))
except Exception as e:
MSG = 'Unhandled error processing resource_key({:s})'
LOGGER.exception(MSG.format(str(resource_key)))
results.append((resource_key, e))
return results
@metered_subclass_method(METRICS_POOL)
def SubscribeState(self, subscriptions : List[Tuple[str, float, float]]) -> List[Union[bool, Exception]]:
# Optical TFS does not support monitoring by now
def SubscribeState(
self, subscriptions : List[Tuple[str, float, float]]
) -> List[Union[bool, Exception]]:
# TODO: does not support monitoring by now
return [False for _ in subscriptions]
@metered_subclass_method(METRICS_POOL)
def UnsubscribeState(self, subscriptions : List[Tuple[str, float, float]]) -> List[Union[bool, Exception]]:
# Optical TFS does not support monitoring by now
def UnsubscribeState(
self, subscriptions : List[Tuple[str, float, float]]
) -> List[Union[bool, Exception]]:
# TODO: does not support monitoring by now
return [False for _ in subscriptions]
def GetState(
self, blocking=False, terminate : Optional[threading.Event] = None
) -> Iterator[Tuple[float, str, Any]]:
# Optical TFS does not support monitoring by now
# TODO: does not support monitoring by now
return []
# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from typing import Dict, List, Optional, Tuple
from common.Constants import DEFAULT_CONTEXT_NAME, DEFAULT_TOPOLOGY_NAME
from common.proto.context_pb2 import ServiceStatusEnum, ServiceTypeEnum
from common.tools.client.RestClient import RestClient
from common.tools.object_factory.Constraint import json_constraint_custom
from common.tools.object_factory.Context import json_context_id
from common.tools.object_factory.Device import json_device_id
from common.tools.object_factory.EndPoint import json_endpoint_id
from common.tools.object_factory.Service import json_service
from device.service.driver_api.ImportTopologyEnum import ImportTopologyEnum
CONTEXT_IDS_URL = '/tfs-api/context_ids'
TOPOLOGY_URL = '/tfs-api/context/{context_uuid:s}/topology_details/{topology_uuid:s}'
SERVICES_URL = '/tfs-api/context/{context_uuid:s}/services'
SERVICE_URL = '/tfs-api/context/{context_uuid:s}/service/{service_uuid:s}'
MAPPING_STATUS = {
'DEVICEOPERATIONALSTATUS_UNDEFINED': 0,
'DEVICEOPERATIONALSTATUS_DISABLED' : 1,
'DEVICEOPERATIONALSTATUS_ENABLED' : 2,
}
MAPPING_DRIVER = {
'DEVICEDRIVER_UNDEFINED' : 0,
'DEVICEDRIVER_OPENCONFIG' : 1,
'DEVICEDRIVER_TRANSPORT_API' : 2,
'DEVICEDRIVER_P4' : 3,
'DEVICEDRIVER_IETF_NETWORK_TOPOLOGY': 4,
'DEVICEDRIVER_ONF_TR_532' : 5,
'DEVICEDRIVER_XR' : 6,
'DEVICEDRIVER_IETF_L2VPN' : 7,
'DEVICEDRIVER_GNMI_OPENCONFIG' : 8,
'DEVICEDRIVER_OPTICAL_TFS' : 9,
'DEVICEDRIVER_IETF_ACTN' : 10,
'DEVICEDRIVER_OC' : 11,
'DEVICEDRIVER_QKD' : 12,
'DEVICEDRIVER_IETF_L3VPN' : 13,
'DEVICEDRIVER_IETF_SLICE' : 14,
'DEVICEDRIVER_NCE' : 15,
}
LOGGER = logging.getLogger(__name__)
class TfsApiClient(RestClient):
def __init__(
self, address : str, port : int, scheme : str = 'http',
username : Optional[str] = None, password : Optional[str] = None,
timeout : Optional[int] = 30
) -> None:
super().__init__(
address, port, scheme=scheme, username=username, password=password,
timeout=timeout, verify_certs=False, allow_redirects=True, logger=LOGGER
)
def check_credentials(self) -> None:
self.get(CONTEXT_IDS_URL)
LOGGER.info('Credentials checked')
def get_devices_endpoints(
self, import_topology : ImportTopologyEnum = ImportTopologyEnum.DEVICES
) -> List[Dict]:
LOGGER.debug('[get_devices_endpoints] begin')
MSG = '[get_devices_endpoints] import_topology={:s}'
LOGGER.debug(MSG.format(str(import_topology)))
if import_topology == ImportTopologyEnum.DISABLED:
MSG = 'Unsupported import_topology mode: {:s}'
raise Exception(MSG.format(str(import_topology)))
topology = self.get(TOPOLOGY_URL.format(
context_uuid=DEFAULT_CONTEXT_NAME, topology_uuid=DEFAULT_TOPOLOGY_NAME
))
result = list()
for json_device in topology['devices']:
device_uuid : str = json_device['device_id']['device_uuid']['uuid']
device_type : str = json_device['device_type']
#if not device_type.startswith('emu-'): device_type = 'emu-' + device_type
device_status = json_device['device_operational_status']
device_url = '/devices/device[{:s}]'.format(device_uuid)
device_data = {
'uuid': json_device['device_id']['device_uuid']['uuid'],
'name': json_device['name'],
'type': device_type,
'status': MAPPING_STATUS[device_status],
'drivers': [
MAPPING_DRIVER[driver]
for driver in json_device['device_drivers']
],
}
result.append((device_url, device_data))
for json_endpoint in json_device['device_endpoints']:
endpoint_uuid = json_endpoint['endpoint_id']['endpoint_uuid']['uuid']
endpoint_url = '/endpoints/endpoint[{:s}]'.format(endpoint_uuid)
endpoint_data = {
'device_uuid': device_uuid,
'uuid': endpoint_uuid,
'name': json_endpoint['name'],
'type': json_endpoint['endpoint_type'],
}
result.append((endpoint_url, endpoint_data))
if import_topology == ImportTopologyEnum.DEVICES:
LOGGER.debug('[get_devices_endpoints] devices only; returning')
return result
for json_link in topology['links']:
link_uuid : str = json_link['link_id']['link_uuid']['uuid']
link_url = '/links/link[{:s}]'.format(link_uuid)
link_endpoint_ids = [
(
json_endpoint_id['device_id']['device_uuid']['uuid'],
json_endpoint_id['endpoint_uuid']['uuid'],
)
for json_endpoint_id in json_link['link_endpoint_ids']
]
link_data = {
'uuid': json_link['link_id']['link_uuid']['uuid'],
'name': json_link['name'],
'endpoints': link_endpoint_ids,
}
result.append((link_url, link_data))
for json_link in topology['optical_links']:
link_uuid : str = json_link['link_id']['link_uuid']['uuid']
link_url = '/links/link[{:s}]'.format(link_uuid)
link_endpoint_ids = [
(
json_endpoint_id['device_id']['device_uuid']['uuid'],
json_endpoint_id['endpoint_uuid']['uuid'],
)
for json_endpoint_id in json_link['link_endpoint_ids']
]
link_data = {
'uuid': json_link['link_id']['link_uuid']['uuid'],
'name': json_link['name'],
'endpoints': link_endpoint_ids,
}
result.append((link_url, link_data))
LOGGER.debug('[get_devices_endpoints] topology; returning')
return result
def setup_service(self, resource_value : Dict) -> None:
service_uuid = resource_value['service_uuid' ]
service_name = resource_value['service_name' ]
src_device_uuid = resource_value['src_device_uuid' ]
src_endpoint_uuid = resource_value['src_endpoint_uuid']
dst_device_uuid = resource_value['dst_device_uuid' ]
dst_endpoint_uuid = resource_value['dst_endpoint_uuid']
bitrate = resource_value['bitrate' ]
bidir = resource_value['bidir' ]
ob_width = resource_value['ob_width' ]
endpoint_ids = [
json_endpoint_id(json_device_id(src_device_uuid), src_endpoint_uuid),
json_endpoint_id(json_device_id(dst_device_uuid), dst_endpoint_uuid),
]
constraints = [
json_constraint_custom('bandwidth[gbps]', str(bitrate)),
json_constraint_custom('bidirectionality', '1' if bidir else '0'),
]
if service_name == 'IP1/PORT-xe1==IP2/PORT-xe1':
constraints.append(json_constraint_custom('optical-band-width[GHz]', str(ob_width)))
service_add = json_service(
service_uuid,
ServiceTypeEnum.Name(ServiceTypeEnum.SERVICETYPE_OPTICAL_CONNECTIVITY),
context_id = json_context_id(DEFAULT_CONTEXT_NAME),
name = service_name,
status = ServiceStatusEnum.Name(ServiceStatusEnum.SERVICESTATUS_PLANNED),
)
services_url = SERVICES_URL.format(context_uuid=DEFAULT_CONTEXT_NAME)
service_ids = self.post(services_url, body=service_add)
assert len(service_ids) == 1
service_id = service_ids[0]
service_uuid = service_id['service_uuid']['uuid']
service_upd = json_service(
service_uuid,
ServiceTypeEnum.SERVICETYPE_OPTICAL_CONNECTIVITY,
context_id = json_context_id(DEFAULT_CONTEXT_NAME),
name = service_name, endpoint_ids = endpoint_ids, constraints = constraints,
status = ServiceStatusEnum.Name(ServiceStatusEnum.SERVICESTATUS_PLANNED),
)
service_url = SERVICE_URL.format(context_uuid=DEFAULT_CONTEXT_NAME, service_uuid=service_uuid)
self.put(service_url, body=service_upd)
def teardown_service(self, resource_value : Dict) -> None:
service_uuid = resource_value['service_uuid']
service_name = resource_value['service_name']
service_url = SERVICE_URL.format(context_uuid=DEFAULT_CONTEXT_NAME, service_uuid=service_uuid)
self.delete(service_url)
if service_name == 'IP1/PORT-xe1==IP2/PORT-xe1':
self.delete(service_url)
@staticmethod
def parse_service(service : Dict) -> Tuple[str, Dict]:
service_uuid = service['service_id']['service_uuid']['uuid']
src_endpoint_id = service['service_endpoint_ids'][ 0]
dst_endpoint_id = service['service_endpoint_ids'][-1]
parsed_service = {
'service_uuid' : service_uuid,
'service_name' : service['name'],
'src_device_uuid' : src_endpoint_id['device_id']['device_uuid']['uuid'],
'src_endpoint_uuid': src_endpoint_id['endpoint_uuid']['uuid'],
'dst_device_uuid' : dst_endpoint_id['device_id']['device_uuid']['uuid'],
'dst_endpoint_uuid': dst_endpoint_id['endpoint_uuid']['uuid'],
}
for constraint in service.get('service_constraints', list()):
if 'custom' not in constraint: continue
constraint_type = constraint['custom']['constraint_type']
constraint_value = constraint['custom']['constraint_value']
if constraint_type == 'bandwidth[gbps]':
parsed_service['bitrate'] = int(float(constraint_value))
if constraint_type == 'bidirectionality':
parsed_service['bidir'] = int(constraint_value) == 1
if constraint_type == 'optical-band-width[GHz]':
parsed_service['ob_width'] = int(constraint_value)
resource_key = '/services/service[{:s}]'.format(service_uuid)
return resource_key, parsed_service
def get_services(self) -> List[Tuple[str, Dict]]:
services_url = SERVICES_URL.format(context_uuid=DEFAULT_CONTEXT_NAME)
_services = self.get(services_url)
OPTICAL_CONNECTIVITY_SERVICE_TYPES = {
'SERVICETYPE_OPTICAL_CONNECTIVITY',
ServiceTypeEnum.SERVICETYPE_OPTICAL_CONNECTIVITY
}
return [
TfsApiClient.parse_service(service)
for service in _services['services']
if service['service_type'] in OPTICAL_CONNECTIVITY_SERVICE_TYPES
]
def get_service(self, service_uuid : str) -> Tuple[str, Dict]:
service_url = SERVICE_URL.format(context_uuid=DEFAULT_CONTEXT_NAME, service_uuid=service_uuid)
service = self.get(service_url)
return TfsApiClient.parse_service(service)
# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging, requests
from typing import Dict, List, Optional, Union
from common.tools.client.RestClient import RestClient
LOGGER = logging.getLogger(__name__)
GET_OPTICAL_LINKS_URL = '/OpticalTFS/GetLinks'
GET_LIGHTPATHS_URL = '/OpticalTFS/GetLightpaths'
ADD_LIGHTPATH_URL = '/OpticalTFS/AddLightpath/{src_node:s}/{dst_node:s}/{bitrate:s}'
DEL_LIGHTPATH_URL = '/OpticalTFS/DelLightpath/{flow_id:s}/{src_node:s}/{dst_node:s}/{bitrate:s}'
class TfsOpticalClient(RestClient):
def __init__(
self, address : str, port : int, scheme : str = 'http',
username : Optional[str] = None, password : Optional[str] = None,
timeout : Optional[int] = 30
) -> None:
super().__init__(
address, port, scheme=scheme, username=username, password=password,
timeout=timeout, verify_certs=False, allow_redirects=True, logger=LOGGER
)
def check_credentials(self) -> None:
self.get(GET_LIGHTPATHS_URL, expected_status_codes={requests.codes['OK']})
LOGGER.info('Credentials checked')
def get_optical_links(self) -> Union[List[Dict], Exception]:
try:
return self.get(GET_OPTICAL_LINKS_URL, expected_status_codes={requests.codes['OK']})
except Exception as e:
LOGGER.exception('Exception retrieving optical links')
return e
def get_lightpaths(self) -> Union[List[Dict], Exception]:
try:
lightpaths : List[Dict] = self.get(
GET_LIGHTPATHS_URL, expected_status_codes={requests.codes['OK']}
)
except Exception as e:
LOGGER.exception('Exception retrieving lightpaths')
return e
result = []
for lightpath in lightpaths:
assert 'flow_id' in lightpath
assert 'src' in lightpath
assert 'dst' in lightpath
assert 'bitrate' in lightpath
resource_key = '/lightpaths/lightpath[{:s}]'.format(lightpath['flow_id'])
result.append((resource_key, lightpath))
return result
def add_lightpath(
self, src_node : str, dst_node : str, bitrate : int
) -> Union[List[Dict], Exception]:
MSG = 'Add Lightpath: {:s} <-> {:s} with {:d} bitrate'
LOGGER.info(MSG.format(str(src_node), str(dst_node), int(bitrate)))
request_endpoint = ADD_LIGHTPATH_URL.format(
src_node=str(src_node), dst_node=str(dst_node), bitrate=int(bitrate)
)
expected_status_codes = {requests.codes['CREATED'], requests.codes['NO_CONTENT']}
try:
return self.put(request_endpoint, expected_status_codes=expected_status_codes)
except Exception as e:
MSG = 'Exception requesting Lightpath: {:s} <-> {:s} with {:s} bitrate'
LOGGER.exception(MSG.format(str(src_node), str(dst_node), str(bitrate)))
return e
def del_lightpath(
self, flow_id : str, src_node : str, dst_node : str, bitrate : int
) -> Union[List[Dict], Exception]:
MSG = 'Delete Lightpath {:s}: {:s} <-> {:s} with {:d} bitrate'
LOGGER.info(MSG.format(str(flow_id), str(src_node), str(dst_node), int(bitrate)))
request_endpoint = DEL_LIGHTPATH_URL.format(
src_node=str(src_node), dst_node=str(dst_node), bitrate=int(bitrate)
)
expected_status_codes = {requests.codes['NO_CONTENT']}
try:
return self.delete(request_endpoint, expected_status_codes=expected_status_codes)
except Exception as e:
MSG = 'Exception deleting Lightpath {:s}: {:s} <-> {:s} with {:s} bitrate'
LOGGER.exception(MSG.format(str(flow_id), str(src_node), str(dst_node), str(bitrate)))
return e
# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json, logging, requests
from requests.auth import HTTPBasicAuth
from typing import Optional
LOGGER = logging.getLogger(__name__)
HTTP_OK_CODES = {
200, # OK
201, # Created
202, # Accepted
204, # No Content
}
def find_key(resource, key):
return json.loads(resource[1])[key]
def get_lightpaths(root_url : str, resource_key : str,auth : Optional[HTTPBasicAuth] = None,
timeout : Optional[int] = None):
headers = {'accept': 'application/json'}
url = '{:s}/OpticalTFS/GetLightpaths'.format(root_url)
result = []
try:
response = requests.get(url, timeout=timeout, headers=headers, verify=False, auth=auth)
except requests.exceptions.Timeout:
LOGGER.exception('Timeout connecting {:s}'.format(url))
return result
except Exception as e: # pylint: disable=broad-except
LOGGER.exception('Exception retrieving {:s}'.format(resource_key))
result.append((resource_key, e))
return result
try:
flows = json.loads(response.content)
except Exception as e: # pylint: disable=broad-except
LOGGER.warning('Unable to decode reply: {:s}'.format(str(response.content)))
result.append((resource_key, e))
return result
for flow in flows:
flow_id = flow.get('flow_id')
source = flow.get('src')
destination = flow.get('dst')
bitrate = flow.get('bitrate')
endpoint_url = '/flows/flow[{:s}]'.format(flow_id)
endpoint_data = {'flow_id': flow_id, 'src': source, 'dst': destination, 'bitrate': bitrate}
result.append((endpoint_url, endpoint_data))
return result
def add_lightpath(root_url, src_node, dst_node, bitrate,
auth : Optional[HTTPBasicAuth] = None, timeout : Optional[int] = None):
headers = {'accept': 'application/json'}
url = '{:s}/OpticalTFS/AddLightpath/{:s}/{:s}/{:s}'.format(
root_url, src_node, dst_node, bitrate)
results = []
try:
LOGGER.info('Lightpath request: {:s} <-> {:s} with {:s} bitrate'.format(
str(src_node), str(dst_node), str(bitrate)))
response = requests.put(url=url, timeout=timeout, headers=headers, verify=False, auth=auth)
results.append(response.json())
LOGGER.info('Response: {:s}'.format(str(response)))
except Exception as e: # pylint: disable=broad-except
LOGGER.exception('Exception requesting Lightpath: {:s} <-> {:s} with {:s} bitrate'.format(
str(src_node), str(dst_node), str(bitrate)))
results.append(e)
else:
if response.status_code not in HTTP_OK_CODES:
msg = 'Could not create Lightpath(status_code={:s} reply={:s}'
LOGGER.error(msg.format(str(response.status_code), str(response)))
results.append(response.status_code in HTTP_OK_CODES)
return results
def del_lightpath(root_url, flow_id, src_node, dst_node, bitrate,
auth : Optional[HTTPBasicAuth] = None, timeout : Optional[int] = None):
url = '{:s}/OpticalTFS/DelLightpath/{:s}/{:s}/{:s}/{:s}'.format(
root_url, flow_id, src_node, dst_node, bitrate)
headers = {'accept': 'application/json'}
results = []
try:
response = requests.delete(
url=url, timeout=timeout, headers=headers, verify=False, auth=auth)
except Exception as e: # pylint: disable=broad-except
LOGGER.exception('Exception deleting Lightpath(uuid={:s})'.format(str(flow_id)))
results.append(e)
else:
if response.status_code not in HTTP_OK_CODES:
msg = 'Could not delete Lightpath(flow_id={:s}). status_code={:s} reply={:s}'
LOGGER.error(msg.format(str(flow_id), str(response.status_code), str(response)))
results.append(response.status_code in HTTP_OK_CODES)
return results
def get_topology(root_url : str, resource_key : str,auth : Optional[HTTPBasicAuth] = None,
timeout : Optional[int] = None):
headers = {'accept': 'application/json'}
url = '{:s}/OpticalTFS/GetLinks'.format(root_url)
result = []
try:
response = requests.get(url, timeout=timeout, headers=headers, verify=False, auth=auth)
except requests.exceptions.Timeout:
LOGGER.exception('Timeout connecting {:s}'.format(url))
return result
except Exception as e: # pylint: disable=broad-except
LOGGER.exception('Exception retrieving {:s}'.format(resource_key))
result.append((resource_key, e))
return result
try:
response = json.loads(response.content)
except Exception as e: # pylint: disable=broad-except
LOGGER.warning('Unable to decode reply: {:s}'.format(str(response.content)))
result.append((resource_key, e))
return result
result.append(response)
return result
......@@ -12,9 +12,3 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from device.service.driver_api._Driver import RESOURCE_ENDPOINTS, RESOURCE_SERVICES
ALL_RESOURCE_KEYS = [
RESOURCE_ENDPOINTS,
RESOURCE_SERVICES,
]
......@@ -3,7 +3,7 @@ from json import dumps
import requests
from device.service.drivers.ietf_l3vpn.driver import IetfL3VpnDriver
from device.service.drivers.ietf_l3vpn.IetfL3VpnDriver import IetfL3VpnDriver
from device.service.Tools import RESOURCE_ENDPOINTS
settings = {
......
......@@ -79,6 +79,7 @@ RUN python3 -m pip install -r e2e_orchestrator/requirements.txt
# Add component files into working directory
COPY src/context/__init__.py context/__init__.py
COPY src/context/client/. context/client/
COPY src/context/service/database/uuids/. context/service/database/uuids/
COPY src/service/__init__.py service/__init__.py
COPY src/service/client/. service/client/
COPY src/e2e_orchestrator/. e2e_orchestrator/
......
......@@ -12,5 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
networkx
websockets==12.0
networkx==3.2.1
python-socketio==5.12.1
requests==2.27.*
websocket-client==1.8.0 # used by socketio to upgrate to websocket
......@@ -12,202 +12,49 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import copy
import copy, grpc, logging, networkx
from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_method
from common.proto.e2eorchestrator_pb2 import E2EOrchestratorRequest, E2EOrchestratorReply
from common.proto.context_pb2 import (
Empty, Connection, EndPointId, Link, LinkId, TopologyDetails, Topology, Context, Service, ServiceId,
ServiceTypeEnum, ServiceStatusEnum)
from common.proto.context_pb2 import Empty, Connection, EndPointId
from common.proto.e2eorchestrator_pb2_grpc import E2EOrchestratorServiceServicer
from common.Settings import get_setting
from context.client.ContextClient import ContextClient
from service.client.ServiceClient import ServiceClient
from context.service.database.uuids.EndPoint import endpoint_get_uuid
from context.service.database.uuids.Device import device_get_uuid
from common.proto.vnt_manager_pb2 import VNTSubscriptionRequest
from common.tools.grpc.Tools import grpc_message_to_json_string
import grpc
import json
import logging
import networkx as nx
from threading import Thread
from websockets.sync.client import connect
from websockets.sync.server import serve
from common.Constants import DEFAULT_CONTEXT_NAME
LOGGER = logging.getLogger(__name__)
logging.getLogger("websockets").propagate = True
METRICS_POOL = MetricsPool("E2EOrchestrator", "RPC")
context_client: ContextClient = ContextClient()
service_client: ServiceClient = ServiceClient()
EXT_HOST = str(get_setting('WS_IP_HOST'))
EXT_PORT = str(get_setting('WS_IP_PORT'))
OWN_HOST = str(get_setting('WS_E2E_HOST'))
OWN_PORT = str(get_setting('WS_E2E_PORT'))
ALL_HOSTS = "0.0.0.0"
class SubscriptionServer(Thread):
def __init__(self):
Thread.__init__(self)
def run(self):
url = "ws://" + EXT_HOST + ":" + EXT_PORT
request = VNTSubscriptionRequest()
request.host = OWN_HOST
request.port = OWN_PORT
try:
LOGGER.debug("Trying to connect to {}".format(url))
websocket = connect(url)
except Exception as ex:
LOGGER.error('Error connecting to {}'.format(url))
else:
with websocket:
LOGGER.debug("Connected to {}".format(url))
send = grpc_message_to_json_string(request)
websocket.send(send)
LOGGER.debug("Sent: {}".format(send))
try:
message = websocket.recv()
LOGGER.debug("Received message from WebSocket: {}".format(message))
except Exception as ex:
LOGGER.error('Exception receiving from WebSocket: {}'.format(ex))
self._events_server()
def _events_server(self):
all_hosts = "0.0.0.0"
try:
server = serve(self._event_received, all_hosts, int(OWN_PORT))
except Exception as ex:
LOGGER.error('Error starting server on {}:{}'.format(all_hosts, OWN_PORT))
LOGGER.error('Exception!: {}'.format(ex))
else:
with server:
LOGGER.info("Running events server...: {}:{}".format(all_hosts, OWN_PORT))
server.serve_forever()
def _event_received(self, connection):
LOGGER.info("EVENT received!")
for message in connection:
message_json = json.loads(message)
# LOGGER.info("message_json: {}".format(message_json))
# Link creation
if 'link_id' in message_json:
link = Link(**message_json)
service = Service()
service.service_id.service_uuid.uuid = link.link_id.link_uuid.uuid
service.service_id.context_id.context_uuid.uuid = DEFAULT_CONTEXT_NAME
service.service_type = ServiceTypeEnum.SERVICETYPE_OPTICAL_CONNECTIVITY
service.service_status.service_status = ServiceStatusEnum.SERVICESTATUS_PLANNED
service_client.CreateService(service)
links = context_client.ListLinks(Empty()).links
a_device_uuid = device_get_uuid(link.link_endpoint_ids[0].device_id)
a_endpoint_uuid = endpoint_get_uuid(link.link_endpoint_ids[0])[2]
z_device_uuid = device_get_uuid(link.link_endpoint_ids[1].device_id)
z_endpoint_uuid = endpoint_get_uuid(link.link_endpoint_ids[1])[2]
for _link in links:
for _endpoint_id in _link.link_endpoint_ids:
if _endpoint_id.device_id.device_uuid.uuid == a_device_uuid and \
_endpoint_id.endpoint_uuid.uuid == a_endpoint_uuid:
a_ep_id = _endpoint_id
elif _endpoint_id.device_id.device_uuid.uuid == z_device_uuid and \
_endpoint_id.endpoint_uuid.uuid == z_endpoint_uuid:
z_ep_id = _endpoint_id
if (not 'a_ep_id' in locals()) or (not 'z_ep_id' in locals()):
error_msg = 'Could not get VNT link endpoints'
LOGGER.error(error_msg)
connection.send(error_msg)
return
service.service_endpoint_ids.append(copy.deepcopy(a_ep_id))
service.service_endpoint_ids.append(copy.deepcopy(z_ep_id))
# service_client.UpdateService(service)
connection.send(grpc_message_to_json_string(link))
# Link removal
elif 'link_uuid' in message_json:
LOGGER.info('REMOVING VIRTUAL LINK')
link_id = LinkId(**message_json)
service_id = ServiceId()
service_id.service_uuid.uuid = link_id.link_uuid.uuid
service_id.context_id.context_uuid.uuid = DEFAULT_CONTEXT_NAME
# service_client.DeleteService(service_id)
connection.send(grpc_message_to_json_string(link_id))
context_client.RemoveLink(link_id)
# Topology received
else:
LOGGER.info('TOPOLOGY')
topology_details = TopologyDetails(**message_json)
context = Context()
context.context_id.context_uuid.uuid = topology_details.topology_id.context_id.context_uuid.uuid
context_client.SetContext(context)
topology = Topology()
topology.topology_id.context_id.CopyFrom(context.context_id)
topology.topology_id.topology_uuid.uuid = topology_details.topology_id.topology_uuid.uuid
context_client.SetTopology(topology)
for device in topology_details.devices:
context_client.SetDevice(device)
for link in topology_details.links:
context_client.SetLink(link)
class E2EOrchestratorServiceServicerImpl(E2EOrchestratorServiceServicer):
def __init__(self):
LOGGER.debug("Creating Servicer...")
try:
LOGGER.debug("Requesting subscription")
sub_server = SubscriptionServer()
sub_server.start()
LOGGER.debug("Servicer Created")
except Exception as ex:
LOGGER.info("Exception!: {}".format(ex))
LOGGER.debug('Creating Servicer...')
LOGGER.debug('Servicer Created')
@safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
def Compute(self, request: E2EOrchestratorRequest, context: grpc.ServicerContext) -> E2EOrchestratorReply:
endpoints_ids = []
for endpoint_id in request.service.service_endpoint_ids:
endpoints_ids.append(endpoint_get_uuid(endpoint_id)[2])
def Compute(
self, request: E2EOrchestratorRequest, context: grpc.ServicerContext
) -> E2EOrchestratorReply:
endpoints_ids = [
endpoint_get_uuid(endpoint_id)[2]
for endpoint_id in request.service.service_endpoint_ids
]
graph = nx.Graph()
graph = networkx.Graph()
context_client = ContextClient()
devices = context_client.ListDevices(Empty()).devices
for device in devices:
endpoints_uuids = [endpoint.endpoint_id.endpoint_uuid.uuid
for endpoint in device.device_endpoints]
endpoints_uuids = [
endpoint.endpoint_id.endpoint_uuid.uuid
for endpoint in device.device_endpoints
]
for ep in endpoints_uuids:
graph.add_node(ep)
for ep in endpoints_uuids:
for ep_i in endpoints_uuids:
if ep == ep_i:
for ep_i in endpoints_uuids:
for ep_j in endpoints_uuids:
if ep_i == ep_j:
continue
graph.add_edge(ep, ep_i)
graph.add_edge(ep_i, ep_j)
links = context_client.ListLinks(Empty()).links
for link in links:
......@@ -217,7 +64,9 @@ class E2EOrchestratorServiceServicerImpl(E2EOrchestratorServiceServicer):
graph.add_edge(eps[0], eps[1])
shortest = nx.shortest_path(graph, endpoints_ids[0], endpoints_ids[1])
shortest = networkx.shortest_path(
graph, endpoints_ids[0], endpoints_ids[1]
)
path = E2EOrchestratorReply()
path.services.append(copy.deepcopy(request.service))
......
# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import copy, grpc, json, logging, networkx, requests, threading
from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_method
from common.proto.e2eorchestrator_pb2 import E2EOrchestratorRequest, E2EOrchestratorReply
from common.proto.context_pb2 import (
Empty, Connection, EndPointId, Link, LinkId, TopologyDetails, Topology, Context, Service, ServiceId,
ServiceTypeEnum, ServiceStatusEnum)
from common.proto.e2eorchestrator_pb2_grpc import E2EOrchestratorServiceServicer
from common.proto.vnt_manager_pb2 import VNTSubscriptionRequest
from common.tools.grpc.Tools import grpc_message_to_json_string
from common.Constants import DEFAULT_CONTEXT_NAME, DEFAULT_TOPOLOGY_NAME
from common.Settings import get_setting
from context.client.ContextClient import ContextClient
from context.service.database.uuids.EndPoint import endpoint_get_uuid
from context.service.database.uuids.Device import device_get_uuid
from service.client.ServiceClient import ServiceClient
from websockets.sync.client import connect
from websockets.sync.server import serve
LOGGER = logging.getLogger(__name__)
logging.getLogger("websockets").propagate = True
logging.getLogger("requests.packages.urllib3").propagate = True
METRICS_POOL = MetricsPool("E2EOrchestrator", "RPC")
context_client: ContextClient = ContextClient()
service_client: ServiceClient = ServiceClient()
EXT_HOST = str(get_setting('WS_IP_HOST'))
EXT_PORT = int(get_setting('WS_IP_PORT'))
EXT_URL = 'ws://{:s}:{:d}'.format(EXT_HOST, EXT_PORT)
OWN_HOST = str(get_setting('WS_E2E_HOST'))
OWN_PORT = int(get_setting('WS_E2E_PORT'))
ALL_HOSTS = '0.0.0.0'
class SubscriptionServer(threading.Thread):
def run(self):
request = VNTSubscriptionRequest()
request.host = OWN_HOST
request.port = OWN_PORT
try:
LOGGER.debug('Trying to connect to {:s}'.format(EXT_URL))
websocket = connect(EXT_URL)
except: # pylint: disable=bare-except
LOGGER.exception('Error connecting to {:s}'.format(EXT_URL))
else:
with websocket:
LOGGER.debug('Connected to {:s}'.format(EXT_URL))
send = grpc_message_to_json_string(request)
websocket.send(send)
LOGGER.debug('Sent: {:s}'.format(send))
try:
message = websocket.recv()
LOGGER.debug('Received message from WebSocket: {:s}'.format(message))
except Exception as ex:
LOGGER.error('Exception receiving from WebSocket: {:s}'.format(ex))
self._events_server()
def _events_server(self):
try:
server = serve(self._event_received, ALL_HOSTS, int(OWN_PORT))
except: # pylint: disable=bare-except
LOGGER.exception('Error starting server on {:s}:{:d}'.format(ALL_HOSTS, OWN_PORT))
else:
with server:
LOGGER.info('Running events server...: {:s}:{:d}'.format(ALL_HOSTS, OWN_PORT))
server.serve_forever()
def _event_received(self, connection):
LOGGER.debug('Event received')
for message in connection:
message_json = json.loads(message)
# Link creation
if 'link_id' in message_json:
LOGGER.debug('Link creation')
link = Link(**message_json)
service = Service()
service.service_id.service_uuid.uuid = link.link_id.link_uuid.uuid
service.service_id.context_id.context_uuid.uuid = DEFAULT_CONTEXT_NAME
service.service_type = ServiceTypeEnum.SERVICETYPE_OPTICAL_CONNECTIVITY
service.service_status.service_status = ServiceStatusEnum.SERVICESTATUS_PLANNED
service_client.CreateService(service)
a_device_uuid = device_get_uuid(link.link_endpoint_ids[0].device_id)
a_endpoint_uuid = endpoint_get_uuid(link.link_endpoint_ids[0])[2]
z_device_uuid = device_get_uuid(link.link_endpoint_ids[1].device_id)
z_endpoint_uuid = endpoint_get_uuid(link.link_endpoint_ids[1])[2]
links = context_client.ListLinks(Empty()).links
for _link in links:
for _endpoint_id in _link.link_endpoint_ids:
if _endpoint_id.device_id.device_uuid.uuid == a_device_uuid and \
_endpoint_id.endpoint_uuid.uuid == a_endpoint_uuid:
a_ep_id = _endpoint_id
elif _endpoint_id.device_id.device_uuid.uuid == z_device_uuid and \
_endpoint_id.endpoint_uuid.uuid == z_endpoint_uuid:
z_ep_id = _endpoint_id
if (not 'a_ep_id' in locals()) or (not 'z_ep_id' in locals()):
error_msg = f'Could not get VNT link endpoints\
\n\ta_endpoint_uuid= {a_endpoint_uuid}\
\n\tz_endpoint_uuid= {z_device_uuid}'
LOGGER.error(error_msg)
connection.send(error_msg)
return
service.service_endpoint_ids.append(copy.deepcopy(a_ep_id))
service.service_endpoint_ids.append(copy.deepcopy(z_ep_id))
service_client.UpdateService(service)
re_svc = context_client.GetService(service.service_id)
connection.send(grpc_message_to_json_string(link))
context_client.SetLink(link)
elif 'link_uuid' in message_json:
LOGGER.debug('Link removal')
link_id = LinkId(**message_json)
service_id = ServiceId()
service_id.service_uuid.uuid = link_id.link_uuid.uuid
service_id.context_id.context_uuid.uuid = DEFAULT_CONTEXT_NAME
service_client.DeleteService(service_id)
connection.send(grpc_message_to_json_string(link_id))
context_client.RemoveLink(link_id)
else:
LOGGER.debug('Topology received')
topology_details = TopologyDetails(**message_json)
context = Context()
context.context_id.context_uuid.uuid = topology_details.topology_id.context_id.context_uuid.uuid
context_client.SetContext(context)
topology = Topology()
topology.topology_id.context_id.CopyFrom(context.context_id)
topology.topology_id.topology_uuid.uuid = topology_details.topology_id.topology_uuid.uuid
context_client.SetTopology(topology)
for device in topology_details.devices:
context_client.SetDevice(device)
for link in topology_details.links:
context_client.SetLink(link)