Skip to content
Snippets Groups Projects
Objects.py 8.98 KiB
Newer Older
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from typing import Dict, List, Optional, Tuple
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from common.Constants import DEFAULT_CONTEXT_NAME, DEFAULT_TOPOLOGY_NAME
from common.proto.kpi_sample_types_pb2 import KpiSampleType
from common.tools.object_factory.ConfigRule import json_config_rule_set
from common.tools.object_factory.Connection import json_connection, json_connection_id
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from common.tools.object_factory.Constraint import json_constraint_custom, json_constraint_sla_latency
from common.tools.object_factory.Context import json_context, json_context_id
from common.tools.object_factory.Device import json_device_id, json_device_packetrouter_disabled
from common.tools.object_factory.EndPoint import json_endpoint, json_endpoint_id
from common.tools.object_factory.Link import json_link, json_link_id
from common.tools.object_factory.Service import json_service_id, json_service_l3nm_planned
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from common.tools.object_factory.Slice import json_slice_id, json_slice
from common.tools.object_factory.Topology import json_topology, json_topology_id
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from common.tools.object_factory.PolicyRule import json_policyrule, json_policyrule_id


# ----- Context --------------------------------------------------------------------------------------------------------
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
CONTEXT_NAME = DEFAULT_CONTEXT_NAME
CONTEXT_ID   = json_context_id(CONTEXT_NAME)
CONTEXT      = json_context(CONTEXT_NAME, name=CONTEXT_NAME)


# ----- Topology -------------------------------------------------------------------------------------------------------
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
TOPOLOGY_NAME = DEFAULT_TOPOLOGY_NAME
TOPOLOGY_ID   = json_topology_id(TOPOLOGY_NAME, context_id=CONTEXT_ID)
TOPOLOGY      = json_topology(TOPOLOGY_NAME, context_id=CONTEXT_ID, name=TOPOLOGY_NAME)


# ----- KPI Sample Types -----------------------------------------------------------------------------------------------
PACKET_PORT_SAMPLE_TYPES = [
    KpiSampleType.KPISAMPLETYPE_PACKETS_TRANSMITTED,
    KpiSampleType.KPISAMPLETYPE_PACKETS_RECEIVED,
    KpiSampleType.KPISAMPLETYPE_BYTES_TRANSMITTED,
    KpiSampleType.KPISAMPLETYPE_BYTES_RECEIVED,
]


# ----- Device ---------------------------------------------------------------------------------------------------------
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
def compose_device(name : str, endpoint_names : List[str]) -> Tuple[str, Dict, Dict]:
    device_id = json_device_id(name)
    endpoints = [
        json_endpoint(device_id, endpoint_name, 'copper', topology_id=TOPOLOGY_ID,
            kpi_sample_types=PACKET_PORT_SAMPLE_TYPES)
        for endpoint_name in endpoint_names
    ]
    config_rules = [
        json_config_rule_set('dev/rsrc1/value', 'value1'),
        json_config_rule_set('dev/rsrc2/value', 'value2'),
        json_config_rule_set('dev/rsrc3/value', 'value3'),
    ]
    device = json_device_packetrouter_disabled(name, endpoints=endpoints, config_rules=config_rules)
    return name, device_id, device

DEVICE_R1_NAME, DEVICE_R1_ID, DEVICE_R1 = compose_device('R1', ['1.2', '1.3', '2.2', '2.3'])
DEVICE_R2_NAME, DEVICE_R2_ID, DEVICE_R2 = compose_device('R2', ['1.1', '1.3', '2.1', '2.3'])
DEVICE_R3_NAME, DEVICE_R3_ID, DEVICE_R3 = compose_device('R3', ['1.1', '1.2', '2.1', '2.2'])


# ----- Link -----------------------------------------------------------------------------------------------------------
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
def compose_link(
    name : str, endpoint_ids : List[Tuple[str, str]],
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    total_capacity_gbps : Optional[float] = None, used_capacity_gbps : Optional[float] = None
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
) -> Tuple[str, Dict, Dict]:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    link_id = json_link_id(name)
    endpoint_ids = [
        json_endpoint_id(device_id, endpoint_name, topology_id=TOPOLOGY_ID)
        for device_id, endpoint_name in endpoint_ids
    ]
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    link = json_link(
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        name, endpoint_ids, total_capacity_gbps=total_capacity_gbps, used_capacity_gbps=used_capacity_gbps
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    return name, link_id, link
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
LINK_R1_R2_NAME, LINK_R1_R2_ID, LINK_R1_R2 = compose_link(
    'R1==R2', [(DEVICE_R1_ID, '1.2'), (DEVICE_R2_ID, '1.1')],
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    total_capacity_gbps=100, # used_capacity_gbps=None => used_capacity_gbps=total_capacity_gbps
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
)
LINK_R2_R3_NAME, LINK_R2_R3_ID, LINK_R2_R3 = compose_link(
    'R2==R3', [(DEVICE_R2_ID, '1.3'), (DEVICE_R3_ID, '1.2')],
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    total_capacity_gbps=100, # used_capacity_gbps=None => used_capacity_gbps=total_capacity_gbps
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
)
LINK_R1_R3_NAME, LINK_R1_R3_ID, LINK_R1_R3 = compose_link(
    'R1==R3', [(DEVICE_R1_ID, '1.3'), (DEVICE_R3_ID, '1.1')],
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    total_capacity_gbps=100, # used_capacity_gbps=None => used_capacity_gbps=total_capacity_gbps


# ----- Service --------------------------------------------------------------------------------------------------------
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
def compose_service(
    name : str, endpoint_ids : List[Tuple[str, str]], latency_ms : float, jitter_us : float
) -> Tuple[str, Dict, Dict]:
    service_id = json_service_id(name, context_id=CONTEXT_ID)
    endpoint_ids = [
        json_endpoint_id(device_id, endpoint_name, topology_id=TOPOLOGY_ID)
        for device_id, endpoint_name in endpoint_ids
    ]
    constraints = [
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        json_constraint_sla_latency(latency_ms),
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        json_constraint_custom('jitter[us]',  str(jitter_us)),
    ]
    config_rules = [
        json_config_rule_set('svc/rsrc1/value', 'value7'),
        json_config_rule_set('svc/rsrc2/value', 'value8'),
        json_config_rule_set('svc/rsrc3/value', 'value9'),
    ]
    service = json_service_l3nm_planned(
        name, endpoint_ids=endpoint_ids, constraints=constraints, config_rules=config_rules)
    return name, service_id, service

SERVICE_R1_R2_NAME, SERVICE_R1_R2_ID, SERVICE_R1_R2 = compose_service(
    'R1-R2', [(DEVICE_R1_ID, '2.2'), (DEVICE_R2_ID, '2.1')], 15.2, 1.2)

SERVICE_R1_R3_NAME, SERVICE_R1_R3_ID, SERVICE_R1_R3 = compose_service(
    'R1-R3', [(DEVICE_R1_ID, '2.3'), (DEVICE_R3_ID, '2.1')], 5.8, 0.1)

SERVICE_R2_R3_NAME, SERVICE_R2_R3_ID, SERVICE_R2_R3 = compose_service(
    'R2-R3', [(DEVICE_R2_ID, '2.3'), (DEVICE_R3_ID, '2.2')], 23.1, 3.4)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
# ----- Slice ----------------------------------------------------------------------------------------------------------
def compose_slice(
    name : str, endpoint_ids : List[Tuple[str, str]], latency_ms : float, jitter_us : float,
    service_ids : List[Dict] = [], subslice_ids : List[Dict] = [], owner : Optional[Dict] = None
) -> Tuple[str, Dict, Dict]:
    slice_id = json_slice_id(name, context_id=CONTEXT_ID)
    endpoint_ids = [
        json_endpoint_id(device_id, endpoint_name, topology_id=TOPOLOGY_ID)
        for device_id, endpoint_name in endpoint_ids
    ]
    constraints = [
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        json_constraint_sla_latency(latency_ms),
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        json_constraint_custom('jitter[us]',  str(jitter_us)),
    ]
    config_rules = [
        json_config_rule_set('svc/rsrc1/value', 'value7'),
        json_config_rule_set('svc/rsrc2/value', 'value8'),
        json_config_rule_set('svc/rsrc3/value', 'value9'),
    ]
    slice_ = json_slice(
        name, context_id=CONTEXT_ID, endpoint_ids=endpoint_ids, constraints=constraints, config_rules=config_rules,
        service_ids=service_ids, subslice_ids=subslice_ids, owner=owner)
    return name, slice_id, slice_

SLICE_R1_R3_NAME, SLICE_R1_R3_ID, SLICE_R1_R3 = compose_slice(
    'R1-R3', [(DEVICE_R1_ID, '2.3'), (DEVICE_R3_ID, '2.1')], 15.2, 1.2,
    service_ids=[SERVICE_R1_R2_ID, SERVICE_R2_R3_ID],
    subslice_ids=[], owner=None)


# ----- Connection -----------------------------------------------------------------------------------------------------
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
def compose_connection(
    name : str, service_id : Dict, endpoint_ids : List[Tuple[str, str]], sub_service_ids : List[Dict] = []
) -> Tuple[str, Dict, Dict]:
    connection_id = json_connection_id(name)
    endpoint_ids = [
        json_endpoint_id(device_id, endpoint_name, topology_id=TOPOLOGY_ID)
        for device_id, endpoint_name in endpoint_ids
    ]
    connection = json_connection(
        name, service_id=service_id, path_hops_endpoint_ids=endpoint_ids, sub_service_ids=sub_service_ids)
    return name, connection_id, connection

CONNECTION_R1_R3_NAME, CONNECTION_R1_R3_ID, CONNECTION_R1_R3 = compose_connection(
    'CON:R1/2.3-R3/2.1', SERVICE_R1_R3_ID, [
        (DEVICE_R1_ID, '2.3'),
        (DEVICE_R1_ID, '1.2'), (DEVICE_R2_ID, '1.1'),
        (DEVICE_R2_ID, '1.3'), (DEVICE_R3_ID, '1.2'),
        (DEVICE_R3_ID, '2.1')
    ], sub_service_ids=[SERVICE_R1_R2_ID, SERVICE_R2_R3_ID])


# ----- PolicyRule -------------------------------------------------------------------------------------------------------
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
POLICYRULE_NAME = 'my-device-policy'
POLICYRULE_ID   = json_policyrule_id(POLICYRULE_NAME)
POLICYRULE      = json_policyrule(POLICYRULE_NAME, policy_priority=1)