diff --git a/scripts/run_tests_locally-service.sh b/scripts/run_tests_locally-service.sh index 8a2a8d0be1d1960c6197a67e471ae29abba501a7..e2ccc3ebedfb2d465dff609b21bfe28e864fd711 100755 --- a/scripts/run_tests_locally-service.sh +++ b/scripts/run_tests_locally-service.sh @@ -21,4 +21,5 @@ RCFILE=$PROJECTDIR/coverage/.coveragerc # Run unitary tests and analyze coverage of code at same time coverage run --rcfile=$RCFILE --append -m pytest --log-level=INFO --verbose \ + service/tests/test_unitary_dependency_resolver.py \ service/tests/test_unitary.py diff --git a/src/common/rpc_method_wrapper/ServiceExceptions.py b/src/common/rpc_method_wrapper/ServiceExceptions.py index f4f0a64cad79c96dc069bd37e8d2c2be5f011c53..e8d5c79acca19117fca53ec216166c01d3f0781d 100644 --- a/src/common/rpc_method_wrapper/ServiceExceptions.py +++ b/src/common/rpc_method_wrapper/ServiceExceptions.py @@ -56,3 +56,11 @@ class OperationFailedException(ServiceException): details = 'Operation({:s}) failed'.format(str(operation)) super().__init__(grpc.StatusCode.INTERNAL, details, extra_details=extra_details) + +class NotImplementedException(ServiceException): + def __init__( + self, operation : str, extra_details : Union[str, Iterable[str]] = None + ) -> None: + + details = 'Operation({:s}) not implemented'.format(str(operation)) + super().__init__(grpc.StatusCode.UNIMPLEMENTED, details, extra_details=extra_details) diff --git a/src/service/Dockerfile b/src/service/Dockerfile index c53a897821b759a8005118ba81a3a0f5c0b73c66..e469898e590b8797e8d3305e1c583caed41bfc80 100644 --- a/src/service/Dockerfile +++ b/src/service/Dockerfile @@ -64,6 +64,7 @@ RUN python3 -m pip install -r requirements.txt WORKDIR /var/teraflow COPY src/context/. context/ COPY src/device/. device/ +COPY src/pathcomp/frontend/. pathcomp/frontend/ COPY src/service/. service/ # Start the service diff --git a/src/service/service/DependencyResolver.py b/src/service/service/DependencyResolver.py new file mode 100644 index 0000000000000000000000000000000000000000..0bf5923c8bdc6257d32973df1cd4ee038d187fae --- /dev/null +++ b/src/service/service/DependencyResolver.py @@ -0,0 +1,73 @@ +# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import graphlib +from enum import Enum +from typing import Dict, List, Optional, Tuple, Union +from common.proto.context_pb2 import Connection, ConnectionId, Service, ServiceId +from common.proto.pathcomp_pb2 import PathCompReply + +# Compose Directed Acyclic Graph of dependencies between connections and services +# retrieved by PathComp to create them in the appropriate order. + +class ObjectType(Enum): + CONNECTION = 'connection' + SERVICE = 'service' + +ObjectKey = Tuple[ObjectType, str] +ObjectId = Union[ConnectionId, ServiceId] +ObjectData = Union[Connection, Service] +ObjectItem = Tuple[ObjectId, Optional[ObjectData]] +ObjectDict = Dict[ObjectKey, ObjectItem] +Resolution = List[Tuple[ObjectKey, ObjectItem]] + +def get_connection_key(connection_id : ConnectionId) -> ObjectKey: + connection_uuid = connection_id.connection_uuid.uuid + return ObjectType.CONNECTION.value, connection_uuid + +def get_service_key(service_id : ServiceId) -> ObjectKey: + context_uuid = service_id.context_id.context_uuid.uuid + service_uuid = service_id.service_uuid.uuid + return ObjectType.SERVICE.value, '/'.join([context_uuid, service_uuid]) + +def resolve_dependencies(pathcomp_reply : PathCompReply) -> Resolution: + dag = graphlib.TopologicalSorter() + objects : ObjectDict = dict() + + for service in pathcomp_reply.services: + service_key = get_service_key(service.service_id) + objects[service_key] = (service.service_id, service) + + for connection in pathcomp_reply.connections: + connection_key = get_connection_key(connection.connection_id) + objects[connection_key] = (connection.connection_id, connection) + + # the connection's service depends on the connection + service_key = get_service_key(connection.service_id) + dag.add(service_key, connection_key) + if service_key not in objects: objects[service_key] = (connection.service_id, None) + + # the connection depends on these sub-services + for sub_service_id in connection.sub_service_ids: + sub_service_key = get_service_key(sub_service_id) + dag.add(connection_key, sub_service_key) + if sub_service_key not in objects: objects[sub_service_key] = (sub_service_id, None) + + resolution : Resolution = list() + for item_key in dag.static_order(): + item_tuple = objects.get(item_key) + resolution.append((item_key, item_tuple)) + + return resolution diff --git a/src/service/service/ServiceServiceServicerImpl.py b/src/service/service/ServiceServiceServicerImpl.py index 6355cafbef0fc65338269df064a0f56e115b746e..2591f5bda5e8516652aca9d4719dc142295984ef 100644 --- a/src/service/service/ServiceServiceServicerImpl.py +++ b/src/service/service/ServiceServiceServicerImpl.py @@ -13,24 +13,29 @@ # limitations under the License. from typing import Dict, List -import grpc, json, logging +import graphlib, grpc, json, logging from common.orm.Database import Database from common.orm.HighLevel import get_object from common.orm.backend.Tools import key_to_str -from common.proto.context_pb2 import Empty, Service, ServiceId +from common.proto.context_pb2 import ConnectionId, Empty, Service, ServiceId +from common.proto.pathcomp_pb2 import PathCompRequest from common.proto.service_pb2_grpc import ServiceServiceServicer from common.rpc_method_wrapper.Decorator import create_metrics, safe_and_metered_rpc_method -from common.rpc_method_wrapper.ServiceExceptions import InvalidArgumentException, NotFoundException +from common.rpc_method_wrapper.ServiceExceptions import AlreadyExistsException, InvalidArgumentException, NotFoundException, NotImplementedException from common.tools.grpc.Tools import grpc_message_to_json, grpc_message_to_json_string +from common.tools.object_factory.Connection import json_connection_id +from common.tools.object_factory.Service import json_service_id from context.client.ContextClient import ContextClient from device.client.DeviceClient import DeviceClient +from pathcomp.frontend.client.PathCompClient import PathCompClient +from service.service.DependencyResolver import ObjectType, resolve_dependencies from service.service.database.DeviceModel import DeviceModel from .database.DatabaseServiceTools import ( sync_service_from_context, sync_service_to_context, update_service_in_local_database) from .database.ServiceModel import ServiceModel from .path_computation_element.PathComputationElement import PathComputationElement, dump_connectivity from .service_handler_api.ServiceHandlerFactory import ServiceHandlerFactory -from .Tools import delete_service, sync_devices_from_context, update_service +from .Tools import delete_service, get_connection, get_service, sync_devices_from_context, update_service LOGGER = logging.getLogger(__name__) @@ -41,8 +46,6 @@ METRICS = create_metrics(SERVICE_NAME, METHOD_NAMES) class ServiceServiceServicerImpl(ServiceServiceServicer): def __init__(self, database : Database, service_handler_factory : ServiceHandlerFactory) -> None: LOGGER.debug('Creating Servicer...') - self.context_client = ContextClient() - self.device_client = DeviceClient() self.database = database self.service_handler_factory = service_handler_factory LOGGER.debug('Servicer Created') @@ -84,14 +87,19 @@ class ServiceServiceServicerImpl(ServiceServiceServicer): extra_details='RPC method CreateService does not accept Config Rules. '\ 'Config Rules should be configured after creating the service.') - sync_service_from_context(service_context_uuid, service_uuid, self.context_client, self.database) - db_service,_ = update_service_in_local_database(self.database, request) + # check that service does not exist + context_client = ContextClient() + current_service = get_service(context_client, request.service_id) + if current_service is not None: + context_uuid = request.service_id.context_id.context_uuid.uuid + service_uuid = request.service_id.service_uuid.uuid + raise AlreadyExistsException( + 'Service', service_uuid, extra_details='context_uuid={:s}'.format(str(context_uuid))) - LOGGER.info('[CreateService] db_service = {:s}'.format(str(db_service.dump( - include_endpoint_ids=True, include_constraints=True, include_config_rules=True)))) - - sync_service_to_context(db_service, self.context_client) - return ServiceId(**db_service.dump_id()) + # just create the service in the database to lock the service_id + # update will perform changes on the resources + service_id = context_client.SetService(request) + return service_id @safe_and_metered_rpc_method(METRICS, LOGGER) def UpdateService(self, request : Service, context : grpc.ServicerContext) -> ServiceId: @@ -101,54 +109,53 @@ class ServiceServiceServicerImpl(ServiceServiceServicer): service_uuid = service_id.service_uuid.uuid service_context_uuid = service_id.context_id.context_uuid.uuid - pce = PathComputationElement() - pce.load_topology(self.context_client) - pce.load_connectivity(self.context_client, service_id) - #pce.dump_topology_to_file('../data/topo.dot') - #pce.dump_connectivity_to_file('../data/conn-before.txt') - connectivity = pce.route_service(request) - #pce.dump_connectivity_to_file('../data/conn-after.txt') - - LOGGER.info('[UpdateService] connectivity = {:s}'.format(str(dump_connectivity(connectivity)))) - - if connectivity is None: - # just update local database and context - str_service_key = key_to_str([service_context_uuid, service_uuid]) - db_service = get_object(self.database, ServiceModel, str_service_key, raise_if_not_found=False) - LOGGER.info('[UpdateService] before db_service = {:s}'.format(str(db_service.dump( - include_endpoint_ids=True, include_constraints=True, include_config_rules=True)))) - db_devices : Dict[str, DeviceModel] = sync_devices_from_context( - self.context_client, self.database, db_service, request.service_endpoint_ids) - LOGGER.info('[UpdateService] db_devices[{:d}] = {:s}'.format( - len(db_devices), str({ - device_uuid:db_device.dump(include_config_rules=True, include_drivers=True, include_endpoints=True) - for device_uuid,db_device in db_devices.items() - }))) - sync_service_from_context(service_context_uuid, service_uuid, self.context_client, self.database) - db_service,_ = update_service_in_local_database(self.database, request) - LOGGER.info('[UpdateService] after db_service = {:s}'.format(str(db_service.dump( - include_endpoint_ids=True, include_constraints=True, include_config_rules=True)))) - sync_service_to_context(db_service, self.context_client) - else: - for sub_service, sub_connections in connectivity.get('requirements', []): - for sub_connection in sub_connections: - update_service( - self.database, self.context_client, self.device_client, self.service_handler_factory, - sub_service, sub_connection) - - for connection in connectivity.get('connections'): - db_service = update_service( - self.database, self.context_client, self.device_client, self.service_handler_factory, - request, connection) - - str_service_key = key_to_str([service_context_uuid, service_uuid]) - db_service = get_object(self.database, ServiceModel, str_service_key, raise_if_not_found=False) - if db_service is None: raise NotFoundException('Service', str_service_key) - - LOGGER.info('[UpdateService] db_service = {:s}'.format(str(db_service.dump( - include_endpoint_ids=True, include_constraints=True, include_config_rules=True)))) - - return ServiceId(**db_service.dump_id()) + pathcomp_request = PathCompRequest() + pathcomp_request.services.append(request) + pathcomp_request.services.k_shortest_path.k_inspection = 5 + pathcomp_request.services.k_shortest_path.k_return = 5 + + pathcomp = PathCompClient() + pathcomp_response = pathcomp.Compute(pathcomp_request) + + # convert from a unordered lists of services and connections to a list of ordered items + # that fulfill interdependencies among them. E.g., a service cannot be created if connections + # supporting that service still does not exist. + resolution = resolve_dependencies(pathcomp_response) + + # implement changes + context_client = ContextClient() + device_client = DeviceClient() + for (obj_type, obj_key), (grpc_objid, grpc_obj) in resolution: + if grpc_obj is None: + # check if the resource already exists + if obj_type == ObjectType.CONNECTION.value: + connection = get_connection(context_client, grpc_objid) + if connection is None: raise NotFoundException('Connection', obj_key) + elif obj_type == ObjectType.SERVICE.value: + service = get_service(context_client, grpc_objid) + if service is None: raise NotFoundException('Service', obj_key) + else: + MSG_EXTRA_DETAILS = 'obj_type={:s} obj_key={:s} grpc_objid={:s} grpc_obj={:s}' + str_grpc_obj = 'None' if grpc_obj is None else grpc_message_to_json_string(grpc_obj) + str_grpc_objid = 'None' if grpc_objid is None else grpc_message_to_json_string(grpc_objid) + msg_extra_details = MSG_EXTRA_DETAILS.format(obj_type, obj_key, str_grpc_objid, str_grpc_obj) + raise NotImplementedException('Empty Dependency', extra_details=msg_extra_details) + else: + # create/update the resource + if obj_type == ObjectType.CONNECTION.value: + update_connection(context_client, device_client, self.service_handler_factory, grpc_obj) + context_client.SetConnection(grpc_obj) + elif obj_type == ObjectType.SERVICE.value: + update_service(context_client, device_client, self.service_handler_factory, grpc_obj) + context_client.SetService(grpc_obj) + else: + MSG_EXTRA_DETAILS = 'obj_type={:s} obj_key={:s} grpc_objid={:s} grpc_obj={:s}' + str_grpc_obj = 'None' if grpc_obj is None else grpc_message_to_json_string(grpc_obj) + str_grpc_objid = 'None' if grpc_objid is None else grpc_message_to_json_string(grpc_objid) + msg_extra_details = MSG_EXTRA_DETAILS.format(obj_type, obj_key, str_grpc_objid, str_grpc_obj) + raise NotImplementedException('Specified Dependency', extra_details=msg_extra_details) + + return request.service_id @safe_and_metered_rpc_method(METRICS, LOGGER) def DeleteService(self, request : ServiceId, context : grpc.ServicerContext) -> Empty: diff --git a/src/service/service/Tools.py b/src/service/service/Tools.py index 4386793c52a979cd0b3d86701a3476314857f3ac..ea4369fd5f909cc5710a366a9e413af059e0eb13 100644 --- a/src/service/service/Tools.py +++ b/src/service/service/Tools.py @@ -12,13 +12,13 @@ # See the License for the specific language governing permissions and # limitations under the License. -import logging +import grpc, logging from typing import Any, Dict, List, Optional, Tuple from common.orm.Database import Database from common.orm.HighLevel import get_object, get_related_objects from common.orm.backend.Tools import key_to_str from common.proto.context_pb2 import ( - ConfigRule, Connection, Constraint, EndPointId, Service, ServiceId, ServiceStatusEnum) + ConfigRule, Connection, ConnectionId, Constraint, EndPointId, Service, ServiceId, ServiceStatusEnum) from common.rpc_method_wrapper.ServiceExceptions import ( InvalidArgumentException, NotFoundException, OperationFailedException) from context.client.ContextClient import ContextClient @@ -42,6 +42,22 @@ from .service_handler_api.Tools import ( LOGGER = logging.getLogger(__name__) +def get_connection(context_client : ContextClient, connection_id : ConnectionId) -> Optional[Connection]: + try: + connection : Connection = context_client.GetConnection(connection_id) + return connection + except grpc.RpcError as e: + if e.code() != grpc.StatusCode.NOT_FOUND: raise # pylint: disable=no-member + return None + +def get_service(context_client : ContextClient, service_id : ServiceId) -> Optional[Service]: + try: + service : Service = context_client.GetService(service_id) + return service + except grpc.RpcError as e: + if e.code() != grpc.StatusCode.NOT_FOUND: raise # pylint: disable=no-member + return None + def sync_devices_from_context( context_client : ContextClient, database : Database, db_service : Optional[ServiceModel], service_endpoint_ids : List[EndPointId] diff --git a/src/service/service/__main__.py b/src/service/service/__main__.py index 1a67a309ff19bda2bf3174c80dfb908e99f72d14..2c042fe0e9f4496f1dc4c34f901e099b3434969b 100644 --- a/src/service/service/__main__.py +++ b/src/service/service/__main__.py @@ -37,10 +37,12 @@ def main(): LOGGER = logging.getLogger(__name__) wait_for_environment_variables([ - get_env_var_name(ServiceNameEnum.CONTEXT, ENVVAR_SUFIX_SERVICE_HOST ), - get_env_var_name(ServiceNameEnum.CONTEXT, ENVVAR_SUFIX_SERVICE_PORT_GRPC), - get_env_var_name(ServiceNameEnum.DEVICE, ENVVAR_SUFIX_SERVICE_HOST ), - get_env_var_name(ServiceNameEnum.DEVICE, ENVVAR_SUFIX_SERVICE_PORT_GRPC), + get_env_var_name(ServiceNameEnum.CONTEXT, ENVVAR_SUFIX_SERVICE_HOST ), + get_env_var_name(ServiceNameEnum.CONTEXT, ENVVAR_SUFIX_SERVICE_PORT_GRPC), + get_env_var_name(ServiceNameEnum.DEVICE, ENVVAR_SUFIX_SERVICE_HOST ), + get_env_var_name(ServiceNameEnum.DEVICE, ENVVAR_SUFIX_SERVICE_PORT_GRPC), + get_env_var_name(ServiceNameEnum.PATHCOMP, ENVVAR_SUFIX_SERVICE_HOST ), + get_env_var_name(ServiceNameEnum.PATHCOMP, ENVVAR_SUFIX_SERVICE_PORT_GRPC), ]) signal.signal(signal.SIGINT, signal_handler) diff --git a/src/service/tests/test_unitary_dependency_resolver.py b/src/service/tests/test_unitary_dependency_resolver.py new file mode 100644 index 0000000000000000000000000000000000000000..1dd70ba7e628ff25adf4d474e829136dea779aaa --- /dev/null +++ b/src/service/tests/test_unitary_dependency_resolver.py @@ -0,0 +1,98 @@ +# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging, operator +from common.proto.context_pb2 import Connection, Service +from common.proto.pathcomp_pb2 import PathCompReply +from common.tools.grpc.Tools import grpc_message_to_json_string +from service.service.DependencyResolver import resolve_dependencies + +LOGGER = logging.getLogger(__name__) +LOGGER.setLevel(logging.DEBUG) + +def test_dependency_resolver(): + # test: add services and connections that depend on each other + # then, check if they are properly resolved. + # - service MAIN, depends on connection PKT-1, TAPI, and PKT-2 + # - connection PKT-1, depends on nothing + # - connection TAPI, depends on service TAPI-1 and TAPI-2 + # - connection PKT-2, depends on nothing + # - service TAPI-1, depends on connection TAPI-1 + # - service TAPI-2, depends on connection TAPI-2 + + pathcomp_reply = PathCompReply() + + service_main = pathcomp_reply.services.add() + service_main.service_id.context_id.context_uuid.uuid = 'admin' + service_main.service_id.service_uuid.uuid = 'MAIN' + + service_tapi1 = pathcomp_reply.services.add() + service_tapi1.service_id.context_id.context_uuid.uuid = 'admin' + service_tapi1.service_id.service_uuid.uuid = 'TAPI-1' + + service_tapi2 = pathcomp_reply.services.add() + service_tapi2.service_id.context_id.context_uuid.uuid = 'admin' + service_tapi2.service_id.service_uuid.uuid = 'TAPI-2' + + connection_pkt1 = pathcomp_reply.connections.add() + connection_pkt1.connection_id.connection_uuid.uuid = 'PKT-1' + connection_pkt1.service_id.CopyFrom(service_main.service_id) + + connection_tapi = pathcomp_reply.connections.add() + connection_tapi.connection_id.connection_uuid.uuid = 'TAPI' + connection_tapi.service_id.CopyFrom(service_main.service_id) + + connection_pkt2 = pathcomp_reply.connections.add() + connection_pkt2.connection_id.connection_uuid.uuid = 'PKT-2' + connection_pkt2.service_id.CopyFrom(service_main.service_id) + + connection_tapi1 = pathcomp_reply.connections.add() + connection_tapi1.connection_id.connection_uuid.uuid = 'TAPI-1' + connection_tapi1.service_id.CopyFrom(service_tapi1.service_id) + connection_tapi.sub_service_ids.append(service_tapi1.service_id) + + connection_tapi2 = pathcomp_reply.connections.add() + connection_tapi2.connection_id.connection_uuid.uuid = 'TAPI-2' + connection_tapi2.service_id.CopyFrom(service_tapi2.service_id) + connection_tapi.sub_service_ids.append(service_tapi2.service_id) + + LOGGER.info('pathcomp_reply={:s}'.format(grpc_message_to_json_string(pathcomp_reply))) + resolution = resolve_dependencies(pathcomp_reply) + LOGGER.info('resolution={:s}'.format(str(list(map(operator.itemgetter(0), resolution))))) + + CORRECT_RESOLUTION_KEYS = [ + ('connection', 'PKT-1' ), + ('connection', 'PKT-2' ), + ('connection', 'TAPI-1' ), + ('connection', 'TAPI-2' ), + ('service' , 'admin/TAPI-1'), + ('service' , 'admin/TAPI-2'), + ('connection', 'TAPI' ), + ('service' , 'admin/MAIN' ), + ] + for (resolved_key,(resolved_objid, resolved_obj)),correct_key in zip(resolution, CORRECT_RESOLUTION_KEYS): + assert resolved_key == correct_key + assert resolved_obj is not None + if resolved_key[0] == 'connection': + assert isinstance(resolved_obj, Connection) + assert resolved_objid == resolved_obj.connection_id + connection_key = resolved_obj.connection_id.connection_uuid.uuid + assert resolved_key[1] == connection_key + elif resolved_key[0] == 'service': + assert isinstance(resolved_obj, Service) + assert resolved_objid == resolved_obj.service_id + context_uuid = resolved_obj.service_id.context_id.context_uuid.uuid + service_uuid = resolved_obj.service_id.service_uuid.uuid + service_key = '/'.join([context_uuid, service_uuid]) + assert resolved_key[1] == service_key