Commit 7c8d37e9 authored by Shayan Hajipour's avatar Shayan Hajipour
Browse files

feat: initial version of L3NM NCE service handler added

parent c31a36bf
Loading
Loading
Loading
Loading
+121 −0
Original line number Diff line number Diff line
# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import Dict, List

from common.tools.object_factory.ConfigRule import (
    json_config_rule_delete,
    json_config_rule_set,
)
from service.service.service_handler_api.AnyTreeTools import TreeNode


def setup_config_rules(service_uuid: str, json_settings: Dict) -> List[Dict]:
    operation_type: str = json_settings["operation_type"]
    app_flow_id: str = json_settings["app_flow_id"]
    app_flow_user_id: str = json_settings["app_flow_user_id"]
    max_latency: int = json_settings["max_latency"]
    max_jitter: int = json_settings["max_jitter"]
    max_loss: float = json_settings["max_loss"]
    upstream_assure_bw: str = json_settings["upstream_assure_bw"]
    upstream_max_bw: str = json_settings["upstream_max_bw"]
    downstream_assure_bw: str = json_settings["downstream_assure_bw"]
    downstream_max_bw: str = json_settings["downstream_max_bw"]
    src_ip: str = json_settings["src_ip"]
    src_port: str = json_settings["src_port"]
    dst_ip: str = json_settings["dst_ip"]
    dst_port: str = json_settings["dst_port"]

    app_flow_app_name: str = f"App_Flow_{app_flow_id}"
    app_flow_service_profile: str = f"service_{app_flow_id}"
    app_id: str = f"app_{app_flow_id}"
    app_feature_id: str = f"feature_{app_flow_id}"
    app_flow_name: str = json_settings.get("app_flow_name", "App_Flow_Example")
    app_flow_max_online_users: int = json_settings.get("app_flow_max_online_users", 1)
    app_flow_stas: str = json_settings.get("stas", "00:3D:E1:18:82:9E")
    qos_profile_name: str = json_settings.get("app_flow_qos_profile", "AR_VR_Gaming")
    app_flow_duration: int = json_settings.get("app_flow_duration", 9999)
    protocol: str = json_settings.get("protocol", "tcp")

    app_flow = {
        "name": app_flow_name,
        "user-id": app_flow_user_id,
        "app-name": app_flow_app_name,
        "max-online-users": app_flow_max_online_users,
        "stas": app_flow_stas,
        "qos-profile": qos_profile_name,
        "service-profile": app_flow_service_profile,
        "duration": app_flow_duration,
    }
    qos_profile = {
        "name": qos_profile_name,
        "max-latency": max_latency,
        "max-jitter": max_jitter,
        "max-loss": max_loss,
        "upstream": {
            "assure-bandwidth": upstream_assure_bw,
            "max-bandwidth": upstream_max_bw,
        },
        "downstream": {
            "assure-bandwidth": downstream_assure_bw,
            "max-bandwidth": downstream_max_bw,
        },
    }
    application = {
        "name": app_flow_name,
        "app-id": app_id,
        "app-features": {
            "app-feature": [
                {
                    "id": app_feature_id,
                    "dest-ip": dst_ip,
                    "dest-port": dst_port,
                    "src-ip": src_ip,
                    "src-port": src_port,
                    "protocol": protocol,
                }
            ]
        },
    }
    app_flow_datamodel = {
        "huawei-nce-app-flow:app-flows": {
            "app-flow": [app_flow],
            "qos-profiles": {"qos-profile": [qos_profile]},
            "applications": {"application": [application]},
        }
    }
    json_config_rules = [
        json_config_rule_set(
            "/service[{:s}]/AppFlow".format(service_uuid), app_flow_datamodel
        ),
        json_config_rule_set(
            "/service[{:s}]/AppFlow/operation".format(service_uuid),
            {"type": operation_type},
        ),
    ]
    return json_config_rules


def teardown_config_rules(service_uuid: str, json_settings: Dict) -> List[Dict]:
    json_config_rules = [
        json_config_rule_delete(
            "/service[{:s}]/AppFlow".format(service_uuid),
            {},
        ),
        json_config_rule_delete(
            "/service[{:s}]/AppFlow/operation".format(service_uuid),
            {},
        ),
    ]
    return json_config_rules
+446 −0
Original line number Diff line number Diff line
# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import json
import logging
import re
from typing import Any, List, Optional, Tuple, Union
from uuid import uuid4

from deepdiff import DeepDiff

from common.method_wrappers.Decorator import MetricsPool, metered_subclass_method
from common.proto.context_pb2 import ConfigRule, DeviceId, Empty, Service, ServiceConfig
from common.tools.object_factory.Device import json_device_id
from common.type_checkers.Checkers import chk_type
from context.client.ContextClient import ContextClient
from service.service.service_handler_api._ServiceHandler import _ServiceHandler
from service.service.service_handler_api.SettingsHandler import SettingsHandler
from service.service.service_handler_api.Tools import (
    get_device_endpoint_uuids,
    get_endpoint_matching,
)
from service.service.task_scheduler.TaskExecutor import TaskExecutor

from .ConfigRules import setup_config_rules, teardown_config_rules

RUNNING_RESOURCE_KEY = "running_ietf_slice"
CANDIDATE_RESOURCE_KEY = "candidate_ietf_slice"

LOGGER = logging.getLogger(__name__)

METRICS_POOL = MetricsPool("Service", "Handler", labels={"handler": "l3nm_nce"})
SDP_DIFF_RE = re.compile(
    r"^root\[\'network-slice-services\'\]\[\'slice-service\'\]\[0\]\[\'sdps\'\]\[\'sdp\'\]\[(\d)\]$"
)
CONNECTION_GROUP_DIFF_RE = re.compile(
    r"^root\[\'network-slice-services\'\]\[\'slice-service\'\]\[0\]\[\'connection-groups\'\]\[\'connection-group\'\]\[(\d)\]$"
)
MATCH_CRITERION_DIFF_RE = re.compile(
    r"^root\[\'network-slice-services\'\]\[\'slice-service\'\]\[0\]\[\'sdps\'\]\[\'sdp\'\]\[(\d)\]\[\'service-match-criteria\'\]\[\'match-criterion\'\]\[(\d)\]$"
)


def get_custom_config_rule(
    service_config: ServiceConfig, resource_key: str
) -> Optional[ConfigRule]:
    for cr in service_config.config_rules:
        if (
            cr.WhichOneof("config_rule") == "custom"
            and cr.custom.resource_key == resource_key
        ):
            return cr


def get_running_candidate_ietf_slice_data_diff(service_config: ServiceConfig) -> dict:
    running_ietf_slice_cr = get_custom_config_rule(service_config, RUNNING_RESOURCE_KEY)
    running_resource_value_dict = json.loads(
        running_ietf_slice_cr.custom.resource_value
    )
    candidate_ietf_slice_cr = get_custom_config_rule(
        service_config, CANDIDATE_RESOURCE_KEY
    )
    candidate_resource_value_dict = json.loads(
        candidate_ietf_slice_cr.custom.resource_value
    )
    return (
        DeepDiff(
            running_resource_value_dict,
            candidate_resource_value_dict,
        ),
        DeepDiff(
            running_resource_value_dict,
            candidate_resource_value_dict,
            ignore_order=True,
        ),
    )


class L3NMNCEServiceHandler(_ServiceHandler):
    def __init__(  # pylint: disable=super-init-not-called
        self, service: Service, task_executor: TaskExecutor, **settings
    ) -> None:
        self.__service = service
        self.__task_executor = task_executor
        self.__settings_handler = SettingsHandler(service.service_config, **settings)

    @metered_subclass_method(METRICS_POOL)
    def SetEndpoint(
        self,
        endpoints: List[Tuple[str, str, Optional[str]]],
        connection_uuid: Optional[str] = None,
    ) -> List[Union[bool, Exception]]:
        chk_type("endpoints", endpoints, list)
        if len(endpoints) == 0:
            return []
        context_client = ContextClient()
        service_uuid = self.__service.service_id.service_uuid.uuid
        service_name = self.__service.name
        service_config = self.__service.service_config
        settings = self.__settings_handler.get("/settings")
        src_device_uuid, src_endpoint_uuid = get_device_endpoint_uuids(endpoints[0])
        src_device_obj = self.__task_executor.get_device(
            DeviceId(**json_device_id(src_device_uuid))
        )
        controller = self.__task_executor.get_device_controller(src_device_obj)
        list_devices = context_client.ListDevices(Empty())
        devices = list_devices.devices
        device_name_device = {d.name: d for d in devices}
        device_uuid_device = {d.device_id.device_uuid.uuid: d for d in devices}
        running_candidate_diff, running_candidate_diff_no_order = (
            get_running_candidate_ietf_slice_data_diff(service_config)
        )
        candidate_ietf_slice_cr = get_custom_config_rule(
            service_config, CANDIDATE_RESOURCE_KEY
        )
        candidate_resource_value_dict = json.loads(
            candidate_ietf_slice_cr.custom.resource_value
        )
        running_ietf_slice_cr = get_custom_config_rule(
            service_config, RUNNING_RESOURCE_KEY
        )
        running_resource_value_dict = json.loads(
            running_ietf_slice_cr.custom.resource_value
        )
        LOGGER.debug(f"P70: {running_candidate_diff}")
        if not running_candidate_diff:  # Slice Creation
            operation_type = "create"
            slice_services = candidate_resource_value_dict["network-slice-services"][
                "slice-service"
            ]
            slice_service = slice_services[0]
            sdps = slice_service["sdps"]["sdp"]
            connection_groups = slice_service["connection-groups"]["connection-group"]
            sdp_ids = [sdp["id"] for sdp in sdps]
            for sdp in sdps:
                node_id = sdp["node-id"]
                device_obj = device_name_device[node_id]
                device_controller = self.__task_executor.get_device_controller(
                    device_obj
                )
                LOGGER.debug(f"P71: {controller}")
                LOGGER.debug(f"P72: {device_controller}")
                if device_controller is None or controller.name != device_controller.name:
                    continue
                src_sdp_idx = sdp_ids.pop(sdp_ids.index(sdp["id"]))
                dst_sdp_idx = sdp_ids[0]
                match_criteria = sdp["service-match-criteria"]["match-criterion"]
                match_criterion = match_criteria[0]
                connection_grp_id = match_criterion["target-connection-group-id"]
                break
            else:
                raise Exception("connection group id not found")
        elif "iterable_item_added" in running_candidate_diff:  # new SDP added
            slice_services = candidate_resource_value_dict["network-slice-services"][
                "slice-service"
            ]
            slice_service = slice_services[0]
            sdps = slice_service["sdps"]["sdp"]
            connection_groups = slice_service["connection-groups"]["connection-group"]
            operation_type = "create"
            added_items = {
                "sdp": {"sdp_idx": None, "value": {}},
                "connection_group": {"connection_group_idx": None, "value": {}},
                "match_criterion": {
                    "sdp_idx": None,
                    "match_criterion_idx": None,
                    "value": {},
                },
            }
            for added_key, added_value in running_candidate_diff[
                "iterable_item_added"
            ].items():
                sdp_match = SDP_DIFF_RE.match(added_key)
                connection_group_match = CONNECTION_GROUP_DIFF_RE.match(added_key)
                match_criterion_match = MATCH_CRITERION_DIFF_RE.match(added_key)
                if sdp_match:
                    added_items["sdp"] = {
                        "sdp_idx": int(sdp_match.groups()[0]),
                        "value": added_value,
                    }
                elif connection_group_match:
                    added_items["connection_group"] = {
                        "connection_group_idx": int(connection_group_match.groups()[0]),
                        "value": added_value,
                    }
                elif match_criterion_match:
                    added_items["match_criterion"] = {
                        "sdp_idx": int(match_criterion_match.groups()[0]),
                        "match_criterion_idx": int(match_criterion_match.groups()[1]),
                        "value": added_value,
                    }
            new_sdp = sdps[added_items["sdp"]["sdp_idx"]]
            src_sdp_idx = new_sdp["id"]
            dst_sdp_idx = sdps[added_items["match_criterion"]["sdp_idx"]]["id"]
            connection_grp_id = connection_groups[
                added_items["connection_group"]["connection_group_idx"]
            ]["id"]
            if (
                connection_grp_id
                != added_items["match_criterion"]["value"]["target-connection-group-id"]
            ):
                raise Exception(
                    "connection group missmatch in destination sdp and added connection group"
                )
            match_criteria = new_sdp["service-match-criteria"]["match-criterion"]
            match_criterion = match_criteria[0]
        elif "iterable_item_removed" in running_candidate_diff:  # new SDP added
            slice_services = running_resource_value_dict["network-slice-services"][
                "slice-service"
            ]
            slice_service = slice_services[0]
            sdps = slice_service["sdps"]["sdp"]
            connection_groups = slice_service["connection-groups"]["connection-group"]
            operation_type = "delete"
            added_items = {
                "sdp": {"sdp_idx": None, "value": {}},
                "connection_group": {"connection_group_idx": None, "value": {}},
                "match_criterion": {
                    "sdp_idx": None,
                    "match_criterion_idx": None,
                    "value": {},
                },
            }
            for added_key, added_value in running_candidate_diff[
                "iterable_item_removed"
            ].items():
                sdp_match = SDP_DIFF_RE.match(added_key)
                connection_group_match = CONNECTION_GROUP_DIFF_RE.match(added_key)
                match_criterion_match = MATCH_CRITERION_DIFF_RE.match(added_key)
                if sdp_match:
                    added_items["sdp"] = {
                        "sdp_idx": int(sdp_match.groups()[0]),
                        "value": added_value,
                    }
                elif connection_group_match:
                    added_items["connection_group"] = {
                        "connection_group_idx": int(connection_group_match.groups()[0]),
                        "value": added_value,
                    }
                elif match_criterion_match:
                    added_items["match_criterion"] = {
                        "sdp_idx": int(match_criterion_match.groups()[0]),
                        "match_criterion_idx": int(match_criterion_match.groups()[1]),
                        "value": added_value,
                    }
            new_sdp = sdps[added_items["sdp"]["sdp_idx"]]
            src_sdp_idx = new_sdp["id"]
            dst_sdp_idx = sdps[added_items["match_criterion"]["sdp_idx"]]["id"]
            connection_grp_id = connection_groups[
                added_items["connection_group"]["connection_group_idx"]
            ]["id"]
            if (
                connection_grp_id
                != added_items["match_criterion"]["value"][
                    "target-connection-group-id"
                ]
            ):
                raise Exception(
                    "connection group missmatch in destination sdp and added connection group"
                )
            match_criteria = new_sdp["service-match-criteria"]["match-criterion"]
            match_criterion = match_criteria[0]
        for type_value in match_criterion["match-type"]:
            if type_value["type"] == "ietf-network-slice-service:source-ip-prefix":
                src_ip = type_value["value"][0].split("/")[0]
            elif (
                type_value["type"] == "ietf-network-slice-service:destination-ip-prefix"
            ):
                dst_ip = type_value["value"][0].split("/")[0]
            elif type_value["type"] == "ietf-network-slice-service:source-tcp-port":
                src_port = type_value["value"][0]
            elif (
                type_value["type"] == "ietf-network-slice-service:destination-tcp-port"
            ):
                dst_port = type_value["value"][0]
        qos_info = {
            "upstream": {"max_delay": "0", "bw": "0", "packet_loss": "0"},
            "downstream": {"max_delay": "0", "bw": "0", "packet_loss": "0"},
        }
        for cg in connection_groups:
            if cg["id"] != connection_grp_id:
                continue
            for cc in cg["connectivity-construct"]:
                if (
                    cc["p2p-sender-sdp"] == src_sdp_idx
                    and cc["p2p-receiver-sdp"] == dst_sdp_idx
                ):
                    direction = "upstream"
                elif (
                    cc["p2p-sender-sdp"] == dst_sdp_idx
                    and cc["p2p-receiver-sdp"] == src_sdp_idx
                ):
                    direction = "downstream"
                else:
                    raise Exception("invalid sender and receiver sdp ids")
                for metric_bound in cc["service-slo-sle-policy"]["slo-policy"][
                    "metric-bound"
                ]:
                    if (
                        metric_bound["metric-type"]
                        == "ietf-network-slice-service:one-way-delay-maximum"
                        and metric_bound["metric-unit"] == "milliseconds"
                    ):
                        qos_info[direction]["max_delay"] = metric_bound["bound"]
                    elif (
                        metric_bound["metric-type"]
                        == "ietf-network-slice-service:one-way-bandwidth"
                        and metric_bound["metric-unit"] == "Mbps"
                    ):
                        qos_info[direction]["bw"] = metric_bound["bound"]
                    elif (
                        metric_bound["metric-type"]
                        == "ietf-network-slice-service:two-way-packet-loss"
                        and metric_bound["metric-unit"] == "percentage"
                    ):
                        qos_info[direction]["packet_loss"] = metric_bound[
                            "percentile-value"
                        ]
            break
        results = []
        resource_value_dict = {
            "uuid": service_uuid,
            "operation_type": operation_type,
            "app_flow_id": f"{src_sdp_idx}_{dst_sdp_idx}_{service_name}",
            "app_flow_user_id": str(uuid4()),
            "max_latency": int(qos_info["upstream"]["max_delay"]),
            "max_jitter": 10,
            "max_loss": float(qos_info["upstream"]["packet_loss"]),
            "upstream_assure_bw": int(qos_info["upstream"]["bw"]) * 1e6,
            "upstream_max_bw": 2 * int(qos_info["upstream"]["bw"]) * 1e6,
            "downstream_assure_bw": int(qos_info["downstream"]["bw"]) * 1e6,
            "downstream_max_bw": 2 * int(qos_info["downstream"]["bw"]) * 1e6,
            "src_ip": src_ip,
            "src_port": src_port,
            "dst_ip": dst_ip,
            "dst_port": dst_port,
        }
        json_config_rules = setup_config_rules(service_uuid, resource_value_dict)
        del controller.device_config.config_rules[:]
        for jcr in json_config_rules:
            controller.device_config.config_rules.append(ConfigRule(**jcr))
        self.__task_executor.configure_device(controller)
        return results

    @metered_subclass_method(METRICS_POOL)
    def DeleteEndpoint(
        self,
        endpoints: List[Tuple[str, str, Optional[str]]],
        connection_uuid: Optional[str] = None,
    ) -> List[Union[bool, Exception]]:
        chk_type("endpoints", endpoints, list)
        if len(endpoints) == 0:
            return []
        service_uuid = self.__service.service_id.service_uuid.uuid
        results = []
        try:
            src_device_uuid, src_endpoint_uuid = get_device_endpoint_uuids(endpoints[0])
            src_device_obj = self.__task_executor.get_device(
                DeviceId(**json_device_id(src_device_uuid))
            )
            controller = self.__task_executor.get_device_controller(src_device_obj)
            json_config_rules = teardown_config_rules(service_uuid, {})
            if len(json_config_rules) > 0:
                del controller.device_config.config_rules[:]
                for json_config_rule in json_config_rules:
                    controller.device_config.config_rules.append(
                        ConfigRule(**json_config_rule)
                    )
                self.__task_executor.configure_device(controller)
            results.append(True)
        except Exception as e:  # pylint: disable=broad-except
            results.append(e)
        return results

    @metered_subclass_method(METRICS_POOL)
    def SetConstraint(
        self, constraints: List[Tuple[str, Any]]
    ) -> List[Union[bool, Exception]]:
        chk_type("constraints", constraints, list)
        if len(constraints) == 0:
            return []

        msg = "[SetConstraint] Method not implemented. Constraints({:s}) are being ignored."
        LOGGER.warning(msg.format(str(constraints)))
        return [True for _ in range(len(constraints))]

    @metered_subclass_method(METRICS_POOL)
    def DeleteConstraint(
        self, constraints: List[Tuple[str, Any]]
    ) -> List[Union[bool, Exception]]:
        chk_type("constraints", constraints, list)
        if len(constraints) == 0:
            return []

        msg = "[DeleteConstraint] Method not implemented. Constraints({:s}) are being ignored."
        LOGGER.warning(msg.format(str(constraints)))
        return [True for _ in range(len(constraints))]

    @metered_subclass_method(METRICS_POOL)
    def SetConfig(
        self, resources: List[Tuple[str, Any]]
    ) -> List[Union[bool, Exception]]:
        chk_type("resources", resources, list)
        if len(resources) == 0:
            return []

        results = []
        for resource in resources:
            try:
                resource_value = json.loads(resource[1])
                self.__settings_handler.set(resource[0], resource_value)
                results.append(True)
            except Exception as e:  # pylint: disable=broad-except
                LOGGER.exception("Unable to SetConfig({:s})".format(str(resource)))
                results.append(e)

        return results

    @metered_subclass_method(METRICS_POOL)
    def DeleteConfig(
        self, resources: List[Tuple[str, Any]]
    ) -> List[Union[bool, Exception]]:
        chk_type("resources", resources, list)
        if len(resources) == 0:
            return []

        results = []
        for resource in resources:
            try:
                self.__settings_handler.delete(resource[0])
            except Exception as e:  # pylint: disable=broad-except
                LOGGER.exception("Unable to DeleteConfig({:s})".format(str(resource)))
                results.append(e)

        return results
+14 −0
Original line number Diff line number Diff line
# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.