Newer
Older
# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from common.Constants import DEFAULT_CONTEXT_NAME, DEFAULT_TOPOLOGY_NAME
from common.proto.kpi_sample_types_pb2 import KpiSampleType
from common.tools.object_factory.ConfigRule import json_config_rule_set
from common.tools.object_factory.Connection import json_connection, json_connection_id
from common.tools.object_factory.Constraint import json_constraint_custom
from common.tools.object_factory.Context import json_context, json_context_id
from common.tools.object_factory.Device import json_device_id, json_device_packetrouter_disabled
from common.tools.object_factory.EndPoint import json_endpoint, json_endpoint_id
from common.tools.object_factory.Link import json_link, json_link_id
from common.tools.object_factory.Service import json_service_id, json_service_l3nm_planned
from common.tools.object_factory.Slice import json_slice_id, json_slice
from common.tools.object_factory.Topology import json_topology, json_topology_id
from common.tools.object_factory.PolicyRule import json_policy_rule, json_policy_rule_id
# ----- Context --------------------------------------------------------------------------------------------------------
CONTEXT_NAME = DEFAULT_CONTEXT_NAME
CONTEXT_ID = json_context_id(CONTEXT_NAME)
CONTEXT = json_context(CONTEXT_NAME, name=CONTEXT_NAME)
# ----- Topology -------------------------------------------------------------------------------------------------------
TOPOLOGY_NAME = DEFAULT_TOPOLOGY_NAME
TOPOLOGY_ID = json_topology_id(TOPOLOGY_NAME, context_id=CONTEXT_ID)
TOPOLOGY = json_topology(TOPOLOGY_NAME, context_id=CONTEXT_ID, name=TOPOLOGY_NAME)
# ----- KPI Sample Types -----------------------------------------------------------------------------------------------
PACKET_PORT_SAMPLE_TYPES = [
KpiSampleType.KPISAMPLETYPE_PACKETS_TRANSMITTED,
KpiSampleType.KPISAMPLETYPE_PACKETS_RECEIVED,
KpiSampleType.KPISAMPLETYPE_BYTES_TRANSMITTED,
KpiSampleType.KPISAMPLETYPE_BYTES_RECEIVED,
]
# ----- Device ---------------------------------------------------------------------------------------------------------
def compose_device(name : str, endpoint_names : List[str]) -> Tuple[str, Dict, Dict]:
device_id = json_device_id(name)
endpoints = [
json_endpoint(device_id, endpoint_name, 'copper', topology_id=TOPOLOGY_ID,
kpi_sample_types=PACKET_PORT_SAMPLE_TYPES)
for endpoint_name in endpoint_names
]
config_rules = [
json_config_rule_set('dev/rsrc1/value', 'value1'),
json_config_rule_set('dev/rsrc2/value', 'value2'),
json_config_rule_set('dev/rsrc3/value', 'value3'),
]
device = json_device_packetrouter_disabled(name, endpoints=endpoints, config_rules=config_rules)
return name, device_id, device
DEVICE_R1_NAME, DEVICE_R1_ID, DEVICE_R1 = compose_device('R1', ['1.2', '1.3', '2.2', '2.3'])
DEVICE_R2_NAME, DEVICE_R2_ID, DEVICE_R2 = compose_device('R2', ['1.1', '1.3', '2.1', '2.3'])
DEVICE_R3_NAME, DEVICE_R3_ID, DEVICE_R3 = compose_device('R3', ['1.1', '1.2', '2.1', '2.2'])
# ----- Link -----------------------------------------------------------------------------------------------------------
def compose_link(name : str, endpoint_ids : List[Tuple[str, str]]) -> Tuple[str, Dict, Dict]:
link_id = json_link_id(name)
endpoint_ids = [
json_endpoint_id(device_id, endpoint_name, topology_id=TOPOLOGY_ID)
for device_id, endpoint_name in endpoint_ids
]
link = json_link(name, endpoint_ids)
return name, link_id, link
LINK_R1_R2_NAME, LINK_R1_R2_ID, LINK_R1_R2 = compose_link('R1==R2', [(DEVICE_R1_ID, '1.2'), (DEVICE_R2_ID, '1.1')])
LINK_R2_R3_NAME, LINK_R2_R3_ID, LINK_R2_R3 = compose_link('R2==R3', [(DEVICE_R2_ID, '1.3'), (DEVICE_R3_ID, '1.2')])
LINK_R1_R3_NAME, LINK_R1_R3_ID, LINK_R1_R3 = compose_link('R1==R3', [(DEVICE_R1_ID, '1.3'), (DEVICE_R3_ID, '1.1')])
# ----- Service --------------------------------------------------------------------------------------------------------
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
def compose_service(
name : str, endpoint_ids : List[Tuple[str, str]], latency_ms : float, jitter_us : float
) -> Tuple[str, Dict, Dict]:
service_id = json_service_id(name, context_id=CONTEXT_ID)
endpoint_ids = [
json_endpoint_id(device_id, endpoint_name, topology_id=TOPOLOGY_ID)
for device_id, endpoint_name in endpoint_ids
]
constraints = [
json_constraint_custom('latency[ms]', str(latency_ms)),
json_constraint_custom('jitter[us]', str(jitter_us)),
]
config_rules = [
json_config_rule_set('svc/rsrc1/value', 'value7'),
json_config_rule_set('svc/rsrc2/value', 'value8'),
json_config_rule_set('svc/rsrc3/value', 'value9'),
]
service = json_service_l3nm_planned(
name, endpoint_ids=endpoint_ids, constraints=constraints, config_rules=config_rules)
return name, service_id, service
SERVICE_R1_R2_NAME, SERVICE_R1_R2_ID, SERVICE_R1_R2 = compose_service(
'R1-R2', [(DEVICE_R1_ID, '2.2'), (DEVICE_R2_ID, '2.1')], 15.2, 1.2)
SERVICE_R1_R3_NAME, SERVICE_R1_R3_ID, SERVICE_R1_R3 = compose_service(
'R1-R3', [(DEVICE_R1_ID, '2.3'), (DEVICE_R3_ID, '2.1')], 5.8, 0.1)
SERVICE_R2_R3_NAME, SERVICE_R2_R3_ID, SERVICE_R2_R3 = compose_service(
'R2-R3', [(DEVICE_R2_ID, '2.3'), (DEVICE_R3_ID, '2.2')], 23.1, 3.4)
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
# ----- Slice ----------------------------------------------------------------------------------------------------------
def compose_slice(
name : str, endpoint_ids : List[Tuple[str, str]], latency_ms : float, jitter_us : float,
service_ids : List[Dict] = [], subslice_ids : List[Dict] = [], owner : Optional[Dict] = None
) -> Tuple[str, Dict, Dict]:
slice_id = json_slice_id(name, context_id=CONTEXT_ID)
endpoint_ids = [
json_endpoint_id(device_id, endpoint_name, topology_id=TOPOLOGY_ID)
for device_id, endpoint_name in endpoint_ids
]
constraints = [
json_constraint_custom('latency[ms]', str(latency_ms)),
json_constraint_custom('jitter[us]', str(jitter_us)),
]
config_rules = [
json_config_rule_set('svc/rsrc1/value', 'value7'),
json_config_rule_set('svc/rsrc2/value', 'value8'),
json_config_rule_set('svc/rsrc3/value', 'value9'),
]
slice_ = json_slice(
name, context_id=CONTEXT_ID, endpoint_ids=endpoint_ids, constraints=constraints, config_rules=config_rules,
service_ids=service_ids, subslice_ids=subslice_ids, owner=owner)
return name, slice_id, slice_
SLICE_R1_R3_NAME, SLICE_R1_R3_ID, SLICE_R1_R3 = compose_slice(
'R1-R3', [(DEVICE_R1_ID, '2.3'), (DEVICE_R3_ID, '2.1')], 15.2, 1.2,
service_ids=[SERVICE_R1_R2_ID, SERVICE_R2_R3_ID],
subslice_ids=[], owner=None)
# ----- Connection -----------------------------------------------------------------------------------------------------
def compose_connection(
name : str, service_id : Dict, endpoint_ids : List[Tuple[str, str]], sub_service_ids : List[Dict] = []
) -> Tuple[str, Dict, Dict]:
connection_id = json_connection_id(name)
endpoint_ids = [
json_endpoint_id(device_id, endpoint_name, topology_id=TOPOLOGY_ID)
for device_id, endpoint_name in endpoint_ids
]
connection = json_connection(
name, service_id=service_id, path_hops_endpoint_ids=endpoint_ids, sub_service_ids=sub_service_ids)
return name, connection_id, connection
CONNECTION_R1_R3_NAME, CONNECTION_R1_R3_ID, CONNECTION_R1_R3 = compose_connection(
'CON:R1/2.3-R3/2.1', SERVICE_R1_R3_ID, [
(DEVICE_R1_ID, '2.3'),
(DEVICE_R1_ID, '1.2'), (DEVICE_R2_ID, '1.1'),
(DEVICE_R2_ID, '1.3'), (DEVICE_R3_ID, '1.2'),
(DEVICE_R3_ID, '2.1')
], sub_service_ids=[SERVICE_R1_R2_ID, SERVICE_R2_R3_ID])
# ----- PolicyRule -------------------------------------------------------------------------------------------------------
POLICY_RULE_NAME = '56380225-3e40-4f74-9162-529f8dcb96a1'
POLICY_RULE_ID = json_policy_rule_id(POLICY_RULE_NAME)
POLICY_RULE = json_policy_rule(POLICY_RULE_NAME)