Newer
Older
from ncclient import manager
from ncclient.xml_ import *
import lxml.etree as ET
import re
from typing import Optional, Union
from uuid import UUID, uuid4, uuid5
import logging
create_media='''
<config xmlns="urn:ietf:params:xml:ns:netconf:base:1.0">
<wavelength-router xmlns="http://openconfig.net/yang/wavelength-router">
<media-channels>
<channel>
<index>1</index>
<config>
<index>1</index>
<name>C_BAND</name>
<lower-frequency>192006250</lower-frequency>
<upper-frequency>192056250</upper-frequency>
<optical-band-parent xmlns="http://flex-scale-project.eu/yang/flex-scale-mg-on">1</optical-band-parent>
</config>
<source>
<config>
<port-name>111</port-name>
</config>
</source>
<dest>
<config>
<port-name>2</port-name>
</config>
</dest>
</channel>
</media-channels>
</wavelength-router>
</config>
'''
delete_optical_band= '''
<config xmlns="urn:ietf:params:xml:ns:netconf:base:1.0">
<wavelength-router xmlns="http://openconfig.net/yang/wavelength-router">
<optical-bands xmlns="http://flex-scale-project.eu/yang/flex-scale-mg-on">
<optical-band operation="delete">
<index>1</index>
<config>
<index>1</index>
</config>
</optical-band>
</optical-bands>
</wavelength-router>
</config>
'''
def extract_channel_xmlns (data_xml:str,is_opticalband:bool):
xml_bytes = data_xml.encode("utf-8")
root = ET.fromstring(xml_bytes)
namespace=None
channels=None
if (not is_opticalband) :
optical_channel_namespaces = {
'ns': 'urn:ietf:params:xml:ns:netconf:base:1.0',
'oc': 'http://openconfig.net/yang/platform',
}
channels= root.find('.//{*}optical-channel',optical_channel_namespaces)
if channels is not None :
optical_channel_namespace = channels.tag.replace("optical-channel", "")
namespace=optical_channel_namespace.replace("{", "").replace("}", "")
else :
optical_band_namespaces= {
'oc':'http://openconfig.net/yang/wavelength-router'
}
channels= root.find('.//{*}optical-bands',optical_band_namespaces)
if channels is not None:
optical_channel_namespace = channels.tag.replace("optical-bands", "")
namespace=optical_channel_namespace.replace("{", "").replace("}", "")
return namespace
def extract_optical_bands (data_xml:str,namespace:str):
namespaces={"oc":"http://flex-scale-project.eu/yang/flex-scale-mg-on"}
xml_bytes = data_xml.encode("utf-8")
root = ET.fromstring(xml_bytes)
optical_bands= root.find('.//oc:optical-bands',namespaces)
op_bands=[]
if optical_bands is not None :
optical_bands_ele= optical_bands.findall('.//oc:optical-band',namespaces)
for optical_band in optical_bands_ele:
band_ele=optical_band.find('.//oc:name',namespaces)
lower_freq_ele=optical_band.find('.//oc:lower-frequency',namespaces)
upper_freq_ele=optical_band.find('.//oc:upper-frequency',namespaces)
admin_status_ele=optical_band.find('.//oc:admin-status',namespaces)
source_ele=optical_band.find('.//oc:source/oc:config/oc:port-name',namespaces)
dest_ele=optical_band.find('.//oc:dest/oc:config/oc:port-name',namespaces)
channel_index= optical_band.find('.//oc:index',namespaces)
op_band_obj={
'band_name':band_ele.text if band_ele is not None else None,
'lower_frequency':lower_freq_ele.text if lower_freq_ele is not None else None,
'upper_frequency':upper_freq_ele.text if upper_freq_ele is not None else None,
'admin_status':admin_status_ele.text if admin_status_ele is not None else None,
'src_port':source_ele.text if source_ele is not None else None,
'dest_port':dest_ele.text if dest_ele is not None else None,
"channel_index":channel_index.text if channel_index is not None else None
}
op_bands.append(op_band_obj)
return op_bands
def extract_media_channels (data_xml:str,namespace:str):
optical_band_namespaces="http://flex-scale-project.eu/yang/flex-scale-mg-on"
namespaces={"oc":"http://openconfig.net/yang/wavelength-router",'ob_parent':optical_band_namespaces}
xml_bytes = data_xml.encode("utf-8")
root = ET.fromstring(xml_bytes)
media_channels= root.find(f'.//oc:media-channels',namespaces)
op_bands=[]
if media_channels is not None :
media_channels_ele= media_channels.findall('.//oc:channel',namespaces)
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
for optical_band in media_channels_ele:
band_ele=optical_band.find('.//oc:name',namespaces)
lower_freq_ele=optical_band.find('.//oc:lower-frequency',namespaces)
upper_freq_ele=optical_band.find('.//oc:upper-frequency',namespaces)
admin_status_ele=optical_band.find('.//oc:admin-status',namespaces)
source_ele=optical_band.find('.//oc:source/oc:config/oc:port-name',namespaces)
dest_ele=optical_band.find('.//oc:dest/oc:config/oc:port-name',namespaces)
ob_parent=optical_band.find('.//ob_parent:optical-band-parent',namespaces)
channel_index= optical_band.find('.//oc:index',namespaces)
op_band_obj={
'band_name':band_ele.text if band_ele is not None else None,
'lower_frequency':lower_freq_ele.text if lower_freq_ele is not None else None,
'upper_frequency':upper_freq_ele.text if upper_freq_ele is not None else None,
'admin_status':admin_status_ele.text if admin_status_ele is not None else None,
'src_port':source_ele.text if source_ele is not None else None,
'dest_port':dest_ele.text if dest_ele is not None else None,
'optical_band_parent':ob_parent.text if ob_parent is not None else None,
'channel_index':channel_index.text if channel_index is not None else None
}
op_bands.append(op_band_obj)
return op_bands
'host': '10.0.2.4', # IP address or hostname of the remote machine
'port': 2026, # SSH port (default: 22)
'username': 'admin', # SSH username
'password': 'admin', # SSH password
'device_params': {'name': 'default'},
'hostkey_verify':False,
"allow_agent":False
,"look_for_keys":False
}
with manager.connect(host=device['host']
,port=device['port']
,username=device['username']
,password=device['password']
,hostkey_verify=device['hostkey_verify']
,allow_agent=device['allow_agent']
,look_for_keys=device['look_for_keys']) as m :
#edit_result = m.edit_config (target="running",config=delete_optical_band)
optical_band_namespaces="http://flex-scale-project.eu/yang/flex-scale-mg-on"
namespaces={"oc":"http://openconfig.net/yang/wavelength-router"}
obj=extract_media_channels(result,namespaces)
obj1=extract_optical_bands(result,namespaces)
# road_info= extract_openroadm_info(result)
# circuits=extract_roadm_circuits_pack(result)
#print (f'edit result {edit_result}')
print(f"result {result}")
print(f"media_cahnnels {obj}")
print(f"optical_bands {obj1}")
#print(f"circuits {circuits}")
# with open("context.log","w") as f:
# print (result,file=f)