Skip to content
Snippets Groups Projects
ServiceServiceServicerImpl.py 9.82 KiB
Newer Older
# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import Dict, List
import grpc, json, logging
from common.orm.Database import Database
from common.orm.HighLevel import get_object
from common.orm.backend.Tools import key_to_str
from common.proto.context_pb2 import Empty, Service, ServiceId
from common.proto.service_pb2_grpc import ServiceServiceServicer
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from common.rpc_method_wrapper.Decorator import create_metrics, safe_and_metered_rpc_method
from common.rpc_method_wrapper.ServiceExceptions import InvalidArgumentException, NotFoundException
from common.tools.grpc.Tools import grpc_message_to_json, grpc_message_to_json_string
from context.client.ContextClient import ContextClient
from device.client.DeviceClient import DeviceClient
from service.service.database.DeviceModel import DeviceModel
from .database.DatabaseServiceTools import (
    sync_service_from_context, sync_service_to_context, update_service_in_local_database)
from .database.ServiceModel import ServiceModel
from .path_computation_element.PathComputationElement import PathComputationElement, dump_connectivity
from .service_handler_api.ServiceHandlerFactory import ServiceHandlerFactory
from .Tools import delete_service, sync_devices_from_context, update_service
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

LOGGER = logging.getLogger(__name__)

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
SERVICE_NAME = 'Service'
METHOD_NAMES = ['CreateService', 'UpdateService', 'DeleteService']
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
METRICS = create_metrics(SERVICE_NAME, METHOD_NAMES)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

class ServiceServiceServicerImpl(ServiceServiceServicer):
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    def __init__(self, database : Database, service_handler_factory : ServiceHandlerFactory) -> None:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        LOGGER.debug('Creating Servicer...')
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        self.context_client = ContextClient()
        self.device_client = DeviceClient()
        self.database = database
        self.service_handler_factory = service_handler_factory
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        LOGGER.debug('Servicer Created')

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    @safe_and_metered_rpc_method(METRICS, LOGGER)
    def CreateService(self, request : Service, context : grpc.ServicerContext) -> ServiceId:
        LOGGER.info('[CreateService] begin ; request = {:s}'.format(grpc_message_to_json_string(request)))

        service_id = request.service_id
        service_uuid = service_id.service_uuid.uuid
        service_context_uuid = service_id.context_id.context_uuid.uuid

        if len(request.service_endpoint_ids) > 0:
            unexpected_endpoints = []
            for service_endpoint_id in request.service_endpoint_ids:
                unexpected_endpoints.append(grpc_message_to_json(service_endpoint_id))
            str_unexpected_endpoints = json.dumps(unexpected_endpoints, sort_keys=True)
            raise InvalidArgumentException(
                'service.service_endpoint_ids', str_unexpected_endpoints,
                extra_details='RPC method CreateService does not accept Endpoints. '\
                              'Endpoints should be configured after creating the service.')

        if len(request.service_constraints) > 0:
            unexpected_constraints = []
            for service_constraint in request.service_constraints:
                unexpected_constraints.append(grpc_message_to_json(service_constraint))
            str_unexpected_constraints = json.dumps(unexpected_constraints, sort_keys=True)
            raise InvalidArgumentException(
                'service.service_constraints', str_unexpected_constraints,
                extra_details='RPC method CreateService does not accept Constraints. '\
                              'Constraints should be configured after creating the service.')

        if len(request.service_config.config_rules) > 0:
            unexpected_config_rules = grpc_message_to_json(request.service_config)
            unexpected_config_rules = unexpected_config_rules['config_rules']
            str_unexpected_config_rules = json.dumps(unexpected_config_rules, sort_keys=True)
            raise InvalidArgumentException(
                'service.service_config.config_rules', str_unexpected_config_rules,
                extra_details='RPC method CreateService does not accept Config Rules. '\
                              'Config Rules should be configured after creating the service.')

        sync_service_from_context(service_context_uuid, service_uuid, self.context_client, self.database)
        db_service,_ = update_service_in_local_database(self.database, request)

        LOGGER.info('[CreateService] db_service = {:s}'.format(str(db_service.dump(
            include_endpoint_ids=True, include_constraints=True, include_config_rules=True))))

        sync_service_to_context(db_service, self.context_client)
        return ServiceId(**db_service.dump_id())
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

    @safe_and_metered_rpc_method(METRICS, LOGGER)
    def UpdateService(self, request : Service, context : grpc.ServicerContext) -> ServiceId:
        LOGGER.info('[UpdateService] begin ; request = {:s}'.format(grpc_message_to_json_string(request)))

        service_id = request.service_id
        service_uuid = service_id.service_uuid.uuid
        service_context_uuid = service_id.context_id.context_uuid.uuid

        pce = PathComputationElement()
        pce.load_topology(self.context_client)
        pce.load_connectivity(self.context_client, service_id)
        #pce.dump_topology_to_file('../data/topo.dot')
        #pce.dump_connectivity_to_file('../data/conn-before.txt')
        connectivity = pce.route_service(request)
        #pce.dump_connectivity_to_file('../data/conn-after.txt')

        LOGGER.info('[UpdateService] connectivity = {:s}'.format(str(dump_connectivity(connectivity))))

        if connectivity is None:
            # just update local database and context
            str_service_key = key_to_str([service_context_uuid, service_uuid])
            db_service = get_object(self.database, ServiceModel, str_service_key, raise_if_not_found=False)
            LOGGER.info('[UpdateService] before db_service = {:s}'.format(str(db_service.dump(
                include_endpoint_ids=True, include_constraints=True, include_config_rules=True))))
            db_devices : Dict[str, DeviceModel] = sync_devices_from_context(
                self.context_client, self.database, db_service, request.service_endpoint_ids)
            LOGGER.info('[UpdateService] db_devices[{:d}] = {:s}'.format(
                len(db_devices), str({
                    device_uuid:db_device.dump(include_config_rules=True, include_drivers=True, include_endpoints=True)
                    for device_uuid,db_device in db_devices.items()
                })))
            sync_service_from_context(service_context_uuid, service_uuid, self.context_client, self.database)
            db_service,_ = update_service_in_local_database(self.database, request)
            LOGGER.info('[UpdateService] after db_service = {:s}'.format(str(db_service.dump(
                include_endpoint_ids=True, include_constraints=True, include_config_rules=True))))
            sync_service_to_context(db_service, self.context_client)
        else:
            for sub_service, sub_connections in connectivity.get('requirements', []):
                for sub_connection in sub_connections:
                    update_service(
                        self.database, self.context_client, self.device_client, self.service_handler_factory,
                        sub_service, sub_connection)

            for connection in connectivity.get('connections'):
                db_service = update_service(
                    self.database, self.context_client, self.device_client, self.service_handler_factory,
                    request, connection)

            str_service_key = key_to_str([service_context_uuid, service_uuid])
            db_service = get_object(self.database, ServiceModel, str_service_key, raise_if_not_found=False)
            if db_service is None: raise NotFoundException('Service', str_service_key)

        LOGGER.info('[UpdateService] db_service = {:s}'.format(str(db_service.dump(
            include_endpoint_ids=True, include_constraints=True, include_config_rules=True))))

        return ServiceId(**db_service.dump_id())
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

    @safe_and_metered_rpc_method(METRICS, LOGGER)
    def DeleteService(self, request : ServiceId, context : grpc.ServicerContext) -> Empty:
        LOGGER.info('[DeleteService] begin ; request = {:s}'.format(grpc_message_to_json_string(request)))

        pce = PathComputationElement()
        pce.load_topology(self.context_client)
        pce.load_connectivity(self.context_client, request)
        #pce.dump_topology_to_file('../data/topo.dot')
        #pce.dump_connectivity_to_file('../data/conn-before.txt')
        connectivity = pce.get_connectivity_from_service_id(request)
        if connectivity is None: return Empty()
        #pce.dump_connectivity_to_file('../data/conn-after.txt')

        LOGGER.info('[DeleteService] connectivity = {:s}'.format(str(dump_connectivity(connectivity))))

        for connection in connectivity.get('connections'):
            delete_service(
                self.database, self.context_client, self.device_client, self.service_handler_factory,
                request, connection)

        for sub_service, sub_connections in connectivity.get('requirements', []):
            for sub_connection in sub_connections:
                delete_service(
                    self.database, self.context_client, self.device_client, self.service_handler_factory,
                    sub_service.service_id, sub_connection)