Loading src/service/Dockerfile +1 −0 Original line number Diff line number Diff line Loading @@ -66,6 +66,7 @@ COPY src/context/. context/ COPY src/device/. device/ COPY src/pathcomp/frontend/. pathcomp/frontend/ COPY src/service/. service/ COPY src/e2eorchestrator/. e2eorchestrator/ # Start the service ENTRYPOINT ["python", "-m", "service.service"] src/service/service/ServiceServiceServicerImpl.py +28 −0 Original line number Diff line number Diff line Loading @@ -21,10 +21,12 @@ from common.method_wrappers.ServiceExceptions import ( from common.proto.context_pb2 import ( Connection, Empty, Service, ServiceId, ServiceStatusEnum, ServiceTypeEnum, ConstraintActionEnum) from common.proto.pathcomp_pb2 import PathCompRequest from common.proto.e2eorchestrator_pb2 import E2EOrchestratorRequest from common.proto.service_pb2_grpc import ServiceServiceServicer from common.tools.context_queries.Service import get_service_by_id from common.tools.grpc.Tools import grpc_message_to_json, grpc_message_to_json_string from context.client.ContextClient import ContextClient from e2eorchestrator.client.E2EOrchestratorClient import E2EOrchestratorClient from pathcomp.frontend.client.PathCompClient import PathCompClient from service.service.tools.ConnectionToString import connection_to_string from service.client.TEServiceClient import TEServiceClient Loading Loading @@ -153,6 +155,32 @@ class ServiceServiceServicerImpl(ServiceServiceServicer): str_service_status = ServiceStatusEnum.Name(service_status.service_status) raise Exception(MSG.format(service_key, str_service_status)) if service.service_type == ServiceTypeEnum.SERVICETYPE_E2E: # End-to-End service: service_id_with_uuids = context_client.SetService(request) service_with_uuids = get_service_by_id( context_client, service_id_with_uuids, rw_copy=False, include_config_rules=True, include_constraints=True, include_endpoint_ids=True) e2e_orch_request = E2EOrchestratorRequest() e2e_orch_request.service.CopyFrom(service_with_uuids) e2e_orch_client = E2EOrchestratorClient() e2e_orch_reply = e2e_orch_client.Compute(e2e_orch_request) # Feed TaskScheduler with this end-to-end orchestrator reply. TaskScheduler identifies # inter-dependencies among the services and connections retrieved and produces a # schedule of tasks (an ordered list of tasks to be executed) to implement the # requested create/update operation. tasks_scheduler = TasksScheduler(self.service_handler_factory) # e2e_orch_reply should be compatible with pathcomp_reply # TODO: if we extend e2e_orch_reply, implement method TasksScheduler::compose_from_e2eorchreply() tasks_scheduler.compose_from_pathcompreply(e2e_orch_reply, is_delete=False) tasks_scheduler.execute_all() return service_with_uuids.service_id # Normal service del service.service_endpoint_ids[:] # pylint: disable=no-member for endpoint_id in request.service_endpoint_ids: Loading src/service/service/service_handler_api/FilterFields.py +2 −0 Original line number Diff line number Diff line Loading @@ -25,6 +25,7 @@ SERVICE_TYPE_VALUES = { ServiceTypeEnum.SERVICETYPE_L2NM, ServiceTypeEnum.SERVICETYPE_TAPI_CONNECTIVITY_SERVICE, ServiceTypeEnum.SERVICETYPE_TE, ServiceTypeEnum.SERVICETYPE_E2E, } DEVICE_DRIVER_VALUES = { Loading @@ -37,6 +38,7 @@ DEVICE_DRIVER_VALUES = { DeviceDriverEnum.DEVICEDRIVER_XR, DeviceDriverEnum.DEVICEDRIVER_IETF_L2VPN, DeviceDriverEnum.DEVICEDRIVER_GNMI_OPENCONFIG, DeviceDriverEnum.DEVICEDRIVER_FLEXSCALE, } # Map allowed filter fields to allowed values per Filter field. If no restriction (free text) None is specified Loading src/service/service/service_handlers/__init__.py +7 −0 Original line number Diff line number Diff line Loading @@ -24,6 +24,7 @@ from .microwave.MicrowaveServiceHandler import MicrowaveServiceHandler from .p4.p4_service_handler import P4ServiceHandler from .tapi_tapi.TapiServiceHandler import TapiServiceHandler from .tapi_xr.TapiXrServiceHandler import TapiXrServiceHandler from .e2e_orch.E2EOrchestratorServiceHandler import E2EOrchestratorServiceHandler SERVICE_HANDLERS = [ (L2NMEmulatedServiceHandler, [ Loading Loading @@ -86,4 +87,10 @@ SERVICE_HANDLERS = [ FilterFieldEnum.DEVICE_DRIVER : [DeviceDriverEnum.DEVICEDRIVER_IETF_L2VPN], } ]), (E2EOrchestratorServiceHandler, [ { FilterFieldEnum.SERVICE_TYPE : ServiceTypeEnum.SERVICETYPE_E2E, FilterFieldEnum.DEVICE_DRIVER : [DeviceDriverEnum.DEVICEDRIVER_FLEXSCALE], } ]), ] src/service/service/service_handlers/e2e_orch/E2EOrchestratorServiceHandler.py 0 → 100644 +176 −0 Original line number Diff line number Diff line # Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import json, logging from typing import Any, Dict, List, Optional, Tuple, Union from common.method_wrappers.Decorator import MetricsPool, metered_subclass_method from common.proto.context_pb2 import ConfigRule, DeviceId, Service from common.tools.object_factory.ConfigRule import json_config_rule_set from common.tools.object_factory.Device import json_device_id from common.type_checkers.Checkers import chk_type from service.service.service_handler_api.Tools import get_device_endpoint_uuids from service.service.service_handler_api._ServiceHandler import _ServiceHandler from service.service.service_handler_api.SettingsHandler import SettingsHandler from service.service.task_scheduler.TaskExecutor import TaskExecutor LOGGER = logging.getLogger(__name__) METRICS_POOL = MetricsPool('Service', 'Handler', labels={'handler': 'e2e_orch'}) class E2EOrchestratorServiceHandler(_ServiceHandler): def __init__( # pylint: disable=super-init-not-called self, service : Service, task_executor : TaskExecutor, **settings ) -> None: self.__service = service self.__task_executor = task_executor self.__settings_handler = SettingsHandler(service.service_config, **settings) @metered_subclass_method(METRICS_POOL) def SetEndpoint( self, endpoints : List[Tuple[str, str, Optional[str]]], connection_uuid : Optional[str] = None ) -> List[Union[bool, Exception]]: chk_type('endpoints', endpoints, list) if len(endpoints) < 2: return [] service_uuid = self.__service.service_id.service_uuid.uuid settings = self.__settings_handler.get('/settings') json_settings : Dict = {} if settings is None else settings.value bitrate = json_settings.get('bitrate', 1000) results = [] try: src_device_uuid, src_endpoint_uuid = get_device_endpoint_uuids(endpoints[0]) src_device = self.__task_executor.get_device(DeviceId(**json_device_id(src_device_uuid))) src_controller = self.__task_executor.get_device_controller(src_device) if src_controller is None: src_controller = src_device dst_device_uuid, dst_endpoint_uuid = get_device_endpoint_uuids(endpoints[-1]) dst_device = self.__task_executor.get_device(DeviceId(**json_device_id(dst_device_uuid))) dst_controller = self.__task_executor.get_device_controller(dst_device) if dst_controller is None: dst_controller = dst_device controller = src_controller json_config_rule = json_config_rule_set('/services/service[{:s}]'.format(service_uuid), { 'uuid' : service_uuid, 'src_node' : src_endpoint_uuid, 'dst_node' : dst_endpoint_uuid, 'bitrate' : bitrate }) del controller.device_config.config_rules[:] controller.device_config.config_rules.append(ConfigRule(**json_config_rule)) self.__task_executor.configure_device(controller) results.append(True) except Exception as e: # pylint: disable=broad-except LOGGER.exception('Unable to SetEndpoint for Service({:s})'.format(str(service_uuid))) results.append(e) return results @metered_subclass_method(METRICS_POOL) def DeleteEndpoint( self, endpoints : List[Tuple[str, str, Optional[str]]], connection_uuid : Optional[str] = None ) -> List[Union[bool, Exception]]: chk_type('endpoints', endpoints, list) if len(endpoints) < 2: return [] service_uuid = self.__service.service_id.service_uuid.uuid settings = self.__settings_handler.get('/settings') json_settings : Dict = {} if settings is None else settings.value flow_id = json_settings.get('flow_id', 100) bitrate = json_settings.get('bitrate', 1000) results = [] try: src_device_uuid, src_endpoint_uuid = get_device_endpoint_uuids(endpoints[0]) src_device = self.__task_executor.get_device(DeviceId(**json_device_id(src_device_uuid))) src_controller = self.__task_executor.get_device_controller(src_device) if src_controller is None: src_controller = src_device dst_device_uuid, dst_endpoint_uuid = get_device_endpoint_uuids(endpoints[1]) dst_device = self.__task_executor.get_device(DeviceId(**json_device_id(dst_device_uuid))) dst_controller = self.__task_executor.get_device_controller(dst_device) if dst_controller is None: dst_controller = dst_device controller = src_controller json_config_rule = json_config_rule_set('/services/service[{:s}]'.format(service_uuid), { 'uuid' : service_uuid, 'flow_id' : flow_id, 'src_node' : src_endpoint_uuid, 'dst_node' : dst_endpoint_uuid, 'bitrate' : bitrate }) del controller.device_config.config_rules[:] controller.device_config.config_rules.append(ConfigRule(**json_config_rule)) self.__task_executor.configure_device(controller) results.append(True) except Exception as e: # pylint: disable=broad-except LOGGER.exception('Unable to DeleteEndpoint for Service({:s})'.format(str(service_uuid))) results.append(e) return results @metered_subclass_method(METRICS_POOL) def SetConstraint(self, constraints : List[Tuple[str, Any]]) -> List[Union[bool, Exception]]: chk_type('constraints', constraints, list) if len(constraints) == 0: return [] msg = '[SetConstraint] Method not implemented. Constraints({:s}) are being ignored.' LOGGER.warning(msg.format(str(constraints))) return [True for _ in range(len(constraints))] @metered_subclass_method(METRICS_POOL) def DeleteConstraint(self, constraints : List[Tuple[str, Any]]) -> List[Union[bool, Exception]]: chk_type('constraints', constraints, list) if len(constraints) == 0: return [] msg = '[DeleteConstraint] Method not implemented. Constraints({:s}) are being ignored.' LOGGER.warning(msg.format(str(constraints))) return [True for _ in range(len(constraints))] @metered_subclass_method(METRICS_POOL) def SetConfig(self, resources : List[Tuple[str, Any]]) -> List[Union[bool, Exception]]: chk_type('resources', resources, list) if len(resources) == 0: return [] results = [] for resource in resources: try: resource_value = json.loads(resource[1]) self.__settings_handler.set(resource[0], resource_value) results.append(True) except Exception as e: # pylint: disable=broad-except LOGGER.exception('Unable to SetConfig({:s})'.format(str(resource))) results.append(e) return results @metered_subclass_method(METRICS_POOL) def DeleteConfig(self, resources : List[Tuple[str, Any]]) -> List[Union[bool, Exception]]: chk_type('resources', resources, list) if len(resources) == 0: return [] results = [] for resource in resources: try: self.__settings_handler.delete(resource[0]) except Exception as e: # pylint: disable=broad-except LOGGER.exception('Unable to DeleteConfig({:s})'.format(str(resource))) results.append(e) return results Loading
src/service/Dockerfile +1 −0 Original line number Diff line number Diff line Loading @@ -66,6 +66,7 @@ COPY src/context/. context/ COPY src/device/. device/ COPY src/pathcomp/frontend/. pathcomp/frontend/ COPY src/service/. service/ COPY src/e2eorchestrator/. e2eorchestrator/ # Start the service ENTRYPOINT ["python", "-m", "service.service"]
src/service/service/ServiceServiceServicerImpl.py +28 −0 Original line number Diff line number Diff line Loading @@ -21,10 +21,12 @@ from common.method_wrappers.ServiceExceptions import ( from common.proto.context_pb2 import ( Connection, Empty, Service, ServiceId, ServiceStatusEnum, ServiceTypeEnum, ConstraintActionEnum) from common.proto.pathcomp_pb2 import PathCompRequest from common.proto.e2eorchestrator_pb2 import E2EOrchestratorRequest from common.proto.service_pb2_grpc import ServiceServiceServicer from common.tools.context_queries.Service import get_service_by_id from common.tools.grpc.Tools import grpc_message_to_json, grpc_message_to_json_string from context.client.ContextClient import ContextClient from e2eorchestrator.client.E2EOrchestratorClient import E2EOrchestratorClient from pathcomp.frontend.client.PathCompClient import PathCompClient from service.service.tools.ConnectionToString import connection_to_string from service.client.TEServiceClient import TEServiceClient Loading Loading @@ -153,6 +155,32 @@ class ServiceServiceServicerImpl(ServiceServiceServicer): str_service_status = ServiceStatusEnum.Name(service_status.service_status) raise Exception(MSG.format(service_key, str_service_status)) if service.service_type == ServiceTypeEnum.SERVICETYPE_E2E: # End-to-End service: service_id_with_uuids = context_client.SetService(request) service_with_uuids = get_service_by_id( context_client, service_id_with_uuids, rw_copy=False, include_config_rules=True, include_constraints=True, include_endpoint_ids=True) e2e_orch_request = E2EOrchestratorRequest() e2e_orch_request.service.CopyFrom(service_with_uuids) e2e_orch_client = E2EOrchestratorClient() e2e_orch_reply = e2e_orch_client.Compute(e2e_orch_request) # Feed TaskScheduler with this end-to-end orchestrator reply. TaskScheduler identifies # inter-dependencies among the services and connections retrieved and produces a # schedule of tasks (an ordered list of tasks to be executed) to implement the # requested create/update operation. tasks_scheduler = TasksScheduler(self.service_handler_factory) # e2e_orch_reply should be compatible with pathcomp_reply # TODO: if we extend e2e_orch_reply, implement method TasksScheduler::compose_from_e2eorchreply() tasks_scheduler.compose_from_pathcompreply(e2e_orch_reply, is_delete=False) tasks_scheduler.execute_all() return service_with_uuids.service_id # Normal service del service.service_endpoint_ids[:] # pylint: disable=no-member for endpoint_id in request.service_endpoint_ids: Loading
src/service/service/service_handler_api/FilterFields.py +2 −0 Original line number Diff line number Diff line Loading @@ -25,6 +25,7 @@ SERVICE_TYPE_VALUES = { ServiceTypeEnum.SERVICETYPE_L2NM, ServiceTypeEnum.SERVICETYPE_TAPI_CONNECTIVITY_SERVICE, ServiceTypeEnum.SERVICETYPE_TE, ServiceTypeEnum.SERVICETYPE_E2E, } DEVICE_DRIVER_VALUES = { Loading @@ -37,6 +38,7 @@ DEVICE_DRIVER_VALUES = { DeviceDriverEnum.DEVICEDRIVER_XR, DeviceDriverEnum.DEVICEDRIVER_IETF_L2VPN, DeviceDriverEnum.DEVICEDRIVER_GNMI_OPENCONFIG, DeviceDriverEnum.DEVICEDRIVER_FLEXSCALE, } # Map allowed filter fields to allowed values per Filter field. If no restriction (free text) None is specified Loading
src/service/service/service_handlers/__init__.py +7 −0 Original line number Diff line number Diff line Loading @@ -24,6 +24,7 @@ from .microwave.MicrowaveServiceHandler import MicrowaveServiceHandler from .p4.p4_service_handler import P4ServiceHandler from .tapi_tapi.TapiServiceHandler import TapiServiceHandler from .tapi_xr.TapiXrServiceHandler import TapiXrServiceHandler from .e2e_orch.E2EOrchestratorServiceHandler import E2EOrchestratorServiceHandler SERVICE_HANDLERS = [ (L2NMEmulatedServiceHandler, [ Loading Loading @@ -86,4 +87,10 @@ SERVICE_HANDLERS = [ FilterFieldEnum.DEVICE_DRIVER : [DeviceDriverEnum.DEVICEDRIVER_IETF_L2VPN], } ]), (E2EOrchestratorServiceHandler, [ { FilterFieldEnum.SERVICE_TYPE : ServiceTypeEnum.SERVICETYPE_E2E, FilterFieldEnum.DEVICE_DRIVER : [DeviceDriverEnum.DEVICEDRIVER_FLEXSCALE], } ]), ]
src/service/service/service_handlers/e2e_orch/E2EOrchestratorServiceHandler.py 0 → 100644 +176 −0 Original line number Diff line number Diff line # Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import json, logging from typing import Any, Dict, List, Optional, Tuple, Union from common.method_wrappers.Decorator import MetricsPool, metered_subclass_method from common.proto.context_pb2 import ConfigRule, DeviceId, Service from common.tools.object_factory.ConfigRule import json_config_rule_set from common.tools.object_factory.Device import json_device_id from common.type_checkers.Checkers import chk_type from service.service.service_handler_api.Tools import get_device_endpoint_uuids from service.service.service_handler_api._ServiceHandler import _ServiceHandler from service.service.service_handler_api.SettingsHandler import SettingsHandler from service.service.task_scheduler.TaskExecutor import TaskExecutor LOGGER = logging.getLogger(__name__) METRICS_POOL = MetricsPool('Service', 'Handler', labels={'handler': 'e2e_orch'}) class E2EOrchestratorServiceHandler(_ServiceHandler): def __init__( # pylint: disable=super-init-not-called self, service : Service, task_executor : TaskExecutor, **settings ) -> None: self.__service = service self.__task_executor = task_executor self.__settings_handler = SettingsHandler(service.service_config, **settings) @metered_subclass_method(METRICS_POOL) def SetEndpoint( self, endpoints : List[Tuple[str, str, Optional[str]]], connection_uuid : Optional[str] = None ) -> List[Union[bool, Exception]]: chk_type('endpoints', endpoints, list) if len(endpoints) < 2: return [] service_uuid = self.__service.service_id.service_uuid.uuid settings = self.__settings_handler.get('/settings') json_settings : Dict = {} if settings is None else settings.value bitrate = json_settings.get('bitrate', 1000) results = [] try: src_device_uuid, src_endpoint_uuid = get_device_endpoint_uuids(endpoints[0]) src_device = self.__task_executor.get_device(DeviceId(**json_device_id(src_device_uuid))) src_controller = self.__task_executor.get_device_controller(src_device) if src_controller is None: src_controller = src_device dst_device_uuid, dst_endpoint_uuid = get_device_endpoint_uuids(endpoints[-1]) dst_device = self.__task_executor.get_device(DeviceId(**json_device_id(dst_device_uuid))) dst_controller = self.__task_executor.get_device_controller(dst_device) if dst_controller is None: dst_controller = dst_device controller = src_controller json_config_rule = json_config_rule_set('/services/service[{:s}]'.format(service_uuid), { 'uuid' : service_uuid, 'src_node' : src_endpoint_uuid, 'dst_node' : dst_endpoint_uuid, 'bitrate' : bitrate }) del controller.device_config.config_rules[:] controller.device_config.config_rules.append(ConfigRule(**json_config_rule)) self.__task_executor.configure_device(controller) results.append(True) except Exception as e: # pylint: disable=broad-except LOGGER.exception('Unable to SetEndpoint for Service({:s})'.format(str(service_uuid))) results.append(e) return results @metered_subclass_method(METRICS_POOL) def DeleteEndpoint( self, endpoints : List[Tuple[str, str, Optional[str]]], connection_uuid : Optional[str] = None ) -> List[Union[bool, Exception]]: chk_type('endpoints', endpoints, list) if len(endpoints) < 2: return [] service_uuid = self.__service.service_id.service_uuid.uuid settings = self.__settings_handler.get('/settings') json_settings : Dict = {} if settings is None else settings.value flow_id = json_settings.get('flow_id', 100) bitrate = json_settings.get('bitrate', 1000) results = [] try: src_device_uuid, src_endpoint_uuid = get_device_endpoint_uuids(endpoints[0]) src_device = self.__task_executor.get_device(DeviceId(**json_device_id(src_device_uuid))) src_controller = self.__task_executor.get_device_controller(src_device) if src_controller is None: src_controller = src_device dst_device_uuid, dst_endpoint_uuid = get_device_endpoint_uuids(endpoints[1]) dst_device = self.__task_executor.get_device(DeviceId(**json_device_id(dst_device_uuid))) dst_controller = self.__task_executor.get_device_controller(dst_device) if dst_controller is None: dst_controller = dst_device controller = src_controller json_config_rule = json_config_rule_set('/services/service[{:s}]'.format(service_uuid), { 'uuid' : service_uuid, 'flow_id' : flow_id, 'src_node' : src_endpoint_uuid, 'dst_node' : dst_endpoint_uuid, 'bitrate' : bitrate }) del controller.device_config.config_rules[:] controller.device_config.config_rules.append(ConfigRule(**json_config_rule)) self.__task_executor.configure_device(controller) results.append(True) except Exception as e: # pylint: disable=broad-except LOGGER.exception('Unable to DeleteEndpoint for Service({:s})'.format(str(service_uuid))) results.append(e) return results @metered_subclass_method(METRICS_POOL) def SetConstraint(self, constraints : List[Tuple[str, Any]]) -> List[Union[bool, Exception]]: chk_type('constraints', constraints, list) if len(constraints) == 0: return [] msg = '[SetConstraint] Method not implemented. Constraints({:s}) are being ignored.' LOGGER.warning(msg.format(str(constraints))) return [True for _ in range(len(constraints))] @metered_subclass_method(METRICS_POOL) def DeleteConstraint(self, constraints : List[Tuple[str, Any]]) -> List[Union[bool, Exception]]: chk_type('constraints', constraints, list) if len(constraints) == 0: return [] msg = '[DeleteConstraint] Method not implemented. Constraints({:s}) are being ignored.' LOGGER.warning(msg.format(str(constraints))) return [True for _ in range(len(constraints))] @metered_subclass_method(METRICS_POOL) def SetConfig(self, resources : List[Tuple[str, Any]]) -> List[Union[bool, Exception]]: chk_type('resources', resources, list) if len(resources) == 0: return [] results = [] for resource in resources: try: resource_value = json.loads(resource[1]) self.__settings_handler.set(resource[0], resource_value) results.append(True) except Exception as e: # pylint: disable=broad-except LOGGER.exception('Unable to SetConfig({:s})'.format(str(resource))) results.append(e) return results @metered_subclass_method(METRICS_POOL) def DeleteConfig(self, resources : List[Tuple[str, Any]]) -> List[Union[bool, Exception]]: chk_type('resources', resources, list) if len(resources) == 0: return [] results = [] for resource in resources: try: self.__settings_handler.delete(resource[0]) except Exception as e: # pylint: disable=broad-except LOGGER.exception('Unable to DeleteConfig({:s})'.format(str(resource))) results.append(e) return results