Skip to content
Snippets Groups Projects
ServiceServiceServicerImpl.py 9.36 KiB
Newer Older
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
import grpc, json, logging
from typing import Optional
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_method
from common.method_wrappers.ServiceExceptions import AlreadyExistsException, InvalidArgumentException
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from common.proto.context_pb2 import Empty, Service, ServiceId, ServiceStatusEnum, ServiceTypeEnum
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from common.proto.pathcomp_pb2 import PathCompRequest
from common.proto.service_pb2_grpc import ServiceServiceServicer
from common.tools.grpc.Tools import grpc_message_to_json, grpc_message_to_json_string
from context.client.ContextClient import ContextClient
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from pathcomp.frontend.client.PathCompClient import PathCompClient
from .service_handler_api.ServiceHandlerFactory import ServiceHandlerFactory
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from .task_scheduler.TaskScheduler import TasksScheduler
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from .tools.ContextGetters import get_service
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

LOGGER = logging.getLogger(__name__)

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
METRICS_POOL = MetricsPool('Service', 'RPC')
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

class ServiceServiceServicerImpl(ServiceServiceServicer):
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    def __init__(self, service_handler_factory : ServiceHandlerFactory) -> None:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        LOGGER.debug('Creating Servicer...')
        self.service_handler_factory = service_handler_factory
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        LOGGER.debug('Servicer Created')

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    def CreateService(self, request : Service, context : grpc.ServicerContext) -> ServiceId:
        LOGGER.info('[CreateService] begin ; request = {:s}'.format(grpc_message_to_json_string(request)))

        if len(request.service_endpoint_ids) > 0:
            unexpected_endpoints = []
            for service_endpoint_id in request.service_endpoint_ids:
                unexpected_endpoints.append(grpc_message_to_json(service_endpoint_id))
            str_unexpected_endpoints = json.dumps(unexpected_endpoints, sort_keys=True)
            raise InvalidArgumentException(
                'service.service_endpoint_ids', str_unexpected_endpoints,
                extra_details='RPC method CreateService does not accept Endpoints. '\
                              'Endpoints should be configured after creating the service.')

        if len(request.service_constraints) > 0:
            unexpected_constraints = []
            for service_constraint in request.service_constraints:
                unexpected_constraints.append(grpc_message_to_json(service_constraint))
            str_unexpected_constraints = json.dumps(unexpected_constraints, sort_keys=True)
            raise InvalidArgumentException(
                'service.service_constraints', str_unexpected_constraints,
                extra_details='RPC method CreateService does not accept Constraints. '\
                              'Constraints should be configured after creating the service.')

        if len(request.service_config.config_rules) > 0:
            unexpected_config_rules = grpc_message_to_json(request.service_config)
            unexpected_config_rules = unexpected_config_rules['config_rules']
            str_unexpected_config_rules = json.dumps(unexpected_config_rules, sort_keys=True)
            raise InvalidArgumentException(
                'service.service_config.config_rules', str_unexpected_config_rules,
                extra_details='RPC method CreateService does not accept Config Rules. '\
                              'Config Rules should be configured after creating the service.')

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        # check that service does not exist
        context_client = ContextClient()
        current_service = get_service(context_client, request.service_id)
        if current_service is not None:
            context_uuid = request.service_id.context_id.context_uuid.uuid
            service_uuid = request.service_id.service_uuid.uuid
            raise AlreadyExistsException(
                'Service', service_uuid, extra_details='context_uuid={:s}'.format(str(context_uuid)))
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        # just create the service in the Context database to lock the service_id
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        # update will perform changes on the resources
        service_id = context_client.SetService(request)
        return service_id
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    def UpdateService(self, request : Service, context : grpc.ServicerContext) -> ServiceId:
        LOGGER.info('[UpdateService] begin ; request = {:s}'.format(grpc_message_to_json_string(request)))

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        # Set service status to "SERVICESTATUS_PLANNED" to ensure rest of components are aware the service is
        # being modified.
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        context_client = ContextClient()
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        _service : Optional[Service] = get_service(context_client, request.service_id)
        service = Service()
        service.CopyFrom(request if _service is None else _service)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        if service.service_type == ServiceTypeEnum.SERVICETYPE_UNKNOWN:                     # pylint: disable=no-member
            service.service_type = request.service_type                                     # pylint: disable=no-member
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        service.service_status.service_status = ServiceStatusEnum.SERVICESTATUS_PLANNED     # pylint: disable=no-member

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        del service.service_endpoint_ids[:] # pylint: disable=no-member
        for endpoint_id in request.service_endpoint_ids:
            service.service_endpoint_ids.add().CopyFrom(endpoint_id)    # pylint: disable=no-member
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        del service.service_constraints[:]  # pylint: disable=no-member
        for constraint in request.service_constraints:
            service.service_constraints.add().CopyFrom(constraint)  # pylint: disable=no-member

        del service.service_config.config_rules[:]  # pylint: disable=no-member
        for config_rule in request.service_config.config_rules:
            service.service_config.config_rules.add().CopyFrom(config_rule) # pylint: disable=no-member
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

        service_id_with_uuids = context_client.SetService(service)
        service_with_uuids = context_client.GetService(service_id_with_uuids)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

        num_disjoint_paths = None
        for constraint in request.service_constraints:
            if constraint.WhichOneof('constraint') == 'sla_availability':
                num_disjoint_paths = constraint.sla_availability.num_disjoint_paths
                break

        tasks_scheduler = TasksScheduler(self.service_handler_factory)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        if len(service_with_uuids.service_endpoint_ids) >= (2 if num_disjoint_paths is None else 4):
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            pathcomp_request = PathCompRequest()
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            pathcomp_request.services.append(service_with_uuids)    # pylint: disable=no-member
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

            if num_disjoint_paths is None:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
                pathcomp_request.shortest_path.Clear()              # pylint: disable=no-member
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            else:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
                pathcomp_request.k_disjoint_path.num_disjoint = num_disjoint_paths  # pylint: disable=no-member
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            LOGGER.info('pathcomp_request={:s}'.format(grpc_message_to_json_string(pathcomp_request)))
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            pathcomp = PathCompClient()
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            pathcomp_reply = pathcomp.Compute(pathcomp_request)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            pathcomp.close()
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            LOGGER.info('pathcomp_reply={:s}'.format(grpc_message_to_json_string(pathcomp_reply)))

            # Feed TaskScheduler with this path computation reply. TaskScheduler identifies inter-dependencies among
            # the services and connections retrieved and produces a schedule of tasks (an ordered list of tasks to be
            # executed) to implement the requested create/update operation.
            tasks_scheduler.compose_from_pathcompreply(pathcomp_reply, is_delete=False)

        tasks_scheduler.execute_all()
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        return service_with_uuids.service_id
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    def DeleteService(self, request : ServiceId, context : grpc.ServicerContext) -> Empty:
        LOGGER.info('[DeleteService] begin ; request = {:s}'.format(grpc_message_to_json_string(request)))

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        context_client = ContextClient()
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        # Set service status to "SERVICESTATUS_PENDING_REMOVAL" to ensure rest of components are aware the service is
        # being modified.
        _service : Optional[Service] = get_service(context_client, request)
        if _service is None: raise Exception('Service({:s}) not found'.format(grpc_message_to_json_string(request)))
        service = Service()
        service.CopyFrom(_service)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        # pylint: disable=no-member
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        service.service_status.service_status = ServiceStatusEnum.SERVICESTATUS_PENDING_REMOVAL
        context_client.SetService(service)

        # Feed TaskScheduler with this service and the sub-services and sub-connections related to this service.
        # TaskScheduler identifies inter-dependencies among them and produces a schedule of tasks (an ordered list of
        # tasks to be executed) to implement the requested delete operation.
        tasks_scheduler = TasksScheduler(self.service_handler_factory)
        tasks_scheduler.compose_from_service(service, is_delete=True)
        tasks_scheduler.execute_all()