from ncclient import manager from ncclient.xml_ import * import lxml.etree as ET import re from typing import Optional, Union from uuid import UUID, uuid4, uuid5 import logging create_media=''' 1 1 C_BAND 192006250 192056250 1 111 2 ''' delete_optical_band= ''' 1 1 ''' def extract_channel_xmlns (data_xml:str,is_opticalband:bool): xml_bytes = data_xml.encode("utf-8") root = ET.fromstring(xml_bytes) namespace=None channels=None if (not is_opticalband) : optical_channel_namespaces = { 'ns': 'urn:ietf:params:xml:ns:netconf:base:1.0', 'oc': 'http://openconfig.net/yang/platform', } channels= root.find('.//{*}optical-channel',optical_channel_namespaces) if channels is not None : optical_channel_namespace = channels.tag.replace("optical-channel", "") namespace=optical_channel_namespace.replace("{", "").replace("}", "") else : optical_band_namespaces= { 'oc':'http://openconfig.net/yang/wavelength-router' } channels= root.find('.//{*}optical-bands',optical_band_namespaces) if channels is not None: optical_channel_namespace = channels.tag.replace("optical-bands", "") namespace=optical_channel_namespace.replace("{", "").replace("}", "") return namespace def extract_optical_bands (data_xml:str,namespace:str): namespaces={"oc":"http://flex-scale-project.eu/yang/flex-scale-mg-on"} xml_bytes = data_xml.encode("utf-8") root = ET.fromstring(xml_bytes) optical_bands= root.find('.//oc:optical-bands',namespaces) op_bands=[] if optical_bands is not None : optical_bands_ele= optical_bands.findall('.//oc:optical-band',namespaces) for optical_band in optical_bands_ele: band_ele=optical_band.find('.//oc:name',namespaces) lower_freq_ele=optical_band.find('.//oc:lower-frequency',namespaces) upper_freq_ele=optical_band.find('.//oc:upper-frequency',namespaces) admin_status_ele=optical_band.find('.//oc:admin-status',namespaces) source_ele=optical_band.find('.//oc:source/oc:config/oc:port-name',namespaces) dest_ele=optical_band.find('.//oc:dest/oc:config/oc:port-name',namespaces) channel_index= optical_band.find('.//oc:index',namespaces) op_band_obj={ 'band_name':band_ele.text if band_ele is not None else None, 'lower_frequency':lower_freq_ele.text if lower_freq_ele is not None else None, 'upper_frequency':upper_freq_ele.text if upper_freq_ele is not None else None, 'admin_status':admin_status_ele.text if admin_status_ele is not None else None, 'src_port':source_ele.text if source_ele is not None else None, 'dest_port':dest_ele.text if dest_ele is not None else None, "channel_index":channel_index.text if channel_index is not None else None } op_bands.append(op_band_obj) return op_bands def extract_media_channels (data_xml:str,namespace:str): optical_band_namespaces="http://flex-scale-project.eu/yang/flex-scale-mg-on" namespaces={"oc":"http://openconfig.net/yang/wavelength-router",'ob_parent':optical_band_namespaces} xml_bytes = data_xml.encode("utf-8") root = ET.fromstring(xml_bytes) media_channels= root.find(f'.//oc:media-channels',namespaces) op_bands=[] if media_channels is not None : media_channels_ele= media_channels.findall('.//oc:channel',namespaces) for optical_band in media_channels_ele: band_ele=optical_band.find('.//oc:name',namespaces) lower_freq_ele=optical_band.find('.//oc:lower-frequency',namespaces) upper_freq_ele=optical_band.find('.//oc:upper-frequency',namespaces) admin_status_ele=optical_band.find('.//oc:admin-status',namespaces) source_ele=optical_band.find('.//oc:source/oc:config/oc:port-name',namespaces) dest_ele=optical_band.find('.//oc:dest/oc:config/oc:port-name',namespaces) ob_parent=optical_band.find('.//ob_parent:optical-band-parent',namespaces) channel_index= optical_band.find('.//oc:index',namespaces) op_band_obj={ 'band_name':band_ele.text if band_ele is not None else None, 'lower_frequency':lower_freq_ele.text if lower_freq_ele is not None else None, 'upper_frequency':upper_freq_ele.text if upper_freq_ele is not None else None, 'admin_status':admin_status_ele.text if admin_status_ele is not None else None, 'src_port':source_ele.text if source_ele is not None else None, 'dest_port':dest_ele.text if dest_ele is not None else None, 'optical_band_parent':ob_parent.text if ob_parent is not None else None, 'channel_index':channel_index.text if channel_index is not None else None } op_bands.append(op_band_obj) return op_bands device = { 'host': '10.0.2.4', # IP address or hostname of the remote machine 'port': 2026, # SSH port (default: 22) 'username': 'admin', # SSH username 'password': 'admin', # SSH password 'device_params': {'name': 'default'}, 'hostkey_verify':False, "allow_agent":False ,"look_for_keys":False } if __name__ == '__main__': with manager.connect(host=device['host'] ,port=device['port'] ,username=device['username'] ,password=device['password'] ,hostkey_verify=device['hostkey_verify'] ,allow_agent=device['allow_agent'] ,look_for_keys=device['look_for_keys']) as m : #edit_result = m.edit_config (target="running",config=delete_optical_band) result = m.get_config (source="running").data_xml optical_band_namespaces="http://flex-scale-project.eu/yang/flex-scale-mg-on" namespaces={"oc":"http://openconfig.net/yang/wavelength-router"} obj=extract_media_channels(result,namespaces) obj1=extract_optical_bands(result,namespaces) # road_info= extract_openroadm_info(result) # circuits=extract_roadm_circuits_pack(result) #print (f'edit result {edit_result}') print(f"result {result}") print(f"media_cahnnels {obj}") print(f"optical_bands {obj1}") #print(f"circuits {circuits}") # with open("context.log","w") as f: # print (result,file=f)