From 06898aae4eca1a7f8386f5720d32634ad98de4b6 Mon Sep 17 00:00:00 2001 From: Lluis Gifre <lluis.gifre@cttc.es> Date: Sun, 11 Sep 2022 15:10:29 +0000 Subject: [PATCH] Service component: - Renamed DependencyResolver to TaskScheduler - Enhanced TaskScheduler with an extensible Task API framework and extensible dependency resolver - Created TaskExecutor providing an extensible task execution environment - Removed in-memory database; now is stateless and interrogates/updates Context when needed - Extended gRPC servicer to use new TaskScheduler - Removed unneeded files and re-organized helper methods - Extended ServiceHandlerAPI; now SetEndpoint/DeleteEndpoint enables to configure/deconfigure a connection - Moved ServiceHandlerFactory-related methods to the appropriate file - Created ConnectionExpander to resolve from ERO-like paths to lists of links - Created basic tasks: ServiceSetState, ServiceDelete, ConnectionConfigure, ConnectionDeconfigure - Added skeleton for L2NMEmulatedHandler (to be adapted, now is a copy of L3NM one) --- scripts/run_tests_locally-service.sh | 2 +- src/service/service/DependencyResolver.py | 73 ---- src/service/service/ServiceService.py | 6 +- .../service/ServiceServiceServicerImpl.py | 145 +++---- src/service/service/Tools.py | 358 ------------------ .../service_handler_api/FilterFields.py | 23 +- .../ServiceHandlerFactory.py | 41 +- .../service_handler_api/_ServiceHandler.py | 12 +- .../service/service_handlers/__init__.py | 19 +- .../l2nm_emulated/ConfigRules.py | 268 +++++++++++++ .../L2NMEmulatedServiceHandler.py | 296 ++------------- .../task_scheduler/ConnectionExpander.py | 66 ++++ .../service/task_scheduler/TaskExecutor.py | 142 +++++++ .../service/task_scheduler/TaskScheduler.py | 179 +++++++++ .../service/task_scheduler/__init__.py | 51 +++ .../tasks/Task_ConnectionConfigure.py | 63 +++ .../tasks/Task_ConnectionDeconfigure.py | 63 +++ .../tasks/Task_ServiceDelete.py | 39 ++ .../tasks/Task_ServiceSetStatus.py | 46 +++ .../service/task_scheduler/tasks/_Task.py | 30 ++ .../service/task_scheduler/tasks/__init__.py | 14 + src/service/service/tools/ContextGetters.py | 42 ++ .../service/tools/EndpointIdFormatters.py | 27 ++ src/service/service/tools/ObjectKeys.py | 26 ++ src/service/service/tools/__init__.py | 14 + ...lver.py => test_unitary_task_scheduler.py} | 60 ++- 26 files changed, 1276 insertions(+), 829 deletions(-) delete mode 100644 src/service/service/DependencyResolver.py delete mode 100644 src/service/service/Tools.py create mode 100644 src/service/service/service_handlers/l2nm_emulated/ConfigRules.py create mode 100644 src/service/service/task_scheduler/ConnectionExpander.py create mode 100644 src/service/service/task_scheduler/TaskExecutor.py create mode 100644 src/service/service/task_scheduler/TaskScheduler.py create mode 100644 src/service/service/task_scheduler/__init__.py create mode 100644 src/service/service/task_scheduler/tasks/Task_ConnectionConfigure.py create mode 100644 src/service/service/task_scheduler/tasks/Task_ConnectionDeconfigure.py create mode 100644 src/service/service/task_scheduler/tasks/Task_ServiceDelete.py create mode 100644 src/service/service/task_scheduler/tasks/Task_ServiceSetStatus.py create mode 100644 src/service/service/task_scheduler/tasks/_Task.py create mode 100644 src/service/service/task_scheduler/tasks/__init__.py create mode 100644 src/service/service/tools/ContextGetters.py create mode 100644 src/service/service/tools/EndpointIdFormatters.py create mode 100644 src/service/service/tools/ObjectKeys.py create mode 100644 src/service/service/tools/__init__.py rename src/service/tests/{test_unitary_dependency_resolver.py => test_unitary_task_scheduler.py} (65%) diff --git a/scripts/run_tests_locally-service.sh b/scripts/run_tests_locally-service.sh index e2ccc3ebe..8816b9faa 100755 --- a/scripts/run_tests_locally-service.sh +++ b/scripts/run_tests_locally-service.sh @@ -21,5 +21,5 @@ RCFILE=$PROJECTDIR/coverage/.coveragerc # Run unitary tests and analyze coverage of code at same time coverage run --rcfile=$RCFILE --append -m pytest --log-level=INFO --verbose \ - service/tests/test_unitary_dependency_resolver.py \ + service/tests/test_unitary_task_scheduler.py \ service/tests/test_unitary.py diff --git a/src/service/service/DependencyResolver.py b/src/service/service/DependencyResolver.py deleted file mode 100644 index 0bf5923c8..000000000 --- a/src/service/service/DependencyResolver.py +++ /dev/null @@ -1,73 +0,0 @@ -# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/) -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - - -import graphlib -from enum import Enum -from typing import Dict, List, Optional, Tuple, Union -from common.proto.context_pb2 import Connection, ConnectionId, Service, ServiceId -from common.proto.pathcomp_pb2 import PathCompReply - -# Compose Directed Acyclic Graph of dependencies between connections and services -# retrieved by PathComp to create them in the appropriate order. - -class ObjectType(Enum): - CONNECTION = 'connection' - SERVICE = 'service' - -ObjectKey = Tuple[ObjectType, str] -ObjectId = Union[ConnectionId, ServiceId] -ObjectData = Union[Connection, Service] -ObjectItem = Tuple[ObjectId, Optional[ObjectData]] -ObjectDict = Dict[ObjectKey, ObjectItem] -Resolution = List[Tuple[ObjectKey, ObjectItem]] - -def get_connection_key(connection_id : ConnectionId) -> ObjectKey: - connection_uuid = connection_id.connection_uuid.uuid - return ObjectType.CONNECTION.value, connection_uuid - -def get_service_key(service_id : ServiceId) -> ObjectKey: - context_uuid = service_id.context_id.context_uuid.uuid - service_uuid = service_id.service_uuid.uuid - return ObjectType.SERVICE.value, '/'.join([context_uuid, service_uuid]) - -def resolve_dependencies(pathcomp_reply : PathCompReply) -> Resolution: - dag = graphlib.TopologicalSorter() - objects : ObjectDict = dict() - - for service in pathcomp_reply.services: - service_key = get_service_key(service.service_id) - objects[service_key] = (service.service_id, service) - - for connection in pathcomp_reply.connections: - connection_key = get_connection_key(connection.connection_id) - objects[connection_key] = (connection.connection_id, connection) - - # the connection's service depends on the connection - service_key = get_service_key(connection.service_id) - dag.add(service_key, connection_key) - if service_key not in objects: objects[service_key] = (connection.service_id, None) - - # the connection depends on these sub-services - for sub_service_id in connection.sub_service_ids: - sub_service_key = get_service_key(sub_service_id) - dag.add(connection_key, sub_service_key) - if sub_service_key not in objects: objects[sub_service_key] = (sub_service_id, None) - - resolution : Resolution = list() - for item_key in dag.static_order(): - item_tuple = objects.get(item_key) - resolution.append((item_key, item_tuple)) - - return resolution diff --git a/src/service/service/ServiceService.py b/src/service/service/ServiceService.py index b15237625..2f44fe018 100644 --- a/src/service/service/ServiceService.py +++ b/src/service/service/ServiceService.py @@ -14,9 +14,6 @@ from common.Constants import ServiceNameEnum from common.Settings import get_service_port_grpc -from common.orm.backend.BackendEnum import BackendEnum -from common.orm.Database import Database -from common.orm.Factory import get_database_backend from common.proto.service_pb2_grpc import add_ServiceServiceServicer_to_server from common.tools.service.GenericGrpcService import GenericGrpcService from .ServiceServiceServicerImpl import ServiceServiceServicerImpl @@ -26,8 +23,7 @@ class ServiceService(GenericGrpcService): def __init__(self, service_handler_factory : ServiceHandlerFactory, cls_name: str = __name__) -> None: port = get_service_port_grpc(ServiceNameEnum.SERVICE) super().__init__(port, cls_name=cls_name) - database = Database(get_database_backend(backend=BackendEnum.INMEMORY)) - self.service_servicer = ServiceServiceServicerImpl(database, service_handler_factory) + self.service_servicer = ServiceServiceServicerImpl(service_handler_factory) def install_servicers(self): add_ServiceServiceServicer_to_server(self.service_servicer, self.server) diff --git a/src/service/service/ServiceServiceServicerImpl.py b/src/service/service/ServiceServiceServicerImpl.py index 2591f5bda..bc71168f6 100644 --- a/src/service/service/ServiceServiceServicerImpl.py +++ b/src/service/service/ServiceServiceServicerImpl.py @@ -12,30 +12,19 @@ # See the License for the specific language governing permissions and # limitations under the License. -from typing import Dict, List -import graphlib, grpc, json, logging -from common.orm.Database import Database -from common.orm.HighLevel import get_object -from common.orm.backend.Tools import key_to_str -from common.proto.context_pb2 import ConnectionId, Empty, Service, ServiceId +import grpc, json, logging +from typing import Optional +from common.proto.context_pb2 import Empty, Service, ServiceId, ServiceStatusEnum from common.proto.pathcomp_pb2 import PathCompRequest from common.proto.service_pb2_grpc import ServiceServiceServicer from common.rpc_method_wrapper.Decorator import create_metrics, safe_and_metered_rpc_method -from common.rpc_method_wrapper.ServiceExceptions import AlreadyExistsException, InvalidArgumentException, NotFoundException, NotImplementedException +from common.rpc_method_wrapper.ServiceExceptions import AlreadyExistsException, InvalidArgumentException from common.tools.grpc.Tools import grpc_message_to_json, grpc_message_to_json_string -from common.tools.object_factory.Connection import json_connection_id -from common.tools.object_factory.Service import json_service_id from context.client.ContextClient import ContextClient -from device.client.DeviceClient import DeviceClient from pathcomp.frontend.client.PathCompClient import PathCompClient -from service.service.DependencyResolver import ObjectType, resolve_dependencies -from service.service.database.DeviceModel import DeviceModel -from .database.DatabaseServiceTools import ( - sync_service_from_context, sync_service_to_context, update_service_in_local_database) -from .database.ServiceModel import ServiceModel -from .path_computation_element.PathComputationElement import PathComputationElement, dump_connectivity +from service.service.tools.ContextGetters import get_service from .service_handler_api.ServiceHandlerFactory import ServiceHandlerFactory -from .Tools import delete_service, get_connection, get_service, sync_devices_from_context, update_service +from .task_scheduler.TaskScheduler import TasksScheduler LOGGER = logging.getLogger(__name__) @@ -44,9 +33,8 @@ METHOD_NAMES = ['CreateService', 'UpdateService', 'DeleteService'] METRICS = create_metrics(SERVICE_NAME, METHOD_NAMES) class ServiceServiceServicerImpl(ServiceServiceServicer): - def __init__(self, database : Database, service_handler_factory : ServiceHandlerFactory) -> None: + def __init__(self, service_handler_factory : ServiceHandlerFactory) -> None: LOGGER.debug('Creating Servicer...') - self.database = database self.service_handler_factory = service_handler_factory LOGGER.debug('Servicer Created') @@ -96,7 +84,7 @@ class ServiceServiceServicerImpl(ServiceServiceServicer): raise AlreadyExistsException( 'Service', service_uuid, extra_details='context_uuid={:s}'.format(str(context_uuid))) - # just create the service in the database to lock the service_id + # just create the service in the Context database to lock the service_id # update will perform changes on the resources service_id = context_client.SetService(request) return service_id @@ -105,82 +93,63 @@ class ServiceServiceServicerImpl(ServiceServiceServicer): def UpdateService(self, request : Service, context : grpc.ServicerContext) -> ServiceId: LOGGER.info('[UpdateService] begin ; request = {:s}'.format(grpc_message_to_json_string(request))) - service_id = request.service_id - service_uuid = service_id.service_uuid.uuid - service_context_uuid = service_id.context_id.context_uuid.uuid - - pathcomp_request = PathCompRequest() - pathcomp_request.services.append(request) - pathcomp_request.services.k_shortest_path.k_inspection = 5 - pathcomp_request.services.k_shortest_path.k_return = 5 - - pathcomp = PathCompClient() - pathcomp_response = pathcomp.Compute(pathcomp_request) - - # convert from a unordered lists of services and connections to a list of ordered items - # that fulfill interdependencies among them. E.g., a service cannot be created if connections - # supporting that service still does not exist. - resolution = resolve_dependencies(pathcomp_response) - - # implement changes + # Set service status to "SERVICESTATUS_PLANNED" to ensure rest of components are aware the service is + # being modified. context_client = ContextClient() - device_client = DeviceClient() - for (obj_type, obj_key), (grpc_objid, grpc_obj) in resolution: - if grpc_obj is None: - # check if the resource already exists - if obj_type == ObjectType.CONNECTION.value: - connection = get_connection(context_client, grpc_objid) - if connection is None: raise NotFoundException('Connection', obj_key) - elif obj_type == ObjectType.SERVICE.value: - service = get_service(context_client, grpc_objid) - if service is None: raise NotFoundException('Service', obj_key) - else: - MSG_EXTRA_DETAILS = 'obj_type={:s} obj_key={:s} grpc_objid={:s} grpc_obj={:s}' - str_grpc_obj = 'None' if grpc_obj is None else grpc_message_to_json_string(grpc_obj) - str_grpc_objid = 'None' if grpc_objid is None else grpc_message_to_json_string(grpc_objid) - msg_extra_details = MSG_EXTRA_DETAILS.format(obj_type, obj_key, str_grpc_objid, str_grpc_obj) - raise NotImplementedException('Empty Dependency', extra_details=msg_extra_details) + _service : Optional[Service] = get_service(context_client, request.service_id) + service = Service() + service.CopyFrom(request if _service is None else _service) + service.service_status.service_status = ServiceStatusEnum.SERVICESTATUS_PLANNED + context_client.SetService(service) + + num_disjoint_paths = None + for constraint in request.service_constraints: + if constraint.WhichOneof('constraint') == 'sla_availability': + num_disjoint_paths = constraint.sla_availability.num_disjoint_paths + break + + tasks_scheduler = TasksScheduler(self.service_handler_factory) + if len(request.service_endpoint_ids) >= (2 if num_disjoint_paths is None else 4): + pathcomp_request = PathCompRequest() + pathcomp_request.services.append(request) + + if num_disjoint_paths is None: + pathcomp_request.shortest_path.Clear() else: - # create/update the resource - if obj_type == ObjectType.CONNECTION.value: - update_connection(context_client, device_client, self.service_handler_factory, grpc_obj) - context_client.SetConnection(grpc_obj) - elif obj_type == ObjectType.SERVICE.value: - update_service(context_client, device_client, self.service_handler_factory, grpc_obj) - context_client.SetService(grpc_obj) - else: - MSG_EXTRA_DETAILS = 'obj_type={:s} obj_key={:s} grpc_objid={:s} grpc_obj={:s}' - str_grpc_obj = 'None' if grpc_obj is None else grpc_message_to_json_string(grpc_obj) - str_grpc_objid = 'None' if grpc_objid is None else grpc_message_to_json_string(grpc_objid) - msg_extra_details = MSG_EXTRA_DETAILS.format(obj_type, obj_key, str_grpc_objid, str_grpc_obj) - raise NotImplementedException('Specified Dependency', extra_details=msg_extra_details) + pathcomp_request.k_disjoint_path.num_disjoint = num_disjoint_paths + pathcomp = PathCompClient() + LOGGER.info('pathcomp_request={:s}'.format(grpc_message_to_json_string(pathcomp_request))) + pathcomp_reply = pathcomp.Compute(pathcomp_request) + LOGGER.info('pathcomp_reply={:s}'.format(grpc_message_to_json_string(pathcomp_reply))) + + # Feed TaskScheduler with this path computation reply. TaskScheduler identifies inter-dependencies among + # the services and connections retrieved and produces a schedule of tasks (an ordered list of tasks to be + # executed) to implement the requested create/update operation. + tasks_scheduler.compose_from_pathcompreply(pathcomp_reply, is_delete=False) + + tasks_scheduler.execute_all() return request.service_id @safe_and_metered_rpc_method(METRICS, LOGGER) def DeleteService(self, request : ServiceId, context : grpc.ServicerContext) -> Empty: LOGGER.info('[DeleteService] begin ; request = {:s}'.format(grpc_message_to_json_string(request))) - pce = PathComputationElement() - pce.load_topology(self.context_client) - pce.load_connectivity(self.context_client, request) - #pce.dump_topology_to_file('../data/topo.dot') - #pce.dump_connectivity_to_file('../data/conn-before.txt') - connectivity = pce.get_connectivity_from_service_id(request) - if connectivity is None: return Empty() - #pce.dump_connectivity_to_file('../data/conn-after.txt') - - LOGGER.info('[DeleteService] connectivity = {:s}'.format(str(dump_connectivity(connectivity)))) - - for connection in connectivity.get('connections'): - delete_service( - self.database, self.context_client, self.device_client, self.service_handler_factory, - request, connection) - - for sub_service, sub_connections in connectivity.get('requirements', []): - for sub_connection in sub_connections: - delete_service( - self.database, self.context_client, self.device_client, self.service_handler_factory, - sub_service.service_id, sub_connection) + context_client = ContextClient() + # Set service status to "SERVICESTATUS_PENDING_REMOVAL" to ensure rest of components are aware the service is + # being modified. + _service : Optional[Service] = get_service(context_client, request) + if _service is None: raise Exception('Service({:s}) not found'.format(grpc_message_to_json_string(request))) + service = Service() + service.CopyFrom(_service) + service.service_status.service_status = ServiceStatusEnum.SERVICESTATUS_PENDING_REMOVAL + context_client.SetService(service) + + # Feed TaskScheduler with this service and the sub-services and sub-connections related to this service. + # TaskScheduler identifies inter-dependencies among them and produces a schedule of tasks (an ordered list of + # tasks to be executed) to implement the requested delete operation. + tasks_scheduler = TasksScheduler(self.service_handler_factory) + tasks_scheduler.compose_from_service(service, is_delete=True) + tasks_scheduler.execute_all() return Empty() diff --git a/src/service/service/Tools.py b/src/service/service/Tools.py deleted file mode 100644 index ea4369fd5..000000000 --- a/src/service/service/Tools.py +++ /dev/null @@ -1,358 +0,0 @@ -# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/) -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import grpc, logging -from typing import Any, Dict, List, Optional, Tuple -from common.orm.Database import Database -from common.orm.HighLevel import get_object, get_related_objects -from common.orm.backend.Tools import key_to_str -from common.proto.context_pb2 import ( - ConfigRule, Connection, ConnectionId, Constraint, EndPointId, Service, ServiceId, ServiceStatusEnum) -from common.rpc_method_wrapper.ServiceExceptions import ( - InvalidArgumentException, NotFoundException, OperationFailedException) -from context.client.ContextClient import ContextClient -from device.client.DeviceClient import DeviceClient -from .database.ConfigModel import ( - ConfigModel, ConfigRuleModel, ORM_ConfigActionEnum, get_config_rules, grpc_config_rules_to_raw) -from .database.ConstraintModel import ConstraintModel, ConstraintsModel, get_constraints, grpc_constraints_to_raw -from .database.DatabaseDeviceTools import sync_device_from_context -from .database.DatabaseServiceTools import ( - delete_service_from_context, sync_service_from_context, sync_service_to_context, update_service_in_local_database) -from .database.DeviceModel import DeviceModel, DriverModel -from .database.EndPointModel import EndPointModel, grpc_endpointids_to_raw -from .database.RelationModels import ServiceEndPointModel -from .database.ServiceModel import ServiceModel -from .service_handler_api._ServiceHandler import _ServiceHandler -from .service_handler_api.FilterFields import FilterFieldEnum -from .service_handler_api.ServiceHandlerFactory import ServiceHandlerFactory -from .service_handler_api.Tools import ( - check_errors_deleteconfig, check_errors_deleteconstraint, check_errors_deleteendpoint, check_errors_setconfig, - check_errors_setconstraint, check_errors_setendpoint) - -LOGGER = logging.getLogger(__name__) - -def get_connection(context_client : ContextClient, connection_id : ConnectionId) -> Optional[Connection]: - try: - connection : Connection = context_client.GetConnection(connection_id) - return connection - except grpc.RpcError as e: - if e.code() != grpc.StatusCode.NOT_FOUND: raise # pylint: disable=no-member - return None - -def get_service(context_client : ContextClient, service_id : ServiceId) -> Optional[Service]: - try: - service : Service = context_client.GetService(service_id) - return service - except grpc.RpcError as e: - if e.code() != grpc.StatusCode.NOT_FOUND: raise # pylint: disable=no-member - return None - -def sync_devices_from_context( - context_client : ContextClient, database : Database, db_service : Optional[ServiceModel], - service_endpoint_ids : List[EndPointId] - ) -> Dict[str, DeviceModel]: - - required_device_uuids = set() - if db_service is not None: - db_endpoints = get_related_objects(db_service, ServiceEndPointModel, 'endpoint_fk') - for db_endpoint in db_endpoints: - db_device = get_object(database, DeviceModel, db_endpoint.device_fk, raise_if_not_found=False) - required_device_uuids.add(db_device.device_uuid) - - for endpoint_id in service_endpoint_ids: - required_device_uuids.add(endpoint_id.device_id.device_uuid.uuid) - - db_devices = {} - devices_not_found = set() - for device_uuid in required_device_uuids: - sync_device_from_context(device_uuid, context_client, database) - db_device = get_object(database, DeviceModel, device_uuid, raise_if_not_found=False) - if db_device is None: - devices_not_found.add(device_uuid) - else: - db_devices[device_uuid] = db_device - - if len(devices_not_found) > 0: - extra_details = ['Devices({:s}) cannot be retrieved from Context'.format(str(devices_not_found))] - raise NotFoundException('Device', '...', extra_details=extra_details) - - return db_devices - -def classify_config_rules( - db_service : ServiceModel, service_config_rules : List[ConfigRule], - resources_to_set: List[Tuple[str, Any]], resources_to_delete : List[Tuple[str, Any]]): - - context_config_rules = get_config_rules(db_service.database, db_service.pk, 'running') - context_config_rules = {config_rule[1]: config_rule[2] for config_rule in context_config_rules} - #LOGGER.info('[classify_config_rules] context_config_rules = {:s}'.format(str(context_config_rules))) - - request_config_rules = grpc_config_rules_to_raw(service_config_rules) - #LOGGER.info('[classify_config_rules] request_config_rules = {:s}'.format(str(request_config_rules))) - - for config_rule in request_config_rules: - action, key, value = config_rule - if action == ORM_ConfigActionEnum.SET: - if (key not in context_config_rules) or (context_config_rules[key] != value): - resources_to_set.append((key, value)) - elif action == ORM_ConfigActionEnum.DELETE: - if key in context_config_rules: - resources_to_delete.append((key, value)) - else: - raise InvalidArgumentException('config_rule.action', str(action), extra_details=str(request_config_rules)) - - #LOGGER.info('[classify_config_rules] resources_to_set = {:s}'.format(str(resources_to_set))) - #LOGGER.info('[classify_config_rules] resources_to_delete = {:s}'.format(str(resources_to_delete))) - -def classify_constraints( - db_service : ServiceModel, service_constraints : List[Constraint], - constraints_to_set: List[Tuple[str, str]], constraints_to_delete : List[Tuple[str, str]]): - - context_constraints = get_constraints(db_service.database, db_service.pk, 'running') - context_constraints = {constraint[0]: constraint[1] for constraint in context_constraints} - #LOGGER.info('[classify_constraints] context_constraints = {:s}'.format(str(context_constraints))) - - request_constraints = grpc_constraints_to_raw(service_constraints) - #LOGGER.info('[classify_constraints] request_constraints = {:s}'.format(str(request_constraints))) - - for constraint in request_constraints: - constraint_type, constraint_value = constraint - if constraint_type in context_constraints: - if context_constraints[constraint_type] != constraint_value: - constraints_to_set.append(constraint) - else: - constraints_to_set.append(constraint) - context_constraints.pop(constraint_type, None) - - for constraint in context_constraints: - constraints_to_delete.append(constraint) - - #LOGGER.info('[classify_constraints] constraints_to_set = {:s}'.format(str(constraints_to_set))) - #LOGGER.info('[classify_constraints] constraints_to_delete = {:s}'.format(str(constraints_to_delete))) - -def get_service_endpointids(db_service : ServiceModel) -> List[Tuple[str, str, Optional[str]]]: - db_endpoints : List[EndPointModel] = get_related_objects(db_service, ServiceEndPointModel, 'endpoint_fk') - endpoint_ids = [db_endpoint.dump_id() for db_endpoint in db_endpoints] - return [ - (endpoint_id['device_id']['device_uuid']['uuid'], endpoint_id['endpoint_uuid']['uuid'], - endpoint_id.get('topology_id', {}).get('topology_uuid', {}).get('uuid', None)) - for endpoint_id in endpoint_ids - ] - -def classify_endpointids( - db_service : ServiceModel, service_endpoint_ids : List[EndPointId], - endpointids_to_set: List[Tuple[str, str, Optional[str]]], - endpointids_to_delete : List[Tuple[str, str, Optional[str]]]): - - context_endpoint_ids = get_service_endpointids(db_service) - #LOGGER.info('[classify_endpointids] context_endpoint_ids = {:s}'.format(str(context_endpoint_ids))) - context_endpoint_ids = set(context_endpoint_ids) - #LOGGER.info('[classify_endpointids] context_endpoint_ids = {:s}'.format(str(context_endpoint_ids))) - - request_endpoint_ids = grpc_endpointids_to_raw(service_endpoint_ids) - #LOGGER.info('[classify_endpointids] request_endpoint_ids = {:s}'.format(str(request_endpoint_ids))) - - if len(service_endpoint_ids) != 2: return - for endpoint_id in request_endpoint_ids: - #if endpoint_id not in context_endpoint_ids: - # endpointids_to_set.append(endpoint_id) - #context_endpoint_ids.discard(endpoint_id) - endpointids_to_set.append(endpoint_id) - - #for endpoint_id in context_endpoint_ids: - # endpointids_to_delete.append(endpoint_id) - - #LOGGER.info('[classify_endpointids] endpointids_to_set = {:s}'.format(str(endpointids_to_set))) - #LOGGER.info('[classify_endpointids] endpointids_to_delete = {:s}'.format(str(endpointids_to_delete))) - -def get_service_handler_class( - service_handler_factory : ServiceHandlerFactory, db_service : ServiceModel, db_devices : Dict[str, DeviceModel] - ) -> Optional[_ServiceHandler]: - - str_service_key = db_service.pk - database = db_service.database - - # Assume all devices involved in the service must support at least one driver in common - device_drivers = None - for _,db_device in db_devices.items(): - db_driver_pks = db_device.references(DriverModel) - db_driver_names = [DriverModel(database, pk).driver.value for pk,_ in db_driver_pks] - if device_drivers is None: - device_drivers = set(db_driver_names) - else: - device_drivers.intersection_update(db_driver_names) - - filter_fields = { - FilterFieldEnum.SERVICE_TYPE.value : db_service.service_type.value, # must be supported - FilterFieldEnum.DEVICE_DRIVER.value : device_drivers, # at least one must be supported - } - - msg = 'Selecting service handler for service({:s}) with filter_fields({:s})...' - LOGGER.info(msg.format(str(str_service_key), str(filter_fields))) - service_handler_class = service_handler_factory.get_service_handler_class(**filter_fields) - msg = 'ServiceHandler({:s}) selected for service({:s}) with filter_fields({:s})...' - LOGGER.info(msg.format(str(service_handler_class.__name__), str(str_service_key), str(filter_fields))) - return service_handler_class - -def update_service( - database : Database, context_client : ContextClient, device_client : DeviceClient, - service_handler_factory : ServiceHandlerFactory, service : Service, connection : Connection - ) -> ServiceModel: - - service_id = service.service_id - service_uuid = service_id.service_uuid.uuid - service_context_uuid = service_id.context_id.context_uuid.uuid - str_service_key = key_to_str([service_context_uuid, service_uuid]) - - # Sync before updating service to ensure we have devices, endpoints, constraints, and config rules to be - # set/deleted before actuallymodifying them in the local in-memory database. - - sync_service_from_context(service_context_uuid, service_uuid, context_client, database) - db_service = get_object(database, ServiceModel, str_service_key, raise_if_not_found=False) - db_devices = sync_devices_from_context(context_client, database, db_service, service.service_endpoint_ids) - - if db_service is None: db_service,_ = update_service_in_local_database(database, service) - LOGGER.info('[update_service] db_service = {:s}'.format(str(db_service.dump( - include_endpoint_ids=True, include_constraints=True, include_config_rules=True)))) - - resources_to_set : List[Tuple[str, Any]] = [] # resource_key, resource_value - resources_to_delete : List[Tuple[str, Any]] = [] # resource_key, resource_value - classify_config_rules(db_service, service.service_config.config_rules, resources_to_set, resources_to_delete) - - constraints_to_set : List[Tuple[str, str]] = [] # constraint_type, constraint_value - constraints_to_delete : List[Tuple[str, str]] = [] # constraint_type, constraint_value - classify_constraints(db_service, service.service_constraints, constraints_to_set, constraints_to_delete) - - endpointids_to_set : List[Tuple[str, str, Optional[str]]] = [] # device_uuid, endpoint_uuid, topology_uuid - endpointids_to_delete : List[Tuple[str, str, Optional[str]]] = [] # device_uuid, endpoint_uuid, topology_uuid - classify_endpointids(db_service, service.service_endpoint_ids, endpointids_to_set, endpointids_to_delete) - - service_handler_class = get_service_handler_class(service_handler_factory, db_service, db_devices) - service_handler_settings = {} - service_handler : _ServiceHandler = service_handler_class( - db_service, database, context_client, device_client, **service_handler_settings) - - errors = [] - - if len(errors) == 0: - results_deleteendpoint = service_handler.DeleteEndpoint(endpointids_to_delete) - errors.extend(check_errors_deleteendpoint(endpointids_to_delete, results_deleteendpoint)) - - if len(errors) == 0: - results_deleteconstraint = service_handler.DeleteConstraint(constraints_to_delete) - errors.extend(check_errors_deleteconstraint(constraints_to_delete, results_deleteconstraint)) - - if len(errors) == 0: - results_deleteconfig = service_handler.DeleteConfig(resources_to_delete) - errors.extend(check_errors_deleteconfig(resources_to_delete, results_deleteconfig)) - - if len(errors) == 0: - results_setconfig = service_handler.SetConfig(resources_to_set) - errors.extend(check_errors_setconfig(resources_to_set, results_setconfig)) - - if len(errors) == 0: - results_setconstraint = service_handler.SetConstraint(constraints_to_set) - errors.extend(check_errors_setconstraint(constraints_to_set, results_setconstraint)) - - if len(errors) == 0: - results_setendpoint = service_handler.SetEndpoint(endpointids_to_set) - errors.extend(check_errors_setendpoint(endpointids_to_set, results_setendpoint)) - - if len(errors) > 0: - raise OperationFailedException('UpdateService', extra_details=errors) - - LOGGER.info('[update_service] len(service.service_endpoint_ids) = {:d}'.format(len(service.service_endpoint_ids))) - if len(service.service_endpoint_ids) >= 2: - service.service_status.service_status = ServiceStatusEnum.SERVICESTATUS_ACTIVE - - db_service,_ = update_service_in_local_database(database, service) - LOGGER.info('[update_service] db_service = {:s}'.format(str(db_service.dump( - include_endpoint_ids=True, include_constraints=True, include_config_rules=True)))) - - sync_service_to_context(db_service, context_client) - context_client.SetConnection(connection) - return db_service - -def delete_service( - database : Database, context_client : ContextClient, device_client : DeviceClient, - service_handler_factory : ServiceHandlerFactory, service_id : ServiceId, connection : Connection - ) -> None: - - context_client.RemoveConnection(connection.connection_id) - - service_uuid = service_id.service_uuid.uuid - service_context_uuid = service_id.context_id.context_uuid.uuid - str_service_key = key_to_str([service_context_uuid, service_uuid]) - - # Sync before updating service to ensure we have devices, endpoints, constraints, and config rules to be - # set/deleted before actuallymodifying them in the local in-memory database. - - sync_service_from_context(service_context_uuid, service_uuid, context_client, database) - db_service : ServiceModel = get_object(database, ServiceModel, str_service_key, raise_if_not_found=False) - if db_service is None: return - LOGGER.info('[delete_service] db_service = {:s}'.format(str(db_service.dump( - include_endpoint_ids=True, include_constraints=True, include_config_rules=True)))) - - db_devices = sync_devices_from_context(context_client, database, db_service, []) - - resources_to_delete : List[Tuple[str, str]] = [ # resource_key, resource_value - (config_rule[1], config_rule[2]) - for config_rule in get_config_rules(db_service.database, db_service.pk, 'running') - ] - - constraints_to_delete : List[Tuple[str, str]] = [ # constraint_type, constraint_value - (constraint[0], constraint[1]) - for constraint in get_constraints(db_service.database, db_service.pk, 'running') - ] - - # device_uuid, endpoint_uuid, topology_uuid - endpointids_to_delete : List[Tuple[str, str, Optional[str]]] = list(set(get_service_endpointids(db_service))) - - service_handler_class = get_service_handler_class(service_handler_factory, db_service, db_devices) - service_handler_settings = {} - service_handler : _ServiceHandler = service_handler_class( - db_service, database, context_client, device_client, **service_handler_settings) - - errors = [] - - if len(errors) == 0: - results_deleteendpoint = service_handler.DeleteEndpoint(endpointids_to_delete) - errors.extend(check_errors_deleteendpoint(endpointids_to_delete, results_deleteendpoint)) - - if len(errors) == 0: - results_deleteconstraint = service_handler.DeleteConstraint(constraints_to_delete) - errors.extend(check_errors_deleteconstraint(constraints_to_delete, results_deleteconstraint)) - - if len(errors) == 0: - results_deleteconfig = service_handler.DeleteConfig(resources_to_delete) - errors.extend(check_errors_deleteconfig(resources_to_delete, results_deleteconfig)) - - if len(errors) > 0: - raise OperationFailedException('DeleteService', extra_details=errors) - - delete_service_from_context(db_service, context_client) - - for db_service_endpoint_pk,_ in db_service.references(ServiceEndPointModel): - ServiceEndPointModel(database, db_service_endpoint_pk).delete() - - db_running_config = ConfigModel(database, db_service.service_config_fk) - for db_config_rule_pk,_ in db_running_config.references(ConfigRuleModel): - ConfigRuleModel(database, db_config_rule_pk).delete() - - db_running_constraints = ConstraintsModel(database, db_service.service_constraints_fk) - for db_constraint_pk,_ in db_running_constraints.references(ConstraintModel): - ConstraintModel(database, db_constraint_pk).delete() - - db_service.delete() - db_running_config.delete() - db_running_constraints.delete() diff --git a/src/service/service/service_handler_api/FilterFields.py b/src/service/service/service_handler_api/FilterFields.py index 9d8f9ad28..98113ba30 100644 --- a/src/service/service/service_handler_api/FilterFields.py +++ b/src/service/service/service_handler_api/FilterFields.py @@ -13,15 +13,30 @@ # limitations under the License. from enum import Enum -from service.service.database.ServiceModel import ORM_ServiceTypeEnum -from service.service.database.DeviceModel import ORM_DeviceDriverEnum +from common.proto.context_pb2 import DeviceDriverEnum, ServiceTypeEnum class FilterFieldEnum(Enum): SERVICE_TYPE = 'service_type' DEVICE_DRIVER = 'device_driver' +SERVICE_TYPE_VALUES = { + ServiceTypeEnum.SERVICETYPE_UNKNOWN, + ServiceTypeEnum.SERVICETYPE_L3NM, + ServiceTypeEnum.SERVICETYPE_L2NM, + ServiceTypeEnum.SERVICETYPE_TAPI_CONNECTIVITY_SERVICE, +} + +DEVICE_DRIVER_VALUES = { + DeviceDriverEnum.DEVICEDRIVER_UNDEFINED, + DeviceDriverEnum.DEVICEDRIVER_OPENCONFIG, + DeviceDriverEnum.DEVICEDRIVER_TRANSPORT_API, + DeviceDriverEnum.DEVICEDRIVER_P4, + DeviceDriverEnum.DEVICEDRIVER_IETF_NETWORK_TOPOLOGY, + DeviceDriverEnum.DEVICEDRIVER_ONF_TR_352, +} + # Map allowed filter fields to allowed values per Filter field. If no restriction (free text) None is specified FILTER_FIELD_ALLOWED_VALUES = { - FilterFieldEnum.SERVICE_TYPE.value : {i.value for i in ORM_ServiceTypeEnum}, - FilterFieldEnum.DEVICE_DRIVER.value : {i.value for i in ORM_DeviceDriverEnum}, + FilterFieldEnum.SERVICE_TYPE.value : SERVICE_TYPE_VALUES, + FilterFieldEnum.DEVICE_DRIVER.value : DEVICE_DRIVER_VALUES, } diff --git a/src/service/service/service_handler_api/ServiceHandlerFactory.py b/src/service/service/service_handler_api/ServiceHandlerFactory.py index 8b7223a95..09a56775d 100644 --- a/src/service/service/service_handler_api/ServiceHandlerFactory.py +++ b/src/service/service/service_handler_api/ServiceHandlerFactory.py @@ -14,7 +14,9 @@ import logging, operator from enum import Enum -from typing import Any, Dict, Iterable, List, Set, Tuple +from typing import Any, Dict, Iterable, List, Optional, Set, Tuple +from common.proto.context_pb2 import Device, Service +from common.tools.grpc.Tools import grpc_message_to_json_string from service.service.service_handler_api._ServiceHandler import _ServiceHandler from .Exceptions import ( UnsatisfiedFilterException, UnsupportedServiceHandlerClassException, UnsupportedFilterFieldException, @@ -91,3 +93,40 @@ class ServiceHandlerFactory: candidate_service_handler_classes = sorted( candidate_service_handler_classes.items(), key=operator.itemgetter(1), reverse=True) return candidate_service_handler_classes[0][0] + +def get_device_supported_drivers(device : Device) -> Set[int]: + return {device_driver for device_driver in device.device_drivers} + +def get_common_device_drivers(drivers_per_device : List[Set[int]]) -> Set[int]: + common_device_drivers = None + for device_drivers in drivers_per_device: + if common_device_drivers is None: + common_device_drivers = set(device_drivers) + else: + common_device_drivers.intersection_update(device_drivers) + if common_device_drivers is None: common_device_drivers = set() + return common_device_drivers + +def get_service_handler_class( + service_handler_factory : ServiceHandlerFactory, service : Service, connection_devices : Dict[str, Device] +) -> Optional[_ServiceHandler]: + + str_service_key = grpc_message_to_json_string(service.service_id) + + # Assume all devices involved in the service's connection must support at least one driver in common + common_device_drivers = get_common_device_drivers([ + get_device_supported_drivers(device) + for device in connection_devices.values() + ]) + + filter_fields = { + FilterFieldEnum.SERVICE_TYPE.value : service.service_type, # must be supported + FilterFieldEnum.DEVICE_DRIVER.value : common_device_drivers, # at least one must be supported + } + + MSG = 'Selecting service handler for service({:s}) with filter_fields({:s})...' + LOGGER.info(MSG.format(str(str_service_key), str(filter_fields))) + service_handler_class = service_handler_factory.get_service_handler_class(**filter_fields) + MSG = 'ServiceHandler({:s}) selected for service({:s}) with filter_fields({:s})...' + LOGGER.info(MSG.format(str(service_handler_class.__name__), str(str_service_key), str(filter_fields))) + return service_handler_class diff --git a/src/service/service/service_handler_api/_ServiceHandler.py b/src/service/service/service_handler_api/_ServiceHandler.py index e724ebcc9..170e842cd 100644 --- a/src/service/service/service_handler_api/_ServiceHandler.py +++ b/src/service/service/service_handler_api/_ServiceHandler.py @@ -38,12 +38,16 @@ class _ServiceHandler: """ raise NotImplementedError() - def SetEndpoint(self, endpoints : List[Tuple[str, str, Optional[str]]]) -> List[Union[bool, Exception]]: + def SetEndpoint( + self, endpoints : List[Tuple[str, str, Optional[str]]], connection_uuid : Optional[str] = None + ) -> List[Union[bool, Exception]]: """ Set endpoints from a list. Parameters: endpoints : List[Tuple[str, str, Optional[str]]] List of tuples, each containing a device_uuid, endpoint_uuid and, optionally, the topology_uuid of the endpoint to be added. + connection_uuid : Optional[str] + If specified, is the UUID of the connection this endpoint is associated to. Returns: results : List[Union[bool, Exception]] List of results for endpoint changes requested. Return values must be in the same order than @@ -52,12 +56,16 @@ class _ServiceHandler: """ raise NotImplementedError() - def DeleteEndpoint(self, endpoints : List[Tuple[str, str, Optional[str]]]) -> List[Union[bool, Exception]]: + def DeleteEndpoint( + self, endpoints : List[Tuple[str, str, Optional[str]]], connection_uuid : Optional[str] = None + ) -> List[Union[bool, Exception]]: """ Delete endpoints form a list. Parameters: endpoints : List[Tuple[str, str, Optional[str]]] List of tuples, each containing a device_uuid, endpoint_uuid, and the topology_uuid of the endpoint to be removed. + connection_uuid : Optional[str] + If specified, is the UUID of the connection this endpoint is associated to. Returns: results : List[Union[bool, Exception]] List of results for endpoint deletions requested. Return values must be in the same order than diff --git a/src/service/service/service_handlers/__init__.py b/src/service/service/service_handlers/__init__.py index 33e345c42..89e717722 100644 --- a/src/service/service/service_handlers/__init__.py +++ b/src/service/service/service_handlers/__init__.py @@ -12,7 +12,8 @@ # See the License for the specific language governing permissions and # limitations under the License. -from ..service_handler_api.FilterFields import FilterFieldEnum, ORM_DeviceDriverEnum, ORM_ServiceTypeEnum +from common.proto.context_pb2 import DeviceDriverEnum, ServiceTypeEnum +from ..service_handler_api.FilterFields import FilterFieldEnum from .l2nm_emulated.L2NMEmulatedServiceHandler import L2NMEmulatedServiceHandler from .l3nm_emulated.L3NMEmulatedServiceHandler import L3NMEmulatedServiceHandler from .l3nm_openconfig.L3NMOpenConfigServiceHandler import L3NMOpenConfigServiceHandler @@ -21,26 +22,26 @@ from .tapi_tapi.TapiServiceHandler import TapiServiceHandler SERVICE_HANDLERS = [ (L2NMEmulatedServiceHandler, [ { - FilterFieldEnum.SERVICE_TYPE : ORM_ServiceTypeEnum.L2NM, - FilterFieldEnum.DEVICE_DRIVER : ORM_DeviceDriverEnum.UNDEFINED, + FilterFieldEnum.SERVICE_TYPE : ServiceTypeEnum.SERVICETYPE_L2NM, + FilterFieldEnum.DEVICE_DRIVER : DeviceDriverEnum.DEVICEDRIVER_UNDEFINED, } ]), (L3NMEmulatedServiceHandler, [ { - FilterFieldEnum.SERVICE_TYPE : ORM_ServiceTypeEnum.L3NM, - FilterFieldEnum.DEVICE_DRIVER : ORM_DeviceDriverEnum.UNDEFINED, + FilterFieldEnum.SERVICE_TYPE : ServiceTypeEnum.SERVICETYPE_L3NM, + FilterFieldEnum.DEVICE_DRIVER : DeviceDriverEnum.DEVICEDRIVER_UNDEFINED, } ]), (L3NMOpenConfigServiceHandler, [ { - FilterFieldEnum.SERVICE_TYPE : ORM_ServiceTypeEnum.L3NM, - FilterFieldEnum.DEVICE_DRIVER : ORM_DeviceDriverEnum.OPENCONFIG, + FilterFieldEnum.SERVICE_TYPE : ServiceTypeEnum.SERVICETYPE_L3NM, + FilterFieldEnum.DEVICE_DRIVER : DeviceDriverEnum.DEVICEDRIVER_OPENCONFIG, } ]), (TapiServiceHandler, [ { - FilterFieldEnum.SERVICE_TYPE : ORM_ServiceTypeEnum.TAPI_CONNECTIVITY_SERVICE, - FilterFieldEnum.DEVICE_DRIVER : ORM_DeviceDriverEnum.TRANSPORT_API, + FilterFieldEnum.SERVICE_TYPE : ServiceTypeEnum.SERVICETYPE_TAPI_CONNECTIVITY_SERVICE, + FilterFieldEnum.DEVICE_DRIVER : DeviceDriverEnum.DEVICEDRIVER_TRANSPORT_API, } ]), ] diff --git a/src/service/service/service_handlers/l2nm_emulated/ConfigRules.py b/src/service/service/service_handlers/l2nm_emulated/ConfigRules.py new file mode 100644 index 000000000..be0f1fda5 --- /dev/null +++ b/src/service/service/service_handlers/l2nm_emulated/ConfigRules.py @@ -0,0 +1,268 @@ +# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from typing import Dict, List +from common.proto.context_pb2 import EndPointId, Service +from common.tools.object_factory.ConfigRule import json_config_rule_delete, json_config_rule_set +from service.service.service_handler_api.AnyTreeTools import TreeNode + +def setup_config_rules( + service_uuid : str, connection_uuid : str, device_uuid : str, endpoint_uuid : str, + service_settings : TreeNode, endpoint_settings : TreeNode +) -> List[Dict]: + + connection_short_uuid = connection_uuid.split('-')[-1] + network_instance_name = '{:s}-NetInst'.format(connection_short_uuid) + network_interface_desc = '{:s}-NetIf'.format(connection_uuid) + network_subinterface_desc = '{:s}-NetSubIf'.format(connection_uuid) + + if service_settings is None: + # MSG = 'Unable to retrieve settings for Service({:s})' + #raise Exception(MSG.format(connection_uuid)) + mtu = 1450 + bgp_as = 0 + bgp_route_target = '0:0' + else: + json_settings : Dict = service_settings.value + mtu = json_settings.get('mtu', 1450 ) # 1512 + #address_families = json_settings.get('address_families', [] ) # ['IPV4'] + bgp_as = json_settings.get('bgp_as', 0 ) # 65000 + bgp_route_target = json_settings.get('bgp_route_target', '0:0') # 65000:333 + + if endpoint_settings is None: + #MSG = 'Unable to retrieve settings for device({:s}/endpoint({:s}) in service({:s})' + #raise Exception(MSG.format(device_uuid, endpoint_uuid, connection_uuid)) + route_distinguisher = '0:0' + sub_interface_index = 0 + vlan_id = 1 + address_ip = '0.0.0.0' + address_prefix = 24 + if_subif_name = '{:s}.{:d}'.format(endpoint_uuid, vlan_id) + else: + json_endpoint_settings : Dict = endpoint_settings.value + #router_id = json_endpoint_settings.get('router_id', '0.0.0.0') # '10.95.0.10' + route_distinguisher = json_endpoint_settings.get('route_distinguisher', '0:0' ) # '60001:801' + sub_interface_index = json_endpoint_settings.get('sub_interface_index', 0 ) # 1 + vlan_id = json_endpoint_settings.get('vlan_id', 1 ) # 400 + address_ip = json_endpoint_settings.get('address_ip', '0.0.0.0') # '2.2.2.1' + address_prefix = json_endpoint_settings.get('address_prefix', 24 ) # 30 + if_subif_name = '{:s}.{:d}'.format(endpoint_uuid, vlan_id) + + json_config_rules = [ + json_config_rule_set( + '/network_instance[{:s}]'.format(network_instance_name), { + 'name': network_instance_name, 'description': network_interface_desc, 'type': 'L3VRF', + 'route_distinguisher': route_distinguisher, + #'router_id': router_id, 'address_families': address_families, + }), + json_config_rule_set( + '/interface[{:s}]'.format(endpoint_uuid), { + 'name': endpoint_uuid, 'description': network_interface_desc, 'mtu': mtu, + }), + json_config_rule_set( + '/interface[{:s}]/subinterface[{:d}]'.format(endpoint_uuid, sub_interface_index), { + 'name': endpoint_uuid, 'index': sub_interface_index, + 'description': network_subinterface_desc, 'vlan_id': vlan_id, + 'address_ip': address_ip, 'address_prefix': address_prefix, + }), + json_config_rule_set( + '/network_instance[{:s}]/interface[{:s}]'.format(network_instance_name, if_subif_name), { + 'name': network_instance_name, 'id': if_subif_name, 'interface': endpoint_uuid, + 'subinterface': sub_interface_index, + }), + json_config_rule_set( + '/network_instance[{:s}]/protocols[BGP]'.format(network_instance_name), { + 'name': network_instance_name, 'identifier': 'BGP', 'protocol_name': 'BGP', 'as': bgp_as, + }), + json_config_rule_set( + '/network_instance[{:s}]/table_connections[STATIC][BGP][IPV4]'.format(network_instance_name), { + 'name': network_instance_name, 'src_protocol': 'STATIC', 'dst_protocol': 'BGP', + 'address_family': 'IPV4', #'default_import_policy': 'REJECT_ROUTE', + }), + json_config_rule_set( + '/network_instance[{:s}]/table_connections[DIRECTLY_CONNECTED][BGP][IPV4]'.format( + network_instance_name), { + 'name': network_instance_name, 'src_protocol': 'DIRECTLY_CONNECTED', 'dst_protocol': 'BGP', + 'address_family': 'IPV4', #'default_import_policy': 'REJECT_ROUTE', + }), + json_config_rule_set( + '/routing_policy/bgp_defined_set[{:s}_rt_import]'.format(network_instance_name), { + 'ext_community_set_name': '{:s}_rt_import'.format(network_instance_name), + }), + json_config_rule_set( + '/routing_policy/bgp_defined_set[{:s}_rt_import][route-target:{:s}]'.format( + network_instance_name, bgp_route_target), { + 'ext_community_set_name': '{:s}_rt_import'.format(network_instance_name), + 'ext_community_member' : 'route-target:{:s}'.format(bgp_route_target), + }), + json_config_rule_set( + '/routing_policy/policy_definition[{:s}_import]'.format(network_instance_name), { + 'policy_name': '{:s}_import'.format(network_instance_name), + }), + json_config_rule_set( + '/routing_policy/policy_definition[{:s}_import]/statement[{:s}]'.format( + network_instance_name, '3'), { + 'policy_name': '{:s}_import'.format(network_instance_name), 'statement_name': '3', + 'ext_community_set_name': '{:s}_rt_import'.format(network_instance_name), + 'match_set_options': 'ANY', 'policy_result': 'ACCEPT_ROUTE', + }), + json_config_rule_set( + # pylint: disable=duplicate-string-formatting-argument + '/network_instance[{:s}]/inter_instance_policies[{:s}_import]'.format( + network_instance_name, network_instance_name), { + 'name': network_instance_name, 'import_policy': '{:s}_import'.format(network_instance_name), + }), + json_config_rule_set( + '/routing_policy/bgp_defined_set[{:s}_rt_export]'.format(network_instance_name), { + 'ext_community_set_name': '{:s}_rt_export'.format(network_instance_name), + }), + json_config_rule_set( + '/routing_policy/bgp_defined_set[{:s}_rt_export][route-target:{:s}]'.format( + network_instance_name, bgp_route_target), { + 'ext_community_set_name': '{:s}_rt_export'.format(network_instance_name), + 'ext_community_member' : 'route-target:{:s}'.format(bgp_route_target), + }), + json_config_rule_set( + '/routing_policy/policy_definition[{:s}_export]'.format(network_instance_name), { + 'policy_name': '{:s}_export'.format(network_instance_name), + }), + json_config_rule_set( + '/routing_policy/policy_definition[{:s}_export]/statement[{:s}]'.format( + network_instance_name, '3'), { + 'policy_name': '{:s}_export'.format(network_instance_name), 'statement_name': '3', + 'ext_community_set_name': '{:s}_rt_export'.format(network_instance_name), + 'match_set_options': 'ANY', 'policy_result': 'ACCEPT_ROUTE', + }), + json_config_rule_set( + # pylint: disable=duplicate-string-formatting-argument + '/network_instance[{:s}]/inter_instance_policies[{:s}_export]'.format( + network_instance_name, network_instance_name), { + 'name': network_instance_name, 'export_policy': '{:s}_export'.format(network_instance_name), + }), + ] + + return json_config_rules + +def teardown_config_rules( + service_uuid : str, connection_uuid : str, device_uuid : str, endpoint_uuid : str, + service_settings : TreeNode, endpoint_settings : TreeNode +) -> List[Dict]: + + connection_short_uuid = connection_uuid.split('-')[-1] + network_instance_name = '{:s}-NetInst'.format(connection_short_uuid) + + if service_settings is None: + # MSG = 'Unable to retrieve settings for Service({:s})' + #raise Exception(MSG.format(connection_uuid)) + bgp_route_target = '0:0' + else: + json_settings : Dict = service_settings.value + bgp_route_target = json_settings.get('bgp_route_target', '0:0') # 65000:333 + + if endpoint_settings is None: + #MSG = 'Unable to retrieve settings for device({:s}/endpoint({:s}) in service({:s})' + #raise Exception(MSG.format(device_uuid, endpoint_uuid, connection_uuid)) + sub_interface_index = 0 + vlan_id = 1 + if_subif_name = '{:s}.{:d}'.format(endpoint_uuid, vlan_id) + else: + json_endpoint_settings : Dict = endpoint_settings.value + sub_interface_index = json_endpoint_settings.get('sub_interface_index', 0 ) # 1 + vlan_id = json_endpoint_settings.get('vlan_id', 1 ) # 400 + if_subif_name = '{:s}.{:d}'.format(endpoint_uuid, vlan_id) + + json_config_rules = [ + json_config_rule_delete( + '/network_instance[{:s}]/interface[{:s}]'.format(network_instance_name, if_subif_name), { + 'name': network_instance_name, 'id': if_subif_name, + }), + json_config_rule_delete( + '/interface[{:s}]/subinterface[{:d}]'.format(endpoint_uuid, sub_interface_index), { + 'name': endpoint_uuid, 'index': sub_interface_index, + }), + json_config_rule_delete( + '/interface[{:s}]'.format(endpoint_uuid), { + 'name': endpoint_uuid, + }), + json_config_rule_delete( + '/network_instance[{:s}]/table_connections[DIRECTLY_CONNECTED][BGP][IPV4]'.format( + network_instance_name), { + 'name': network_instance_name, 'src_protocol': 'DIRECTLY_CONNECTED', 'dst_protocol': 'BGP', + 'address_family': 'IPV4', + }), + json_config_rule_delete( + '/network_instance[{:s}]/table_connections[STATIC][BGP][IPV4]'.format(network_instance_name), { + 'name': network_instance_name, 'src_protocol': 'STATIC', 'dst_protocol': 'BGP', + 'address_family': 'IPV4', + }), + json_config_rule_delete( + '/network_instance[{:s}]/protocols[BGP]'.format(network_instance_name), { + 'name': network_instance_name, 'identifier': 'BGP', 'protocol_name': 'BGP', + }), + json_config_rule_delete( + # pylint: disable=duplicate-string-formatting-argument + '/network_instance[{:s}]/inter_instance_policies[{:s}_import]'.format( + network_instance_name, network_instance_name), { + 'name': network_instance_name, + }), + json_config_rule_delete( + '/routing_policy/policy_definition[{:s}_import]/statement[{:s}]'.format( + network_instance_name, '3'), { + 'policy_name': '{:s}_import'.format(network_instance_name), 'statement_name': '3', + }), + json_config_rule_delete( + '/routing_policy/policy_definition[{:s}_import]'.format(network_instance_name), { + 'policy_name': '{:s}_import'.format(network_instance_name), + }), + json_config_rule_delete( + '/routing_policy/bgp_defined_set[{:s}_rt_import][route-target:{:s}]'.format( + network_instance_name, bgp_route_target), { + 'ext_community_set_name': '{:s}_rt_import'.format(network_instance_name), + 'ext_community_member' : 'route-target:{:s}'.format(bgp_route_target), + }), + json_config_rule_delete( + '/routing_policy/bgp_defined_set[{:s}_rt_import]'.format(network_instance_name), { + 'ext_community_set_name': '{:s}_rt_import'.format(network_instance_name), + }), + json_config_rule_delete( + # pylint: disable=duplicate-string-formatting-argument + '/network_instance[{:s}]/inter_instance_policies[{:s}_export]'.format( + network_instance_name, network_instance_name), { + 'name': network_instance_name, + }), + json_config_rule_delete( + '/routing_policy/policy_definition[{:s}_export]/statement[{:s}]'.format( + network_instance_name, '3'), { + 'policy_name': '{:s}_export'.format(network_instance_name), 'statement_name': '3', + }), + json_config_rule_delete( + '/routing_policy/policy_definition[{:s}_export]'.format(network_instance_name), { + 'policy_name': '{:s}_export'.format(network_instance_name), + }), + json_config_rule_delete( + '/routing_policy/bgp_defined_set[{:s}_rt_export][route-target:{:s}]'.format( + network_instance_name, bgp_route_target), { + 'ext_community_set_name': '{:s}_rt_export'.format(network_instance_name), + 'ext_community_member' : 'route-target:{:s}'.format(bgp_route_target), + }), + json_config_rule_delete( + '/routing_policy/bgp_defined_set[{:s}_rt_export]'.format(network_instance_name), { + 'ext_community_set_name': '{:s}_rt_export'.format(network_instance_name), + }), + json_config_rule_delete( + '/network_instance[{:s}]'.format(network_instance_name), { + 'name': network_instance_name + }), + ] + return json_config_rules diff --git a/src/service/service/service_handlers/l2nm_emulated/L2NMEmulatedServiceHandler.py b/src/service/service/service_handlers/l2nm_emulated/L2NMEmulatedServiceHandler.py index 889a60ad5..5b0bd0304 100644 --- a/src/service/service/service_handlers/l2nm_emulated/L2NMEmulatedServiceHandler.py +++ b/src/service/service/service_handlers/l2nm_emulated/L2NMEmulatedServiceHandler.py @@ -13,188 +13,65 @@ # limitations under the License. import anytree, json, logging -from typing import Any, Dict, List, Optional, Tuple, Union -from common.orm.Database import Database -from common.orm.HighLevel import get_object -from common.orm.backend.Tools import key_to_str -from common.proto.context_pb2 import Device -from common.tools.object_factory.ConfigRule import json_config_rule_delete, json_config_rule_set +from typing import Any, List, Optional, Tuple, Union +from common.proto.context_pb2 import ConfigActionEnum, ConfigRule, DeviceId, Service +from common.tools.object_factory.Device import json_device_id from common.type_checkers.Checkers import chk_length, chk_type -from context.client.ContextClient import ContextClient -from device.client.DeviceClient import DeviceClient -from service.service.database.ConfigModel import ORM_ConfigActionEnum, get_config_rules -from service.service.database.ContextModel import ContextModel -from service.service.database.DeviceModel import DeviceModel -from service.service.database.ServiceModel import ServiceModel from service.service.service_handler_api._ServiceHandler import _ServiceHandler from service.service.service_handler_api.AnyTreeTools import TreeNode, delete_subnode, get_subnode, set_subnode_value +from service.service.task_scheduler.TaskExecutor import TaskExecutor +from .ConfigRules import setup_config_rules, teardown_config_rules LOGGER = logging.getLogger(__name__) class L2NMEmulatedServiceHandler(_ServiceHandler): def __init__( # pylint: disable=super-init-not-called - self, db_service : ServiceModel, database : Database, context_client : ContextClient, - device_client : DeviceClient, **settings + self, service : Service, task_executor : TaskExecutor, **settings ) -> None: - self.__db_service = db_service - self.__database = database - self.__context_client = context_client # pylint: disable=unused-private-member - self.__device_client = device_client - - self.__db_context : ContextModel = get_object(self.__database, ContextModel, self.__db_service.context_fk) - str_service_key = key_to_str([self.__db_context.context_uuid, self.__db_service.service_uuid]) - db_config = get_config_rules(self.__database, str_service_key, 'running') + self.__service = service + self.__task_executor = task_executor # pylint: disable=unused-private-member self.__resolver = anytree.Resolver(pathattr='name') self.__config = TreeNode('.') - for action, resource_key, resource_value in db_config: - if action == ORM_ConfigActionEnum.SET: + for config_rule in service.service_config.config_rules: + action = config_rule.action + if config_rule.WhichOneof('config_rule') != 'custom': continue + resource_key = config_rule.custom.resource_key + resource_value = config_rule.custom.resource_value + if action == ConfigActionEnum.CONFIGACTION_SET: try: resource_value = json.loads(resource_value) except: # pylint: disable=bare-except pass set_subnode_value(self.__resolver, self.__config, resource_key, resource_value) - elif action == ORM_ConfigActionEnum.DELETE: + elif action == ConfigActionEnum.CONFIGACTION_DELETE: delete_subnode(self.__resolver, self.__config, resource_key) - def SetEndpoint(self, endpoints : List[Tuple[str, str, Optional[str]]]) -> List[Union[bool, Exception]]: + def SetEndpoint( + self, endpoints : List[Tuple[str, str, Optional[str]]], connection_uuid : Optional[str] = None + ) -> List[Union[bool, Exception]]: chk_type('endpoints', endpoints, list) if len(endpoints) == 0: return [] - service_uuid = self.__db_service.service_uuid - service_short_uuid = service_uuid.split('-')[-1] - network_instance_name = '{:s}-NetInst'.format(service_short_uuid) - network_interface_desc = '{:s}-NetIf'.format(service_uuid) - network_subinterface_desc = '{:s}-NetSubIf'.format(service_uuid) - + service_uuid = self.__service.service_id.service_uuid.uuid settings : TreeNode = get_subnode(self.__resolver, self.__config, '/settings', None) - if settings is None: raise Exception('Unable to retrieve service settings') - json_settings : Dict = settings.value - mtu = json_settings.get('mtu', 1450 ) # 1512 - #address_families = json_settings.get('address_families', [] ) # ['IPV4'] - bgp_as = json_settings.get('bgp_as', 0 ) # 65000 - bgp_route_target = json_settings.get('bgp_route_target', '0:0') # 65000:333 results = [] for endpoint in endpoints: try: chk_type('endpoint', endpoint, (tuple, list)) chk_length('endpoint', endpoint, min_length=2, max_length=3) - if len(endpoint) == 2: - device_uuid, endpoint_uuid = endpoint - else: - device_uuid, endpoint_uuid, _ = endpoint # ignore topology_uuid by now + device_uuid, endpoint_uuid = endpoint[0:2] # ignore topology_uuid by now endpoint_settings_uri = '/device[{:s}]/endpoint[{:s}]/settings'.format(device_uuid, endpoint_uuid) endpoint_settings : TreeNode = get_subnode(self.__resolver, self.__config, endpoint_settings_uri, None) - if endpoint_settings is None: - raise Exception('Unable to retrieve service settings for endpoint({:s})'.format( - str(endpoint_settings_uri))) - json_endpoint_settings : Dict = endpoint_settings.value - #router_id = json_endpoint_settings.get('router_id', '0.0.0.0') # '10.95.0.10' - route_distinguisher = json_endpoint_settings.get('route_distinguisher', '0:0' ) # '60001:801' - sub_interface_index = json_endpoint_settings.get('sub_interface_index', 0 ) # 1 - vlan_id = json_endpoint_settings.get('vlan_id', 1 ) # 400 - address_ip = json_endpoint_settings.get('address_ip', '0.0.0.0') # '2.2.2.1' - address_prefix = json_endpoint_settings.get('address_prefix', 24 ) # 30 - if_subif_name = '{:s}.{:d}'.format(endpoint_uuid, vlan_id) - db_device : DeviceModel = get_object(self.__database, DeviceModel, device_uuid, raise_if_not_found=True) - json_device = db_device.dump(include_config_rules=False, include_drivers=True, include_endpoints=True) - json_device_config : Dict = json_device.setdefault('device_config', {}) - json_device_config_rules : List = json_device_config.setdefault('config_rules', []) - json_device_config_rules.extend([ - json_config_rule_set( - '/network_instance[{:s}]'.format(network_instance_name), { - 'name': network_instance_name, 'description': network_interface_desc, 'type': 'L3VRF', - 'route_distinguisher': route_distinguisher, - #'router_id': router_id, 'address_families': address_families, - }), - json_config_rule_set( - '/interface[{:s}]'.format(endpoint_uuid), { - 'name': endpoint_uuid, 'description': network_interface_desc, 'mtu': mtu, - }), - json_config_rule_set( - '/interface[{:s}]/subinterface[{:d}]'.format(endpoint_uuid, sub_interface_index), { - 'name': endpoint_uuid, 'index': sub_interface_index, - 'description': network_subinterface_desc, 'vlan_id': vlan_id, - 'address_ip': address_ip, 'address_prefix': address_prefix, - }), - json_config_rule_set( - '/network_instance[{:s}]/interface[{:s}]'.format(network_instance_name, if_subif_name), { - 'name': network_instance_name, 'id': if_subif_name, 'interface': endpoint_uuid, - 'subinterface': sub_interface_index, - }), - json_config_rule_set( - '/network_instance[{:s}]/protocols[BGP]'.format(network_instance_name), { - 'name': network_instance_name, 'identifier': 'BGP', 'protocol_name': 'BGP', 'as': bgp_as, - }), - json_config_rule_set( - '/network_instance[{:s}]/table_connections[STATIC][BGP][IPV4]'.format(network_instance_name), { - 'name': network_instance_name, 'src_protocol': 'STATIC', 'dst_protocol': 'BGP', - 'address_family': 'IPV4', #'default_import_policy': 'REJECT_ROUTE', - }), - json_config_rule_set( - '/network_instance[{:s}]/table_connections[DIRECTLY_CONNECTED][BGP][IPV4]'.format( - network_instance_name), { - 'name': network_instance_name, 'src_protocol': 'DIRECTLY_CONNECTED', 'dst_protocol': 'BGP', - 'address_family': 'IPV4', #'default_import_policy': 'REJECT_ROUTE', - }), - json_config_rule_set( - '/routing_policy/bgp_defined_set[{:s}_rt_import]'.format(network_instance_name), { - 'ext_community_set_name': '{:s}_rt_import'.format(network_instance_name), - }), - json_config_rule_set( - '/routing_policy/bgp_defined_set[{:s}_rt_import][route-target:{:s}]'.format( - network_instance_name, bgp_route_target), { - 'ext_community_set_name': '{:s}_rt_import'.format(network_instance_name), - 'ext_community_member' : 'route-target:{:s}'.format(bgp_route_target), - }), - json_config_rule_set( - '/routing_policy/policy_definition[{:s}_import]'.format(network_instance_name), { - 'policy_name': '{:s}_import'.format(network_instance_name), - }), - json_config_rule_set( - '/routing_policy/policy_definition[{:s}_import]/statement[{:s}]'.format( - network_instance_name, '3'), { - 'policy_name': '{:s}_import'.format(network_instance_name), 'statement_name': '3', - 'ext_community_set_name': '{:s}_rt_import'.format(network_instance_name), - 'match_set_options': 'ANY', 'policy_result': 'ACCEPT_ROUTE', - }), - json_config_rule_set( - # pylint: disable=duplicate-string-formatting-argument - '/network_instance[{:s}]/inter_instance_policies[{:s}_import]'.format( - network_instance_name, network_instance_name), { - 'name': network_instance_name, 'import_policy': '{:s}_import'.format(network_instance_name), - }), - json_config_rule_set( - '/routing_policy/bgp_defined_set[{:s}_rt_export]'.format(network_instance_name), { - 'ext_community_set_name': '{:s}_rt_export'.format(network_instance_name), - }), - json_config_rule_set( - '/routing_policy/bgp_defined_set[{:s}_rt_export][route-target:{:s}]'.format( - network_instance_name, bgp_route_target), { - 'ext_community_set_name': '{:s}_rt_export'.format(network_instance_name), - 'ext_community_member' : 'route-target:{:s}'.format(bgp_route_target), - }), - json_config_rule_set( - '/routing_policy/policy_definition[{:s}_export]'.format(network_instance_name), { - 'policy_name': '{:s}_export'.format(network_instance_name), - }), - json_config_rule_set( - '/routing_policy/policy_definition[{:s}_export]/statement[{:s}]'.format( - network_instance_name, '3'), { - 'policy_name': '{:s}_export'.format(network_instance_name), 'statement_name': '3', - 'ext_community_set_name': '{:s}_rt_export'.format(network_instance_name), - 'match_set_options': 'ANY', 'policy_result': 'ACCEPT_ROUTE', - }), - json_config_rule_set( - # pylint: disable=duplicate-string-formatting-argument - '/network_instance[{:s}]/inter_instance_policies[{:s}_export]'.format( - network_instance_name, network_instance_name), { - 'name': network_instance_name, 'export_policy': '{:s}_export'.format(network_instance_name), - }), - ]) - self.__device_client.ConfigureDevice(Device(**json_device)) + json_config_rules = setup_config_rules( + service_uuid, connection_uuid, device_uuid, endpoint_uuid, settings, endpoint_settings) + + device = self.__task_executor.get_device(DeviceId(**json_device_id(device_uuid))) + for json_config_rule in json_config_rules: + device.device_config.config_rules.append(ConfigRule(**json_config_rule)) + self.__task_executor.configure_device(device) results.append(True) except Exception as e: # pylint: disable=broad-except LOGGER.exception('Unable to SetEndpoint({:s})'.format(str(endpoint))) @@ -202,127 +79,32 @@ class L2NMEmulatedServiceHandler(_ServiceHandler): return results - def DeleteEndpoint(self, endpoints : List[Tuple[str, str, Optional[str]]]) -> List[Union[bool, Exception]]: + def DeleteEndpoint( + self, endpoints : List[Tuple[str, str, Optional[str]]], connection_uuid : Optional[str] = None + ) -> List[Union[bool, Exception]]: chk_type('endpoints', endpoints, list) if len(endpoints) == 0: return [] - service_uuid = self.__db_service.service_uuid - service_short_uuid = service_uuid.split('-')[-1] - network_instance_name = '{:s}-NetInst'.format(service_short_uuid) - + service_uuid = self.__service.service_uuid.uuid settings : TreeNode = get_subnode(self.__resolver, self.__config, '/settings', None) - if settings is None: raise Exception('Unable to retrieve service settings') - json_settings : Dict = settings.value - bgp_route_target = json_settings.get('bgp_route_target', '0:0') # 65000:333 results = [] for endpoint in endpoints: try: chk_type('endpoint', endpoint, (tuple, list)) chk_length('endpoint', endpoint, min_length=2, max_length=3) - if len(endpoint) == 2: - device_uuid, endpoint_uuid = endpoint - else: - device_uuid, endpoint_uuid, _ = endpoint # ignore topology_uuid by now + device_uuid, endpoint_uuid = endpoint[0:2] # ignore topology_uuid by now endpoint_settings_uri = '/device[{:s}]/endpoint[{:s}]/settings'.format(device_uuid, endpoint_uuid) endpoint_settings : TreeNode = get_subnode(self.__resolver, self.__config, endpoint_settings_uri, None) - if endpoint_settings is None: - raise Exception('Unable to retrieve service settings for endpoint({:s})'.format( - str(endpoint_settings_uri))) - json_endpoint_settings : Dict = endpoint_settings.value - sub_interface_index = json_endpoint_settings.get('sub_interface_index', 0 ) # 1 - vlan_id = json_endpoint_settings.get('vlan_id', 1 ) # 400 - if_subif_name = '{:s}.{:d}'.format(endpoint_uuid, vlan_id) - db_device : DeviceModel = get_object(self.__database, DeviceModel, device_uuid, raise_if_not_found=True) - json_device = db_device.dump(include_config_rules=False, include_drivers=True, include_endpoints=True) - json_device_config : Dict = json_device.setdefault('device_config', {}) - json_device_config_rules : List = json_device_config.setdefault('config_rules', []) - json_device_config_rules.extend([ - json_config_rule_delete( - '/network_instance[{:s}]/interface[{:s}]'.format(network_instance_name, if_subif_name), { - 'name': network_instance_name, 'id': if_subif_name, - }), - json_config_rule_delete( - '/interface[{:s}]/subinterface[{:d}]'.format(endpoint_uuid, sub_interface_index), { - 'name': endpoint_uuid, 'index': sub_interface_index, - }), - json_config_rule_delete( - '/interface[{:s}]'.format(endpoint_uuid), { - 'name': endpoint_uuid, - }), - json_config_rule_delete( - '/network_instance[{:s}]/table_connections[DIRECTLY_CONNECTED][BGP][IPV4]'.format( - network_instance_name), { - 'name': network_instance_name, 'src_protocol': 'DIRECTLY_CONNECTED', 'dst_protocol': 'BGP', - 'address_family': 'IPV4', - }), - json_config_rule_delete( - '/network_instance[{:s}]/table_connections[STATIC][BGP][IPV4]'.format(network_instance_name), { - 'name': network_instance_name, 'src_protocol': 'STATIC', 'dst_protocol': 'BGP', - 'address_family': 'IPV4', - }), - json_config_rule_delete( - '/network_instance[{:s}]/protocols[BGP]'.format(network_instance_name), { - 'name': network_instance_name, 'identifier': 'BGP', 'protocol_name': 'BGP', - }), - json_config_rule_delete( - # pylint: disable=duplicate-string-formatting-argument - '/network_instance[{:s}]/inter_instance_policies[{:s}_import]'.format( - network_instance_name, network_instance_name), { - 'name': network_instance_name, - }), - json_config_rule_delete( - '/routing_policy/policy_definition[{:s}_import]/statement[{:s}]'.format( - network_instance_name, '3'), { - 'policy_name': '{:s}_import'.format(network_instance_name), 'statement_name': '3', - }), - json_config_rule_delete( - '/routing_policy/policy_definition[{:s}_import]'.format(network_instance_name), { - 'policy_name': '{:s}_import'.format(network_instance_name), - }), - json_config_rule_delete( - '/routing_policy/bgp_defined_set[{:s}_rt_import][route-target:{:s}]'.format( - network_instance_name, bgp_route_target), { - 'ext_community_set_name': '{:s}_rt_import'.format(network_instance_name), - 'ext_community_member' : 'route-target:{:s}'.format(bgp_route_target), - }), - json_config_rule_delete( - '/routing_policy/bgp_defined_set[{:s}_rt_import]'.format(network_instance_name), { - 'ext_community_set_name': '{:s}_rt_import'.format(network_instance_name), - }), - json_config_rule_delete( - # pylint: disable=duplicate-string-formatting-argument - '/network_instance[{:s}]/inter_instance_policies[{:s}_export]'.format( - network_instance_name, network_instance_name), { - 'name': network_instance_name, - }), - json_config_rule_delete( - '/routing_policy/policy_definition[{:s}_export]/statement[{:s}]'.format( - network_instance_name, '3'), { - 'policy_name': '{:s}_export'.format(network_instance_name), 'statement_name': '3', - }), - json_config_rule_delete( - '/routing_policy/policy_definition[{:s}_export]'.format(network_instance_name), { - 'policy_name': '{:s}_export'.format(network_instance_name), - }), - json_config_rule_delete( - '/routing_policy/bgp_defined_set[{:s}_rt_export][route-target:{:s}]'.format( - network_instance_name, bgp_route_target), { - 'ext_community_set_name': '{:s}_rt_export'.format(network_instance_name), - 'ext_community_member' : 'route-target:{:s}'.format(bgp_route_target), - }), - json_config_rule_delete( - '/routing_policy/bgp_defined_set[{:s}_rt_export]'.format(network_instance_name), { - 'ext_community_set_name': '{:s}_rt_export'.format(network_instance_name), - }), - json_config_rule_delete( - '/network_instance[{:s}]'.format(network_instance_name), { - 'name': network_instance_name - }), - ]) - self.__device_client.ConfigureDevice(Device(**json_device)) + json_config_rules = teardown_config_rules( + service_uuid, connection_uuid, device_uuid, endpoint_uuid, settings, endpoint_settings) + + device = self.__task_executor.get_device(DeviceId(**json_device_id(device_uuid))) + for json_config_rule in json_config_rules: + device.device_config.config_rules.append(ConfigRule(**json_config_rule)) + self.__task_executor.configure_device(device) results.append(True) except Exception as e: # pylint: disable=broad-except LOGGER.exception('Unable to DeleteEndpoint({:s})'.format(str(endpoint))) diff --git a/src/service/service/task_scheduler/ConnectionExpander.py b/src/service/service/task_scheduler/ConnectionExpander.py new file mode 100644 index 000000000..39c91b1ba --- /dev/null +++ b/src/service/service/task_scheduler/ConnectionExpander.py @@ -0,0 +1,66 @@ +# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from typing import Dict, List, Optional, Tuple +from common.tools.grpc.Tools import grpc_message_to_json_string +from common.proto.context_pb2 import Connection, Empty, EndPointId, Link +from context.client.ContextClient import ContextClient + +class ConnectionExpander: + def __init__(self) -> None: + self.context_client = ContextClient() + self.endpointkey_to_link : Dict[Tuple[str, str], Link] = dict() + self.refresh_links() + + def refresh_links(self) -> None: + links = self.context_client.ListLinks(Empty()) + for link in links.links: + for link_endpoint_id in link.link_endpoint_ids: + device_uuid = link_endpoint_id.device_id.device_uuid.uuid + endpoint_uuid = link_endpoint_id.endpoint_uuid.uuid + endpoint_key = (device_uuid, endpoint_uuid) + self.endpointkey_to_link[endpoint_key] = link + + def get_link_from_endpoint_id(self, endpoint_id : EndPointId, raise_if_not_found : bool = False) -> Optional[Link]: + device_uuid = endpoint_id.device_id.device_uuid.uuid + endpoint_uuid = endpoint_id.endpoint_uuid.uuid + endpoint_key = (device_uuid, endpoint_uuid) + link = self.endpointkey_to_link.get(endpoint_key) + if link is None and raise_if_not_found: + str_endpoint_id = grpc_message_to_json_string(endpoint_id) + raise Exception('Link for Endpoint({:s}) not found'.format(str_endpoint_id)) + return link + + def get_links(self, connection : Connection) -> List[Link]: + path_links = list() + last_link_uuid = None + for endpoint_id in connection.path_hops_endpoint_ids: + link = self.get_link_from_endpoint_id(endpoint_id, raise_if_not_found=True) + link_uuid = link.link_id.link_uuid.uuid + if last_link_uuid is None or last_link_uuid != link_uuid: + path_links.append(link) + last_link_uuid = link_uuid + return path_links + + def get_endpoints_traversed(self, connection : Connection) -> List[EndPointId]: + path_endpoint_ids = list() + last_link_uuid = None + for endpoint_id in connection.path_hops_endpoint_ids: + link = self.get_link_from_endpoint_id(endpoint_id, raise_if_not_found=True) + link_uuid = link.link_id.link_uuid.uuid + if last_link_uuid is None or last_link_uuid != link_uuid: + for link_endpoint_id in link.link_endpoint_ids: + path_endpoint_ids.append(link_endpoint_id) + last_link_uuid = link_uuid + return path_endpoint_ids diff --git a/src/service/service/task_scheduler/TaskExecutor.py b/src/service/service/task_scheduler/TaskExecutor.py new file mode 100644 index 000000000..416e1698f --- /dev/null +++ b/src/service/service/task_scheduler/TaskExecutor.py @@ -0,0 +1,142 @@ +# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from enum import Enum +from typing import Any, Dict, Optional, Union +from common.proto.context_pb2 import Connection, ConnectionId, Device, DeviceId, Service, ServiceId +from common.rpc_method_wrapper.ServiceExceptions import NotFoundException +from context.client.ContextClient import ContextClient +from device.client.DeviceClient import DeviceClient +from service.service.service_handler_api._ServiceHandler import _ServiceHandler +from service.service.service_handler_api.ServiceHandlerFactory import ServiceHandlerFactory, get_service_handler_class +from service.service.tools.ContextGetters import get_connection, get_device, get_service +from service.service.tools.ObjectKeys import get_connection_key, get_device_key, get_service_key + +CacheableObject = Union[Connection, Device, Service] + +class CacheableObjectType(Enum): + CONNECTION = 'connection' + DEVICE = 'device' + SERVICE = 'service' + +class TaskExecutor: + def __init__(self, service_handler_factory : ServiceHandlerFactory) -> None: + self._service_handler_factory = service_handler_factory + self._context_client = ContextClient() + self._device_client = DeviceClient() + self._grpc_objects_cache : Dict[str, CacheableObject] = dict() + + @property + def service_handler_factory(self) -> ServiceHandlerFactory: return self._service_handler_factory + + # ----- Common methods --------------------------------------------------------------------------------------------- + + def _load_grpc_object(self, object_type : CacheableObjectType, object_key : str) -> Optional[CacheableObject]: + object_key = '{:s}:{:s}'.format(object_type.value, object_key) + return self._grpc_objects_cache.get(object_key) + + def _store_grpc_object(self, object_type : CacheableObjectType, object_key : str, grpc_object) -> None: + object_key = '{:s}:{:s}'.format(object_type.value, object_key) + self._grpc_objects_cache[object_key] = grpc_object + + def _delete_grpc_object(self, object_type : CacheableObjectType, object_key : str) -> None: + object_key = '{:s}:{:s}'.format(object_type.value, object_key) + self._grpc_objects_cache.pop(object_key, None) + + def _store_editable_grpc_object( + self, object_type : CacheableObjectType, object_key : str, grpc_class, grpc_ro_object + ) -> Any: + grpc_rw_object = grpc_class() + grpc_rw_object.CopyFrom(grpc_ro_object) + self._store_grpc_object(object_type, object_key, grpc_rw_object) + return grpc_rw_object + + # ----- Connection-related methods --------------------------------------------------------------------------------- + + def get_connection(self, connection_id : ConnectionId) -> Connection: + connection_key = get_connection_key(connection_id) + connection = self._load_grpc_object(CacheableObjectType.CONNECTION, connection_key) + if connection is None: + connection = get_connection(self._context_client, connection_id) + if connection is None: raise NotFoundException('Connection', connection_key) + connection : Connection = self._store_editable_grpc_object( + CacheableObjectType.CONNECTION, connection_key, Connection, connection) + return connection + + def set_connection(self, connection : Connection) -> None: + connection_key = get_connection_key(connection.connection_id) + self._context_client.SetConnection(connection) + self._store_grpc_object(CacheableObjectType.CONNECTION, connection_key, connection) + + def delete_connection(self, connection_id : ConnectionId) -> None: + connection_key = get_connection_key(connection_id) + self._context_client.RemoveConnection(connection_id) + self._delete_grpc_object(CacheableObjectType.CONNECTION, connection_key) + + # ----- Device-related methods ------------------------------------------------------------------------------------- + + def get_device(self, device_id : DeviceId) -> Device: + device_key = get_device_key(device_id) + device = self._load_grpc_object(CacheableObjectType.DEVICE, device_key) + if device is None: + device = get_device(self._context_client, device_id) + if device is None: raise NotFoundException('Device', device_key) + device : Device = self._store_editable_grpc_object( + CacheableObjectType.DEVICE, device_key, Device, device) + return device + + def configure_device(self, device : Device) -> None: + device_key = get_device_key(device.device_id) + self._device_client.ConfigureDevice(device) + self._store_grpc_object(CacheableObjectType.DEVICE, device_key, device) + + def get_devices_from_connection(self, connection : Connection) -> Dict[str, Device]: + devices = dict() + for endpoint_id in connection.path_hops_endpoint_ids: + device = self.get_device(endpoint_id.device_id) + device_uuid = endpoint_id.device_id.device_uuid.uuid + if device is None: raise Exception('Device({:s}) not found'.format(str(device_uuid))) + devices[device_uuid] = device + return devices + + # ----- Service-related methods ------------------------------------------------------------------------------------ + + def get_service(self, service_id : ServiceId) -> Service: + service_key = get_service_key(service_id) + service = self._load_grpc_object(CacheableObjectType.SERVICE, service_key) + if service is None: + service = get_service(self._context_client, service_id) + if service is None: raise NotFoundException('Service', service_key) + service : service = self._store_editable_grpc_object( + CacheableObjectType.SERVICE, service_key, Service, service) + return service + + def set_service(self, service : Service) -> None: + service_key = get_service_key(service.service_id) + self._context_client.SetService(service) + self._store_grpc_object(CacheableObjectType.SERVICE, service_key, service) + + def delete_service(self, service_id : ServiceId) -> None: + service_key = get_service_key(service_id) + self._context_client.RemoveService(service_id) + self._delete_grpc_object(CacheableObjectType.SERVICE, service_key) + + # ----- Service Handler Factory ------------------------------------------------------------------------------------ + + def get_service_handler( + self, connection : Connection, service : Service, **service_handler_settings + ) -> _ServiceHandler: + connection_devices = self.get_devices_from_connection(connection) + service_handler_class = get_service_handler_class(self._service_handler_factory, service, connection_devices) + return service_handler_class(service, self, **service_handler_settings) diff --git a/src/service/service/task_scheduler/TaskScheduler.py b/src/service/service/task_scheduler/TaskScheduler.py new file mode 100644 index 000000000..e5656bd0d --- /dev/null +++ b/src/service/service/task_scheduler/TaskScheduler.py @@ -0,0 +1,179 @@ +# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import graphlib, logging, queue +from typing import Dict, Tuple +from common.proto.context_pb2 import Connection, ConnectionId, Service, ServiceId, ServiceStatusEnum +from common.proto.pathcomp_pb2 import PathCompReply +from common.tools.grpc.Tools import grpc_message_to_json_string +from context.client.ContextClient import ContextClient +from service.service.service_handler_api.ServiceHandlerFactory import ServiceHandlerFactory +from service.service.tools.ObjectKeys import get_connection_key, get_service_key +from .tasks._Task import _Task +from .tasks.Task_ConnectionConfigure import Task_ConnectionConfigure +from .tasks.Task_ConnectionDeconfigure import Task_ConnectionDeconfigure +from .tasks.Task_ServiceDelete import Task_ServiceDelete +from .tasks.Task_ServiceSetStatus import Task_ServiceSetStatus +from .TaskExecutor import CacheableObjectType, TaskExecutor + +LOGGER = logging.getLogger(__name__) + +class TasksScheduler: + def __init__(self, service_handler_factory : ServiceHandlerFactory) -> None: + self._dag = graphlib.TopologicalSorter() + self._executor = TaskExecutor(service_handler_factory) + self._tasks : Dict[str, _Task] = dict() + self._context_client = ContextClient() + + # ----- Helper methods --------------------------------------------------------------------------------------------- + + def _add_task_if_not_exists(self, task : _Task) -> str: + task_key = task.key + if task_key not in self._tasks: + self._tasks[task_key] = task + return task_key + + def _add_connection_to_executor_cache(self, connection : Connection) -> None: + connection_key = get_connection_key(connection.connection_id) + self._executor._store_editable_grpc_object( + CacheableObjectType.CONNECTION, connection_key, Connection, connection) + + def _add_service_to_executor_cache(self, service : Service) -> None: + service_key = get_service_key(service.service_id) + self._executor._store_editable_grpc_object( + CacheableObjectType.SERVICE, service_key, Service, service) + + # ----- Task & DAG composition methods ----------------------------------------------------------------------------- + + def _service_create(self, service_id : ServiceId) -> Tuple[str, str]: + service_planned_key = self._add_task_if_not_exists(Task_ServiceSetStatus( + self._executor, service_id, ServiceStatusEnum.SERVICESTATUS_PLANNED)) + + service_active_key = self._add_task_if_not_exists(Task_ServiceSetStatus( + self._executor, service_id, ServiceStatusEnum.SERVICESTATUS_ACTIVE)) + + # activating a service requires the service is in planning state + self._dag.add(service_active_key, service_planned_key) + return service_planned_key, service_active_key + + def _service_remove(self, service_id : ServiceId) -> Tuple[str, str]: + service_removing_key = self._add_task_if_not_exists(Task_ServiceSetStatus( + self._executor, service_id, ServiceStatusEnum.SERVICESTATUS_PENDING_REMOVAL)) + + service_delete_key = self._add_task_if_not_exists(Task_ServiceDelete(self._executor, service_id)) + + # deleting a service requires the service is in removing state + self._dag.add(service_delete_key, service_removing_key) + return service_removing_key, service_delete_key + + def _connection_configure(self, connection_id : ConnectionId, service_id : ServiceId) -> str: + connection_configure_key = self._add_task_if_not_exists(Task_ConnectionConfigure( + self._executor, connection_id)) + + # the connection configuration depends on its connection's service being in planning state + service_planned_key = self._add_task_if_not_exists(Task_ServiceSetStatus( + self._executor, service_id, ServiceStatusEnum.SERVICESTATUS_PLANNED)) + self._dag.add(connection_configure_key, service_planned_key) + + # the connection's service depends on the connection configuration to transition to active state + service_active_key = self._add_task_if_not_exists(Task_ServiceSetStatus( + self._executor, service_id, ServiceStatusEnum.SERVICESTATUS_ACTIVE)) + self._dag.add(service_active_key, connection_configure_key) + + return connection_configure_key + + def _connection_deconfigure(self, connection_id : ConnectionId, service_id : ServiceId) -> str: + connection_deconfigure_key = self._add_task_if_not_exists(Task_ConnectionDeconfigure( + self._executor, connection_id)) + + # the connection deconfiguration depends on its connection's service being in removing state + service_pending_removal_key = self._add_task_if_not_exists(Task_ServiceSetStatus( + self._executor, service_id, ServiceStatusEnum.SERVICESTATUS_PENDING_REMOVAL)) + self._dag.add(connection_deconfigure_key, service_pending_removal_key) + + # the connection's service depends on the connection deconfiguration to transition to delete + service_delete_key = self._add_task_if_not_exists(Task_ServiceDelete( + self._executor, service_id)) + self._dag.add(service_delete_key, connection_deconfigure_key) + + return connection_deconfigure_key + + def compose_from_pathcompreply(self, pathcomp_reply : PathCompReply, is_delete : bool = False) -> None: + include_service = self._service_remove if is_delete else self._service_create + include_connection = self._connection_deconfigure if is_delete else self._connection_configure + + for service in pathcomp_reply.services: + include_service(service.service_id) + self._add_service_to_executor_cache(service) + + for connection in pathcomp_reply.connections: + connection_key = include_connection(connection.connection_id, connection.service_id) + self._add_connection_to_executor_cache(connection) + self._executor.get_service(connection.service_id) + for sub_service_id in connection.sub_service_ids: + _,service_key_done = include_service(sub_service_id) + self._executor.get_service(sub_service_id) + self._dag.add(connection_key, service_key_done) + + def compose_from_service(self, service : Service, is_delete : bool = False) -> None: + include_service = self._service_remove if is_delete else self._service_create + include_connection = self._connection_deconfigure if is_delete else self._connection_configure + + pending_items_to_explore = queue.Queue() + pending_items_to_explore.put(service) + + while not pending_items_to_explore.empty(): + item = pending_items_to_explore.get() + + if isinstance(item, Service): + include_service(item.service_id) + self._add_service_to_executor_cache(item) + connections = self._context_client.ListConnections(item.service_id) + for connection in connections: + self._add_connection_to_executor_cache(connection) + pending_items_to_explore.put(connection) + + elif isinstance(item, ServiceId): + include_service(item) + self._executor.get_service(item) + connections = self._context_client.ListConnections(item) + for connection in connections: + self._add_connection_to_executor_cache(connection) + pending_items_to_explore.put(connection) + + elif isinstance(item, Connection): + connection_key = include_connection(item) + self._add_connection_to_executor_cache(connection) + self._executor.get_service(item.service_id) + pending_items_to_explore.put(item.service_id) + for sub_service_id in connection.sub_service_ids: + _,service_key_done = include_service(sub_service_id) + self._executor.get_service(sub_service_id) + self._dag.add(connection_key, service_key_done) + + else: + MSG = 'Unsupported item {:s}({:s})' + raise Exception(MSG.format(type(item).__name__, grpc_message_to_json_string(item))) + + def execute_all(self, dry_run : bool = False) -> None: + ordered_task_keys = list(self._dag.static_order()) + LOGGER.info('ordered_task_keys={:s}'.format(str(ordered_task_keys))) + + results = [] + for task_key in ordered_task_keys: + task = self._tasks.get(task_key) + succeeded = True if dry_run else task.execute() + results.append(succeeded) + + return zip(ordered_task_keys, results) diff --git a/src/service/service/task_scheduler/__init__.py b/src/service/service/task_scheduler/__init__.py new file mode 100644 index 000000000..70bfa5118 --- /dev/null +++ b/src/service/service/task_scheduler/__init__.py @@ -0,0 +1,51 @@ +# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# TaskScheduler is initialized with a PathComputation Reply or a Service, and it collects/identifies the sub-services, +# sub-connections, and operations associated to them. It discovers and takes care of the inter-dependencies among them, +# and produces an ordered list of tasks to be executed to implement the desired create/delete operation on the service. +# E.g., a service cannot be deleted if connections supporting that service still exist. If these connections are +# supported by sub-services, the connection needs to be torn down before destroying the services. +# +# Internally, it composes a Directed Acyclic Graph (DAG) of dependencies between tasks. Each task performs a specific +# operation on a connection or service. The DAG composition is based on information extracted from a PathComp reply +# and/or interrogating the Context component. +# +# Example: +# A B C +# *---L3---*---L3---* +# *--L0--* *--L0--* +# - L3 service between A and C, depends on L3 connections A-B and B-C. +# - Each L3 connection is supported by an L0 service and its corresponding L0 connection. +# +# Dependency structure: +# service L3:A-C +# connection L3:A-B +# service L0:A-B +# connection L0:A-B +# connection L3:B-C +# service L0:B-C +# connection L0:B-C +# +# Resolution: +# - service.set(L3:A-C, state=PLANNING) +# - service.set(L0:A-B, state=PLANNING) +# - connection.configure(L0:A-B) +# - service.set(L0:A-B, state=ACTIVE) +# - connection.configure(L3:A-B) +# - service.set(L0:B-C, state=PLANNING) +# - connection.configure(L0:B-C) +# - service.set(L0:B-C, state=ACTIVE) +# - connection.configure(L3:B-C) +# - service.set(L3:A-C, state=ACTIVE) diff --git a/src/service/service/task_scheduler/tasks/Task_ConnectionConfigure.py b/src/service/service/task_scheduler/tasks/Task_ConnectionConfigure.py new file mode 100644 index 000000000..ea9692142 --- /dev/null +++ b/src/service/service/task_scheduler/tasks/Task_ConnectionConfigure.py @@ -0,0 +1,63 @@ +# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from common.proto.context_pb2 import ConnectionId +from common.rpc_method_wrapper.ServiceExceptions import OperationFailedException +from common.tools.grpc.Tools import grpc_message_to_json_string +from service.service.service_handler_api.Tools import check_errors_setendpoint +from service.service.task_scheduler.ConnectionExpander import ConnectionExpander +from service.service.task_scheduler.TaskExecutor import TaskExecutor +from service.service.tools.EndpointIdFormatters import endpointids_to_raw +from service.service.tools.ObjectKeys import get_connection_key +from ._Task import _Task + +KEY_TEMPLATE = 'connection({connection_id:s}):configure' + +class Task_ConnectionConfigure(_Task): + def __init__(self, task_executor : TaskExecutor, connection_id : ConnectionId) -> None: + super().__init__(task_executor) + self._connection_id = connection_id + + @property + def connection_id(self) -> ConnectionId: return self._connection_id + + @staticmethod + def build_key(connection_id : ConnectionId) -> str: + str_connection_id = get_connection_key(connection_id) + return KEY_TEMPLATE.format(connection_id=str_connection_id) + + @property + def key(self) -> str: return self.build_key(self._connection_id) + + def execute(self) -> None: + connection = self._task_executor.get_connection(self._connection_id) + service = self._task_executor.get_service(connection.service_id) + + service_handler_settings = {} + service_handler = self._task_executor.get_service_handler(connection, service, **service_handler_settings) + + connection_expander = ConnectionExpander() + traversed_endpoint_ids = connection_expander.get_endpoints_traversed(connection) + endpointids_to_set = endpointids_to_raw(traversed_endpoint_ids) + + connection_uuid = connection.connection_id.connection_uuid.uuid + results_setendpoint = service_handler.SetEndpoint(endpointids_to_set, connection_uuid=connection_uuid) + errors = check_errors_setendpoint(endpointids_to_set, results_setendpoint) + if len(errors) > 0: + MSG = 'SetEndpoint for Connection({:s}) from Service({:s})' + str_connection = grpc_message_to_json_string(connection) + str_service = grpc_message_to_json_string(service) + raise OperationFailedException(MSG.format(str_connection, str_service), extra_details=errors) + + self._task_executor.set_connection(connection) diff --git a/src/service/service/task_scheduler/tasks/Task_ConnectionDeconfigure.py b/src/service/service/task_scheduler/tasks/Task_ConnectionDeconfigure.py new file mode 100644 index 000000000..fc849560e --- /dev/null +++ b/src/service/service/task_scheduler/tasks/Task_ConnectionDeconfigure.py @@ -0,0 +1,63 @@ +# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from common.proto.context_pb2 import ConnectionId +from common.rpc_method_wrapper.ServiceExceptions import OperationFailedException +from common.tools.grpc.Tools import grpc_message_to_json_string +from service.service.service_handler_api.Tools import check_errors_deleteendpoint +from service.service.task_scheduler.ConnectionExpander import ConnectionExpander +from service.service.task_scheduler.TaskExecutor import TaskExecutor +from service.service.tools.EndpointIdFormatters import endpointids_to_raw +from service.service.tools.ObjectKeys import get_connection_key +from ._Task import _Task + +KEY_TEMPLATE = 'connection({connection_id:s}):deconfigure' + +class Task_ConnectionDeconfigure(_Task): + def __init__(self, task_executor : TaskExecutor, connection_id : ConnectionId) -> None: + super().__init__(task_executor) + self._connection_id = connection_id + + @property + def connection_id(self) -> ConnectionId: return self._connection_id + + @staticmethod + def build_key(connection_id : ConnectionId) -> str: + str_connection_id = get_connection_key(connection_id) + return KEY_TEMPLATE.format(connection_id=str_connection_id) + + @property + def key(self) -> str: return self.build_key(self._connection_id) + + def execute(self) -> None: + connection = self._task_executor.get_connection(self._connection_id) + service = self._task_executor.get_service(connection.service_id) + + service_handler_settings = {} + service_handler = self._task_executor.get_service_handler(connection, service, **service_handler_settings) + + connection_expander = ConnectionExpander() + traversed_endpoint_ids = connection_expander.get_endpoints_traversed(connection) + endpointids_to_delete = endpointids_to_raw(traversed_endpoint_ids) + + connection_uuid = connection.connection_id.connection_uuid.uuid + results_deleteendpoint = service_handler.DeleteEndpoint(endpointids_to_delete, connection_uuid=connection_uuid) + errors = check_errors_deleteendpoint(endpointids_to_delete, results_deleteendpoint) + if len(errors) > 0: + MSG = 'DeleteEndpoint for Connection({:s}) from Service({:s})' + str_connection = grpc_message_to_json_string(connection) + str_service = grpc_message_to_json_string(service) + raise OperationFailedException(MSG.format(str_connection, str_service), extra_details=errors) + + self._task_executor.delete_connection(self._connection_id) diff --git a/src/service/service/task_scheduler/tasks/Task_ServiceDelete.py b/src/service/service/task_scheduler/tasks/Task_ServiceDelete.py new file mode 100644 index 000000000..15da1ffed --- /dev/null +++ b/src/service/service/task_scheduler/tasks/Task_ServiceDelete.py @@ -0,0 +1,39 @@ +# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from common.proto.context_pb2 import ServiceId +from service.service.task_scheduler.TaskExecutor import TaskExecutor +from service.service.tools.ObjectKeys import get_service_key +from ._Task import _Task + +KEY_TEMPLATE = 'service({service_id:s}):delete' + +class Task_ServiceDelete(_Task): + def __init__(self, task_executor : TaskExecutor, service_id : ServiceId) -> None: + super().__init__(task_executor) + self._service_id = service_id + + @property + def service_id(self) -> ServiceId: return self._service_id + + @staticmethod + def build_key(service_id : ServiceId) -> str: + str_service_id = get_service_key(service_id) + return KEY_TEMPLATE.format(service_id=str_service_id) + + @property + def key(self) -> str: return self.build_key(self._service_id) + + def execute(self) -> None: + self._task_executor.delete_service(self._service_id) diff --git a/src/service/service/task_scheduler/tasks/Task_ServiceSetStatus.py b/src/service/service/task_scheduler/tasks/Task_ServiceSetStatus.py new file mode 100644 index 000000000..163954f1b --- /dev/null +++ b/src/service/service/task_scheduler/tasks/Task_ServiceSetStatus.py @@ -0,0 +1,46 @@ +# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from common.proto.context_pb2 import ServiceId, ServiceStatusEnum +from service.service.task_scheduler.TaskExecutor import TaskExecutor +from service.service.tools.ObjectKeys import get_service_key +from ._Task import _Task + +KEY_TEMPLATE = 'service({service_id:s}):set_status({new_status:s})' + +class Task_ServiceSetStatus(_Task): + def __init__(self, task_executor : TaskExecutor, service_id : ServiceId, new_status : ServiceStatusEnum) -> None: + super().__init__(task_executor) + self._service_id = service_id + self._new_status = new_status + + @property + def service_id(self) -> ServiceId: return self._service_id + + @property + def new_status(self) -> ServiceStatusEnum: return self._new_status + + @staticmethod + def build_key(service_id : ServiceId, new_status : ServiceStatusEnum) -> str: + str_service_id = get_service_key(service_id) + str_new_status = ServiceStatusEnum.Name(new_status) + return KEY_TEMPLATE.format(service_id=str_service_id, new_status=str_new_status) + + @property + def key(self) -> str: return self.build_key(self._service_id, self._new_status) + + def execute(self) -> None: + service = self._task_executor.get_service(self._service_id) + service.service_status.service_status = self._new_status + self._task_executor.set_service(service) diff --git a/src/service/service/task_scheduler/tasks/_Task.py b/src/service/service/task_scheduler/tasks/_Task.py new file mode 100644 index 000000000..c36f92973 --- /dev/null +++ b/src/service/service/task_scheduler/tasks/_Task.py @@ -0,0 +1,30 @@ +# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from service.service.task_scheduler.TaskExecutor import TaskExecutor + +class _Task: + def __init__(self, task_executor : TaskExecutor) -> None: + self._task_executor = task_executor + + @staticmethod + def build_key() -> str: + raise NotImplementedError('Task:build_key() not implemented') + + @property + def key(self) -> str: + raise NotImplementedError('Task:key() not implemented') + + def execute(self) -> bool: + raise NotImplementedError('Task:execute() not implemented') diff --git a/src/service/service/task_scheduler/tasks/__init__.py b/src/service/service/task_scheduler/tasks/__init__.py new file mode 100644 index 000000000..70a332512 --- /dev/null +++ b/src/service/service/task_scheduler/tasks/__init__.py @@ -0,0 +1,14 @@ +# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + diff --git a/src/service/service/tools/ContextGetters.py b/src/service/service/tools/ContextGetters.py new file mode 100644 index 000000000..79ccf956b --- /dev/null +++ b/src/service/service/tools/ContextGetters.py @@ -0,0 +1,42 @@ +# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import grpc +from typing import Optional +from common.proto.context_pb2 import Connection, ConnectionId, Device, DeviceId, Service, ServiceId +from context.client.ContextClient import ContextClient + +def get_connection(context_client : ContextClient, connection_id : ConnectionId) -> Optional[Connection]: + try: + connection : Connection = context_client.GetConnection(connection_id) + return connection + except grpc.RpcError as e: + if e.code() != grpc.StatusCode.NOT_FOUND: raise # pylint: disable=no-member + return None + +def get_device(context_client : ContextClient, device_id : DeviceId) -> Optional[Device]: + try: + device : Device = context_client.GetDevice(device_id) + return device + except grpc.RpcError as e: + if e.code() != grpc.StatusCode.NOT_FOUND: raise # pylint: disable=no-member + return None + +def get_service(context_client : ContextClient, service_id : ServiceId) -> Optional[Service]: + try: + service : Service = context_client.GetService(service_id) + return service + except grpc.RpcError as e: + if e.code() != grpc.StatusCode.NOT_FOUND: raise # pylint: disable=no-member + return None diff --git a/src/service/service/tools/EndpointIdFormatters.py b/src/service/service/tools/EndpointIdFormatters.py new file mode 100644 index 000000000..2435df42c --- /dev/null +++ b/src/service/service/tools/EndpointIdFormatters.py @@ -0,0 +1,27 @@ +# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from typing import List, Optional, Tuple +from common.proto.context_pb2 import EndPointId + +def endpointids_to_raw(traversed_endpoint_ids : List[EndPointId]) -> List[Tuple[str, str, Optional[str]]]: + raw_endpoint_ids : List[Tuple[str, str, Optional[str]]] = [] + for endpoint_id in traversed_endpoint_ids: + device_uuid = endpoint_id.device_id.device_uuid.uuid + endpoint_uuid = endpoint_id.endpoint_uuid.uuid + topology_uuid = endpoint_id.topology_id.topology_uuid.uuid + if len(topology_uuid) == 0: topology_uuid = None + endpoint_id_tuple = device_uuid, endpoint_uuid, topology_uuid + raw_endpoint_ids.append(endpoint_id_tuple) + return raw_endpoint_ids diff --git a/src/service/service/tools/ObjectKeys.py b/src/service/service/tools/ObjectKeys.py new file mode 100644 index 000000000..e58d8bd3e --- /dev/null +++ b/src/service/service/tools/ObjectKeys.py @@ -0,0 +1,26 @@ +# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from common.proto.context_pb2 import ConnectionId, DeviceId, ServiceId + +def get_connection_key(connection_id : ConnectionId) -> str: + return connection_id.connection_uuid.uuid + +def get_device_key(device_id : DeviceId) -> str: + return device_id.device_uuid.uuid + +def get_service_key(service_id : ServiceId) -> str: + context_uuid = service_id.context_id.context_uuid.uuid + service_uuid = service_id.service_uuid.uuid + return '{:s}/{:s}'.format(context_uuid, service_uuid) diff --git a/src/service/service/tools/__init__.py b/src/service/service/tools/__init__.py new file mode 100644 index 000000000..70a332512 --- /dev/null +++ b/src/service/service/tools/__init__.py @@ -0,0 +1,14 @@ +# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + diff --git a/src/service/tests/test_unitary_dependency_resolver.py b/src/service/tests/test_unitary_task_scheduler.py similarity index 65% rename from src/service/tests/test_unitary_dependency_resolver.py rename to src/service/tests/test_unitary_task_scheduler.py index 1dd70ba7e..020386d76 100644 --- a/src/service/tests/test_unitary_dependency_resolver.py +++ b/src/service/tests/test_unitary_task_scheduler.py @@ -12,16 +12,18 @@ # See the License for the specific language governing permissions and # limitations under the License. -import logging, operator -from common.proto.context_pb2 import Connection, Service +import logging +#from common.proto.context_pb2 import Connection, Service from common.proto.pathcomp_pb2 import PathCompReply from common.tools.grpc.Tools import grpc_message_to_json_string -from service.service.DependencyResolver import resolve_dependencies +from service.service.service_handler_api.ServiceHandlerFactory import ServiceHandlerFactory +from service.service.task_scheduler.TaskScheduler import TasksScheduler +from .PrepareTestScenario import context_client # pylint: disable=unused-import LOGGER = logging.getLogger(__name__) LOGGER.setLevel(logging.DEBUG) -def test_dependency_resolver(): +def test_task_scheduler(): # test: add services and connections that depend on each other # then, check if they are properly resolved. # - service MAIN, depends on connection PKT-1, TAPI, and PKT-2 @@ -68,31 +70,27 @@ def test_dependency_resolver(): connection_tapi.sub_service_ids.append(service_tapi2.service_id) LOGGER.info('pathcomp_reply={:s}'.format(grpc_message_to_json_string(pathcomp_reply))) - resolution = resolve_dependencies(pathcomp_reply) - LOGGER.info('resolution={:s}'.format(str(list(map(operator.itemgetter(0), resolution))))) - - CORRECT_RESOLUTION_KEYS = [ - ('connection', 'PKT-1' ), - ('connection', 'PKT-2' ), - ('connection', 'TAPI-1' ), - ('connection', 'TAPI-2' ), - ('service' , 'admin/TAPI-1'), - ('service' , 'admin/TAPI-2'), - ('connection', 'TAPI' ), - ('service' , 'admin/MAIN' ), + + service_handler_factory = ServiceHandlerFactory([]) + task_scheduler = TasksScheduler(service_handler_factory) + task_scheduler.compose_from_pathcompreply(pathcomp_reply) + tasks_and_results = list(task_scheduler.execute_all(dry_run=True)) + + LOGGER.info('tasks_and_results={:s}'.format(str(tasks_and_results))) + + CORRECT_ORDERED_TASK_KEYS = [ + 'service(admin/MAIN):set_status(SERVICESTATUS_PLANNED)', + 'service(admin/TAPI-1):set_status(SERVICESTATUS_PLANNED)', + 'service(admin/TAPI-2):set_status(SERVICESTATUS_PLANNED)', + 'connection(PKT-1):configure', + 'connection(PKT-2):configure', + 'connection(TAPI-1):configure', + 'connection(TAPI-2):configure', + 'service(admin/TAPI-1):set_status(SERVICESTATUS_ACTIVE)', + 'service(admin/TAPI-2):set_status(SERVICESTATUS_ACTIVE)', + 'connection(TAPI):configure', + 'service(admin/MAIN):set_status(SERVICESTATUS_ACTIVE)' ] - for (resolved_key,(resolved_objid, resolved_obj)),correct_key in zip(resolution, CORRECT_RESOLUTION_KEYS): - assert resolved_key == correct_key - assert resolved_obj is not None - if resolved_key[0] == 'connection': - assert isinstance(resolved_obj, Connection) - assert resolved_objid == resolved_obj.connection_id - connection_key = resolved_obj.connection_id.connection_uuid.uuid - assert resolved_key[1] == connection_key - elif resolved_key[0] == 'service': - assert isinstance(resolved_obj, Service) - assert resolved_objid == resolved_obj.service_id - context_uuid = resolved_obj.service_id.context_id.context_uuid.uuid - service_uuid = resolved_obj.service_id.service_uuid.uuid - service_key = '/'.join([context_uuid, service_uuid]) - assert resolved_key[1] == service_key + + for (task_key,_),correct_key in zip(tasks_and_results, CORRECT_ORDERED_TASK_KEYS): + assert task_key == correct_key -- GitLab