diff --git a/src/e2e_orchestrator/service/E2EOrchestratorServiceServicerImpl.py b/src/e2e_orchestrator/service/E2EOrchestratorServiceServicerImpl.py index 07ece21c618e92f67efe19ffeca74ac0beffc5d5..5911ac384ec1e8e0fa307ce7ea9fedb6b40ca8e6 100644 --- a/src/e2e_orchestrator/service/E2EOrchestratorServiceServicerImpl.py +++ b/src/e2e_orchestrator/service/E2EOrchestratorServiceServicerImpl.py @@ -16,7 +16,7 @@ import copy from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_method from common.proto.e2eorchestrator_pb2 import E2EOrchestratorRequest, E2EOrchestratorReply from common.proto.context_pb2 import ( - Empty, Connection, EndPointId, Link, TopologyDetails, Topology, Context, Service, ServiceTypeEnum, + Empty, Connection, EndPointId, Link, LinkId, TopologyDetails, Topology, Context, Service, ServiceId, ServiceTypeEnum, ServiceStatusEnum) from common.proto.e2eorchestrator_pb2_grpc import E2EOrchestratorServiceServicer from context.client.ContextClient import ContextClient @@ -96,8 +96,10 @@ class SubscriptionServer(Thread): def _event_received(self, connection): + LOGGER.info("EVENT received!") for message in connection: message_json = json.loads(message) + # LOGGER.info("message_json: {}".format(message_json)) if 'link_id' in message_json: link = Link(**message_json) @@ -115,7 +117,6 @@ class SubscriptionServer(Thread): z_device_uuid = device_get_uuid(link.link_endpoint_ids[1].device_id) z_endpoint_uuid = endpoint_get_uuid(link.link_endpoint_ids[1])[2] - for _link in links: for _endpoint_id in _link.link_endpoint_ids: if _endpoint_id.device_id.device_uuid.uuid == a_device_uuid and \ @@ -125,14 +126,29 @@ class SubscriptionServer(Thread): _endpoint_id.endpoint_uuid.uuid == z_endpoint_uuid: z_ep_id = _endpoint_id + if (not 'a_ep_id' in locals()) or (not 'z_ep_id' in locals()): + error_msg = 'Could not get VNT link endpoints' + LOGGER.error(error_msg) + connection.send(error_msg) + return service.service_endpoint_ids.append(copy.deepcopy(a_ep_id)) service.service_endpoint_ids.append(copy.deepcopy(z_ep_id)) - service_client.UpdateService(service) + # service_client.UpdateService(service) connection.send(grpc_message_to_json_string(link)) + elif 'link_uuid' in message_json: + LOGGER.info('REMOVING VIRTUAL LINK') + link_id = LinkId(**message_json) + + service_id = ServiceId() + service_id.service_uuid.uuid = link_id.link_uuid.uuid + service_id.context_id.context_uuid.uuid = DEFAULT_CONTEXT_NAME + # service_client.DeleteService(service_id) + connection.send(grpc_message_to_json_string(link_id)) else: + LOGGER.info('TOPOLOGY') topology_details = TopologyDetails(**message_json) context = Context() @@ -140,6 +156,7 @@ class SubscriptionServer(Thread): context_client.SetContext(context) topology = Topology() + topology.topology_id.context_id.CopyFrom(context.context_id) topology.topology_id.topology_uuid.uuid = topology_details.topology_id.topology_uuid.uuid context_client.SetTopology(topology) diff --git a/src/vnt_manager/Dockerfile b/src/vnt_manager/Dockerfile index 1c7137fbe5603795e5c51cf649ff89ae95f60033..29a3c953b472eaa7daad1143058d3e704d359d58 100644 --- a/src/vnt_manager/Dockerfile +++ b/src/vnt_manager/Dockerfile @@ -79,6 +79,8 @@ RUN python3 -m pip install -r vnt_manager/requirements.txt # Add component files into working directory COPY --chown=teraflow:teraflow ./src/context/. context COPY --chown=teraflow:teraflow ./src/vnt_manager/. vnt_manager +COPY --chown=teraflow:teraflow ./src/device/. device + # Start the service ENTRYPOINT ["python", "-m", "vnt_manager.service"] diff --git a/src/vnt_manager/service/VNTManagerServiceServicerImpl.py b/src/vnt_manager/service/VNTManagerServiceServicerImpl.py index 1b7b85f4c27dcad6aca426669ae3c5cffb7c99c7..c2a570a9d5a946a02516b50fcba83005d8ed7934 100644 --- a/src/vnt_manager/service/VNTManagerServiceServicerImpl.py +++ b/src/vnt_manager/service/VNTManagerServiceServicerImpl.py @@ -41,6 +41,7 @@ from typing import Any, Dict, Set from common.proto.dlt_gateway_pb2 import DltRecordEvent, DltRecordOperationEnum, DltRecordTypeEnum from common.tools.grpc.Tools import grpc_message_to_json_string from common.tools.grpc.Tools import grpc_message_to_json +from .vntm_config_device import configure, deconfigure import json LOGGER = logging.getLogger(__name__) @@ -61,13 +62,6 @@ HOST = "10.1.1.83" PORT = str(8765) -WEBSOCKET = None - -def send_msg(msg): - try: - WEBSOCKET.send(msg) - except Exception as e: - LOGGER.info(e) class VNTMEventDispatcher(threading.Thread): @@ -86,8 +80,18 @@ class VNTMEventDispatcher(threading.Thread): def stop(self): self._terminate.set() + + def send_msg(self, msg): + try: + self.websocket.send(msg) + except Exception as e: + LOGGER.info(e) + + def recv_msg(self): + message = self.websocket.recv() + return message + def run(self) -> None: - global WEBSOCKET time.sleep(5) events_collector = EventsCollector( @@ -107,7 +111,7 @@ class VNTMEventDispatcher(threading.Thread): try: LOGGER.info("Connecting to events server...: {}".format(url)) - WEBSOCKET = connect(url) + self.websocket = connect(url) except Exception as ex: LOGGER.error('Error connecting to {}'.format(url)) else: @@ -120,17 +124,18 @@ class VNTMEventDispatcher(threading.Thread): except Exception as ex: LOGGER.warning('No topology found') else: - send_msg(grpc_message_to_json_string(topology_details)) + self.send_msg(grpc_message_to_json_string(topology_details)) while not self._terminate.is_set(): - event = events_collector.get_event(block=True, timeout=GET_EVENT_TIMEOUT) + LOGGER.info('Event type: {}'.format(event)) if event is None: continue + LOGGER.debug('Received event: {}'.format(event)) topology_details = context_client.GetTopologyDetails(TopologyId(**topology_id)) to_send = grpc_message_to_json_string(topology_details) - send_msg(to_send) + self.send_msg(to_send) LOGGER.info('Exiting') events_collector.stop() @@ -149,11 +154,11 @@ class VNTManagerServiceServicerImpl(VNTManagerServiceServicer): reply = VNTSubscriptionReply() reply.subscription = "OK" - event_dispatcher = VNTMEventDispatcher(request.host, int(request.port)) + self.event_dispatcher = VNTMEventDispatcher(request.host, int(request.port)) self.host = request.host self.port = request.port - event_dispatcher.start() + self.event_dispatcher.start() return reply @@ -176,17 +181,11 @@ class VNTManagerServiceServicerImpl(VNTManagerServiceServicer): @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def SetVirtualLink(self, request : Link, context : grpc.ServicerContext) -> LinkId: try: - send_msg(grpc_message_to_json_string(request)) - message = WEBSOCKET.recv() - - - - - - - - - message_json = json.loads(message) + LOGGER.info('SETTING virtual link') + self.event_dispatcher.send_msg(grpc_message_to_json_string(request)) + # configure('CSGW1', 'xe5', 'CSGW2', 'xe5', 'ecoc2024-1') + response = self.event_dispatcher.recv_msg() + message_json = json.loads(response) link = Link(**message_json) context_client.SetLink(link) except Exception as e: @@ -195,6 +194,22 @@ class VNTManagerServiceServicerImpl(VNTManagerServiceServicer): @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def RemoveVirtualLink(self, request : LinkId, context : grpc.ServicerContext) -> Empty: - # TODO + try: + self.event_dispatcher.send_msg(grpc_message_to_json_string(request)) + # deconfigure('CSGW1', 'xe5', 'CSGW2', 'xe5', 'ecoc2024-1') + response = self.event_dispatcher.recv_msg() + message_json = json.loads(response) + link_id = LinkId(**message_json) + context_client.RemoveLink(link_id) + + LOGGER.info('Removed') + except Exception as e: + msg_error = 'Exception removing virtual link={}\n\t{}'.format(request.link_uuid.uuid, e) + LOGGER.error(msg_error) + return msg_error + else: + context_client.RemoveLink(request) + LOGGER.info('Removed') + return Empty() - + diff --git a/src/vnt_manager/service/vntm_config_device.py b/src/vnt_manager/service/vntm_config_device.py new file mode 100644 index 0000000000000000000000000000000000000000..4735ed31f185ba221033a7611b2b1af3f90c1688 --- /dev/null +++ b/src/vnt_manager/service/vntm_config_device.py @@ -0,0 +1,184 @@ +# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from typing import Dict +from common.proto.context_pb2 import ConfigRule +from common.tools.context_queries.Device import get_device +from common.tools.object_factory.ConfigRule import json_config_rule_set, json_config_rule_delete +from context.client.ContextClient import ContextClient +from device.client.DeviceClient import DeviceClient + +##### Config Rule Composers #################################################### + +def compose_config_rule(resource_key, resource_value, delete) -> Dict: + json_config_rule = json_config_rule_delete if delete else json_config_rule_set + return ConfigRule(**json_config_rule(resource_key, resource_value)) + +def network_instance(ni_name, ni_type, ni_router_id=None, ni_route_distinguisher=None, delete=False) -> Dict: + path = '/network_instance[{:s}]'.format(ni_name) + data = {'name': ni_name, 'type': ni_type} + if ni_router_id is not None: data['router_id'] = ni_router_id + if ni_route_distinguisher is not None: data['route_distinguisher'] = ni_route_distinguisher + return compose_config_rule(path, data, delete) + +def network_instance_add_protocol_bgp(ni_name, ni_type, ni_router_id, ni_bgp_as, neighbors=[], delete=False)-> Dict: + path = '/network_instance[{:s}]/protocols[BGP]'.format(ni_name) + data = { + 'name': ni_name, 'type': ni_type, 'router_id': ni_router_id, 'identifier': 'BGP', + 'protocol_name': ni_bgp_as, 'as': ni_bgp_as + } + if len(neighbors) > 0: + data['neighbors'] = [ + {'ip_address': neighbor_ip_address, 'remote_as': neighbor_remote_as} + for neighbor_ip_address, neighbor_remote_as in neighbors + ] + return compose_config_rule(path, data, delete) + +def network_instance_add_protocol_direct(ni_name, ni_type, delete=False) -> Dict: + path = '/network_instance[{:s}]/protocols[DIRECTLY_CONNECTED]'.format(ni_name) + data = { + 'name': ni_name, 'type': ni_type, 'identifier': 'DIRECTLY_CONNECTED', + 'protocol_name': 'DIRECTLY_CONNECTED' + } + return compose_config_rule(path, data, delete) + +def network_instance_add_protocol_static(ni_name, ni_type, delete=False) -> Dict: + path = '/network_instance[{:s}]/protocols[STATIC]'.format(ni_name) + data = { + 'name': ni_name, 'type': ni_type, 'identifier': 'STATIC', + 'protocol_name': 'STATIC' + } + return compose_config_rule(path, data, delete) + +def network_instance_add_table_connection( + ni_name, src_protocol, dst_protocol, address_family, default_import_policy, bgp_as=None, delete=False +) -> Dict: + path = '/network_instance[{:s}]/table_connections[{:s}][{:s}][{:s}]'.format( + ni_name, src_protocol, dst_protocol, address_family + ) + data = { + 'name': ni_name, 'src_protocol': src_protocol, 'dst_protocol': dst_protocol, + 'address_family': address_family, 'default_import_policy': default_import_policy, + } + if bgp_as is not None: data['as'] = bgp_as + return compose_config_rule(path, data, delete) + +def interface( + name, index, description=None, if_type=None, vlan_id=None, mtu=None, ipv4_address_prefix=None, + enabled=None, delete=False +) -> Dict: + path = '/interface[{:s}]/subinterface[{:d}]'.format(name, index) + data = {'name': name, 'index': index} + if description is not None: data['description'] = description + if if_type is not None: data['type' ] = if_type + if vlan_id is not None: data['vlan_id' ] = vlan_id + if mtu is not None: data['mtu' ] = mtu + if enabled is not None: data['enabled' ] = enabled + if ipv4_address_prefix is not None: + ipv4_address, ipv4_prefix = ipv4_address_prefix + data['address_ip' ] = ipv4_address + data['address_prefix'] = ipv4_prefix + return compose_config_rule(path, data, delete) + +def network_instance_interface(ni_name, ni_type, if_name, if_index, delete=False) -> Dict: + path = '/network_instance[{:s}]/interface[{:s}.{:d}]'.format(ni_name, if_name, if_index) + data = {'name': ni_name, 'type': ni_type, 'id': if_name, 'interface': if_name, 'subinterface': if_index} + return compose_config_rule(path, data, delete) + +# configure('CSGW1', 'xe5', 'CSGW2', 'xe5', 'ecoc2024-1') +# deconfigure('CSGW1', 'xe5', 'CSGW2', 'xe5', 'ecoc2024-1') + +def configure(router_a, port_a, router_b, port_b, ni_name): + context_client = ContextClient() + device_client = DeviceClient() + + client_if_name = 'ce1' + client_if_addr = {'CSGW1': ('192.168.10.1', 24), 'CSGW2': ('192.168.20.1', 24)} + bgp_router_addresses = {'CSGW1': '192.168.150.1', 'CSGW2': '192.168.150.2'} + + locations = [ + {'router': router_a, 'port': port_a, 'neighbor': router_b}, + {'router': router_b, 'port': port_b, 'neighbor': router_a}, + ] + for location in locations: + router = location['router'] + port = location['port'] + neighbor = location['neighbor'] + + client_ipv4_address_prefix = client_if_addr[router] + bgp_router_address = bgp_router_addresses[router] + bgp_neighbor_address = bgp_router_addresses[neighbor] + + config_rules = [ + network_instance(ni_name, 'L3VRF', bgp_router_address, '65001:1'), + network_instance_add_protocol_direct(ni_name, 'L3VRF'), + network_instance_add_protocol_static(ni_name, 'L3VRF'), + network_instance_add_protocol_bgp(ni_name, 'L3VRF', bgp_router_address, '65001', neighbors=[ + (bgp_neighbor_address, '65001') + ]), + network_instance_add_table_connection( + ni_name, 'DIRECTLY_CONNECTED', 'BGP', 'IPV4', 'ACCEPT_ROUTE', bgp_as='65001' + ), + network_instance_add_table_connection( + ni_name, 'STATIC', 'BGP', 'IPV4', 'ACCEPT_ROUTE', bgp_as='65001' + ), + + interface(client_if_name, 0, if_type='ethernetCsmacd', mtu=1500), + network_instance_interface(ni_name, 'L3VRF', client_if_name, 0), + interface(client_if_name, 0, if_type='ethernetCsmacd', mtu=1500, + ipv4_address_prefix=client_ipv4_address_prefix, enabled=True), + + interface(port, 0, if_type='ethernetCsmacd', mtu=1500), + network_instance_interface(ni_name, 'L3VRF', port, 0), + interface(port, 0, if_type='ethernetCsmacd', mtu=1500, + ipv4_address_prefix=(bgp_router_address, 24), enabled=True), + ] + + device = get_device( + context_client, router, rw_copy=True, include_endpoints=False, + include_config_rules=False, include_components=False + ) + device.device_config.config_rules.extend(config_rules) + device_client.ConfigureDevice(device) + + +def deconfigure(router_a, port_a, router_b, port_b, ni_name): + context_client = ContextClient() + device_client = DeviceClient() + + client_if_name = 'ce1' + + locations = [ + {'router': router_a, 'port': port_a, 'neighbor': router_b}, + {'router': router_b, 'port': port_b, 'neighbor': router_a}, + ] + for location in locations: + router = location['router'] + port = location['port'] + #neighbor = location['neighbor'] + + config_rules = [ + network_instance_interface(ni_name, 'L3VRF', client_if_name, 0, delete=True), + network_instance_interface(ni_name, 'L3VRF', port, 0, delete=True), + #interface(client_if_name, 0, delete=True), + #interface(port, 0, delete=True), + network_instance(ni_name, 'L3VRF', delete=True), + ] + + device = get_device( + context_client, router, rw_copy=True, include_endpoints=False, + include_config_rules=False, include_components=False + ) + device.device_config.config_rules.extend(config_rules) + device_client.ConfigureDevice(device)