Newer
Older
# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
import re,logging
import json
import lxml.etree as ET
from typing import Collection, Dict, Any
from yattag import Doc, indent
from .VPN.physical import create_optical_channel
def add_value_from_tag(target : Dict, field_name: str, field_value : ET.Element, cast=None) -> None:
if isinstance(field_value,str) or field_value is None or field_value.text is None: return
field_value = field_value.text
if cast is not None: field_value = cast(field_value)
target[field_name] = field_value
def add_value_from_collection(target : Dict, field_name: str, field_value : Collection) -> None:
if field_value is None or len(field_value) == 0: return
target[field_name] = field_value
"""
# Method Name: generate_templates
# Parameters:
- resource_key: [str] Variable to identify the rule to be executed.
- resource_value: [str] Variable with the configuration parameters of the rule to be executed.
- delete: [bool] Variable to identify whether to create or delete the rule.
- vendor: [str] Variable to identify the vendor of the equipment to be configured.
# Functionality:
This method generates the template to configure the equipment using pyangbind.
To generate the template the following steps are performed:
1) Get the first parameter of the variable "resource_key" to identify the main path of the rule.
2) Search for the specific configuration path
3) Call the method with the configuration parameters (resource_data variable).
# Return:
[dict] Set of templates generated according to the configuration rule
"""
def generate_templates(resource_key: str, resource_value: str, channel:str) -> str: # template management to be configured
result_templates = []
data={}
data['name']=channel
data['resource_key']=resource_key
data['value']=resource_value
result_templates.append(create_physical_config(data))
return result_templates
def extract_channel_xmlns (data_xml:str,is_opticalband:bool):
xml_bytes = data_xml.encode("utf-8")
root = ET.fromstring(xml_bytes)
namespace=None
channels=None
if (not is_opticalband) :
optical_channel_namespaces = {
'ns': 'urn:ietf:params:xml:ns:netconf:base:1.0',
'oc': 'http://openconfig.net/yang/platform',
}
channels= root.find('.//{*}optical-channel',optical_channel_namespaces)
if channels is not None :
optical_channel_namespace = channels.tag.replace("optical-channel", "")
namespace=optical_channel_namespace.replace("{", "").replace("}", "")
else :
optical_band_namespaces= {
'oc':'http://openconfig.net/yang/wavelength-router'
}
channels= root.find('.//{*}optical-bands',optical_band_namespaces)
if channels is not None:
optical_channel_namespace = channels.tag.replace("optical-bands", "")
namespace=optical_channel_namespace.replace("{", "").replace("}", "")
return namespace
def extract_channels_based_on_channelnamespace (xml_data:str,channel_namespace:str,is_opticalband:bool):
xml_bytes = xml_data.encode("utf-8")
root = ET.fromstring(xml_bytes)
channels=[]
# Find the component names whose children include the "optical-channel" element
if (not is_opticalband):
namespace = {'namespace': 'http://openconfig.net/yang/platform','cn':channel_namespace}
component_names = root.findall('.//namespace:component[cn:optical-channel]',namespace)
# Extract and print the component names
for component in component_names:
component_name = component.find('namespace:name', namespace).text
channels.append({"index":component_name})
else :
namespaces = {
'wr': 'http://openconfig.net/yang/wavelength-router',
'fs': channel_namespace
}
wl = root.findall('.//fs:optical-band',namespaces=namespaces)
for component in wl :
index=component.find('.//fs:index',namespaces).text
dest_port_name = component.find('.//fs:dest/fs:config/fs:port-name', namespaces).text
# Retrieve port-name for source (assuming it exists in the XML structure)
source_port_name = component.find('.//fs:source/fs:config/fs:port-name', namespaces).text
channels.append({"index":index,"endpoints":(source_port_name,dest_port_name)})
# Retrieve port-name for dest
return channels
def extract_channels_based_on_type (xml_data:str):
xml_bytes = xml_data.encode("utf-8")
root = ET.fromstring(xml_bytes)
namespace = {'oc': 'http://openconfig.net/yang/platform', 'typex': 'http://openconfig.net/yang/platform-types'}
channel_names = []
components = root.findall('.//oc:component', namespace)
for component in components:
type_element = component.find('.//oc:state/oc:type[.="oc-opt-types:OPTICAL_CHANNEL"]',namespaces=namespace)
if type_element is not None and type_element.text == 'oc-opt-types:OPTICAL_CHANNEL':
name_element = component.find('oc:name', namespace)
if name_element is not None:
channel_names.append(name_element.text)
return channel_names
def extract_value(resource_key:str,xml_data:str,dic:dict,channel_name:str,channel_namespace:str):
xml_bytes = xml_data.encode("utf-8")
root = ET.fromstring(xml_bytes)
namespace = {'oc': 'http://openconfig.net/yang/platform',
'td': channel_namespace}
element = root.find(f'.//oc:component[oc:name="{channel_name}"]', namespace)
if element is not None:
parameter= element.find(f'.//td:{resource_key}',namespace)
if (parameter is not None):
value = parameter.text
dic[resource_key]=value
else:
print(" element not found.")
return dic
def extract_port_value (xml_string:list,port_name:str):
xml_bytes = xml_string.encode("utf-8")
root = ET.fromstring(xml_bytes)
namespace = {"oc": "http://openconfig.net/yang/platform"}
component=root.find(f".//oc:component[oc:name='{port_name}']", namespace)
onos_index = component.find(
f".//oc:property//oc:state/oc:name[.='onos-index']/../oc:value", namespace
).text
return (port_name,onos_index)
def extract_tranceiver (data_xml:str,dic:dict):
xml_bytes = data_xml.encode("utf-8")
root = ET.fromstring(xml_bytes)
namespaces = {
'ns': 'urn:ietf:params:xml:ns:netconf:base:1.0',
'oc': 'http://openconfig.net/yang/platform',
'oc-terminal': 'http://openconfig.net/yang/terminal-device',
'oc-platform-types': 'http://openconfig.net/yang/platform-types'
}
transceiver_components = root.findall('.//oc:component/oc:state/[oc:type="oc-platform-types:TRANSCEIVER"]../oc:state/oc:name', namespaces)
component_names = [component.text for component in transceiver_components]
dic['transceiver']=component_names
return dic
def extract_interface (xml_data:str,dic:dict):
xml_bytes = xml_data.encode("utf-8")
root = ET.fromstring(xml_bytes)
namespaces = {
'ns': 'urn:ietf:params:xml:ns:netconf:base:1.0',
'oc': 'http://openconfig.net/yang/interfaces',
}
ip_namespaces = {
'oc': 'http://openconfig.net/yang/interfaces',
'ip': 'http://openconfig.net/yang/interfaces/ip',
}
interfaces = root.findall('.//oc:interfaces/oc:interface', namespaces)
interface_names = [interface.find('oc:name', namespaces).text for interface in interfaces]
interface_enabled=[interface.find('oc:config/oc:enabled', namespaces).text for interface in interfaces]
ip_address_element = root.find('.//ip:ip', ip_namespaces)
interface_prefix_length=root.find('.//ip:prefix-length',ip_namespaces)
if (len(interface_names) > 0):
dic['interface']={"name":interface_names[0],'ip':ip_address_element.text,'enabled':interface_enabled[0],"prefix-length":interface_prefix_length.text}
else :
dic['interface']={}
return dic
def has_opticalbands(xml_data:str):
xml_bytes = xml_data.encode("utf-8")
root = ET.fromstring(xml_bytes)
has_opticalbands=False
elements= root.find('.//{*}optical-bands')
if (elements is not None and len(elements) >0):
has_opticalbands=True
else :
has_opticalbands=False
return has_opticalbands
def extractor(data_xml:str,resource_keys:list,dic:dict):
endpoints=[]
is_opticalband=has_opticalbands(xml_data=data_xml)
channel_namespace=extract_channel_xmlns(data_xml=data_xml,is_opticalband=is_opticalband)
# channel_names=extract_channels_based_on_type(xml_data=data_xml)
# if len(channel_names)==0 :
channel_names= extract_channels_based_on_channelnamespace(xml_data=data_xml,channel_namespace=channel_namespace,is_opticalband=is_opticalband)
lst_dic=[]
if (is_opticalband):
endpoints=channel_names
else:
for channel_name in channel_names:
dic={}
for resource_key in resource_keys :
if (resource_key != 'interface'):
dic=extract_value(dic=dic,resource_key=resource_key,xml_data=data_xml,channel_name=channel_name,channel_namespace=channel_namespace)
dic["name"]=channel_name
endpoints.append({"endpoint_uuid":{"uuid":channel_name}})
lst_dic.append(dic)
transceivers_dic=extract_tranceiver(data_xml=data_xml,dic={})
interfaces_dic=extract_interface(xml_data=data_xml,dic={})
return [transceivers_dic,interfaces_dic,lst_dic,channel_namespace,endpoints]