Commit 06898aae authored by Lluis Gifre Renom's avatar Lluis Gifre Renom
Browse files

Service component:

- Renamed DependencyResolver to TaskScheduler
- Enhanced TaskScheduler with an extensible Task API framework and extensible dependency resolver
- Created TaskExecutor providing an extensible task execution environment
- Removed in-memory database; now is stateless and interrogates/updates Context when needed
- Extended gRPC servicer to use new TaskScheduler
- Removed unneeded files and re-organized helper methods
- Extended ServiceHandlerAPI; now SetEndpoint/DeleteEndpoint enables to configure/deconfigure a connection
- Moved ServiceHandlerFactory-related methods to the appropriate file
- Created ConnectionExpander to resolve from ERO-like paths to lists of links
- Created basic tasks: ServiceSetState, ServiceDelete, ConnectionConfigure, ConnectionDeconfigure
- Added skeleton for L2NMEmulatedHandler (to be adapted, now is a copy of L3NM one)
parent f3096852
Loading
Loading
Loading
Loading
+1 −1
Original line number Diff line number Diff line
@@ -21,5 +21,5 @@ RCFILE=$PROJECTDIR/coverage/.coveragerc

# Run unitary tests and analyze coverage of code at same time
coverage run --rcfile=$RCFILE --append -m pytest --log-level=INFO --verbose \
    service/tests/test_unitary_dependency_resolver.py \
    service/tests/test_unitary_task_scheduler.py \
    service/tests/test_unitary.py
+0 −73
Original line number Diff line number Diff line
# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


import graphlib
from enum import Enum
from typing import Dict, List, Optional, Tuple, Union
from common.proto.context_pb2 import Connection, ConnectionId, Service, ServiceId
from common.proto.pathcomp_pb2 import PathCompReply

# Compose Directed Acyclic Graph of dependencies between connections and services
# retrieved by PathComp to create them in the appropriate order.

class ObjectType(Enum):
    CONNECTION = 'connection'
    SERVICE    = 'service'

ObjectKey  = Tuple[ObjectType, str]
ObjectId   = Union[ConnectionId, ServiceId]
ObjectData = Union[Connection, Service]
ObjectItem = Tuple[ObjectId, Optional[ObjectData]]
ObjectDict = Dict[ObjectKey, ObjectItem]
Resolution = List[Tuple[ObjectKey, ObjectItem]]

def get_connection_key(connection_id : ConnectionId) -> ObjectKey:
    connection_uuid = connection_id.connection_uuid.uuid
    return ObjectType.CONNECTION.value, connection_uuid

def get_service_key(service_id : ServiceId) -> ObjectKey:
    context_uuid = service_id.context_id.context_uuid.uuid
    service_uuid = service_id.service_uuid.uuid
    return ObjectType.SERVICE.value, '/'.join([context_uuid, service_uuid])

def resolve_dependencies(pathcomp_reply : PathCompReply) -> Resolution:
    dag = graphlib.TopologicalSorter()
    objects : ObjectDict = dict()

    for service in pathcomp_reply.services:
        service_key = get_service_key(service.service_id)
        objects[service_key] = (service.service_id, service)

    for connection in pathcomp_reply.connections:
        connection_key = get_connection_key(connection.connection_id)
        objects[connection_key] = (connection.connection_id, connection)

        # the connection's service depends on the connection
        service_key = get_service_key(connection.service_id)
        dag.add(service_key, connection_key)
        if service_key not in objects: objects[service_key] = (connection.service_id, None)

        # the connection depends on these sub-services
        for sub_service_id in connection.sub_service_ids:
            sub_service_key = get_service_key(sub_service_id)
            dag.add(connection_key, sub_service_key)
            if sub_service_key not in objects: objects[sub_service_key] = (sub_service_id, None)

    resolution : Resolution = list()
    for item_key in dag.static_order():
        item_tuple = objects.get(item_key)
        resolution.append((item_key, item_tuple))

    return resolution
+1 −5
Original line number Diff line number Diff line
@@ -14,9 +14,6 @@

from common.Constants import ServiceNameEnum
from common.Settings import get_service_port_grpc
from common.orm.backend.BackendEnum import BackendEnum
from common.orm.Database import Database
from common.orm.Factory import get_database_backend
from common.proto.service_pb2_grpc import add_ServiceServiceServicer_to_server
from common.tools.service.GenericGrpcService import GenericGrpcService
from .ServiceServiceServicerImpl import ServiceServiceServicerImpl
@@ -26,8 +23,7 @@ class ServiceService(GenericGrpcService):
    def __init__(self, service_handler_factory : ServiceHandlerFactory, cls_name: str = __name__) -> None:
        port = get_service_port_grpc(ServiceNameEnum.SERVICE)
        super().__init__(port, cls_name=cls_name)
        database = Database(get_database_backend(backend=BackendEnum.INMEMORY))
        self.service_servicer = ServiceServiceServicerImpl(database, service_handler_factory)
        self.service_servicer = ServiceServiceServicerImpl(service_handler_factory)

    def install_servicers(self):
        add_ServiceServiceServicer_to_server(self.service_servicer, self.server)
+57 −88
Original line number Diff line number Diff line
@@ -12,30 +12,19 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import Dict, List
import graphlib, grpc, json, logging
from common.orm.Database import Database
from common.orm.HighLevel import get_object
from common.orm.backend.Tools import key_to_str
from common.proto.context_pb2 import ConnectionId, Empty, Service, ServiceId
import grpc, json, logging
from typing import Optional
from common.proto.context_pb2 import Empty, Service, ServiceId, ServiceStatusEnum
from common.proto.pathcomp_pb2 import PathCompRequest
from common.proto.service_pb2_grpc import ServiceServiceServicer
from common.rpc_method_wrapper.Decorator import create_metrics, safe_and_metered_rpc_method
from common.rpc_method_wrapper.ServiceExceptions import AlreadyExistsException, InvalidArgumentException, NotFoundException, NotImplementedException
from common.rpc_method_wrapper.ServiceExceptions import AlreadyExistsException, InvalidArgumentException
from common.tools.grpc.Tools import grpc_message_to_json, grpc_message_to_json_string
from common.tools.object_factory.Connection import json_connection_id
from common.tools.object_factory.Service import json_service_id
from context.client.ContextClient import ContextClient
from device.client.DeviceClient import DeviceClient
from pathcomp.frontend.client.PathCompClient import PathCompClient
from service.service.DependencyResolver import ObjectType, resolve_dependencies
from service.service.database.DeviceModel import DeviceModel
from .database.DatabaseServiceTools import (
    sync_service_from_context, sync_service_to_context, update_service_in_local_database)
from .database.ServiceModel import ServiceModel
from .path_computation_element.PathComputationElement import PathComputationElement, dump_connectivity
from service.service.tools.ContextGetters import get_service
from .service_handler_api.ServiceHandlerFactory import ServiceHandlerFactory
from .Tools import delete_service, get_connection, get_service, sync_devices_from_context, update_service
from .task_scheduler.TaskScheduler import TasksScheduler

LOGGER = logging.getLogger(__name__)

@@ -44,9 +33,8 @@ METHOD_NAMES = ['CreateService', 'UpdateService', 'DeleteService']
METRICS = create_metrics(SERVICE_NAME, METHOD_NAMES)

class ServiceServiceServicerImpl(ServiceServiceServicer):
    def __init__(self, database : Database, service_handler_factory : ServiceHandlerFactory) -> None:
    def __init__(self, service_handler_factory : ServiceHandlerFactory) -> None:
        LOGGER.debug('Creating Servicer...')
        self.database = database
        self.service_handler_factory = service_handler_factory
        LOGGER.debug('Servicer Created')

@@ -96,7 +84,7 @@ class ServiceServiceServicerImpl(ServiceServiceServicer):
            raise AlreadyExistsException(
                'Service', service_uuid, extra_details='context_uuid={:s}'.format(str(context_uuid)))

        # just create the service in the database to lock the service_id
        # just create the service in the Context database to lock the service_id
        # update will perform changes on the resources
        service_id = context_client.SetService(request)
        return service_id
@@ -105,82 +93,63 @@ class ServiceServiceServicerImpl(ServiceServiceServicer):
    def UpdateService(self, request : Service, context : grpc.ServicerContext) -> ServiceId:
        LOGGER.info('[UpdateService] begin ; request = {:s}'.format(grpc_message_to_json_string(request)))

        service_id = request.service_id
        service_uuid = service_id.service_uuid.uuid
        service_context_uuid = service_id.context_id.context_uuid.uuid

        # Set service status to "SERVICESTATUS_PLANNED" to ensure rest of components are aware the service is
        # being modified.
        context_client = ContextClient()
        _service : Optional[Service] = get_service(context_client, request.service_id)
        service = Service()
        service.CopyFrom(request if _service is None else _service)
        service.service_status.service_status = ServiceStatusEnum.SERVICESTATUS_PLANNED
        context_client.SetService(service)

        num_disjoint_paths = None
        for constraint in request.service_constraints:
            if constraint.WhichOneof('constraint') == 'sla_availability':
                num_disjoint_paths = constraint.sla_availability.num_disjoint_paths
                break

        tasks_scheduler = TasksScheduler(self.service_handler_factory)
        if len(request.service_endpoint_ids) >= (2 if num_disjoint_paths is None else 4):
            pathcomp_request = PathCompRequest()
            pathcomp_request.services.append(request)
        pathcomp_request.services.k_shortest_path.k_inspection = 5
        pathcomp_request.services.k_shortest_path.k_return = 5

        pathcomp = PathCompClient()
        pathcomp_response = pathcomp.Compute(pathcomp_request)
            if num_disjoint_paths is None:
                pathcomp_request.shortest_path.Clear()
            else:
                pathcomp_request.k_disjoint_path.num_disjoint = num_disjoint_paths

        # convert from a unordered lists of services and connections to a list of ordered items
        # that fulfill interdependencies among them. E.g., a service cannot be created if connections
        # supporting that service still does not exist.
        resolution = resolve_dependencies(pathcomp_response)
            pathcomp = PathCompClient()
            LOGGER.info('pathcomp_request={:s}'.format(grpc_message_to_json_string(pathcomp_request)))
            pathcomp_reply = pathcomp.Compute(pathcomp_request)
            LOGGER.info('pathcomp_reply={:s}'.format(grpc_message_to_json_string(pathcomp_reply)))

        # implement changes
        context_client = ContextClient()
        device_client = DeviceClient()
        for (obj_type, obj_key), (grpc_objid, grpc_obj) in resolution:
            if grpc_obj is None:
                # check if the resource already exists
                if obj_type == ObjectType.CONNECTION.value:
                    connection = get_connection(context_client, grpc_objid)
                    if connection is None: raise NotFoundException('Connection', obj_key)
                elif obj_type == ObjectType.SERVICE.value:
                    service = get_service(context_client, grpc_objid)
                    if service is None: raise NotFoundException('Service', obj_key)
                else:
                    MSG_EXTRA_DETAILS = 'obj_type={:s} obj_key={:s} grpc_objid={:s} grpc_obj={:s}'
                    str_grpc_obj = 'None' if grpc_obj is None else grpc_message_to_json_string(grpc_obj)
                    str_grpc_objid = 'None' if grpc_objid is None else grpc_message_to_json_string(grpc_objid)
                    msg_extra_details = MSG_EXTRA_DETAILS.format(obj_type, obj_key, str_grpc_objid, str_grpc_obj)
                    raise NotImplementedException('Empty Dependency', extra_details=msg_extra_details)
            else:
                # create/update the resource
                if obj_type == ObjectType.CONNECTION.value:
                    update_connection(context_client, device_client, self.service_handler_factory, grpc_obj)
                    context_client.SetConnection(grpc_obj)
                elif obj_type == ObjectType.SERVICE.value:
                    update_service(context_client, device_client, self.service_handler_factory, grpc_obj)
                    context_client.SetService(grpc_obj)
                else:
                    MSG_EXTRA_DETAILS = 'obj_type={:s} obj_key={:s} grpc_objid={:s} grpc_obj={:s}'
                    str_grpc_obj = 'None' if grpc_obj is None else grpc_message_to_json_string(grpc_obj)
                    str_grpc_objid = 'None' if grpc_objid is None else grpc_message_to_json_string(grpc_objid)
                    msg_extra_details = MSG_EXTRA_DETAILS.format(obj_type, obj_key, str_grpc_objid, str_grpc_obj)
                    raise NotImplementedException('Specified Dependency', extra_details=msg_extra_details)
            # Feed TaskScheduler with this path computation reply. TaskScheduler identifies inter-dependencies among
            # the services and connections retrieved and produces a schedule of tasks (an ordered list of tasks to be
            # executed) to implement the requested create/update operation.
            tasks_scheduler.compose_from_pathcompreply(pathcomp_reply, is_delete=False)

        tasks_scheduler.execute_all()
        return request.service_id

    @safe_and_metered_rpc_method(METRICS, LOGGER)
    def DeleteService(self, request : ServiceId, context : grpc.ServicerContext) -> Empty:
        LOGGER.info('[DeleteService] begin ; request = {:s}'.format(grpc_message_to_json_string(request)))

        pce = PathComputationElement()
        pce.load_topology(self.context_client)
        pce.load_connectivity(self.context_client, request)
        #pce.dump_topology_to_file('../data/topo.dot')
        #pce.dump_connectivity_to_file('../data/conn-before.txt')
        connectivity = pce.get_connectivity_from_service_id(request)
        if connectivity is None: return Empty()
        #pce.dump_connectivity_to_file('../data/conn-after.txt')

        LOGGER.info('[DeleteService] connectivity = {:s}'.format(str(dump_connectivity(connectivity))))

        for connection in connectivity.get('connections'):
            delete_service(
                self.database, self.context_client, self.device_client, self.service_handler_factory,
                request, connection)

        for sub_service, sub_connections in connectivity.get('requirements', []):
            for sub_connection in sub_connections:
                delete_service(
                    self.database, self.context_client, self.device_client, self.service_handler_factory,
                    sub_service.service_id, sub_connection)
        context_client = ContextClient()

        # Set service status to "SERVICESTATUS_PENDING_REMOVAL" to ensure rest of components are aware the service is
        # being modified.
        _service : Optional[Service] = get_service(context_client, request)
        if _service is None: raise Exception('Service({:s}) not found'.format(grpc_message_to_json_string(request)))
        service = Service()
        service.CopyFrom(_service)
        service.service_status.service_status = ServiceStatusEnum.SERVICESTATUS_PENDING_REMOVAL
        context_client.SetService(service)

        # Feed TaskScheduler with this service and the sub-services and sub-connections related to this service.
        # TaskScheduler identifies inter-dependencies among them and produces a schedule of tasks (an ordered list of
        # tasks to be executed) to implement the requested delete operation.
        tasks_scheduler = TasksScheduler(self.service_handler_factory)
        tasks_scheduler.compose_from_service(service, is_delete=True)
        tasks_scheduler.execute_all()
        return Empty()

src/service/service/Tools.py

deleted100644 → 0
+0 −358

File deleted.

Preview size limit exceeded, changes collapsed.

Loading