Skip to content
Snippets Groups Projects
ServiceServiceServicerImpl.py 22.7 KiB
Newer Older
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import copy, grpc, json, logging, random, uuid
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from typing import Optional
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_method
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from common.method_wrappers.ServiceExceptions import (
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    AlreadyExistsException, InvalidArgumentException, NotFoundException, NotImplementedException,
    OperationFailedException)
from common.proto.context_pb2 import (
    Connection, Empty, Service, ServiceId, ServiceStatusEnum, ServiceTypeEnum, ConstraintActionEnum)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from common.proto.pathcomp_pb2 import PathCompRequest
from common.proto.service_pb2_grpc import ServiceServiceServicer
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from common.tools.context_queries.Service import get_service_by_id
from common.tools.grpc.Tools import grpc_message_to_json, grpc_message_to_json_string
from context.client.ContextClient import ContextClient
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from pathcomp.frontend.client.PathCompClient import PathCompClient
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from service.service.tools.ConnectionToString import connection_to_string
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from service.client.TEServiceClient import TEServiceClient
from .service_handler_api.ServiceHandlerFactory import ServiceHandlerFactory
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from .task_scheduler.TaskScheduler import TasksScheduler
Carlos Manso's avatar
Carlos Manso committed
from .tools.GeodesicDistance import gps_distance
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

LOGGER = logging.getLogger(__name__)

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
METRICS_POOL = MetricsPool('Service', 'RPC')
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

class ServiceServiceServicerImpl(ServiceServiceServicer):
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    def __init__(self, service_handler_factory : ServiceHandlerFactory) -> None:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        LOGGER.debug('Creating Servicer...')
        self.service_handler_factory = service_handler_factory
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        LOGGER.debug('Servicer Created')

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    def CreateService(self, request : Service, context : grpc.ServicerContext) -> ServiceId:
        if len(request.service_endpoint_ids) > 0:
            unexpected_endpoints = []
            for service_endpoint_id in request.service_endpoint_ids:
                unexpected_endpoints.append(grpc_message_to_json(service_endpoint_id))
            str_unexpected_endpoints = json.dumps(unexpected_endpoints, sort_keys=True)
            raise InvalidArgumentException(
                'service.service_endpoint_ids', str_unexpected_endpoints,
                extra_details='RPC method CreateService does not accept Endpoints. '\
                              'Endpoints should be configured after creating the service.')

        if len(request.service_constraints) > 0:
            unexpected_constraints = []
            for service_constraint in request.service_constraints:
                unexpected_constraints.append(grpc_message_to_json(service_constraint))
            str_unexpected_constraints = json.dumps(unexpected_constraints, sort_keys=True)
            raise InvalidArgumentException(
                'service.service_constraints', str_unexpected_constraints,
                extra_details='RPC method CreateService does not accept Constraints. '\
                              'Constraints should be configured after creating the service.')

        if len(request.service_config.config_rules) > 0:
            unexpected_config_rules = grpc_message_to_json(request.service_config)
            unexpected_config_rules = unexpected_config_rules['config_rules']
            str_unexpected_config_rules = json.dumps(unexpected_config_rules, sort_keys=True)
            raise InvalidArgumentException(
                'service.service_config.config_rules', str_unexpected_config_rules,
                extra_details='RPC method CreateService does not accept Config Rules. '\
                              'Config Rules should be configured after creating the service.')

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        # check that service does not exist
        context_client = ContextClient()
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        current_service = get_service_by_id(
            context_client, request.service_id, rw_copy=False,
            include_config_rules=False, include_constraints=False, include_endpoint_ids=False)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        if current_service is not None:
            context_uuid = request.service_id.context_id.context_uuid.uuid
            service_uuid = request.service_id.service_uuid.uuid
            raise AlreadyExistsException(
                'Service', service_uuid, extra_details='context_uuid={:s}'.format(str(context_uuid)))
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        # just create the service in the Context database to lock the service_id
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        # update will perform changes on the resources
        service_id = context_client.SetService(request)
        return service_id
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    def UpdateService(self, request : Service, context : grpc.ServicerContext) -> ServiceId:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        # Set service status to "SERVICESTATUS_PLANNED" to ensure rest of components are aware the service is
        # being modified.
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        context_client = ContextClient()
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        _service : Optional[Service] = get_service_by_id(
            context_client, request.service_id, rw_copy=False,
            include_config_rules=True, include_constraints=True, include_endpoint_ids=True)

        # Identify service constraints        
        num_disjoint_paths = None
        is_diverse = False
        for constraint in request.service_constraints:
            constraint_kind = constraint.WhichOneof('constraint')
            if constraint_kind == 'sla_availability':
                num_disjoint_paths = constraint.sla_availability.num_disjoint_paths
            elif constraint_kind == 'custom':
                if constraint.custom.constraint_type == 'diversity': is_diverse = True
            elif constraint_kind == 'endpoint_location':
                location = constraint.endpoint_location.location
                if location.WhichOneof('location') == 'gps_position': gps_location_aware = True
            else:
                continue
        LOGGER.debug('num_disjoint_paths={:s}'.format(str(num_disjoint_paths)))
        LOGGER.debug('is_diverse={:s}'.format(str(is_diverse)))
        LOGGER.debug('gps_location_aware={:s}'.format(str(gps_location_aware)))

        if _service is not None and num_disjoint_paths is None and not is_diverse and gps_location_aware:
            LOGGER.debug('  Removing previous service')
            tasks_scheduler = TasksScheduler(self.service_handler_factory)
            tasks_scheduler.compose_from_service(_service, is_delete=True)
            tasks_scheduler.execute_all()

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        service = Service()
        service.CopyFrom(request if _service is None else _service)

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        if service.service_type == ServiceTypeEnum.SERVICETYPE_UNKNOWN:                     # pylint: disable=no-member
            service.service_type = request.service_type                                     # pylint: disable=no-member
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        service.service_status.service_status = ServiceStatusEnum.SERVICESTATUS_PLANNED     # pylint: disable=no-member

        if service.service_type == ServiceTypeEnum.SERVICETYPE_TE:
            # TE service:
            context_client.SetService(request)

            te_service_client = TEServiceClient()
            service_status = te_service_client.RequestLSP(service)

            if service_status.service_status == ServiceStatusEnum.SERVICESTATUS_ACTIVE:
                _service : Optional[Service] = get_service_by_id(
                    context_client, request.service_id, rw_copy=True,
                    include_config_rules=False, include_constraints=False, include_endpoint_ids=False)
                _service.service_status.service_status = ServiceStatusEnum.SERVICESTATUS_ACTIVE
                service_id = context_client.SetService(_service)
                return service_id
            else:
                MSG = 'RequestLSP for Service({:s}) returned ServiceStatus({:s})'
                context_uuid = request.service_id.context_id.context_uuid.uuid
                service_uuid = request.service_id.service_uuid.uuid
                service_key = '{:s}/{:s}'.format(context_uuid, service_uuid)
                str_service_status = ServiceStatusEnum.Name(service_status.service_status)
                raise Exception(MSG.format(service_key, str_service_status))

        # Normal service
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        del service.service_endpoint_ids[:] # pylint: disable=no-member
        for endpoint_id in request.service_endpoint_ids:
            service.service_endpoint_ids.add().CopyFrom(endpoint_id)    # pylint: disable=no-member
        device_list = context_client.ListDevices(Empty())

        LOGGER.debug('[before] request={:s}'.format(grpc_message_to_json_string(request)))
        for constraint in request.service_constraints:
            if constraint.action == ConstraintActionEnum.CONSTRAINTACTION_UNDEFINED:
                # Field action is new; assume if not set, it means SET
                constraint.action = ConstraintActionEnum.CONSTRAINTACTION_SET

            if constraint.action != ConstraintActionEnum.CONSTRAINTACTION_SET: continue
            if constraint.WhichOneof('constraint') != 'endpoint_location': continue
            if constraint.endpoint_location.HasField('endpoint_id'): continue

            service_location = constraint.endpoint_location.location
            distances = {}
            for device in device_list.devices:
                for endpoint in device.device_endpoints:
                    if not endpoint.endpoint_location.HasField('gps_position'): continue
                    distance = gps_distance(service_location.gps_position, endpoint.endpoint_location.gps_position)
                    distances[distance] = endpoint.endpoint_id

            closer_endpoint_id = distances[min(distances)]
            constraint.endpoint_location.endpoint_id.CopyFrom(closer_endpoint_id)

            service_endpoint_ids = [
                endpoint_id.endpoint_uuid
                for endpoint_id in service.service_endpoint_ids
            ]
            if closer_endpoint_id not in service_endpoint_ids:
                service.service_endpoint_ids.append(closer_endpoint_id)
        LOGGER.debug('[after] request={:s}'.format(grpc_message_to_json_string(request)))
        LOGGER.debug('[after] service={:s}'.format(grpc_message_to_json_string(service)))
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        del service.service_constraints[:]  # pylint: disable=no-member
        for constraint in request.service_constraints:
            service.service_constraints.add().CopyFrom(constraint)  # pylint: disable=no-member

        del service.service_config.config_rules[:]  # pylint: disable=no-member
        for config_rule in request.service_config.config_rules:
            service.service_config.config_rules.add().CopyFrom(config_rule) # pylint: disable=no-member
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

        service_id_with_uuids = context_client.SetService(service)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

        # PathComp requires endpoints, constraints and config rules
        service_with_uuids = get_service_by_id(
            context_client, service_id_with_uuids, rw_copy=False,
            include_config_rules=True, include_constraints=True, include_endpoint_ids=True)
        num_disjoint_paths = 1 if num_disjoint_paths is None or num_disjoint_paths == 0 else num_disjoint_paths
        num_expected_endpoints = num_disjoint_paths * 2
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        tasks_scheduler = TasksScheduler(self.service_handler_factory)
Carlos Manso's avatar
Carlos Manso committed
        if len(service_with_uuids.service_endpoint_ids) >= num_expected_endpoints:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            pathcomp_request = PathCompRequest()
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            pathcomp_request.services.append(service_with_uuids)    # pylint: disable=no-member
            if num_disjoint_paths is None or num_disjoint_paths in {0, 1}:
                pathcomp_request.shortest_path.Clear()              # pylint: disable=no-member
            else:
                pathcomp_request.k_disjoint_path.num_disjoint = num_disjoint_paths  # pylint: disable=no-member

            LOGGER.debug('pathcomp_request={:s}'.format(grpc_message_to_json_string(pathcomp_request)))
            pathcomp = PathCompClient()
            pathcomp_reply = pathcomp.Compute(pathcomp_request)
            pathcomp.close()
            LOGGER.debug('pathcomp_reply={:s}'.format(grpc_message_to_json_string(pathcomp_reply)))

            # Feed TaskScheduler with this path computation reply. TaskScheduler identifies inter-dependencies among
            # the services and connections retrieved and produces a schedule of tasks (an ordered list of tasks to be
            # executed) to implement the requested create/update operation.
            tasks_scheduler.compose_from_pathcompreply(pathcomp_reply, is_delete=False)
        tasks_scheduler.execute_all()
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        return service_with_uuids.service_id
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    def DeleteService(self, request : ServiceId, context : grpc.ServicerContext) -> Empty:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        context_client = ContextClient()
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        # Set service status to "SERVICESTATUS_PENDING_REMOVAL" to ensure rest of components are aware the service is
        # being modified.
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        service : Optional[Service] = get_service_by_id(context_client, request, rw_copy=True)
        if service is None: raise Exception('Service({:s}) not found'.format(grpc_message_to_json_string(request)))
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        # pylint: disable=no-member
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        service.service_status.service_status = ServiceStatusEnum.SERVICESTATUS_PENDING_REMOVAL
        context_client.SetService(service)

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        if service.service_type == ServiceTypeEnum.SERVICETYPE_TE:
            # TE service
            te_service_client = TEServiceClient()
            te_service_client.DeleteLSP(request)
            context_client.RemoveService(request)
            return Empty()
        # Normal service
        # Feed TaskScheduler with this service and the sub-services and sub-connections related to this service.
        # TaskScheduler identifies inter-dependencies among them and produces a schedule of tasks (an ordered list of
        # tasks to be executed) to implement the requested delete operation.
        tasks_scheduler = TasksScheduler(self.service_handler_factory)
        tasks_scheduler.compose_from_service(service, is_delete=True)
        tasks_scheduler.execute_all()
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    def RecomputeConnections(self, request : Service, context : grpc.ServicerContext) -> Empty:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        if len(request.service_endpoint_ids) > 0:
            raise NotImplementedException('update-endpoints')

        if len(request.service_constraints) > 0:
            raise NotImplementedException('update-constraints')

        if len(request.service_config.config_rules) > 0:
            raise NotImplementedException('update-config-rules')

        context_client = ContextClient()

        updated_service : Optional[Service] = get_service_by_id(
            context_client, request.service_id, rw_copy=True,
            include_config_rules=False, include_constraints=False, include_endpoint_ids=False)

        if updated_service is None:
            raise NotFoundException('service', request.service_id.service_uuid.uuid)

        # pylint: disable=no-member
        if updated_service.service_type == ServiceTypeEnum.SERVICETYPE_UNKNOWN:
            raise InvalidArgumentException(
                'request.service_type', ServiceTypeEnum.Name(updated_service.service_type)
            )

        # Set service status to "SERVICESTATUS_UPDATING" to ensure rest of components are aware the service is
        # being modified.
        # pylint: disable=no-member
        updated_service.service_status.service_status = ServiceStatusEnum.SERVICESTATUS_UPDATING

        # Update endpoints
        # pylint: disable=no-member
        #del updated_service.service_endpoint_ids[:]
        #updated_service.service_endpoint_ids.extend(request.service_endpoint_ids)

        # Update constraints
        # pylint: disable=no-member
        #del updated_service.service_constraints[:]
        #updated_service.service_constraints.extend(request.service_constraints)

        # Update config rules
        # pylint: disable=no-member
        #del updated_service.service_config.config_rules[:]
        #updated_service.service_config.config_rules.extend(request.service_config.config_rules)

        updated_service_id_with_uuids = context_client.SetService(updated_service)

        # PathComp requires endpoints, constraints and config rules
        updated_service_with_uuids = get_service_by_id(
            context_client, updated_service_id_with_uuids, rw_copy=True,
            include_config_rules=True, include_constraints=True, include_endpoint_ids=True)

        # Get active connection
        connections = context_client.ListConnections(updated_service_id_with_uuids)
        if len(connections.connections) == 0:
            MSG = 'Service({:s}) has no connections'
            str_service_id = grpc_message_to_json_string(updated_service_id_with_uuids)
            str_extra_details = MSG.format(str_service_id)
            raise NotImplementedException('service-with-no-connections', extra_details=str_extra_details)
        if len(connections.connections) > 1:
            MSG = 'Service({:s}) has multiple ({:d}) connections({:s})'
            str_service_id = grpc_message_to_json_string(updated_service_id_with_uuids)
            num_connections = len(connections.connections)
            str_connections = grpc_message_to_json_string(connections)
            str_extra_details = MSG.format(str_service_id, num_connections, str_connections)
            raise NotImplementedException('service-with-multiple-connections', extra_details=str_extra_details)

        old_connection = connections.connections[0]
        if len(old_connection.sub_service_ids) > 0:
            MSG = 'Service({:s})/Connection({:s}) has sub-services: {:s}'
            str_service_id = grpc_message_to_json_string(updated_service_id_with_uuids)
            str_connection_id = grpc_message_to_json_string(old_connection.connection_id)
            str_connection = grpc_message_to_json_string(old_connection)
            str_extra_details = MSG.format(str_service_id, str_connection_id, str_connection)
            raise NotImplementedException('service-connection-with-subservices', extra_details=str_extra_details)

        # Find alternative connections
        # pylint: disable=no-member
        pathcomp_request = PathCompRequest()
        pathcomp_request.services.append(updated_service_with_uuids)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        #pathcomp_request.k_disjoint_path.num_disjoint = 100
        pathcomp_request.k_shortest_path.k_inspection = 100
        pathcomp_request.k_shortest_path.k_return = 3

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        LOGGER.debug('pathcomp_request={:s}'.format(grpc_message_to_json_string(pathcomp_request)))
        pathcomp = PathCompClient()
        pathcomp_reply = pathcomp.Compute(pathcomp_request)
        pathcomp.close()
        LOGGER.debug('pathcomp_reply={:s}'.format(grpc_message_to_json_string(pathcomp_reply)))

        if len(pathcomp_reply.services) == 0:
            MSG = 'KDisjointPath reported no services for Service({:s}): {:s}'
            str_service_id = grpc_message_to_json_string(updated_service_id_with_uuids)
            str_pathcomp_reply = grpc_message_to_json_string(pathcomp_reply)
            str_extra_details = MSG.format(str_service_id, str_pathcomp_reply)
            raise NotImplementedException('kdisjointpath-no-services', extra_details=str_extra_details)

        if len(pathcomp_reply.services) > 1:
            MSG = 'KDisjointPath reported subservices for Service({:s}): {:s}'
            str_service_id = grpc_message_to_json_string(updated_service_id_with_uuids)
            str_pathcomp_reply = grpc_message_to_json_string(pathcomp_reply)
            str_extra_details = MSG.format(str_service_id, str_pathcomp_reply)
            raise NotImplementedException('kdisjointpath-subservices', extra_details=str_extra_details)

        if len(pathcomp_reply.connections) == 0:
            MSG = 'KDisjointPath reported no connections for Service({:s}): {:s}'
            str_service_id = grpc_message_to_json_string(updated_service_id_with_uuids)
            str_pathcomp_reply = grpc_message_to_json_string(pathcomp_reply)
            str_extra_details = MSG.format(str_service_id, str_pathcomp_reply)
            raise NotImplementedException('kdisjointpath-no-connections', extra_details=str_extra_details)

        # compute a string representing the old connection
        str_old_connection = connection_to_string(old_connection)

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        LOGGER.debug('old_connection={:s}'.format(grpc_message_to_json_string(old_connection)))

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        candidate_new_connections = list()
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        for candidate_new_connection in pathcomp_reply.connections:
            str_candidate_new_connection = connection_to_string(candidate_new_connection)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            if str_candidate_new_connection == str_old_connection: continue
            candidate_new_connections.append(candidate_new_connection)

        if len(candidate_new_connections) == 0:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            MSG = 'Unable to find a new suitable path: pathcomp_request={:s} pathcomp_reply={:s} old_connection={:s}'
            str_pathcomp_request = grpc_message_to_json_string(pathcomp_request)
            str_pathcomp_reply = grpc_message_to_json_string(pathcomp_reply)
            str_old_connection = grpc_message_to_json_string(old_connection)
            extra_details = MSG.format(str_pathcomp_request, str_pathcomp_reply, str_old_connection)
            raise OperationFailedException('no-new-path-found', extra_details=extra_details)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        
        str_candidate_new_connections = [
            grpc_message_to_json_string(candidate_new_connection)
            for candidate_new_connection in candidate_new_connections
        ]
        LOGGER.debug('candidate_new_connections={:s}'.format(str(str_candidate_new_connections)))
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        new_connection = random.choice(candidate_new_connections)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        LOGGER.debug('new_connection={:s}'.format(grpc_message_to_json_string(new_connection)))

        # Change UUID of new connection to prevent collisions
        tmp_connection = Connection()
        tmp_connection.CopyFrom(new_connection)
        tmp_connection.connection_id.connection_uuid.uuid = str(uuid.uuid4())
        new_connection = tmp_connection
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

        # Feed TaskScheduler with the service to update, the old connection to
        # deconfigure and the new connection to configure. It will produce a
        # schedule of tasks (an ordered list of tasks to be executed) to
        # implement the requested changes.
        tasks_scheduler = TasksScheduler(self.service_handler_factory)
        tasks_scheduler.compose_service_connection_update(
            updated_service_with_uuids, old_connection, new_connection)
        tasks_scheduler.execute_all()

        return Empty()