Commit b3dd640a authored by Carlos Manso's avatar Carlos Manso
Browse files

e2e_orch service handler

parent daed600d
Loading
Loading
Loading
Loading
+1 −0
Original line number Diff line number Diff line
@@ -66,6 +66,7 @@ COPY src/context/. context/
COPY src/device/. device/
COPY src/pathcomp/frontend/. pathcomp/frontend/
COPY src/service/. service/
COPY src/e2eorchestrator/. e2eorchestrator/

# Start the service
ENTRYPOINT ["python", "-m", "service.service"]
+29 −0
Original line number Diff line number Diff line
@@ -21,10 +21,12 @@ from common.method_wrappers.ServiceExceptions import (
from common.proto.context_pb2 import (
    Connection, Empty, Service, ServiceId, ServiceStatusEnum, ServiceTypeEnum, ConstraintActionEnum)
from common.proto.pathcomp_pb2 import PathCompRequest
from common.proto.e2eorchestrator_pb2 import E2EOrchestratorRequest, E2EOrchestratorReply
from common.proto.service_pb2_grpc import ServiceServiceServicer
from common.tools.context_queries.Service import get_service_by_id
from common.tools.grpc.Tools import grpc_message_to_json, grpc_message_to_json_string
from context.client.ContextClient import ContextClient
from e2eorchestrator.client.E2EOrchestratorClient import E2EOrchestratorClient
from pathcomp.frontend.client.PathCompClient import PathCompClient
from service.service.tools.ConnectionToString import connection_to_string
from service.client.TEServiceClient import TEServiceClient
@@ -32,6 +34,7 @@ from .service_handler_api.ServiceHandlerFactory import ServiceHandlerFactory
from .task_scheduler.TaskScheduler import TasksScheduler
from .tools.GeodesicDistance import gps_distance


LOGGER = logging.getLogger(__name__)

METRICS_POOL = MetricsPool('Service', 'RPC')
@@ -153,6 +156,32 @@ class ServiceServiceServicerImpl(ServiceServiceServicer):
                str_service_status = ServiceStatusEnum.Name(service_status.service_status)
                raise Exception(MSG.format(service_key, str_service_status))

        if service.service_type == ServiceTypeEnum.SERVICETYPE_E2E:
            # End-to-End service:
            service_id_with_uuids = context_client.SetService(request)

            service_with_uuids = get_service_by_id(
                context_client, service_id_with_uuids, rw_copy=False,
                include_config_rules=True, include_constraints=True, include_endpoint_ids=True)

            e2e_orch_request = E2EOrchestratorRequest()
            e2e_orch_request.service.CopyFrom(service_with_uuids)

            e2e_orch_client = E2EOrchestratorClient()
            e2e_orch_reply = e2e_orch_client.Compute(e2e_orch_request)

            # Feed TaskScheduler with this end-to-end orchestrator reply. TaskScheduler identifies
            # inter-dependencies among the services and connections retrieved and produces a
            # schedule of tasks (an ordered list of tasks to be executed) to implement the
            # requested create/update operation.
            tasks_scheduler = TasksScheduler(self.service_handler_factory)
            # e2e_orch_reply should be compatible with pathcomp_reply
            # TODO: if we extend e2e_orch_reply, implement method TasksScheduler::compose_from_e2eorchreply()
            tasks_scheduler.compose_from_pathcompreply(e2e_orch_reply, is_delete=False)
            tasks_scheduler.execute_all()
            return service_with_uuids.service_id


        # Normal service
        del service.service_endpoint_ids[:] # pylint: disable=no-member
        for endpoint_id in request.service_endpoint_ids:
+2 −0
Original line number Diff line number Diff line
@@ -25,6 +25,7 @@ SERVICE_TYPE_VALUES = {
    ServiceTypeEnum.SERVICETYPE_L2NM,
    ServiceTypeEnum.SERVICETYPE_TAPI_CONNECTIVITY_SERVICE,
    ServiceTypeEnum.SERVICETYPE_TE,
    ServiceTypeEnum.SERVICETYPE_E2E,
}

DEVICE_DRIVER_VALUES = {
@@ -37,6 +38,7 @@ DEVICE_DRIVER_VALUES = {
    DeviceDriverEnum.DEVICEDRIVER_XR,
    DeviceDriverEnum.DEVICEDRIVER_IETF_L2VPN,
    DeviceDriverEnum.DEVICEDRIVER_GNMI_OPENCONFIG,
    DeviceDriverEnum.DEVICEDRIVER_FLEXSCALE,
}

# Map allowed filter fields to allowed values per Filter field. If no restriction (free text) None is specified
+8 −0
Original line number Diff line number Diff line
@@ -24,6 +24,8 @@ from .microwave.MicrowaveServiceHandler import MicrowaveServiceHandler
from .p4.p4_service_handler import P4ServiceHandler
from .tapi_tapi.TapiServiceHandler import TapiServiceHandler
from .tapi_xr.TapiXrServiceHandler import TapiXrServiceHandler
from .e2e_orch.E2EOrchestratorServiceHandler import E2EOrchestratorServiceHandler


SERVICE_HANDLERS = [
    (L2NMEmulatedServiceHandler, [
@@ -86,4 +88,10 @@ SERVICE_HANDLERS = [
            FilterFieldEnum.DEVICE_DRIVER : [DeviceDriverEnum.DEVICEDRIVER_IETF_L2VPN],
        }
    ]),
    (E2EOrchestratorServiceHandler, [
        {
            FilterFieldEnum.SERVICE_TYPE  : ServiceTypeEnum.SERVICETYPE_E2E,
            FilterFieldEnum.DEVICE_DRIVER : [DeviceDriverEnum.DEVICEDRIVER_FLEXSCALE],
        }
    ]),
]
+176 −0
Original line number Diff line number Diff line
# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import json, logging
from typing import Any, Dict, List, Optional, Tuple, Union
from common.method_wrappers.Decorator import MetricsPool, metered_subclass_method
from common.proto.context_pb2 import ConfigRule, DeviceId, Service
from common.tools.object_factory.ConfigRule import json_config_rule_set
from common.tools.object_factory.Device import json_device_id
from common.type_checkers.Checkers import chk_type
from service.service.service_handler_api.Tools import get_device_endpoint_uuids
from service.service.service_handler_api._ServiceHandler import _ServiceHandler
from service.service.service_handler_api.SettingsHandler import SettingsHandler
from service.service.task_scheduler.TaskExecutor import TaskExecutor

LOGGER = logging.getLogger(__name__)

METRICS_POOL = MetricsPool('Service', 'Handler', labels={'handler': 'e2e_orch'})

class E2EOrchestratorServiceHandler(_ServiceHandler):
    def __init__(   # pylint: disable=super-init-not-called
        self, service : Service, task_executor : TaskExecutor, **settings
    ) -> None:
        self.__service = service
        self.__task_executor = task_executor
        self.__settings_handler = SettingsHandler(service.service_config, **settings)

    @metered_subclass_method(METRICS_POOL)
    def SetEndpoint(
        self, endpoints : List[Tuple[str, str, Optional[str]]], connection_uuid : Optional[str] = None
    ) -> List[Union[bool, Exception]]:

        chk_type('endpoints', endpoints, list)
        if len(endpoints) < 2: return []

        service_uuid = self.__service.service_id.service_uuid.uuid
        settings = self.__settings_handler.get('/settings')
        json_settings : Dict = {} if settings is None else settings.value
        bitrate = json_settings.get('bitrate', 1000)

        results = []
        try:
            src_device_uuid, src_endpoint_uuid = get_device_endpoint_uuids(endpoints[0])
            src_device = self.__task_executor.get_device(DeviceId(**json_device_id(src_device_uuid)))
            src_controller = self.__task_executor.get_device_controller(src_device)
            if src_controller is None: src_controller = src_device

            dst_device_uuid, dst_endpoint_uuid = get_device_endpoint_uuids(endpoints[-1])
            dst_device = self.__task_executor.get_device(DeviceId(**json_device_id(dst_device_uuid)))
            dst_controller = self.__task_executor.get_device_controller(dst_device)
            if dst_controller is None: dst_controller = dst_device

            controller = src_controller

            json_config_rule = json_config_rule_set('/services/service[{:s}]'.format(service_uuid), {
                'uuid'                    : service_uuid,
                'src_node'                : src_endpoint_uuid,
                'dst_node'                : dst_endpoint_uuid,
                'bitrate'                 : bitrate
            })
            del controller.device_config.config_rules[:]
            controller.device_config.config_rules.append(ConfigRule(**json_config_rule))
            self.__task_executor.configure_device(controller)
            results.append(True)
        except Exception as e: # pylint: disable=broad-except
            LOGGER.exception('Unable to SetEndpoint for Service({:s})'.format(str(service_uuid)))
            results.append(e)

        return results

    @metered_subclass_method(METRICS_POOL)
    def DeleteEndpoint(
        self, endpoints : List[Tuple[str, str, Optional[str]]], connection_uuid : Optional[str] = None
    ) -> List[Union[bool, Exception]]:

        chk_type('endpoints', endpoints, list)
        if len(endpoints) < 2: return []

        service_uuid = self.__service.service_id.service_uuid.uuid
        settings = self.__settings_handler.get('/settings')
        json_settings : Dict = {} if settings is None else settings.value
        flow_id = json_settings.get('flow_id', 100)
        bitrate = json_settings.get('bitrate', 1000)

        results = []
        try:
            src_device_uuid, src_endpoint_uuid = get_device_endpoint_uuids(endpoints[0])
            src_device = self.__task_executor.get_device(DeviceId(**json_device_id(src_device_uuid)))
            src_controller = self.__task_executor.get_device_controller(src_device)
            if src_controller is None: src_controller = src_device

            dst_device_uuid, dst_endpoint_uuid = get_device_endpoint_uuids(endpoints[1])
            dst_device = self.__task_executor.get_device(DeviceId(**json_device_id(dst_device_uuid)))
            dst_controller = self.__task_executor.get_device_controller(dst_device)
            if dst_controller is None: dst_controller = dst_device

            controller = src_controller

            json_config_rule = json_config_rule_set('/services/service[{:s}]'.format(service_uuid), {
                'uuid'                    : service_uuid,
                'flow_id'                 : flow_id,
                'src_node'                : src_endpoint_uuid,
                'dst_node'                : dst_endpoint_uuid,
                'bitrate'                 : bitrate
            })

            del controller.device_config.config_rules[:]
            controller.device_config.config_rules.append(ConfigRule(**json_config_rule))
            self.__task_executor.configure_device(controller)
            results.append(True)
        except Exception as e: # pylint: disable=broad-except
            LOGGER.exception('Unable to DeleteEndpoint for Service({:s})'.format(str(service_uuid)))
            results.append(e)

        return results

    @metered_subclass_method(METRICS_POOL)
    def SetConstraint(self, constraints : List[Tuple[str, Any]]) -> List[Union[bool, Exception]]:
        chk_type('constraints', constraints, list)
        if len(constraints) == 0: return []

        msg = '[SetConstraint] Method not implemented. Constraints({:s}) are being ignored.'
        LOGGER.warning(msg.format(str(constraints)))
        return [True for _ in range(len(constraints))]

    @metered_subclass_method(METRICS_POOL)
    def DeleteConstraint(self, constraints : List[Tuple[str, Any]]) -> List[Union[bool, Exception]]:
        chk_type('constraints', constraints, list)
        if len(constraints) == 0: return []

        msg = '[DeleteConstraint] Method not implemented. Constraints({:s}) are being ignored.'
        LOGGER.warning(msg.format(str(constraints)))
        return [True for _ in range(len(constraints))]

    @metered_subclass_method(METRICS_POOL)
    def SetConfig(self, resources : List[Tuple[str, Any]]) -> List[Union[bool, Exception]]:
        chk_type('resources', resources, list)
        if len(resources) == 0: return []

        results = []
        for resource in resources:
            try:
                resource_value = json.loads(resource[1])
                self.__settings_handler.set(resource[0], resource_value)
                results.append(True)
            except Exception as e: # pylint: disable=broad-except
                LOGGER.exception('Unable to SetConfig({:s})'.format(str(resource)))
                results.append(e)

        return results

    @metered_subclass_method(METRICS_POOL)
    def DeleteConfig(self, resources : List[Tuple[str, Any]]) -> List[Union[bool, Exception]]:
        chk_type('resources', resources, list)
        if len(resources) == 0: return []

        results = []
        for resource in resources:
            try:
                self.__settings_handler.delete(resource[0])
            except Exception as e: # pylint: disable=broad-except
                LOGGER.exception('Unable to DeleteConfig({:s})'.format(str(resource)))
                results.append(e)

        return results
Loading