Skip to content
Snippets Groups Projects
Commit ad9f57b9 authored by Lluis Gifre Renom's avatar Lluis Gifre Renom
Browse files

E2E Orchestrator component:

- Changed recommendation namespace to use specific events for actions
- Corrected path computation logic
- Corrected subscription creation logic
parent 597edf16
No related branches found
No related tags found
3 merge requests!359Release TeraFlowSDN 5.0,!328Resolve "(CTTC) Update recommendations to use SocketIO on NBI and E2E Orch components",!286Resolve "(CTTC) Implement integration test between E2E-IP-Optical SDN Controllers"
......@@ -31,7 +31,7 @@ class Subscriptions:
device_uuid = tfs_ctrl_settings.device_uuid
with self._lock:
subscription = self._subscriptions.get(device_uuid)
if (subscription is not None) and subscription.is_running: return
if subscription is not None: return
subscription = Subscription(tfs_ctrl_settings, self._dispatchers, self._terminate)
self._subscriptions[device_uuid] = subscription
subscription.start()
......
......@@ -31,25 +31,60 @@ class ClientNamespace(socketio.ClientNamespace):
MSG = '[on_disconnect] Disconnected!, reason: {:s}'
LOGGER.info(MSG.format(str(reason)))
def on_recommendation(self, data):
MSG = '[on_recommendation] begin data={:s}'
def on_vlink_create(self, data):
MSG = '[on_vlink_create] begin data={:s}'
LOGGER.info(MSG.format(str(data)))
json_data = json.loads(data)
request_key = json_data.pop('_request_key')
recommendation = Recommendation(
action = RecommendationAction.VLINK_CREATE,
data = json_data,
)
result = Future()
MSG = '[on_vlink_create] Recommendation ({:s}): {:s}'
LOGGER.info(MSG.format(str(request_key), str(recommendation)))
LOGGER.debug('[on_vlink_create] Queuing recommendation...')
self._dispatcher_queue.put_nowait((recommendation, result))
reply = dict()
reply['_request_key'] = request_key
try:
reply['result'] = result.result()
event = reply['result']['event']
except Exception as e:
reply['error'] = str(e)
#reply['stacktrace'] = str(e)
event = 'error'
LOGGER.debug('[on_vlink_create] Replying...')
self.emit(event, json.dumps(reply))
LOGGER.debug('[on_vlink_create] end')
def on_vlink_remove(self, data):
MSG = '[on_vlink_remove] begin data={:s}'
LOGGER.info(MSG.format(str(data)))
json_data = json.loads(data)
request_key = json_data.pop('_request_key')
recommendation = Recommendation(
action = RecommendationAction._value2member_map_[json_data['action']],
data = json.loads(json_data['data']),
action = RecommendationAction.VLINK_REMOVE,
data = json_data,
)
result = Future()
MSG = '[on_recommendation] Recommendation: {:s}'
LOGGER.info(MSG.format(str(recommendation)))
MSG = '[on_vlink_remove] Recommendation ({:s}): {:s}'
LOGGER.info(MSG.format(str(request_key), str(recommendation)))
LOGGER.debug('[on_recommendation] Queuing recommendation...')
LOGGER.debug('[on_vlink_remove] Queuing recommendation...')
self._dispatcher_queue.put_nowait((recommendation, result))
LOGGER.debug('[on_recommendation] Recommendation processed...')
reply = dict()
reply['_request_key'] = request_key
try:
reply['result'] = result.result()
event = reply['result']['event']
......@@ -58,6 +93,6 @@ class ClientNamespace(socketio.ClientNamespace):
#reply['stacktrace'] = str(e)
event = 'error'
LOGGER.debug('[on_recommendation] Replying...')
LOGGER.debug('[on_vlink_remove] Replying...')
self.emit(event, json.dumps(reply))
LOGGER.debug('[on_recommendation] end')
LOGGER.debug('[on_vlink_remove] end')
......@@ -46,7 +46,7 @@ class RecommendationDispatcher(_Dispatcher):
vlink_optical_service['service_id']['service_uuid']['uuid'] = service_id.service_uuid.uuid
service_id = service_client.UpdateService(Service(**vlink_optical_service))
result = {'event': 'vlink-created'}
result = {'event': 'vlink_created'}
elif request.action == RecommendationAction.VLINK_REMOVE:
vlink_service_uuid = request.data['link_id']['link_uuid']['uuid']
context_id = json_context_id(DEFAULT_CONTEXT_NAME)
......@@ -58,7 +58,7 @@ class RecommendationDispatcher(_Dispatcher):
if vlink_service_uuid == 'IP1/PORT-xe1==IP2/PORT-xe1':
service_id = service_client.DeleteService(ServiceId(**vlink_optical_service_id))
result = {'event': 'vlink-removed'}
result = {'event': 'vlink_removed'}
else:
MSG = 'RecommendationAction not supported in Recommendation({:s})'
raise NotImplementedError(MSG.format(str(request)))
......
......@@ -14,11 +14,11 @@
import logging, networkx
from dataclasses import dataclass, field
from typing import Dict, List
from typing import Dict, List, Set
from common.proto.context_pb2 import ServiceTypeEnum
from common.tools.context_queries.Topology import get_topology_details
from common.tools.object_factory.Constraint import json_constraint_custom
from common.tools.object_factory.Context import json_context
from common.tools.object_factory.Context import json_context_id
from common.tools.object_factory.Device import json_device_id
from common.tools.object_factory.EndPoint import json_endpoint_id
from common.tools.object_factory.Service import json_service
......@@ -39,6 +39,21 @@ class GraphAndMapping:
endpoint_to_device_uuid : Dict[str, str] = field(default_factory=dict)
EXCLUDED_DEVICE_TYPES : Set[str] = {
DeviceTypeEnum.EMULATED_IP_SDN_CONTROLLER.value,
DeviceTypeEnum.EMULATED_MICROWAVE_RADIO_SYSTEM.value,
DeviceTypeEnum.EMULATED_OPEN_LINE_SYSTEM.value,
DeviceTypeEnum.EMULATED_XR_CONSTELLATION.value,
DeviceTypeEnum.IETF_SLICE.value,
DeviceTypeEnum.IP_SDN_CONTROLLER.value,
DeviceTypeEnum.MICROWAVE_RADIO_SYSTEM.value,
DeviceTypeEnum.NCE.value,
DeviceTypeEnum.OPEN_LINE_SYSTEM.value,
DeviceTypeEnum.TERAFLOWSDN_CONTROLLER.value,
DeviceTypeEnum.XR_CONSTELLATION.value,
}
def compose_graph_from_topology() -> GraphAndMapping:
context_client = ContextClient()
topology_details = get_topology_details(
......@@ -48,12 +63,18 @@ def compose_graph_from_topology() -> GraphAndMapping:
graph_and_mapping = GraphAndMapping()
excluded_device_uuids : Set[str] = set()
for device in topology_details.devices:
device_uuid = device.device_id.device_uuid.uuid
graph_and_mapping.device_name_to_uuid.setdefault(device.name, device_uuid)
graph_and_mapping.device_name_to_uuid.setdefault(device_uuid, device_uuid)
graph_and_mapping.device_to_type.setdefault(device_uuid, device.device_type)
if device.device_type in EXCLUDED_DEVICE_TYPES:
excluded_device_uuids.add(device_uuid)
continue
endpoint_uuids = list()
for endpoint in device.device_endpoints:
endpoint_uuid = endpoint.endpoint_id.endpoint_uuid.uuid
......@@ -70,9 +91,18 @@ def compose_graph_from_topology() -> GraphAndMapping:
graph_and_mapping.graph.add_edge(endpoint_uuid_i, endpoint_uuid_j)
for link in topology_details.links:
endpoint_id_a = link.link_endpoint_ids[ 0]
endpoint_id_z = link.link_endpoint_ids[-1]
device_uuid_a = endpoint_id_a.device_id.device_uuid.uuid
if device_uuid_a in excluded_device_uuids: continue
device_uuid_z = endpoint_id_z.device_id.device_uuid.uuid
if device_uuid_z in excluded_device_uuids: continue
graph_and_mapping.graph.add_edge(
link.link_endpoint_ids[ 0].endpoint_uuid.uuid,
link.link_endpoint_ids[-1].endpoint_uuid.uuid,
endpoint_id_a.endpoint_uuid.uuid,
endpoint_id_z.endpoint_uuid.uuid,
)
return graph_and_mapping
......@@ -98,15 +128,23 @@ def compose_optical_service(vlink_request : Dict) -> Dict:
graph_and_mapping.graph, endpoint_uuid_a, endpoint_uuid_b
)
LOGGER.info('[compose_optical_service] path_hops={:s}'.format(str(path_hops)))
optical_border_endpoint_ids : List[str] = list()
for endpoint_uuid in path_hops:
LOGGER.info('[compose_optical_service] endpoint_uuid={:s}'.format(str(endpoint_uuid)))
device_uuid = graph_and_mapping.endpoint_to_device_uuid[endpoint_uuid]
LOGGER.info('[compose_optical_service] device_uuid={:s}'.format(str(device_uuid)))
device_type = graph_and_mapping.device_to_type[device_uuid]
LOGGER.info('[compose_optical_service] device_type={:s}'.format(str(device_type)))
if device_type != DeviceTypeEnum.EMULATED_OPTICAL_TRANSPONDER.value: continue
device_id = json_device_id(device_uuid)
endpoint_id = json_endpoint_id(device_id, endpoint_uuid)
LOGGER.info('[compose_optical_service] endpoint_id={:s}'.format(str(endpoint_id)))
optical_border_endpoint_ids.append(endpoint_id)
LOGGER.info('[compose_optical_service] optical_border_endpoint_ids={:s}'.format(str(optical_border_endpoint_ids)))
constraints = [
json_constraint_custom('bandwidth[gbps]', str(vlink_request['attributes']['total_capacity_gbps'])),
json_constraint_custom('bidirectionality', '1'),
......@@ -120,8 +158,10 @@ def compose_optical_service(vlink_request : Dict) -> Dict:
vlink_optical_service = json_service(
vlink_service_uuid,
ServiceTypeEnum.SERVICETYPE_OPTICAL_CONNECTIVITY,
context_id=json_context(DEFAULT_CONTEXT_NAME),
endpoint_ids=optical_border_endpoint_ids,
context_id=json_context_id(DEFAULT_CONTEXT_NAME),
endpoint_ids=[
optical_border_endpoint_ids[1], optical_border_endpoint_ids[2]
],
constraints=constraints,
)
return vlink_optical_service
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment