Skip to content
Snippets Groups Projects
ServiceServiceServicerImpl.py 30.7 KiB
Newer Older
# Copyright 2022-2025 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
import grpc, json, logging, random, uuid
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from typing import Dict, Optional
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_method
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from common.method_wrappers.ServiceExceptions import (
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    AlreadyExistsException, InvalidArgumentException, NotFoundException, NotImplementedException,
    OperationFailedException
)
from common.proto.context_pb2 import (
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    Connection, ConstraintActionEnum, Empty, Service, ServiceId, ServiceStatusEnum,
    ServiceTypeEnum, TopologyId
)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from common.proto.pathcomp_pb2 import PathCompRequest
Carlos Manso's avatar
Carlos Manso committed
from common.proto.e2eorchestrator_pb2 import E2EOrchestratorRequest
from common.proto.service_pb2_grpc import ServiceServiceServicer
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from common.tools.context_queries.Service import get_service_by_id
from common.tools.grpc.Tools import grpc_message_to_json, grpc_message_to_json_string
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from common.tools.object_factory.Context import json_context_id
from common.tools.object_factory.Topology import json_topology_id
from common.Constants import DEFAULT_CONTEXT_NAME, DEFAULT_TOPOLOGY_NAME
from common.Settings import (
    is_deployed_e2e_orch, is_deployed_optical, is_deployed_te
)
from context.client.ContextClient import ContextClient
from e2e_orchestrator.client.E2EOrchestratorClient import E2EOrchestratorClient
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from pathcomp.frontend.client.PathCompClient import PathCompClient
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from service.service.tools.ConnectionToString import connection_to_string
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from service.client.TEServiceClient import TEServiceClient
from .service_handler_api.ServiceHandlerFactory import ServiceHandlerFactory
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from .task_scheduler.TaskScheduler import TasksScheduler
Carlos Manso's avatar
Carlos Manso committed
from .tools.GeodesicDistance import gps_distance
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from .tools.OpticalTools import (
    add_lightpath, delete_lightpath, adapt_reply, get_device_name_from_uuid,
    get_optical_band, refresh_opticalcontroller, DelFlexLightpath
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
LOGGER = logging.getLogger(__name__)

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
METRICS_POOL = MetricsPool('Service', 'RPC')
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

class ServiceServiceServicerImpl(ServiceServiceServicer):
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    def __init__(self, service_handler_factory : ServiceHandlerFactory) -> None:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        LOGGER.debug('Creating Servicer...')
        self.service_handler_factory = service_handler_factory
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        LOGGER.debug('Servicer Created')

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    def CreateService(self, request : Service, context : grpc.ServicerContext) -> ServiceId:
        if len(request.service_endpoint_ids) > 0:
            unexpected_endpoints = []
            for service_endpoint_id in request.service_endpoint_ids:
                unexpected_endpoints.append(grpc_message_to_json(service_endpoint_id))
            str_unexpected_endpoints = json.dumps(unexpected_endpoints, sort_keys=True)
            raise InvalidArgumentException(
                'service.service_endpoint_ids', str_unexpected_endpoints,
                extra_details='RPC method CreateService does not accept Endpoints. '\
                              'Endpoints should be configured after creating the service.')

        if len(request.service_constraints) > 0:
            unexpected_constraints = []
            for service_constraint in request.service_constraints:
                unexpected_constraints.append(grpc_message_to_json(service_constraint))
            str_unexpected_constraints = json.dumps(unexpected_constraints, sort_keys=True)
            raise InvalidArgumentException(
                'service.service_constraints', str_unexpected_constraints,
                extra_details='RPC method CreateService does not accept Constraints. '\
                              'Constraints should be configured after creating the service.')

        if len(request.service_config.config_rules) > 0:
            unexpected_config_rules = grpc_message_to_json(request.service_config)
            unexpected_config_rules = unexpected_config_rules['config_rules']
            str_unexpected_config_rules = json.dumps(unexpected_config_rules, sort_keys=True)
            raise InvalidArgumentException(
                'service.service_config.config_rules', str_unexpected_config_rules,
                extra_details='RPC method CreateService does not accept Config Rules. '\
                              'Config Rules should be configured after creating the service.')

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        # check that service does not exist
        context_client = ContextClient()
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        current_service = get_service_by_id(
            context_client, request.service_id, rw_copy=False,
            include_config_rules=False, include_constraints=False, include_endpoint_ids=False)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        if current_service is not None:
            context_uuid = request.service_id.context_id.context_uuid.uuid
            service_uuid = request.service_id.service_uuid.uuid
            raise AlreadyExistsException(
                'Service', service_uuid, extra_details='context_uuid={:s}'.format(str(context_uuid)))
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        # just create the service in the Context database to lock the service_id
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        # update will perform changes on the resources
        service_id = context_client.SetService(request)
        return service_id
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    def UpdateService(self, request : Service, context : grpc.ServicerContext) -> ServiceId:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        # Set service status to "SERVICESTATUS_PLANNED" to ensure rest of components are aware the service is
        # being modified.
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        context_client = ContextClient()
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        _service : Optional[Service] = get_service_by_id(
            context_client, request.service_id, rw_copy=False,
            include_config_rules=True, include_constraints=True, include_endpoint_ids=True)

        # Identify service constraints        
        num_disjoint_paths = None
        is_diverse = False
        for constraint in request.service_constraints:
            constraint_kind = constraint.WhichOneof('constraint')
            if constraint_kind == 'sla_availability':
                num_disjoint_paths = constraint.sla_availability.num_disjoint_paths
            elif constraint_kind == 'custom':
                if constraint.custom.constraint_type == 'diversity': is_diverse = True
            elif constraint_kind == 'endpoint_location':
                location = constraint.endpoint_location.location
                if location.WhichOneof('location') == 'gps_position': gps_location_aware = True
            else:
                continue
        LOGGER.debug('num_disjoint_paths={:s}'.format(str(num_disjoint_paths)))
        LOGGER.debug('is_diverse={:s}'.format(str(is_diverse)))
        LOGGER.debug('gps_location_aware={:s}'.format(str(gps_location_aware)))

        if _service is not None and num_disjoint_paths is None and not is_diverse and gps_location_aware:
            LOGGER.debug('  Removing previous service')
            tasks_scheduler = TasksScheduler(self.service_handler_factory)
            tasks_scheduler.compose_from_service(_service, is_delete=True)
            tasks_scheduler.execute_all()

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        service = Service()
        service.CopyFrom(request if _service is None else _service)

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        if service.service_type == ServiceTypeEnum.SERVICETYPE_UNKNOWN:                     # pylint: disable=no-member
            service.service_type = request.service_type                                     # pylint: disable=no-member
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        service.service_status.service_status = ServiceStatusEnum.SERVICESTATUS_PLANNED     # pylint: disable=no-member

        if is_deployed_te() and service.service_type == ServiceTypeEnum.SERVICETYPE_TE:
            # TE service:
            context_client.SetService(request)

            te_service_client = TEServiceClient()
            service_status = te_service_client.RequestLSP(service)

            if service_status.service_status == ServiceStatusEnum.SERVICESTATUS_ACTIVE:
                _service : Optional[Service] = get_service_by_id(
                    context_client, request.service_id, rw_copy=True,
                    include_config_rules=False, include_constraints=False, include_endpoint_ids=False)
                _service.service_status.service_status = ServiceStatusEnum.SERVICESTATUS_ACTIVE
                service_id = context_client.SetService(_service)
                return service_id
            else:
                MSG = 'RequestLSP for Service({:s}) returned ServiceStatus({:s})'
                context_uuid = request.service_id.context_id.context_uuid.uuid
                service_uuid = request.service_id.service_uuid.uuid
                service_key = '{:s}/{:s}'.format(context_uuid, service_uuid)
                str_service_status = ServiceStatusEnum.Name(service_status.service_status)
                raise Exception(MSG.format(service_key, str_service_status))

        if is_deployed_e2e_orch() and service.service_type == ServiceTypeEnum.SERVICETYPE_E2E:
            # End-to-End service:
            service_id_with_uuids = context_client.SetService(request)

            service_with_uuids = get_service_by_id(
                context_client, service_id_with_uuids, rw_copy=False,
                include_config_rules=True, include_constraints=True, include_endpoint_ids=True)

            e2e_orch_request = E2EOrchestratorRequest()
            e2e_orch_request.service.CopyFrom(service_with_uuids)

            e2e_orch_client = E2EOrchestratorClient()
            e2e_orch_reply = e2e_orch_client.Compute(e2e_orch_request)

            # Feed TaskScheduler with this end-to-end orchestrator reply. TaskScheduler identifies
            # inter-dependencies among the services and connections retrieved and produces a
            # schedule of tasks (an ordered list of tasks to be executed) to implement the
            # requested create/update operation.
            tasks_scheduler = TasksScheduler(self.service_handler_factory)
            # e2e_orch_reply should be compatible with pathcomp_reply
            # TODO: if we extend e2e_orch_reply, implement method TasksScheduler::compose_from_e2eorchreply()
            tasks_scheduler.compose_from_pathcompreply(e2e_orch_reply, is_delete=False)
            tasks_scheduler.execute_all()
            return service_with_uuids.service_id


        # Normal service
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        del service.service_endpoint_ids[:] # pylint: disable=no-member
        for endpoint_id in request.service_endpoint_ids:
            service.service_endpoint_ids.add().CopyFrom(endpoint_id)    # pylint: disable=no-member
        device_list = context_client.ListDevices(Empty())

        LOGGER.debug('[before] request={:s}'.format(grpc_message_to_json_string(request)))
        for constraint in request.service_constraints:
            if constraint.action == ConstraintActionEnum.CONSTRAINTACTION_UNDEFINED:
                # Field action is new; assume if not set, it means SET
                constraint.action = ConstraintActionEnum.CONSTRAINTACTION_SET

            if constraint.action != ConstraintActionEnum.CONSTRAINTACTION_SET: continue
            if constraint.WhichOneof('constraint') != 'endpoint_location': continue
            if constraint.endpoint_location.HasField('endpoint_id'): continue

            service_location = constraint.endpoint_location.location
            distances = {}
            for device in device_list.devices:
                for endpoint in device.device_endpoints:
                    if not endpoint.endpoint_location.HasField('gps_position'): continue
                    distance = gps_distance(service_location.gps_position, endpoint.endpoint_location.gps_position)
                    distances[distance] = endpoint.endpoint_id

            closer_endpoint_id = distances[min(distances)]
            constraint.endpoint_location.endpoint_id.CopyFrom(closer_endpoint_id)

            service_endpoint_ids = [
                endpoint_id.endpoint_uuid
                for endpoint_id in service.service_endpoint_ids
            ]
            if closer_endpoint_id not in service_endpoint_ids:
                service.service_endpoint_ids.append(closer_endpoint_id)
        LOGGER.debug('[after] request={:s}'.format(grpc_message_to_json_string(request)))
        LOGGER.debug('[after] service={:s}'.format(grpc_message_to_json_string(service)))
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        del service.service_constraints[:]  # pylint: disable=no-member
        for constraint in request.service_constraints:
            service.service_constraints.add().CopyFrom(constraint)  # pylint: disable=no-member

        del service.service_config.config_rules[:]  # pylint: disable=no-member
        for config_rule in request.service_config.config_rules:
            service.service_config.config_rules.add().CopyFrom(config_rule) # pylint: disable=no-member
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

        service_id_with_uuids = context_client.SetService(service)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

        # PathComp requires endpoints, constraints and config rules
        service_with_uuids = get_service_by_id(
            context_client, service_id_with_uuids, rw_copy=False,
            include_config_rules=True, include_constraints=True, include_endpoint_ids=True)
        num_disjoint_paths = 1 if num_disjoint_paths is None or num_disjoint_paths == 0 else num_disjoint_paths
        num_expected_endpoints = num_disjoint_paths * 2
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        tasks_scheduler = TasksScheduler(self.service_handler_factory)
        if is_deployed_optical() and service.service_type == ServiceTypeEnum.SERVICETYPE_OPTICAL_CONNECTIVITY:
            context_id_x = json_context_id(DEFAULT_CONTEXT_NAME)
            topology_id_x = json_topology_id(
                DEFAULT_TOPOLOGY_NAME, context_id_x)
            topology_details = context_client.GetTopologyDetails(
                TopologyId(**topology_id_x))
        
            refresh_opticalcontroller(topology_id_x)
            # devices = get_devices_in_topology(context_client, TopologyId(**topology_id_x), ContextId(**context_id_x))
            devices = topology_details.devices
            context_uuid_x = topology_details.topology_id.context_id.context_uuid.uuid
            topology_uuid_x = topology_details.topology_id.topology_uuid.uuid
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

            device_names : Dict[str, str] = dict()
            for device in devices:
                device_uuid = device.device_id.device_uuid.uuid
                device_names[device_uuid] = device.name # ID => name
                device_names[device.name] = device.name # name => name (that way, if not present, crash)
            devs = []
            ports = []
            for endpoint_id in service.service_endpoint_ids:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
                endpoint_device_uuid = endpoint_id.device_id.device_uuid.uuid
                endpoint_device_name = device_names[endpoint_device_uuid]
                devs.append(endpoint_device_name)
                ports.append(endpoint_id.endpoint_uuid.uuid)
            src = devs[0]
            dst = devs[1]
            bidir = None
            ob_band = None
            bitrate = 100
            for constraint in service.service_constraints:
                if "bandwidth" in constraint.custom.constraint_type:
                    bitrate = int(float(constraint.custom.constraint_value))
                elif "bidirectionality" in constraint.custom.constraint_type:
                    bidir = int(constraint.custom.constraint_value)
                elif "optical-band-width" in constraint.custom.constraint_type:
                    ob_band = int(constraint.custom.constraint_value)

            # to get the reply form the optical module
            reply_txt = add_lightpath(src, dst, bitrate, bidir, ob_band)
            # reply with 2 transponders and 2 roadms
            reply_json = json.loads(reply_txt)
            LOGGER.debug('[optical] reply_json[{:s}]={:s}'.format(str(type(reply_json)), str(reply_json)))
            optical_band_txt = ""
            if "new_optical_band" in reply_json.keys():
                if reply_json["new_optical_band"] == 1:
                    if reply_json["parent_opt_band"]:
                        if "parent_opt_band" in reply_json.keys():
                            parent_ob = reply_json["parent_opt_band"]
                            LOGGER.debug('Parent optical-band={}'.format(parent_ob))
                            optical_band_txt = get_optical_band(parent_ob)
                        else:
                            LOGGER.debug('expected optical band not found')
                    else:
                        LOGGER.debug('expected optical band not found')
                else:
                    LOGGER.debug('Using existing optical band')
            else:
                LOGGER.debug('Using existing optical band')
            if reply_txt is not None:
                optical_reply = adapt_reply(
                    devices, _service, reply_json, context_uuid_x, topology_uuid_x, optical_band_txt
                )

                tasks_scheduler.compose_from_pathcompreply(
                    optical_reply, is_delete=False)
        else:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            if len(service_with_uuids.service_endpoint_ids) >= num_expected_endpoints:
                pathcomp_request = PathCompRequest()
                pathcomp_request.services.append(service_with_uuids)    # pylint: disable=no-member

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
                if num_disjoint_paths is None or num_disjoint_paths in {0, 1} :
                    pathcomp_request.shortest_path.Clear()              # pylint: disable=no-member
                else:
                    pathcomp_request.k_disjoint_path.num_disjoint = num_disjoint_paths  # pylint: disable=no-member
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
                pathcomp = PathCompClient()
                pathcomp_reply = pathcomp.Compute(pathcomp_request)
                pathcomp.close()
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
                # Feed TaskScheduler with this path computation reply. TaskScheduler identifies inter-dependencies among
                # the services and connections retrieved and produces a schedule of tasks (an ordered list of tasks to be
                # executed) to implement the requested create/update operation.
                tasks_scheduler.compose_from_pathcompreply(pathcomp_reply, is_delete=False)
        tasks_scheduler.execute_all()
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        return service_with_uuids.service_id
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    def DeleteService(self, request : ServiceId, context : grpc.ServicerContext) -> Empty:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        context_client = ContextClient()
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        # Set service status to "SERVICESTATUS_PENDING_REMOVAL" to ensure rest of components are aware the service is
        # being modified.
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        service : Optional[Service] = get_service_by_id(context_client, request, rw_copy=True)
        if service is None: raise Exception('Service({:s}) not found'.format(grpc_message_to_json_string(request)))
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        # pylint: disable=no-member
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        service.service_status.service_status = ServiceStatusEnum.SERVICESTATUS_PENDING_REMOVAL
Mohammad Ismaeel's avatar
Mohammad Ismaeel committed
        if service.service_type == ServiceTypeEnum.SERVICETYPE_OPTICAL_CONNECTIVITY:
             service.service_status.service_status = ServiceStatusEnum.SERVICESTATUS_ACTIVE
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        context_client.SetService(service)

        if is_deployed_te() and service.service_type == ServiceTypeEnum.SERVICETYPE_TE:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            # TE service
            te_service_client = TEServiceClient()
            te_service_client.DeleteLSP(request)
            context_client.RemoveService(request)
            return Empty()
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        if is_deployed_optical() and service.service_type == ServiceTypeEnum.SERVICETYPE_OPTICAL_CONNECTIVITY:
            params = {
                "src"     : None,
                "dst"     : None,
                "bitrate" : None,
                'ob_id'   : None,
                'flow_id' : None
            devs = []
            context_id_x = json_context_id(DEFAULT_CONTEXT_NAME)
            topology_id_x = json_topology_id(
                DEFAULT_TOPOLOGY_NAME, context_id_x)
            topology_details = context_client.GetTopologyDetails(
                TopologyId(**topology_id_x))
            devices = topology_details.devices
            for endpoint_id in service.service_endpoint_ids:
                devs.append(endpoint_id.device_id.device_uuid.uuid)
            src = get_device_name_from_uuid(devices, devs[0])
            dst = get_device_name_from_uuid(devices, devs[1])
            bitrate = 100
            for constraint in service.service_constraints:
                if "bandwidth" in constraint.custom.constraint_type:
                    bitrate = int(float(constraint.custom.constraint_value))
                    break
Carlos Manso's avatar
Carlos Manso committed
            if service.service_constraints:
                bitrate = int(float(
                    service.service_constraints[0].custom.constraint_value
                ))
            if len(service.service_config.config_rules) > 0:
                c_rules_dict = json.loads(
                service.service_config.config_rules[0].custom.resource_value)
                ob_id = None
                flow_id = None
                if "ob_id" in c_rules_dict:
                    ob_id = c_rules_dict["ob_id"]
                if ("flow_id" in c_rules_dict):
                    flow_id = c_rules_dict["flow_id"]
                    #if ("ob_id" in c_rules_dict):
                    #    ob_id = c_rules_dict["ob_id"]
                params['bitrate'] = bitrate
                params['dst'    ] = dst
                params['src'    ] = src
                params['ob_id'  ] = ob_id
                params['flow_id'] = flow_id

            tasks_scheduler = TasksScheduler(self.service_handler_factory)
            tasks_scheduler.compose_from_optical_service(service, params=params, is_delete=True)
            tasks_scheduler.execute_all()
            return Empty()
        # Normal service
        # Feed TaskScheduler with this service and the sub-services and sub-connections related to this service.
        # TaskScheduler identifies inter-dependencies among them and produces a schedule of tasks (an ordered list of
        # tasks to be executed) to implement the requested delete operation.
        tasks_scheduler = TasksScheduler(self.service_handler_factory)
        tasks_scheduler.compose_from_service(service, is_delete=True)
        tasks_scheduler.execute_all()
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    def RecomputeConnections(self, request : Service, context : grpc.ServicerContext) -> Empty:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        if len(request.service_endpoint_ids) > 0:
            raise NotImplementedException('update-endpoints')

        if len(request.service_constraints) > 0:
            raise NotImplementedException('update-constraints')

        if len(request.service_config.config_rules) > 0:
            raise NotImplementedException('update-config-rules')

        context_client = ContextClient()

        updated_service : Optional[Service] = get_service_by_id(
            context_client, request.service_id, rw_copy=True,
            include_config_rules=False, include_constraints=False, include_endpoint_ids=False)

        if updated_service is None:
            raise NotFoundException('service', request.service_id.service_uuid.uuid)

        # pylint: disable=no-member
        if updated_service.service_type == ServiceTypeEnum.SERVICETYPE_UNKNOWN:
            raise InvalidArgumentException(
                'request.service_type', ServiceTypeEnum.Name(updated_service.service_type)
            )

        # Set service status to "SERVICESTATUS_UPDATING" to ensure rest of components are aware the service is
        # being modified.
        # pylint: disable=no-member
        updated_service.service_status.service_status = ServiceStatusEnum.SERVICESTATUS_UPDATING

        # Update endpoints
        # pylint: disable=no-member
        #del updated_service.service_endpoint_ids[:]
        #updated_service.service_endpoint_ids.extend(request.service_endpoint_ids)

        # Update constraints
        # pylint: disable=no-member
        #del updated_service.service_constraints[:]
        #updated_service.service_constraints.extend(request.service_constraints)

        # Update config rules
        # pylint: disable=no-member
        #del updated_service.service_config.config_rules[:]
        #updated_service.service_config.config_rules.extend(request.service_config.config_rules)

        updated_service_id_with_uuids = context_client.SetService(updated_service)

        # PathComp requires endpoints, constraints and config rules
        updated_service_with_uuids = get_service_by_id(
            context_client, updated_service_id_with_uuids, rw_copy=True,
            include_config_rules=True, include_constraints=True, include_endpoint_ids=True)

        # Get active connection
        connections = context_client.ListConnections(updated_service_id_with_uuids)
        if len(connections.connections) == 0:
            MSG = 'Service({:s}) has no connections'
            str_service_id = grpc_message_to_json_string(updated_service_id_with_uuids)
            str_extra_details = MSG.format(str_service_id)
            raise NotImplementedException('service-with-no-connections', extra_details=str_extra_details)
        if len(connections.connections) > 1:
            MSG = 'Service({:s}) has multiple ({:d}) connections({:s})'
            str_service_id = grpc_message_to_json_string(updated_service_id_with_uuids)
            num_connections = len(connections.connections)
            str_connections = grpc_message_to_json_string(connections)
            str_extra_details = MSG.format(str_service_id, num_connections, str_connections)
            raise NotImplementedException('service-with-multiple-connections', extra_details=str_extra_details)

        old_connection = connections.connections[0]
        if len(old_connection.sub_service_ids) > 0:
            MSG = 'Service({:s})/Connection({:s}) has sub-services: {:s}'
            str_service_id = grpc_message_to_json_string(updated_service_id_with_uuids)
            str_connection_id = grpc_message_to_json_string(old_connection.connection_id)
            str_connection = grpc_message_to_json_string(old_connection)
            str_extra_details = MSG.format(str_service_id, str_connection_id, str_connection)
            raise NotImplementedException('service-connection-with-subservices', extra_details=str_extra_details)

        # Find alternative connections
        # pylint: disable=no-member
        pathcomp_request = PathCompRequest()
        pathcomp_request.services.append(updated_service_with_uuids)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        #pathcomp_request.k_disjoint_path.num_disjoint = 100
        pathcomp_request.k_shortest_path.k_inspection = 100
        pathcomp_request.k_shortest_path.k_return = 3

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        LOGGER.debug('pathcomp_request={:s}'.format(grpc_message_to_json_string(pathcomp_request)))
        pathcomp = PathCompClient()
        pathcomp_reply = pathcomp.Compute(pathcomp_request)
        pathcomp.close()
        LOGGER.debug('pathcomp_reply={:s}'.format(grpc_message_to_json_string(pathcomp_reply)))

        if len(pathcomp_reply.services) == 0:
            MSG = 'KDisjointPath reported no services for Service({:s}): {:s}'
            str_service_id = grpc_message_to_json_string(updated_service_id_with_uuids)
            str_pathcomp_reply = grpc_message_to_json_string(pathcomp_reply)
            str_extra_details = MSG.format(str_service_id, str_pathcomp_reply)
            raise NotImplementedException('kdisjointpath-no-services', extra_details=str_extra_details)

        if len(pathcomp_reply.services) > 1:
            MSG = 'KDisjointPath reported subservices for Service({:s}): {:s}'
            str_service_id = grpc_message_to_json_string(updated_service_id_with_uuids)
            str_pathcomp_reply = grpc_message_to_json_string(pathcomp_reply)
            str_extra_details = MSG.format(str_service_id, str_pathcomp_reply)
            raise NotImplementedException('kdisjointpath-subservices', extra_details=str_extra_details)

        if len(pathcomp_reply.connections) == 0:
            MSG = 'KDisjointPath reported no connections for Service({:s}): {:s}'
            str_service_id = grpc_message_to_json_string(updated_service_id_with_uuids)
            str_pathcomp_reply = grpc_message_to_json_string(pathcomp_reply)
            str_extra_details = MSG.format(str_service_id, str_pathcomp_reply)
            raise NotImplementedException('kdisjointpath-no-connections', extra_details=str_extra_details)

        # compute a string representing the old connection
        str_old_connection = connection_to_string(old_connection)

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        LOGGER.debug('old_connection={:s}'.format(grpc_message_to_json_string(old_connection)))

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        candidate_new_connections = list()
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        for candidate_new_connection in pathcomp_reply.connections:
            str_candidate_new_connection = connection_to_string(candidate_new_connection)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            if str_candidate_new_connection == str_old_connection: continue
            candidate_new_connections.append(candidate_new_connection)

        if len(candidate_new_connections) == 0:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            MSG = 'Unable to find a new suitable path: pathcomp_request={:s} pathcomp_reply={:s} old_connection={:s}'
            str_pathcomp_request = grpc_message_to_json_string(pathcomp_request)
            str_pathcomp_reply = grpc_message_to_json_string(pathcomp_reply)
            str_old_connection = grpc_message_to_json_string(old_connection)
            extra_details = MSG.format(str_pathcomp_request, str_pathcomp_reply, str_old_connection)
            raise OperationFailedException('no-new-path-found', extra_details=extra_details)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        
        str_candidate_new_connections = [
            grpc_message_to_json_string(candidate_new_connection)
            for candidate_new_connection in candidate_new_connections
        ]
        LOGGER.debug('candidate_new_connections={:s}'.format(str(str_candidate_new_connections)))
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        new_connection = random.choice(candidate_new_connections)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        LOGGER.debug('new_connection={:s}'.format(grpc_message_to_json_string(new_connection)))

        # Change UUID of new connection to prevent collisions
        tmp_connection = Connection()
        tmp_connection.CopyFrom(new_connection)
        tmp_connection.connection_id.connection_uuid.uuid = str(uuid.uuid4())
        new_connection = tmp_connection
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

        # Feed TaskScheduler with the service to update, the old connection to
        # deconfigure and the new connection to configure. It will produce a
        # schedule of tasks (an ordered list of tasks to be executed) to
        # implement the requested changes.
        tasks_scheduler = TasksScheduler(self.service_handler_factory)
        tasks_scheduler.compose_service_connection_update(
            updated_service_with_uuids, old_connection, new_connection)
        tasks_scheduler.execute_all()

        return Empty()