Newer
Older
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_method
from common.method_wrappers.ServiceExceptions import (
AlreadyExistsException, InvalidArgumentException, NotFoundException, NotImplementedException,
OperationFailedException)
from common.proto.context_pb2 import (
Connection, ConstraintActionEnum, Empty, Service, ServiceId, ServiceStatusEnum,
ServiceTypeEnum, TopologyId
)
from common.proto.pathcomp_pb2 import PathCompRequest
from common.proto.e2eorchestrator_pb2 import E2EOrchestratorRequest
from common.proto.service_pb2_grpc import ServiceServiceServicer
from common.tools.context_queries.Service import get_service_by_id
from common.tools.grpc.Tools import grpc_message_to_json, grpc_message_to_json_string
from common.tools.object_factory.Context import json_context_id
from common.tools.object_factory.Topology import json_topology_id
from common.Constants import DEFAULT_CONTEXT_NAME, DEFAULT_TOPOLOGY_NAME

Lluis Gifre Renom
committed
from context.client.ContextClient import ContextClient
from e2e_orchestrator.client.E2EOrchestratorClient import E2EOrchestratorClient
from pathcomp.frontend.client.PathCompClient import PathCompClient
from service.service.tools.ConnectionToString import connection_to_string
from service.client.TEServiceClient import TEServiceClient

Lluis Gifre Renom
committed
from .service_handler_api.ServiceHandlerFactory import ServiceHandlerFactory
from .task_scheduler.TaskScheduler import TasksScheduler
from .tools.OpticalTools import (
add_lightpath, delete_lightpath, adapt_reply, get_device_name_from_uuid, get_optical_band
)
class ServiceServiceServicerImpl(ServiceServiceServicer):
def __init__(self, service_handler_factory : ServiceHandlerFactory) -> None:

Lluis Gifre Renom
committed
self.service_handler_factory = service_handler_factory
def CreateService(self, request : Service, context : grpc.ServicerContext) -> ServiceId:

Lluis Gifre Renom
committed
if len(request.service_endpoint_ids) > 0:
unexpected_endpoints = []
for service_endpoint_id in request.service_endpoint_ids:
unexpected_endpoints.append(grpc_message_to_json(service_endpoint_id))

Lluis Gifre Renom
committed
str_unexpected_endpoints = json.dumps(unexpected_endpoints, sort_keys=True)
raise InvalidArgumentException(
'service.service_endpoint_ids', str_unexpected_endpoints,
extra_details='RPC method CreateService does not accept Endpoints. '\
'Endpoints should be configured after creating the service.')
if len(request.service_constraints) > 0:
unexpected_constraints = []
for service_constraint in request.service_constraints:
unexpected_constraints.append(grpc_message_to_json(service_constraint))

Lluis Gifre Renom
committed
str_unexpected_constraints = json.dumps(unexpected_constraints, sort_keys=True)
raise InvalidArgumentException(
'service.service_constraints', str_unexpected_constraints,
extra_details='RPC method CreateService does not accept Constraints. '\
'Constraints should be configured after creating the service.')
if len(request.service_config.config_rules) > 0:
unexpected_config_rules = grpc_message_to_json(request.service_config)

Lluis Gifre Renom
committed
unexpected_config_rules = unexpected_config_rules['config_rules']
str_unexpected_config_rules = json.dumps(unexpected_config_rules, sort_keys=True)
raise InvalidArgumentException(
'service.service_config.config_rules', str_unexpected_config_rules,
extra_details='RPC method CreateService does not accept Config Rules. '\
'Config Rules should be configured after creating the service.')
# check that service does not exist
context_client = ContextClient()
current_service = get_service_by_id(
context_client, request.service_id, rw_copy=False,
include_config_rules=False, include_constraints=False, include_endpoint_ids=False)
if current_service is not None:
context_uuid = request.service_id.context_id.context_uuid.uuid
service_uuid = request.service_id.service_uuid.uuid
raise AlreadyExistsException(
'Service', service_uuid, extra_details='context_uuid={:s}'.format(str(context_uuid)))

Lluis Gifre Renom
committed
# just create the service in the Context database to lock the service_id
# update will perform changes on the resources
service_id = context_client.SetService(request)
return service_id
def UpdateService(self, request : Service, context : grpc.ServicerContext) -> ServiceId:
# Set service status to "SERVICESTATUS_PLANNED" to ensure rest of components are aware the service is
# being modified.
_service : Optional[Service] = get_service_by_id(
context_client, request.service_id, rw_copy=False,
include_config_rules=True, include_constraints=True, include_endpoint_ids=True)
# Identify service constraints
num_disjoint_paths = None
is_diverse = False

Lluis Gifre Renom
committed
gps_location_aware = False
for constraint in request.service_constraints:
constraint_kind = constraint.WhichOneof('constraint')
if constraint_kind == 'sla_availability':
num_disjoint_paths = constraint.sla_availability.num_disjoint_paths
elif constraint_kind == 'custom':
if constraint.custom.constraint_type == 'diversity': is_diverse = True
elif constraint_kind == 'endpoint_location':
location = constraint.endpoint_location.location
if location.WhichOneof('location') == 'gps_position': gps_location_aware = True
else:
continue
LOGGER.debug('num_disjoint_paths={:s}'.format(str(num_disjoint_paths)))
LOGGER.debug('is_diverse={:s}'.format(str(is_diverse)))

Lluis Gifre Renom
committed
LOGGER.debug('gps_location_aware={:s}'.format(str(gps_location_aware)))
if _service is not None and num_disjoint_paths is None and not is_diverse and gps_location_aware:
LOGGER.debug(' Removing previous service')
tasks_scheduler = TasksScheduler(self.service_handler_factory)
tasks_scheduler.compose_from_service(_service, is_delete=True)
tasks_scheduler.execute_all()
service.CopyFrom(request if _service is None else _service)
if service.service_type == ServiceTypeEnum.SERVICETYPE_UNKNOWN: # pylint: disable=no-member
service.service_type = request.service_type # pylint: disable=no-member
service.service_status.service_status = ServiceStatusEnum.SERVICESTATUS_PLANNED # pylint: disable=no-member
if service.service_type == ServiceTypeEnum.SERVICETYPE_TE:
# TE service:
context_client.SetService(request)
te_service_client = TEServiceClient()
service_status = te_service_client.RequestLSP(service)
if service_status.service_status == ServiceStatusEnum.SERVICESTATUS_ACTIVE:
_service : Optional[Service] = get_service_by_id(
context_client, request.service_id, rw_copy=True,
include_config_rules=False, include_constraints=False, include_endpoint_ids=False)
_service.service_status.service_status = ServiceStatusEnum.SERVICESTATUS_ACTIVE
service_id = context_client.SetService(_service)
return service_id
else:
MSG = 'RequestLSP for Service({:s}) returned ServiceStatus({:s})'
context_uuid = request.service_id.context_id.context_uuid.uuid
service_uuid = request.service_id.service_uuid.uuid
service_key = '{:s}/{:s}'.format(context_uuid, service_uuid)
str_service_status = ServiceStatusEnum.Name(service_status.service_status)
raise Exception(MSG.format(service_key, str_service_status))
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
if service.service_type == ServiceTypeEnum.SERVICETYPE_E2E:
# End-to-End service:
service_id_with_uuids = context_client.SetService(request)
service_with_uuids = get_service_by_id(
context_client, service_id_with_uuids, rw_copy=False,
include_config_rules=True, include_constraints=True, include_endpoint_ids=True)
e2e_orch_request = E2EOrchestratorRequest()
e2e_orch_request.service.CopyFrom(service_with_uuids)
e2e_orch_client = E2EOrchestratorClient()
e2e_orch_reply = e2e_orch_client.Compute(e2e_orch_request)
# Feed TaskScheduler with this end-to-end orchestrator reply. TaskScheduler identifies
# inter-dependencies among the services and connections retrieved and produces a
# schedule of tasks (an ordered list of tasks to be executed) to implement the
# requested create/update operation.
tasks_scheduler = TasksScheduler(self.service_handler_factory)
# e2e_orch_reply should be compatible with pathcomp_reply
# TODO: if we extend e2e_orch_reply, implement method TasksScheduler::compose_from_e2eorchreply()
tasks_scheduler.compose_from_pathcompreply(e2e_orch_reply, is_delete=False)
tasks_scheduler.execute_all()
return service_with_uuids.service_id
del service.service_endpoint_ids[:] # pylint: disable=no-member
for endpoint_id in request.service_endpoint_ids:
service.service_endpoint_ids.add().CopyFrom(endpoint_id) # pylint: disable=no-member
device_list = context_client.ListDevices(Empty())
LOGGER.debug('[before] request={:s}'.format(grpc_message_to_json_string(request)))
for constraint in request.service_constraints:
if constraint.action == ConstraintActionEnum.CONSTRAINTACTION_UNDEFINED:
# Field action is new; assume if not set, it means SET
constraint.action = ConstraintActionEnum.CONSTRAINTACTION_SET
if constraint.action != ConstraintActionEnum.CONSTRAINTACTION_SET: continue
if constraint.WhichOneof('constraint') != 'endpoint_location': continue
if constraint.endpoint_location.HasField('endpoint_id'): continue
service_location = constraint.endpoint_location.location
distances = {}
for device in device_list.devices:
for endpoint in device.device_endpoints:
if not endpoint.endpoint_location.HasField('gps_position'): continue
distance = gps_distance(service_location.gps_position, endpoint.endpoint_location.gps_position)
distances[distance] = endpoint.endpoint_id
closer_endpoint_id = distances[min(distances)]
constraint.endpoint_location.endpoint_id.CopyFrom(closer_endpoint_id)
service_endpoint_ids = [
endpoint_id.endpoint_uuid
for endpoint_id in service.service_endpoint_ids
]
if closer_endpoint_id not in service_endpoint_ids:
service.service_endpoint_ids.append(closer_endpoint_id)
LOGGER.debug('[after] request={:s}'.format(grpc_message_to_json_string(request)))
LOGGER.debug('[after] service={:s}'.format(grpc_message_to_json_string(service)))
del service.service_constraints[:] # pylint: disable=no-member
for constraint in request.service_constraints:
service.service_constraints.add().CopyFrom(constraint) # pylint: disable=no-member
del service.service_config.config_rules[:] # pylint: disable=no-member
for config_rule in request.service_config.config_rules:
service.service_config.config_rules.add().CopyFrom(config_rule) # pylint: disable=no-member
service_id_with_uuids = context_client.SetService(service)
# PathComp requires endpoints, constraints and config rules
service_with_uuids = get_service_by_id(
context_client, service_id_with_uuids, rw_copy=False,
include_config_rules=True, include_constraints=True, include_endpoint_ids=True)
num_disjoint_paths = 1 if num_disjoint_paths is None or num_disjoint_paths == 0 else num_disjoint_paths
num_expected_endpoints = num_disjoint_paths * 2
tasks_scheduler = TasksScheduler(self.service_handler_factory)
if service.service_type == ServiceTypeEnum.SERVICETYPE_OPTICAL_CONNECTIVITY:
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
context_id_x = json_context_id(DEFAULT_CONTEXT_NAME)
topology_id_x = json_topology_id(
DEFAULT_TOPOLOGY_NAME, context_id_x)
topology_details = context_client.GetTopologyDetails(
TopologyId(**topology_id_x))
# devices = get_devices_in_topology(context_client, TopologyId(**topology_id_x), ContextId(**context_id_x))
devices = topology_details.devices
context_uuid_x = topology_details.topology_id.context_id.context_uuid.uuid
topology_uuid_x = topology_details.topology_id.topology_uuid.uuid
devs = []
ports = []
for endpoint_id in service.service_endpoint_ids:
devs.append(endpoint_id.device_id.device_uuid.uuid)
ports.append(endpoint_id.endpoint_uuid.uuid)
src = devs[0]
dst = devs[1]
bidir = None
ob_band = None
bitrate = 100
for constraint in service.service_constraints:
if "bandwidth" in constraint.custom.constraint_type:
bitrate = int(float(constraint.custom.constraint_value))
elif "bidirectionality" in constraint.custom.constraint_type:
bidir = int(constraint.custom.constraint_value)
elif "optical-band-width" in constraint.custom.constraint_type:
ob_band = int(constraint.custom.constraint_value)
# to get the reply form the optical module
reply_txt = add_lightpath(src, dst, bitrate, bidir, ob_band)
# reply with 2 transponders and 2 roadms
reply_json = json.loads(reply_txt)
optical_band_txt = ""
if "new_optical_band" in reply_json.keys():
if reply_json["new_optical_band"] == 1:
if reply_json["parent_opt_band"]:
if "parent_opt_band" in reply_json.keys():
parent_ob = reply_json["parent_opt_band"]
LOGGER.debug('Parent optical-band={}'.format(parent_ob))
optical_band_txt = get_optical_band(parent_ob)
LOGGER.info('optical-band details={}'.format(optical_band_txt))
else:
LOGGER.debug('expected optical band not found')
else:
else:
LOGGER.debug('Using existing optical band')
else:
LOGGER.debug('Using existing optical band')
if reply_txt is not None:
optical_reply = adapt_reply(
devices, _service, reply_json, context_uuid_x, topology_uuid_x, optical_band_txt
)
LOGGER.info('optical_reply={:s}'.format(
grpc_message_to_json_string(optical_reply)))
tasks_scheduler.compose_from_pathcompreply(
optical_reply, is_delete=False)
else:
if len(service_with_uuids.service_endpoint_ids) >= num_expected_endpoints:
pathcomp_request = PathCompRequest()
pathcomp_request.services.append(service_with_uuids) # pylint: disable=no-member
if num_disjoint_paths is None or num_disjoint_paths in {0, 1} :
pathcomp_request.shortest_path.Clear() # pylint: disable=no-member
else:
pathcomp_request.k_disjoint_path.num_disjoint = num_disjoint_paths # pylint: disable=no-member
pathcomp = PathCompClient()
pathcomp_reply = pathcomp.Compute(pathcomp_request)
pathcomp.close()
# Feed TaskScheduler with this path computation reply. TaskScheduler identifies inter-dependencies among
# the services and connections retrieved and produces a schedule of tasks (an ordered list of tasks to be
# executed) to implement the requested create/update operation.
tasks_scheduler.compose_from_pathcompreply(pathcomp_reply, is_delete=False)
def DeleteService(self, request : ServiceId, context : grpc.ServicerContext) -> Empty:

Lluis Gifre Renom
committed
# Set service status to "SERVICESTATUS_PENDING_REMOVAL" to ensure rest of components are aware the service is
# being modified.
service : Optional[Service] = get_service_by_id(context_client, request, rw_copy=True)
if service is None: raise Exception('Service({:s}) not found'.format(grpc_message_to_json_string(request)))
service.service_status.service_status = ServiceStatusEnum.SERVICESTATUS_PENDING_REMOVAL
context_client.SetService(service)
if service.service_type == ServiceTypeEnum.SERVICETYPE_TE:
# TE service
te_service_client = TEServiceClient()
te_service_client.DeleteLSP(request)
context_client.RemoveService(request)
if service.service_type == ServiceTypeEnum.SERVICETYPE_OPTICAL_CONNECTIVITY:
devs = []
context_id_x = json_context_id(DEFAULT_CONTEXT_NAME)
topology_id_x = json_topology_id(
DEFAULT_TOPOLOGY_NAME, context_id_x)
topology_details = context_client.GetTopologyDetails(
TopologyId(**topology_id_x))
devices = topology_details.devices
for endpoint_id in service.service_endpoint_ids:
devs.append(endpoint_id.device_id.device_uuid.uuid)
src = get_device_name_from_uuid(devices, devs[0])
dst = get_device_name_from_uuid(devices, devs[1])
bitrate = int(
float(service.service_constraints[0].custom.constraint_value))
if len(service.service_config.config_rules) > 0:
c_rules_dict = json.loads(
service.service_config.config_rules[0].custom.resource_value)
reply = delete_lightpath(flow_id, src, dst, bitrate)
# Normal service
# Feed TaskScheduler with this service and the sub-services and sub-connections related to this service.
# TaskScheduler identifies inter-dependencies among them and produces a schedule of tasks (an ordered list of
# tasks to be executed) to implement the requested delete operation.
tasks_scheduler = TasksScheduler(self.service_handler_factory)
tasks_scheduler.compose_from_service(service, is_delete=True)
tasks_scheduler.execute_all()

Lluis Gifre Renom
committed
return Empty()
@safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
def RecomputeConnections(self, request : Service, context : grpc.ServicerContext) -> Empty:
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
if len(request.service_endpoint_ids) > 0:
raise NotImplementedException('update-endpoints')
if len(request.service_constraints) > 0:
raise NotImplementedException('update-constraints')
if len(request.service_config.config_rules) > 0:
raise NotImplementedException('update-config-rules')
context_client = ContextClient()
updated_service : Optional[Service] = get_service_by_id(
context_client, request.service_id, rw_copy=True,
include_config_rules=False, include_constraints=False, include_endpoint_ids=False)
if updated_service is None:
raise NotFoundException('service', request.service_id.service_uuid.uuid)
# pylint: disable=no-member
if updated_service.service_type == ServiceTypeEnum.SERVICETYPE_UNKNOWN:
raise InvalidArgumentException(
'request.service_type', ServiceTypeEnum.Name(updated_service.service_type)
)
# Set service status to "SERVICESTATUS_UPDATING" to ensure rest of components are aware the service is
# being modified.
# pylint: disable=no-member
updated_service.service_status.service_status = ServiceStatusEnum.SERVICESTATUS_UPDATING
# Update endpoints
# pylint: disable=no-member
#del updated_service.service_endpoint_ids[:]
#updated_service.service_endpoint_ids.extend(request.service_endpoint_ids)
# Update constraints
# pylint: disable=no-member
#del updated_service.service_constraints[:]
#updated_service.service_constraints.extend(request.service_constraints)
# Update config rules
# pylint: disable=no-member
#del updated_service.service_config.config_rules[:]
#updated_service.service_config.config_rules.extend(request.service_config.config_rules)
updated_service_id_with_uuids = context_client.SetService(updated_service)
# PathComp requires endpoints, constraints and config rules
updated_service_with_uuids = get_service_by_id(
context_client, updated_service_id_with_uuids, rw_copy=True,
include_config_rules=True, include_constraints=True, include_endpoint_ids=True)
# Get active connection
connections = context_client.ListConnections(updated_service_id_with_uuids)
if len(connections.connections) == 0:
MSG = 'Service({:s}) has no connections'
str_service_id = grpc_message_to_json_string(updated_service_id_with_uuids)
str_extra_details = MSG.format(str_service_id)
raise NotImplementedException('service-with-no-connections', extra_details=str_extra_details)
if len(connections.connections) > 1:
MSG = 'Service({:s}) has multiple ({:d}) connections({:s})'
str_service_id = grpc_message_to_json_string(updated_service_id_with_uuids)
num_connections = len(connections.connections)
str_connections = grpc_message_to_json_string(connections)
str_extra_details = MSG.format(str_service_id, num_connections, str_connections)
raise NotImplementedException('service-with-multiple-connections', extra_details=str_extra_details)
old_connection = connections.connections[0]
if len(old_connection.sub_service_ids) > 0:
MSG = 'Service({:s})/Connection({:s}) has sub-services: {:s}'
str_service_id = grpc_message_to_json_string(updated_service_id_with_uuids)
str_connection_id = grpc_message_to_json_string(old_connection.connection_id)
str_connection = grpc_message_to_json_string(old_connection)
str_extra_details = MSG.format(str_service_id, str_connection_id, str_connection)
raise NotImplementedException('service-connection-with-subservices', extra_details=str_extra_details)
# Find alternative connections
# pylint: disable=no-member
pathcomp_request = PathCompRequest()
pathcomp_request.services.append(updated_service_with_uuids)
#pathcomp_request.k_disjoint_path.num_disjoint = 100
pathcomp_request.k_shortest_path.k_inspection = 100
pathcomp_request.k_shortest_path.k_return = 3
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
LOGGER.debug('pathcomp_request={:s}'.format(grpc_message_to_json_string(pathcomp_request)))
pathcomp = PathCompClient()
pathcomp_reply = pathcomp.Compute(pathcomp_request)
pathcomp.close()
LOGGER.debug('pathcomp_reply={:s}'.format(grpc_message_to_json_string(pathcomp_reply)))
if len(pathcomp_reply.services) == 0:
MSG = 'KDisjointPath reported no services for Service({:s}): {:s}'
str_service_id = grpc_message_to_json_string(updated_service_id_with_uuids)
str_pathcomp_reply = grpc_message_to_json_string(pathcomp_reply)
str_extra_details = MSG.format(str_service_id, str_pathcomp_reply)
raise NotImplementedException('kdisjointpath-no-services', extra_details=str_extra_details)
if len(pathcomp_reply.services) > 1:
MSG = 'KDisjointPath reported subservices for Service({:s}): {:s}'
str_service_id = grpc_message_to_json_string(updated_service_id_with_uuids)
str_pathcomp_reply = grpc_message_to_json_string(pathcomp_reply)
str_extra_details = MSG.format(str_service_id, str_pathcomp_reply)
raise NotImplementedException('kdisjointpath-subservices', extra_details=str_extra_details)
if len(pathcomp_reply.connections) == 0:
MSG = 'KDisjointPath reported no connections for Service({:s}): {:s}'
str_service_id = grpc_message_to_json_string(updated_service_id_with_uuids)
str_pathcomp_reply = grpc_message_to_json_string(pathcomp_reply)
str_extra_details = MSG.format(str_service_id, str_pathcomp_reply)
raise NotImplementedException('kdisjointpath-no-connections', extra_details=str_extra_details)
# compute a string representing the old connection
str_old_connection = connection_to_string(old_connection)
LOGGER.debug('old_connection={:s}'.format(grpc_message_to_json_string(old_connection)))
for candidate_new_connection in pathcomp_reply.connections:
str_candidate_new_connection = connection_to_string(candidate_new_connection)
if str_candidate_new_connection == str_old_connection: continue
candidate_new_connections.append(candidate_new_connection)
if len(candidate_new_connections) == 0:
MSG = 'Unable to find a new suitable path: pathcomp_request={:s} pathcomp_reply={:s} old_connection={:s}'
str_pathcomp_request = grpc_message_to_json_string(pathcomp_request)
str_pathcomp_reply = grpc_message_to_json_string(pathcomp_reply)
str_old_connection = grpc_message_to_json_string(old_connection)
extra_details = MSG.format(str_pathcomp_request, str_pathcomp_reply, str_old_connection)
raise OperationFailedException('no-new-path-found', extra_details=extra_details)
str_candidate_new_connections = [
grpc_message_to_json_string(candidate_new_connection)
for candidate_new_connection in candidate_new_connections
]
LOGGER.debug('candidate_new_connections={:s}'.format(str(str_candidate_new_connections)))
new_connection = random.choice(candidate_new_connections)
LOGGER.debug('new_connection={:s}'.format(grpc_message_to_json_string(new_connection)))
# Change UUID of new connection to prevent collisions
tmp_connection = Connection()
tmp_connection.CopyFrom(new_connection)
tmp_connection.connection_id.connection_uuid.uuid = str(uuid.uuid4())
new_connection = tmp_connection
# Feed TaskScheduler with the service to update, the old connection to
# deconfigure and the new connection to configure. It will produce a
# schedule of tasks (an ordered list of tasks to be executed) to
# implement the requested changes.
tasks_scheduler = TasksScheduler(self.service_handler_factory)
tasks_scheduler.compose_service_connection_update(
updated_service_with_uuids, old_connection, new_connection)
tasks_scheduler.execute_all()