Commit 1d0bf832 authored by Konstantinos Poulakakis's avatar Konstantinos Poulakakis
Browse files

Change the name of a link. Change MAC address of interface. Remove commented...

Change the name of a link. Change MAC address of interface. Remove commented lines. Comment some assert services.
parent 0f6c1277
Loading
Loading
Loading
Loading
+33 −2
Original line number Diff line number Diff line
@@ -21,7 +21,7 @@ from common.method_wrappers.ServiceExceptions import (
)
from common.proto.context_pb2 import (
    Connection, ConstraintActionEnum, Empty, Service, ServiceId, ServiceStatusEnum,
    ServiceTypeEnum, TopologyId
    ServiceTypeEnum, TopologyId, Constraint, Constraint_Exclusions
)
from common.proto.pathcomp_pb2 import PathCompRequest
from common.proto.e2eorchestrator_pb2 import E2EOrchestratorRequest
@@ -105,6 +105,20 @@ class ServiceServiceServicerImpl(ServiceServiceServicer):
        service_id = context_client.SetService(request)
        return service_id

    def _detect_connection(self, conn, label):
        LOGGER.info(f"------------------------------------------------------------")
        LOGGER.info(f"--- Connection - {label}")
        for hop in conn.path_hops_endpoint_ids:
            dev = self._context_client.GetDevice(hop.device_id)
            if (dev.name in ["sw1", "sw5"]):
                LOGGER.info(f"-----------> Skip edge swicthes")
                continue
            LOGGER.info(f" Device: {dev.name}")
            return dev.name
        LOGGER.info(f"------------------------------------------------------------")
        return None
    

    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    def UpdateService(self, request : Service, context : grpc.ServicerContext) -> ServiceId:
        # Set service status to "SERVICESTATUS_PLANNED" to ensure rest of components are aware the service is
@@ -113,12 +127,15 @@ class ServiceServiceServicerImpl(ServiceServiceServicer):
        _service : Optional[Service] = get_service_by_id(
            context_client, request.service_id, rw_copy=False,
            include_config_rules=True, include_constraints=True, include_endpoint_ids=True)
        LOGGER.info('_service={:s}'.format(str(_service)))

        # Identify service constraints        
        num_disjoint_paths = None
        is_diverse = False
        gps_location_aware = False
        LOGGER.info('request={:s}'.format(str(request)))
        for constraint in request.service_constraints:
            LOGGER.info(f"--------------------------------------------> Previous exclusion_constraint={constraint}")
            constraint_kind = constraint.WhichOneof('constraint')
            if constraint_kind == 'sla_availability':
                num_disjoint_paths = constraint.sla_availability.num_disjoint_paths
@@ -127,6 +144,15 @@ class ServiceServiceServicerImpl(ServiceServiceServicer):
            elif constraint_kind == 'endpoint_location':
                location = constraint.endpoint_location.location
                if location.WhichOneof('location') == 'gps_position': gps_location_aware = True
            elif constraint_kind == 'exclusions':
                current_dev_id = constraint.exclusions.device_ids[0].device_uuid.uuid
                LOGGER.info(f"--------------------------------------------> Previously excluded_device={current_dev_id}")
                possible_dev_ids = ["sw2", "sw3", "sw4"]
                possible_dev_ids.remove(current_dev_id)
                target_dev_to_exclude = possible_dev_ids
                LOGGER.info(f"--------------------------------------------> Newly excluded_device={target_dev_to_exclude[0]}")
                constraint.exclusions.device_ids[0].device_uuid.uuid = target_dev_to_exclude[0]
                LOGGER.info(f"--------------------------------------------> New exclusion_constraint={constraint}")
            else:
                continue

@@ -324,13 +350,17 @@ class ServiceServiceServicerImpl(ServiceServiceServicer):
                tasks_scheduler.compose_from_pathcompreply(
                    optical_reply, is_delete=False)
        else:
            LOGGER.info('len(service_with_uuids.service_endpoint_ids)={:s}'.format(str(service_with_uuids.service_endpoint_ids)))
            LOGGER.info('len(service_with_uuids.service_endpoint_ids)={:s}'.format(str(num_expected_endpoints)))
            if len(service_with_uuids.service_endpoint_ids) >= num_expected_endpoints:
                pathcomp_request = PathCompRequest()
                pathcomp_request.services.append(service_with_uuids)    # pylint: disable=no-member

                LOGGER.info('pathcomp_request {:s}'.format(str(pathcomp_request)))
                if num_disjoint_paths is None or num_disjoint_paths in {0, 1} :
                    LOGGER.info('No other path is available')
                    pathcomp_request.shortest_path.Clear()              # pylint: disable=no-member
                else:
                    LOGGER.info('Number of path available {:s}'.format(str(num_disjoint_paths)))
                    pathcomp_request.k_disjoint_path.num_disjoint = num_disjoint_paths  # pylint: disable=no-member

                pathcomp = PathCompClient()
@@ -340,6 +370,7 @@ class ServiceServiceServicerImpl(ServiceServiceServicer):
                # Feed TaskScheduler with this path computation reply. TaskScheduler identifies inter-dependencies among
                # the services and connections retrieved and produces a schedule of tasks (an ordered list of tasks to be
                # executed) to implement the requested create/update operation.
                LOGGER.info(f"---------------------------> Calling compose_from_pathcompreply")
                tasks_scheduler.compose_from_pathcompreply(pathcomp_reply, is_delete=False)

        tasks_scheduler.execute_all()
+295 −2
Original line number Diff line number Diff line
@@ -15,8 +15,9 @@
import graphlib, logging, queue, time
from typing import TYPE_CHECKING, Dict, Tuple
from common.proto.context_pb2 import (
    Connection, ConnectionId, Service, ServiceId, ServiceStatusEnum, ConnectionList
    Uuid, Empty, DeviceId, Connection, ConnectionId, EndPointId, EndPointIdList, Service, ServiceId, ServiceStatusEnum, ConnectionList, ServiceTypeEnum
)
from common.tools.object_factory.Device import json_device_id
from common.proto.pathcomp_pb2 import PathCompReply
from common.tools.grpc.Tools import grpc_message_to_json_string
from context.client.ContextClient import ContextClient
@@ -32,6 +33,8 @@ from .TaskExecutor import CacheableObjectType, TaskExecutor
from .tasks.Task_OpticalServiceConfigDelete import Task_OpticalServiceConfigDelete
from service.service.tools.OpticalTools import delete_lightpath 

from uuid import uuid4

if TYPE_CHECKING:
    from service.service.service_handler_api.ServiceHandlerFactory import ServiceHandlerFactory

@@ -169,23 +172,313 @@ class TasksScheduler:
        self._dag.add(service_config_key, service_pending_removal_key)
        return service_config_key

    def detect_middle_switch(self, conn):
        hop = conn.path_hops_endpoint_ids[1]
        dev = self._context_client.GetDevice(hop.device_id)
        return dev.name

    def report_connection(self, conn, label):
        LOGGER.info(f"------------------------------------------------------------")
        LOGGER.info(f"--- Connection - {label}")
        for hop in conn.path_hops_endpoint_ids:
            dev_uuid = hop.device_id.device_uuid.uuid
            dev = self._context_client.GetDevice(hop.device_id)
            ep  = hop.endpoint_uuid
            LOGGER.info(f" Device: {dev.name}")
            LOGGER.info(f"Endpoint: {ep}")
            LOGGER.info("")
        LOGGER.info(f"------------------------------------------------------------")

    def shorten_connection(self, conn):
        new_conn = Connection()
        new_conn.connection_id.CopyFrom(conn.connection_id)
        new_conn.service_id.CopyFrom(conn.service_id)
        for i, hop in enumerate(conn.path_hops_endpoint_ids):
            if i == 0 or i == 5:
                LOGGER.info(f"Skipping first or last hop")
            else:
                new_conn.path_hops_endpoint_ids.add().CopyFrom(hop)
        # self.report_connection(new_conn, label="Shorter")
        return new_conn

    def replaced_hops(self, conn):
        import random

        # Find who is now in the middle
        sw_to_exclude = self.detect_middle_switch(conn)

        new_conn = Connection()
        new_conn.connection_id.CopyFrom(conn.connection_id)
        new_conn.service_id.CopyFrom(conn.service_id)

        LOGGER.info(f"### To exclude {sw_to_exclude}")
        possible_switches = ["sw2", "sw3", "sw4"]
        possible_switches.remove(sw_to_exclude)
        LOGGER.info(f"### Filtered switches found: {possible_switches}")
        selected_sw = random.choice(possible_switches)
        LOGGER.info(f"### Selected switch: {selected_sw}")
        new_dev_id = DeviceId(**json_device_id(selected_sw))
        new_dev = self._context_client.GetDevice(new_dev_id)
        LOGGER.info(f"### {selected_sw} ID: {new_dev.device_id}")

        repl_num = 1
        for hop in conn.path_hops_endpoint_ids:
            cur_dev = self._context_client.GetDevice(hop.device_id)
            cur_ep_uuid = hop.endpoint_uuid
            LOGGER.info(f"################### cur_dev.name {cur_dev.name}")

            if cur_dev.name == sw_to_exclude:
                LOGGER.info(f"################### Replace {sw_to_exclude} with {selected_sw}")
                new_ep = EndPointId()
                new_ep.topology_id.CopyFrom(hop.topology_id)
                new_ep.device_id.CopyFrom(new_dev_id)

                new_ep_uuid = None
                for ep in new_dev.device_endpoints:
                    if ep.name == "3":
                        LOGGER.info(f"##### Skipping INT endpoint {ep.name}")
                        continue
                    if ep.name == str(repl_num):
                        new_ep_uuid = ep.endpoint_id.endpoint_uuid
                new_ep.endpoint_uuid.CopyFrom(new_ep_uuid)

                # Add path hops to new connection
                new_conn.path_hops_endpoint_ids.add().CopyFrom(new_ep)
                repl_num += 1

        return new_conn, selected_sw, new_dev.device_id.device_uuid.uuid

    def modify_edge_hops(self, conn, replaced_dev_name, replaced_dev_uuid):
        LOGGER.info(f"------------------> Looking for device {replaced_dev_name} with ID {replaced_dev_uuid}")
        link_list = self._context_client.ListLinks(Empty())
        if not link_list:
            LOGGER.info(f"------------------> LINKS CANNOT BE FETCHED")
            return None

        LOGGER.info(f"------------------> Total links number: {len(link_list.links)}")
        
        dp_links = []
        for i, link in enumerate(link_list.links):
            if "tfs" not in link.name:
                dp_links.append(link)
        
        LOGGER.info(f"------------------> Dataplane links number: {len(dp_links)}")

        relevant_links = []
        for link in dp_links:
            if not replaced_dev_name in link.name:
                continue
            relevant_links.append(link)
        assert len(relevant_links) == 2, "Cannot have more than two relevant links"
        
        LOGGER.info(f"------------------> Relevant links number: {len(relevant_links)}")
        LOGGER.info("")

        result = []
        paired_ep_uuid = None
        paired_dev_id = None
        for link in relevant_links:
            LOGGER.info(f"------------------> Link name {link.name}")
            for endpoint_id in link.link_endpoint_ids:
                dev_id = endpoint_id.device_id
                dev_uuid = endpoint_id.device_id.device_uuid.uuid
                dev = self._context_client.GetDevice(dev_id)
                # LOGGER.info(f"------------------> dev_id {dev_uuid} --- replaced_dev_uuid {replaced_dev_uuid}")
                # LOGGER.info(f"------------------> dev.name {dev.name}")
                # LOGGER.info(f"------------------> Replaced device {replaced_dev_name} with ID {replaced_dev_uuid}")
                if dev_uuid != replaced_dev_uuid and dev.name in ["sw1", "sw5"]:
                    paired_dev = dev
                    paired_dev_id = dev_id
                    paired_ep_uuid = endpoint_id.endpoint_uuid
                    LOGGER.info(f"------------------> FOUND switch {paired_dev.name} with ID {paired_dev_id} and endpoint UUID {paired_ep_uuid}")
                    result.append((paired_dev_id, paired_ep_uuid))

        return result

    def turn_edge_hops_to_connections(self, conn, hop_tuples):
        assert len(hop_tuples) == 2, "Cannot have more than two edge hops"

        new_conn = Connection()
        new_conn.connection_id.CopyFrom(conn.connection_id)
        new_conn.service_id.CopyFrom(conn.service_id)

        ep_map = {}
        for dev_id, dev_ep in hop_tuples:
            # dev_id = DeviceId(**json_device_id(dev_name))
            # LOGGER.info(f"################### Edge device ID {dev_id}")
            dev = self._context_client.GetDevice(dev_id)
            assert dev.name in ["sw1", "sw5"]
            LOGGER.info(f"################### Edge dev name {dev.name} - endpoint UUID {dev_ep}")
            # Turn Endpoint UUID into EndpointId
            new_ep = EndPointId()
            new_ep.topology_id.CopyFrom(conn.path_hops_endpoint_ids[0].topology_id)
            new_ep.device_id.CopyFrom(dev_id)
            new_ep.endpoint_uuid.CopyFrom(dev_ep)

            ep_map[dev.name] = new_ep

        # Add it to the connection list alphabetically
        LOGGER.info(f"################### Ordered map")
        for k, v in dict(sorted(ep_map.items())).items():
            LOGGER.info(f"################### {k}: {v}")
            new_conn.path_hops_endpoint_ids.add().CopyFrom(v)
        return new_conn

    def new_l2_connection(self, conn):
        new_conn = Connection()
        new_con_id = ConnectionId()
        new_uuid = Uuid()
        new_uuid.uuid = str(uuid4())
        new_con_id.connection_uuid.CopyFrom(new_uuid)
        new_conn.connection_id.CopyFrom(new_con_id)
        new_conn.service_id.CopyFrom(conn.service_id)

        LOGGER.info("=================================================================================================")
        self.report_connection(conn, label=">>>>>>>> Before <<<<<<<<")
        LOGGER.info("=================================================================================================")

        LOGGER.info("\n")

        LOGGER.info("=================================================================================================")
        self.report_connection(new_conn, label=">>>>>>>> Initial new <<<<<<<<")
        LOGGER.info("=================================================================================================")

        LOGGER.info("\n")

        # Hop-1: client-sw1
        first_hop = conn.path_hops_endpoint_ids[0]

        # Hop 3 + Hop 4: middle-switch (sw2/3/4) to be replaced
        LOGGER.info("=================================================================================================")
        temp_conn = self.shorten_connection(conn)
        self.report_connection(temp_conn, label=">>>>>>>> Initial Shorter <<<<<<<<")
        middle_con, replaced_dev_name, replaced_dev_id = self.replaced_hops(temp_conn)
        self.report_connection(middle_con, label=">>>>>>>> New middle only <<<<<<<<")
        LOGGER.info("=================================================================================================")

        LOGGER.info("\n")

        # # Hop-6: sw5 to server
        last_hop = conn.path_hops_endpoint_ids[5]

        # Hop-2+5: sw1 and sw5 to new middle switch
        LOGGER.info("=================================================================================================")
        temp_conn_2 = self.modify_edge_hops(middle_con, replaced_dev_name, replaced_dev_id)
        assert temp_conn_2, "Cannot proceed without connection tuples"
        replaced_conn = self.turn_edge_hops_to_connections(conn, temp_conn_2)
        self.report_connection(replaced_conn, label=">>>>>>>> Replaced edge-infra hops <<<<<<<<")
        LOGGER.info("=================================================================================================")

        # LOGGER.info("\n")

        # # Add them in order
        # LOGGER.info("=================================================================================================")
        assert len(new_conn.path_hops_endpoint_ids) == 0, "[1st hop] New connection must have no endpoints initially"
        new_conn.path_hops_endpoint_ids.add().CopyFrom(first_hop)
        self.report_connection(new_conn, label=">>>>>>>> NEW step 1 <<<<<<<<")
        assert len(new_conn.path_hops_endpoint_ids) == 1, "[1st hop] client-sw1 is not properly established"
        new_conn.path_hops_endpoint_ids.add().CopyFrom(replaced_conn.path_hops_endpoint_ids[0])
        assert len(new_conn.path_hops_endpoint_ids) == 2, "[2nd hop] sw1-{} is not properly established".format(replaced_dev_name)
        self.report_connection(new_conn, label=">>>>>>>> NEW step 2 <<<<<<<<")
        for c in middle_con.path_hops_endpoint_ids:
            LOGGER.info(f"Replaced {c}")
            new_conn.path_hops_endpoint_ids.add().CopyFrom(c)
        assert len(new_conn.path_hops_endpoint_ids) == 4, "[3rd hop] {} is not properly established".format(replaced_dev_name)
        self.report_connection(new_conn, label=">>>>>>>> NEW step 3 <<<<<<<<")
        new_conn.path_hops_endpoint_ids.add().CopyFrom(replaced_conn.path_hops_endpoint_ids[1])
        assert len(new_conn.path_hops_endpoint_ids) == 5, "[4th hop] {}-sw5 is not properly established".format(replaced_dev_name)
        self.report_connection(new_conn, label=">>>>>>>> NEW step 4 <<<<<<<<")
        new_conn.path_hops_endpoint_ids.add().CopyFrom(last_hop)
        assert len(new_conn.path_hops_endpoint_ids) == 6, "[5th hop] sw5-server is not properly established"
        self.report_connection(new_conn, label=">>>>>>>> NEW step 5 <<<<<<<<")
        LOGGER.info("=================================================================================================")

        return new_conn

    def new_int_connection(self, service_int_id):
        new_conn = Connection()
        new_con_id = ConnectionId()
        new_uuid = Uuid()
        new_uuid.uuid = str(uuid4())
        new_con_id.connection_uuid.CopyFrom(new_uuid)
        new_conn.connection_id.CopyFrom(new_con_id)
        new_conn.service_id.CopyFrom(service_int_id)

        for i in range(1, 6):
            new_dev_id = DeviceId(**json_device_id("sw"+str(i)))
            new_dev = self._context_client.GetDevice(new_dev_id)
            LOGGER.info(f"++++++++++++++++++++++++++ [INT CONN] Dev {new_dev.name}")

            topology_id = new_dev.device_endpoints[0].endpoint_id.topology_id

            new_ep_id = EndPointId()
            new_ep_id.topology_id.CopyFrom(topology_id)
            new_ep_id.device_id.CopyFrom(new_dev_id)

            for ep in new_dev.device_endpoints:
                if ep.endpoint_type == "port-int":
                    LOGGER.info(f"++++++++++++++++++++++++++ [INT CONN] Dev {new_dev.name} - INT endpoint {ep.name} with ID {ep.endpoint_id.endpoint_uuid}")
                    new_ep_id.endpoint_uuid.CopyFrom(ep.endpoint_id.endpoint_uuid)

            new_conn.path_hops_endpoint_ids.add().CopyFrom(new_ep_id)

        return new_conn

    def compose_from_pathcompreply(self, pathcomp_reply : PathCompReply, is_delete : bool = False) -> None:
        t0 = time.time()
        include_service = self._service_remove if is_delete else self._service_create
        include_connection = self._connection_deconfigure if is_delete else self._connection_configure

        is_l2 = False
        is_int = False
        service_l2_id = None
        service_int_id = None
        for service in pathcomp_reply.services:
            if service.service_type == ServiceTypeEnum.SERVICETYPE_L2NM:
                LOGGER.info(f"----------------> Is L2")
                is_l2 = True
                service_l2_id = service.service_id
            if service.service_type == ServiceTypeEnum.SERVICETYPE_INT:
                LOGGER.info(f"----------------> Is INT")
                is_int = True
                service_int_id = service.service_id
            include_service(service.service_id)
            self._add_service_to_executor_cache(service)

        # connections_to_remove = []
        # if is_l2:
        #     # Cache existing connection to remove
        #     for connection in pathcomp_reply.connections:                
        #         connections_to_remove.append(connection)

        if is_l2:
            cached_conn = pathcomp_reply.connections[0]
            for connection in pathcomp_reply.connections:
                ################################################################
                # connection object gets modified only for the L2 service
                con_id = connection.connection_id
                LOGGER.info(f"++++++++++++++++++++ Removing connection {con_id}")
                self._executor.delete_connection(con_id)
                ################################################################

            connection = self.new_l2_connection(cached_conn)
            LOGGER.info(f"++++++++++++++++++++ Added new connection {connection.connection_id}")
            connection_key = include_connection(connection.connection_id, connection.service_id)
            self._add_connection_to_executor_cache(connection)
            self._executor.get_service(connection.service_id)
            for sub_service_id in connection.sub_service_ids:
            # connection_key = include_connection(connection.connection_id, connection.service_id)
            # self._add_connection_to_executor_cache(connection)
            # self._executor.get_service(connection.service_id)
            # for sub_service_id in connection.sub_service_ids:
                _,service_key_done = include_service(sub_service_id)
                self._executor.get_service(sub_service_id)
                self._dag.add(connection_key, service_key_done)
        if is_int:
            connection = self.new_int_connection(service_int_id)
            connection_key = include_connection(connection.connection_id, connection.service_id)
            self._add_connection_to_executor_cache(connection)
            self._executor.get_service(connection.service_id)

        t1 = time.time()
        LOGGER.debug('[compose_from_pathcompreply] elapsed_time: {:f} sec'.format(t1-t0))
+82 −0

File added.

Preview size limit exceeded, changes collapsed.

+107 −0

File added.

Preview size limit exceeded, changes collapsed.

+85 −0

File added.

Preview size limit exceeded, changes collapsed.

Loading