-
Lluis Gifre Renom authoredLluis Gifre Renom authored
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
ServiceServiceServicerImpl.py 19.82 KiB
# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import copy, grpc, json, logging, random, uuid
from typing import Optional
from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_method
from common.method_wrappers.ServiceExceptions import (
AlreadyExistsException, InvalidArgumentException, NotFoundException, NotImplementedException,
OperationFailedException)
from common.proto.context_pb2 import Connection, Empty, Service, ServiceId, ServiceStatusEnum, ServiceTypeEnum, ConstraintActionEnum
from common.proto.pathcomp_pb2 import PathCompRequest
from common.proto.service_pb2_grpc import ServiceServiceServicer
from common.tools.context_queries.Service import get_service_by_id
from common.tools.grpc.Tools import grpc_message_to_json, grpc_message_to_json_string
from context.client.ContextClient import ContextClient
from pathcomp.frontend.client.PathCompClient import PathCompClient
from service.service.tools.ConnectionToString import connection_to_string
from .service_handler_api.ServiceHandlerFactory import ServiceHandlerFactory
from .task_scheduler.TaskScheduler import TasksScheduler
from .tools.GeodesicDistance import gps_distance
LOGGER = logging.getLogger(__name__)
METRICS_POOL = MetricsPool('Service', 'RPC')
class ServiceServiceServicerImpl(ServiceServiceServicer):
def __init__(self, service_handler_factory : ServiceHandlerFactory) -> None:
LOGGER.debug('Creating Servicer...')
self.service_handler_factory = service_handler_factory
LOGGER.debug('Servicer Created')
@safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
def CreateService(self, request : Service, context : grpc.ServicerContext) -> ServiceId:
if len(request.service_endpoint_ids) > 0:
unexpected_endpoints = []
for service_endpoint_id in request.service_endpoint_ids:
unexpected_endpoints.append(grpc_message_to_json(service_endpoint_id))
str_unexpected_endpoints = json.dumps(unexpected_endpoints, sort_keys=True)
raise InvalidArgumentException(
'service.service_endpoint_ids', str_unexpected_endpoints,
extra_details='RPC method CreateService does not accept Endpoints. '\
'Endpoints should be configured after creating the service.')
if len(request.service_constraints) > 0:
unexpected_constraints = []
for service_constraint in request.service_constraints:
unexpected_constraints.append(grpc_message_to_json(service_constraint))
str_unexpected_constraints = json.dumps(unexpected_constraints, sort_keys=True)
raise InvalidArgumentException(
'service.service_constraints', str_unexpected_constraints,
extra_details='RPC method CreateService does not accept Constraints. '\
'Constraints should be configured after creating the service.')
if len(request.service_config.config_rules) > 0:
unexpected_config_rules = grpc_message_to_json(request.service_config)
unexpected_config_rules = unexpected_config_rules['config_rules']
str_unexpected_config_rules = json.dumps(unexpected_config_rules, sort_keys=True)
raise InvalidArgumentException(
'service.service_config.config_rules', str_unexpected_config_rules,
extra_details='RPC method CreateService does not accept Config Rules. '\
'Config Rules should be configured after creating the service.')
# check that service does not exist
context_client = ContextClient()
current_service = get_service_by_id(
context_client, request.service_id, rw_copy=False,
include_config_rules=False, include_constraints=False, include_endpoint_ids=False)
if current_service is not None:
context_uuid = request.service_id.context_id.context_uuid.uuid
service_uuid = request.service_id.service_uuid.uuid
raise AlreadyExistsException(
'Service', service_uuid, extra_details='context_uuid={:s}'.format(str(context_uuid)))
# just create the service in the Context database to lock the service_id
# update will perform changes on the resources
service_id = context_client.SetService(request)
return service_id
@safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
def UpdateService(self, request : Service, context : grpc.ServicerContext) -> ServiceId:
# Set service status to "SERVICESTATUS_PLANNED" to ensure rest of components are aware the service is
# being modified.
context_client = ContextClient()
_service : Optional[Service] = get_service_by_id(
context_client, request.service_id, rw_copy=False,
include_config_rules=False, include_constraints=False, include_endpoint_ids=False)
service = Service()
service.CopyFrom(request if _service is None else _service)
if service.service_type == ServiceTypeEnum.SERVICETYPE_UNKNOWN: # pylint: disable=no-member
service.service_type = request.service_type # pylint: disable=no-member
service.service_status.service_status = ServiceStatusEnum.SERVICESTATUS_PLANNED # pylint: disable=no-member
del service.service_endpoint_ids[:] # pylint: disable=no-member
for endpoint_id in request.service_endpoint_ids:
service.service_endpoint_ids.add().CopyFrom(endpoint_id) # pylint: disable=no-member
LOGGER.warning('[before] request={:s}'.format(grpc_message_to_json_string(request)))
for constraint in request.service_constraints:
if constraint.action != ConstraintActionEnum.CONSTRAINTACTION_SET: continue
if constraint.WhichOneof('constraint') != 'endpoint_location': continue
if constraint.endpoint_location.HasField('endpoint_id'): continue
device_list = context_client.ListDevices(Empty())
service_location = constraint.endpoint_location.location
distances = {}
for device in device_list.devices:
for endpoint in device.device_endpoints:
if not endpoint.endpoint_location.HasField('gps_position'): continue
distance = gps_distance(service_location.gps_position, endpoint.endpoint_location.gps_position)
distances[distance] = endpoint.endpoint_id
closer_endpoint_id = distances[min(distances)]
constraint.endpoint_location.endpoint_id.CopyFrom(closer_endpoint_id)
service_endpoint_ids = [
endpoint.endpoint_id.endpoint_uuid
for endpoint in service.service_endpoint_ids
]
if closer_endpoint_id not in service_endpoint_ids:
service.service_endpoint_ids.append(closer_endpoint_id)
LOGGER.warning('[after] request={:s}'.format(grpc_message_to_json_string(request)))
LOGGER.warning('[after] service={:s}'.format(grpc_message_to_json_string(service)))
del service.service_constraints[:] # pylint: disable=no-member
for constraint in request.service_constraints:
service.service_constraints.add().CopyFrom(constraint) # pylint: disable=no-member
del service.service_config.config_rules[:] # pylint: disable=no-member
for config_rule in request.service_config.config_rules:
service.service_config.config_rules.add().CopyFrom(config_rule) # pylint: disable=no-member
service_id_with_uuids = context_client.SetService(service)
# PathComp requires endpoints, constraints and config rules
service_with_uuids = get_service_by_id(
context_client, service_id_with_uuids, rw_copy=False,
include_config_rules=True, include_constraints=True, include_endpoint_ids=True)
num_disjoint_paths = 0
for constraint in request.service_constraints:
if constraint.WhichOneof('constraint') == 'sla_availability':
num_disjoint_paths = constraint.sla_availability.num_disjoint_paths
break
num_disjoint_paths = 1 if num_disjoint_paths is None or num_disjoint_paths == 0 else num_disjoint_paths
num_expected_endpoints = num_disjoint_paths * 2
tasks_scheduler = TasksScheduler(self.service_handler_factory)
if len(service_with_uuids.service_endpoint_ids) >= num_expected_endpoints:
pathcomp_request = PathCompRequest()
pathcomp_request.services.append(service_with_uuids) # pylint: disable=no-member
if num_disjoint_paths is None or num_disjoint_paths in {0, 1}:
pathcomp_request.shortest_path.Clear() # pylint: disable=no-member
else:
pathcomp_request.k_disjoint_path.num_disjoint = num_disjoint_paths # pylint: disable=no-member
LOGGER.debug('pathcomp_request={:s}'.format(grpc_message_to_json_string(pathcomp_request)))
pathcomp = PathCompClient()
pathcomp_reply = pathcomp.Compute(pathcomp_request)
pathcomp.close()
LOGGER.debug('pathcomp_reply={:s}'.format(grpc_message_to_json_string(pathcomp_reply)))
# Feed TaskScheduler with this path computation reply. TaskScheduler identifies inter-dependencies among
# the services and connections retrieved and produces a schedule of tasks (an ordered list of tasks to be
# executed) to implement the requested create/update operation.
tasks_scheduler.compose_from_pathcompreply(pathcomp_reply, is_delete=False)
tasks_scheduler.execute_all()
return service_with_uuids.service_id
@safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
def DeleteService(self, request : ServiceId, context : grpc.ServicerContext) -> Empty:
context_client = ContextClient()
# Set service status to "SERVICESTATUS_PENDING_REMOVAL" to ensure rest of components are aware the service is
# being modified.
service : Optional[Service] = get_service_by_id(context_client, request, rw_copy=True)
if service is None: raise Exception('Service({:s}) not found'.format(grpc_message_to_json_string(request)))
# pylint: disable=no-member
service.service_status.service_status = ServiceStatusEnum.SERVICESTATUS_PENDING_REMOVAL
context_client.SetService(service)
# Feed TaskScheduler with this service and the sub-services and sub-connections related to this service.
# TaskScheduler identifies inter-dependencies among them and produces a schedule of tasks (an ordered list of
# tasks to be executed) to implement the requested delete operation.
tasks_scheduler = TasksScheduler(self.service_handler_factory)
tasks_scheduler.compose_from_service(service, is_delete=True)
tasks_scheduler.execute_all()
return Empty()
@safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
def RecomputeConnections(self, request : Service, context : grpc.ServicerContext) -> Empty:
if len(request.service_endpoint_ids) > 0:
raise NotImplementedException('update-endpoints')
if len(request.service_constraints) > 0:
raise NotImplementedException('update-constraints')
if len(request.service_config.config_rules) > 0:
raise NotImplementedException('update-config-rules')
context_client = ContextClient()
updated_service : Optional[Service] = get_service_by_id(
context_client, request.service_id, rw_copy=True,
include_config_rules=False, include_constraints=False, include_endpoint_ids=False)
if updated_service is None:
raise NotFoundException('service', request.service_id.service_uuid.uuid)
# pylint: disable=no-member
if updated_service.service_type == ServiceTypeEnum.SERVICETYPE_UNKNOWN:
raise InvalidArgumentException(
'request.service_type', ServiceTypeEnum.Name(updated_service.service_type)
)
# Set service status to "SERVICESTATUS_UPDATING" to ensure rest of components are aware the service is
# being modified.
# pylint: disable=no-member
updated_service.service_status.service_status = ServiceStatusEnum.SERVICESTATUS_UPDATING
# Update endpoints
# pylint: disable=no-member
#del updated_service.service_endpoint_ids[:]
#updated_service.service_endpoint_ids.extend(request.service_endpoint_ids)
# Update constraints
# pylint: disable=no-member
#del updated_service.service_constraints[:]
#updated_service.service_constraints.extend(request.service_constraints)
# Update config rules
# pylint: disable=no-member
#del updated_service.service_config.config_rules[:]
#updated_service.service_config.config_rules.extend(request.service_config.config_rules)
updated_service_id_with_uuids = context_client.SetService(updated_service)
# PathComp requires endpoints, constraints and config rules
updated_service_with_uuids = get_service_by_id(
context_client, updated_service_id_with_uuids, rw_copy=True,
include_config_rules=True, include_constraints=True, include_endpoint_ids=True)
# Get active connection
connections = context_client.ListConnections(updated_service_id_with_uuids)
if len(connections.connections) == 0:
MSG = 'Service({:s}) has no connections'
str_service_id = grpc_message_to_json_string(updated_service_id_with_uuids)
str_extra_details = MSG.format(str_service_id)
raise NotImplementedException('service-with-no-connections', extra_details=str_extra_details)
if len(connections.connections) > 1:
MSG = 'Service({:s}) has multiple ({:d}) connections({:s})'
str_service_id = grpc_message_to_json_string(updated_service_id_with_uuids)
num_connections = len(connections.connections)
str_connections = grpc_message_to_json_string(connections)
str_extra_details = MSG.format(str_service_id, num_connections, str_connections)
raise NotImplementedException('service-with-multiple-connections', extra_details=str_extra_details)
old_connection = connections.connections[0]
if len(old_connection.sub_service_ids) > 0:
MSG = 'Service({:s})/Connection({:s}) has sub-services: {:s}'
str_service_id = grpc_message_to_json_string(updated_service_id_with_uuids)
str_connection_id = grpc_message_to_json_string(old_connection.connection_id)
str_connection = grpc_message_to_json_string(old_connection)
str_extra_details = MSG.format(str_service_id, str_connection_id, str_connection)
raise NotImplementedException('service-connection-with-subservices', extra_details=str_extra_details)
# Find alternative connections
# pylint: disable=no-member
pathcomp_request = PathCompRequest()
pathcomp_request.services.append(updated_service_with_uuids)
#pathcomp_request.k_disjoint_path.num_disjoint = 100
pathcomp_request.k_shortest_path.k_inspection = 100
pathcomp_request.k_shortest_path.k_return = 3
LOGGER.debug('pathcomp_request={:s}'.format(grpc_message_to_json_string(pathcomp_request)))
pathcomp = PathCompClient()
pathcomp_reply = pathcomp.Compute(pathcomp_request)
pathcomp.close()
LOGGER.debug('pathcomp_reply={:s}'.format(grpc_message_to_json_string(pathcomp_reply)))
if len(pathcomp_reply.services) == 0:
MSG = 'KDisjointPath reported no services for Service({:s}): {:s}'
str_service_id = grpc_message_to_json_string(updated_service_id_with_uuids)
str_pathcomp_reply = grpc_message_to_json_string(pathcomp_reply)
str_extra_details = MSG.format(str_service_id, str_pathcomp_reply)
raise NotImplementedException('kdisjointpath-no-services', extra_details=str_extra_details)
if len(pathcomp_reply.services) > 1:
MSG = 'KDisjointPath reported subservices for Service({:s}): {:s}'
str_service_id = grpc_message_to_json_string(updated_service_id_with_uuids)
str_pathcomp_reply = grpc_message_to_json_string(pathcomp_reply)
str_extra_details = MSG.format(str_service_id, str_pathcomp_reply)
raise NotImplementedException('kdisjointpath-subservices', extra_details=str_extra_details)
if len(pathcomp_reply.connections) == 0:
MSG = 'KDisjointPath reported no connections for Service({:s}): {:s}'
str_service_id = grpc_message_to_json_string(updated_service_id_with_uuids)
str_pathcomp_reply = grpc_message_to_json_string(pathcomp_reply)
str_extra_details = MSG.format(str_service_id, str_pathcomp_reply)
raise NotImplementedException('kdisjointpath-no-connections', extra_details=str_extra_details)
# compute a string representing the old connection
str_old_connection = connection_to_string(old_connection)
LOGGER.debug('old_connection={:s}'.format(grpc_message_to_json_string(old_connection)))
candidate_new_connections = list()
for candidate_new_connection in pathcomp_reply.connections:
str_candidate_new_connection = connection_to_string(candidate_new_connection)
if str_candidate_new_connection == str_old_connection: continue
candidate_new_connections.append(candidate_new_connection)
if len(candidate_new_connections) == 0:
MSG = 'Unable to find a new suitable path: pathcomp_request={:s} pathcomp_reply={:s} old_connection={:s}'
str_pathcomp_request = grpc_message_to_json_string(pathcomp_request)
str_pathcomp_reply = grpc_message_to_json_string(pathcomp_reply)
str_old_connection = grpc_message_to_json_string(old_connection)
extra_details = MSG.format(str_pathcomp_request, str_pathcomp_reply, str_old_connection)
raise OperationFailedException('no-new-path-found', extra_details=extra_details)
str_candidate_new_connections = [
grpc_message_to_json_string(candidate_new_connection)
for candidate_new_connection in candidate_new_connections
]
LOGGER.debug('candidate_new_connections={:s}'.format(str(str_candidate_new_connections)))
new_connection = random.choice(candidate_new_connections)
LOGGER.debug('new_connection={:s}'.format(grpc_message_to_json_string(new_connection)))
# Change UUID of new connection to prevent collisions
tmp_connection = Connection()
tmp_connection.CopyFrom(new_connection)
tmp_connection.connection_id.connection_uuid.uuid = str(uuid.uuid4())
new_connection = tmp_connection
# Feed TaskScheduler with the service to update, the old connection to
# deconfigure and the new connection to configure. It will produce a
# schedule of tasks (an ordered list of tasks to be executed) to
# implement the requested changes.
tasks_scheduler = TasksScheduler(self.service_handler_factory)
tasks_scheduler.compose_service_connection_update(
updated_service_with_uuids, old_connection, new_connection)
tasks_scheduler.execute_all()
return Empty()