# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import grpc, json, logging from typing import Optional from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_method from common.method_wrappers.ServiceExceptions import AlreadyExistsException, InvalidArgumentException from common.proto.context_pb2 import Empty, Service, ServiceId, ServiceStatusEnum, ServiceTypeEnum from common.proto.pathcomp_pb2 import PathCompRequest from common.proto.service_pb2_grpc import ServiceServiceServicer from common.tools.grpc.Tools import grpc_message_to_json, grpc_message_to_json_string from context.client.ContextClient import ContextClient from pathcomp.frontend.client.PathCompClient import PathCompClient from .service_handler_api.ServiceHandlerFactory import ServiceHandlerFactory from .task_scheduler.TaskScheduler import TasksScheduler from .tools.ContextGetters import get_service LOGGER = logging.getLogger(__name__) METRICS_POOL = MetricsPool('Service', 'RPC') class ServiceServiceServicerImpl(ServiceServiceServicer): def __init__(self, service_handler_factory : ServiceHandlerFactory) -> None: LOGGER.debug('Creating Servicer...') self.service_handler_factory = service_handler_factory LOGGER.debug('Servicer Created') @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def CreateService(self, request : Service, context : grpc.ServicerContext) -> ServiceId: LOGGER.info('[CreateService] begin ; request = {:s}'.format(grpc_message_to_json_string(request))) if len(request.service_endpoint_ids) > 0: unexpected_endpoints = [] for service_endpoint_id in request.service_endpoint_ids: unexpected_endpoints.append(grpc_message_to_json(service_endpoint_id)) str_unexpected_endpoints = json.dumps(unexpected_endpoints, sort_keys=True) raise InvalidArgumentException( 'service.service_endpoint_ids', str_unexpected_endpoints, extra_details='RPC method CreateService does not accept Endpoints. '\ 'Endpoints should be configured after creating the service.') if len(request.service_constraints) > 0: unexpected_constraints = [] for service_constraint in request.service_constraints: unexpected_constraints.append(grpc_message_to_json(service_constraint)) str_unexpected_constraints = json.dumps(unexpected_constraints, sort_keys=True) raise InvalidArgumentException( 'service.service_constraints', str_unexpected_constraints, extra_details='RPC method CreateService does not accept Constraints. '\ 'Constraints should be configured after creating the service.') if len(request.service_config.config_rules) > 0: unexpected_config_rules = grpc_message_to_json(request.service_config) unexpected_config_rules = unexpected_config_rules['config_rules'] str_unexpected_config_rules = json.dumps(unexpected_config_rules, sort_keys=True) raise InvalidArgumentException( 'service.service_config.config_rules', str_unexpected_config_rules, extra_details='RPC method CreateService does not accept Config Rules. '\ 'Config Rules should be configured after creating the service.') # check that service does not exist context_client = ContextClient() current_service = get_service(context_client, request.service_id) if current_service is not None: context_uuid = request.service_id.context_id.context_uuid.uuid service_uuid = request.service_id.service_uuid.uuid raise AlreadyExistsException( 'Service', service_uuid, extra_details='context_uuid={:s}'.format(str(context_uuid))) # just create the service in the Context database to lock the service_id # update will perform changes on the resources service_id = context_client.SetService(request) return service_id @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def UpdateService(self, request : Service, context : grpc.ServicerContext) -> ServiceId: LOGGER.info('[UpdateService] begin ; request = {:s}'.format(grpc_message_to_json_string(request))) # Set service status to "SERVICESTATUS_PLANNED" to ensure rest of components are aware the service is # being modified. context_client = ContextClient() _service : Optional[Service] = get_service(context_client, request.service_id) service = Service() service.CopyFrom(request if _service is None else _service) if service.service_type == ServiceTypeEnum.SERVICETYPE_UNKNOWN: # pylint: disable=no-member service.service_type = request.service_type # pylint: disable=no-member service.service_status.service_status = ServiceStatusEnum.SERVICESTATUS_PLANNED # pylint: disable=no-member del service.service_endpoint_ids[:] # pylint: disable=no-member for endpoint_id in request.service_endpoint_ids: service.service_endpoint_ids.add().CopyFrom(endpoint_id) # pylint: disable=no-member del service.service_constraints[:] # pylint: disable=no-member for constraint in request.service_constraints: service.service_constraints.add().CopyFrom(constraint) # pylint: disable=no-member del service.service_config.config_rules[:] # pylint: disable=no-member for config_rule in request.service_config.config_rules: service.service_config.config_rules.add().CopyFrom(config_rule) # pylint: disable=no-member service_id_with_uuids = context_client.SetService(service) service_with_uuids = context_client.GetService(service_id_with_uuids) num_disjoint_paths = None for constraint in request.service_constraints: if constraint.WhichOneof('constraint') == 'sla_availability': num_disjoint_paths = constraint.sla_availability.num_disjoint_paths break tasks_scheduler = TasksScheduler(self.service_handler_factory) if len(service_with_uuids.service_endpoint_ids) >= (2 if num_disjoint_paths is None else 4): pathcomp_request = PathCompRequest() pathcomp_request.services.append(service_with_uuids) # pylint: disable=no-member if num_disjoint_paths is None: pathcomp_request.shortest_path.Clear() # pylint: disable=no-member else: pathcomp_request.k_disjoint_path.num_disjoint = num_disjoint_paths # pylint: disable=no-member LOGGER.info('pathcomp_request={:s}'.format(grpc_message_to_json_string(pathcomp_request))) pathcomp = PathCompClient() pathcomp_reply = pathcomp.Compute(pathcomp_request) pathcomp.close() LOGGER.info('pathcomp_reply={:s}'.format(grpc_message_to_json_string(pathcomp_reply))) # Feed TaskScheduler with this path computation reply. TaskScheduler identifies inter-dependencies among # the services and connections retrieved and produces a schedule of tasks (an ordered list of tasks to be # executed) to implement the requested create/update operation. tasks_scheduler.compose_from_pathcompreply(pathcomp_reply, is_delete=False) tasks_scheduler.execute_all() return service_with_uuids.service_id @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def DeleteService(self, request : ServiceId, context : grpc.ServicerContext) -> Empty: LOGGER.info('[DeleteService] begin ; request = {:s}'.format(grpc_message_to_json_string(request))) context_client = ContextClient() # Set service status to "SERVICESTATUS_PENDING_REMOVAL" to ensure rest of components are aware the service is # being modified. _service : Optional[Service] = get_service(context_client, request) if _service is None: raise Exception('Service({:s}) not found'.format(grpc_message_to_json_string(request))) service = Service() service.CopyFrom(_service) # pylint: disable=no-member service.service_status.service_status = ServiceStatusEnum.SERVICESTATUS_PENDING_REMOVAL context_client.SetService(service) # Feed TaskScheduler with this service and the sub-services and sub-connections related to this service. # TaskScheduler identifies inter-dependencies among them and produces a schedule of tasks (an ordered list of # tasks to be executed) to implement the requested delete operation. tasks_scheduler = TasksScheduler(self.service_handler_factory) tasks_scheduler.compose_from_service(service, is_delete=True) tasks_scheduler.execute_all() return Empty()