Loading test.py +157 −95 Original line number Diff line number Diff line Loading @@ -7,107 +7,158 @@ from uuid import UUID, uuid4, uuid5 import logging def extract_openroadm_info(xml_data:str): roadm_info={"node-id":None,"node-number":None,"node-type":None,'clli':None} xml_bytes = xml_data.encode("utf-8") create_media=''' <config xmlns="urn:ietf:params:xml:ns:netconf:base:1.0"> <wavelength-router xmlns="http://openconfig.net/yang/wavelength-router"> <media-channels> <channel> <index>1</index> <config> <index>1</index> <name>C_BAND</name> <lower-frequency>192006250</lower-frequency> <upper-frequency>192056250</upper-frequency> <optical-band-parent xmlns="http://flex-scale-project.eu/yang/flex-scale-mg-on">1</optical-band-parent> </config> <source> <config> <port-name>111</port-name> </config> </source> <dest> <config> <port-name>2</port-name> </config> </dest> </channel> </media-channels> </wavelength-router> </config> ''' delete_optical_band= ''' <config xmlns="urn:ietf:params:xml:ns:netconf:base:1.0"> <wavelength-router xmlns="http://openconfig.net/yang/wavelength-router"> <optical-bands xmlns="http://flex-scale-project.eu/yang/flex-scale-mg-on"> <optical-band operation="delete"> <index>1</index> <config> <index>1</index> </config> </optical-band> </optical-bands> </wavelength-router> </config> ''' def extract_channel_xmlns (data_xml:str,is_opticalband:bool): xml_bytes = data_xml.encode("utf-8") root = ET.fromstring(xml_bytes) namespace = {'oc': "http://org/openroadm/device"} info = root.findall('.//oc:info',namespace) if info is not None : for i in info : node_id= i.find('.//oc:node-id',namespace) node_number= i.find('.//oc:node-number',namespace) node_type=i.find('.//oc:node-type',namespace) clli=i.find('.//oc:clli',namespace) if (node_id is not None): roadm_info['node-id']=node_id.text if (node_number is not None): roadm_info['node-number']=node_number.text if (node_type is not None): roadm_info['node-type']=node_type.text if (clli is not None): roadm_info['clli']=clli.text return roadm_info namespace=None channels=None if (not is_opticalband) : optical_channel_namespaces = { 'ns': 'urn:ietf:params:xml:ns:netconf:base:1.0', 'oc': 'http://openconfig.net/yang/platform', } def extract_roadm_circuits_pack (xml_data:str): xml_bytes = xml_data.encode("utf-8") root = ET.fromstring(xml_bytes) # with open('xml.log', 'w') as f: # print(xml_bytes, file=f) namespace = {'oc': "http://org/openroadm/device"} circuits = root.findall('.//oc:circuit-packs',namespace) circuits_list =[] # print(f"component {components}") if (circuits is not None): circuit_info ={} for circuit in circuits: circuit_name = circuit.find(".//oc:circuit-pack-name",namespace) circuit_type=circuit.find(".//oc:circuit-pack-type",namespace) circuit_adminstrative_status=circuit.find(".//oc:administrative-state",namespace) circuit_equipment_state=circuit.find("./oc:equipment-state",namespace) circuit_mode=circuit.find("./oc:circuit-pack-mode",namespace) slot= circuit.find("./oc:slot",namespace) shelf= circuit.find("./oc:shelf",namespace) ports = circuit.findall("./oc:ports",namespace) circuit_ports=[] if (ports is not None): for port in ports : port_info={} port_name=port.find('./oc:port-name',namespace) port_qual= port.find("./oc:port-qual",namespace) if port_name is not None : port_info["port_name"]=port_name.text if port_qual is not None : port_info["port_qual"]=port_qual.text circuit_ports.append(port_info) if (circuit_name is not None): circuit_info["circuit_name"]=circuit_name.text if (circuit_type is not None): circuit_info["circuit_type"]=circuit_type.text if (circuit_adminstrative_status is not None): circuit_info["circuit_adminstrative_status"]=circuit_adminstrative_status.text if (circuit_equipment_state is not None): circuit_info["circuit_equipment_state"]=circuit_equipment_state.text if (circuit_mode is not None): circuit_info["circuit_mode"]=circuit_mode.text if (slot is not None): circuit_info["slot"]=slot.text if (shelf is not None): circuit_info["shelf"]=shelf.text circuit_info["ports"]=circuit_ports circuits_list.append(circuit_info) channels= root.find('.//{*}optical-channel',optical_channel_namespaces) if channels is not None : optical_channel_namespace = channels.tag.replace("optical-channel", "") namespace=optical_channel_namespace.replace("{", "").replace("}", "") else : optical_band_namespaces= { 'oc':'http://openconfig.net/yang/wavelength-router' } channels= root.find('.//{*}optical-bands',optical_band_namespaces) if channels is not None: optical_channel_namespace = channels.tag.replace("optical-bands", "") namespace=optical_channel_namespace.replace("{", "").replace("}", "") return circuits_list return namespace def extract_optical_bands (data_xml:str,namespace:str): namespaces={"oc":"http://flex-scale-project.eu/yang/flex-scale-mg-on"} xml_bytes = data_xml.encode("utf-8") root = ET.fromstring(xml_bytes) optical_bands= root.find('.//oc:optical-bands',namespaces) op_bands=[] if optical_bands is not None : optical_bands_ele= optical_bands.findall('.//oc:optical-band',namespaces) for optical_band in optical_bands_ele: band_ele=optical_band.find('.//oc:name',namespaces) lower_freq_ele=optical_band.find('.//oc:lower-frequency',namespaces) upper_freq_ele=optical_band.find('.//oc:upper-frequency',namespaces) admin_status_ele=optical_band.find('.//oc:admin-status',namespaces) source_ele=optical_band.find('.//oc:source/oc:config/oc:port-name',namespaces) dest_ele=optical_band.find('.//oc:dest/oc:config/oc:port-name',namespaces) channel_index= optical_band.find('.//oc:index',namespaces) op_band_obj={ 'band_name':band_ele.text if band_ele is not None else None, 'lower_frequency':lower_freq_ele.text if lower_freq_ele is not None else None, 'upper_frequency':upper_freq_ele.text if upper_freq_ele is not None else None, 'admin_status':admin_status_ele.text if admin_status_ele is not None else None, 'src_port':source_ele.text if source_ele is not None else None, 'dest_port':dest_ele.text if dest_ele is not None else None, "channel_index":channel_index.text if channel_index is not None else None } op_bands.append(op_band_obj) return op_bands def extract_media_channels (data_xml:str,namespace:str): optical_band_namespaces="http://flex-scale-project.eu/yang/flex-scale-mg-on" namespaces={"oc":"http://openconfig.net/yang/wavelength-router",'ob_parent':optical_band_namespaces} xml_bytes = data_xml.encode("utf-8") root = ET.fromstring(xml_bytes) media_channels= root.find(f'.//oc:media-channels',namespaces) op_bands=[] if media_channels is not None : media_channels_ele= media_channels.findall('.//oc:channel',namespaces) for optical_band in media_channels_ele: band_ele=optical_band.find('.//oc:name',namespaces) lower_freq_ele=optical_band.find('.//oc:lower-frequency',namespaces) upper_freq_ele=optical_band.find('.//oc:upper-frequency',namespaces) admin_status_ele=optical_band.find('.//oc:admin-status',namespaces) source_ele=optical_band.find('.//oc:source/oc:config/oc:port-name',namespaces) dest_ele=optical_band.find('.//oc:dest/oc:config/oc:port-name',namespaces) ob_parent=optical_band.find('.//ob_parent:optical-band-parent',namespaces) channel_index= optical_band.find('.//oc:index',namespaces) op_band_obj={ 'band_name':band_ele.text if band_ele is not None else None, 'lower_frequency':lower_freq_ele.text if lower_freq_ele is not None else None, 'upper_frequency':upper_freq_ele.text if upper_freq_ele is not None else None, 'admin_status':admin_status_ele.text if admin_status_ele is not None else None, 'src_port':source_ele.text if source_ele is not None else None, 'dest_port':dest_ele.text if dest_ele is not None else None, 'optical_band_parent':ob_parent.text if ob_parent is not None else None, 'channel_index':channel_index.text if channel_index is not None else None } op_bands.append(op_band_obj) return op_bands device = { 'host': '172.17.0.2', # IP address or hostname of the remote machine 'port': 830, # SSH port (default: 22) 'username': 'openroadm', # SSH username 'password': 'openroadm', # SSH password 'host': '10.0.2.4', # IP address or hostname of the remote machine 'port': 2026, # SSH port (default: 22) 'username': 'admin', # SSH username 'password': 'admin', # SSH password 'device_params': {'name': 'default'}, 'hostkey_verify':False, "allow_agent":False Loading @@ -125,11 +176,22 @@ if __name__ == '__main__': ,hostkey_verify=device['hostkey_verify'] ,allow_agent=device['allow_agent'] ,look_for_keys=device['look_for_keys']) as m : #edit_result = m.edit_config (target="running",config=delete_optical_band) result = m.get_config (source="running").data_xml road_info= extract_openroadm_info(result) circuits=extract_roadm_circuits_pack(result) print(f"roadm_info {road_info}") optical_band_namespaces="http://flex-scale-project.eu/yang/flex-scale-mg-on" namespaces={"oc":"http://openconfig.net/yang/wavelength-router"} obj=extract_media_channels(result,namespaces) obj1=extract_optical_bands(result,namespaces) # road_info= extract_openroadm_info(result) # circuits=extract_roadm_circuits_pack(result) #print (f'edit result {edit_result}') print(f"result {result}") print(f"media_cahnnels {obj}") print(f"optical_bands {obj1}") #print(f"circuits {circuits}") # with open("context.log","w") as f: # print (result,file=f) Loading
test.py +157 −95 Original line number Diff line number Diff line Loading @@ -7,107 +7,158 @@ from uuid import UUID, uuid4, uuid5 import logging def extract_openroadm_info(xml_data:str): roadm_info={"node-id":None,"node-number":None,"node-type":None,'clli':None} xml_bytes = xml_data.encode("utf-8") create_media=''' <config xmlns="urn:ietf:params:xml:ns:netconf:base:1.0"> <wavelength-router xmlns="http://openconfig.net/yang/wavelength-router"> <media-channels> <channel> <index>1</index> <config> <index>1</index> <name>C_BAND</name> <lower-frequency>192006250</lower-frequency> <upper-frequency>192056250</upper-frequency> <optical-band-parent xmlns="http://flex-scale-project.eu/yang/flex-scale-mg-on">1</optical-band-parent> </config> <source> <config> <port-name>111</port-name> </config> </source> <dest> <config> <port-name>2</port-name> </config> </dest> </channel> </media-channels> </wavelength-router> </config> ''' delete_optical_band= ''' <config xmlns="urn:ietf:params:xml:ns:netconf:base:1.0"> <wavelength-router xmlns="http://openconfig.net/yang/wavelength-router"> <optical-bands xmlns="http://flex-scale-project.eu/yang/flex-scale-mg-on"> <optical-band operation="delete"> <index>1</index> <config> <index>1</index> </config> </optical-band> </optical-bands> </wavelength-router> </config> ''' def extract_channel_xmlns (data_xml:str,is_opticalband:bool): xml_bytes = data_xml.encode("utf-8") root = ET.fromstring(xml_bytes) namespace = {'oc': "http://org/openroadm/device"} info = root.findall('.//oc:info',namespace) if info is not None : for i in info : node_id= i.find('.//oc:node-id',namespace) node_number= i.find('.//oc:node-number',namespace) node_type=i.find('.//oc:node-type',namespace) clli=i.find('.//oc:clli',namespace) if (node_id is not None): roadm_info['node-id']=node_id.text if (node_number is not None): roadm_info['node-number']=node_number.text if (node_type is not None): roadm_info['node-type']=node_type.text if (clli is not None): roadm_info['clli']=clli.text return roadm_info namespace=None channels=None if (not is_opticalband) : optical_channel_namespaces = { 'ns': 'urn:ietf:params:xml:ns:netconf:base:1.0', 'oc': 'http://openconfig.net/yang/platform', } def extract_roadm_circuits_pack (xml_data:str): xml_bytes = xml_data.encode("utf-8") root = ET.fromstring(xml_bytes) # with open('xml.log', 'w') as f: # print(xml_bytes, file=f) namespace = {'oc': "http://org/openroadm/device"} circuits = root.findall('.//oc:circuit-packs',namespace) circuits_list =[] # print(f"component {components}") if (circuits is not None): circuit_info ={} for circuit in circuits: circuit_name = circuit.find(".//oc:circuit-pack-name",namespace) circuit_type=circuit.find(".//oc:circuit-pack-type",namespace) circuit_adminstrative_status=circuit.find(".//oc:administrative-state",namespace) circuit_equipment_state=circuit.find("./oc:equipment-state",namespace) circuit_mode=circuit.find("./oc:circuit-pack-mode",namespace) slot= circuit.find("./oc:slot",namespace) shelf= circuit.find("./oc:shelf",namespace) ports = circuit.findall("./oc:ports",namespace) circuit_ports=[] if (ports is not None): for port in ports : port_info={} port_name=port.find('./oc:port-name',namespace) port_qual= port.find("./oc:port-qual",namespace) if port_name is not None : port_info["port_name"]=port_name.text if port_qual is not None : port_info["port_qual"]=port_qual.text circuit_ports.append(port_info) if (circuit_name is not None): circuit_info["circuit_name"]=circuit_name.text if (circuit_type is not None): circuit_info["circuit_type"]=circuit_type.text if (circuit_adminstrative_status is not None): circuit_info["circuit_adminstrative_status"]=circuit_adminstrative_status.text if (circuit_equipment_state is not None): circuit_info["circuit_equipment_state"]=circuit_equipment_state.text if (circuit_mode is not None): circuit_info["circuit_mode"]=circuit_mode.text if (slot is not None): circuit_info["slot"]=slot.text if (shelf is not None): circuit_info["shelf"]=shelf.text circuit_info["ports"]=circuit_ports circuits_list.append(circuit_info) channels= root.find('.//{*}optical-channel',optical_channel_namespaces) if channels is not None : optical_channel_namespace = channels.tag.replace("optical-channel", "") namespace=optical_channel_namespace.replace("{", "").replace("}", "") else : optical_band_namespaces= { 'oc':'http://openconfig.net/yang/wavelength-router' } channels= root.find('.//{*}optical-bands',optical_band_namespaces) if channels is not None: optical_channel_namespace = channels.tag.replace("optical-bands", "") namespace=optical_channel_namespace.replace("{", "").replace("}", "") return circuits_list return namespace def extract_optical_bands (data_xml:str,namespace:str): namespaces={"oc":"http://flex-scale-project.eu/yang/flex-scale-mg-on"} xml_bytes = data_xml.encode("utf-8") root = ET.fromstring(xml_bytes) optical_bands= root.find('.//oc:optical-bands',namespaces) op_bands=[] if optical_bands is not None : optical_bands_ele= optical_bands.findall('.//oc:optical-band',namespaces) for optical_band in optical_bands_ele: band_ele=optical_band.find('.//oc:name',namespaces) lower_freq_ele=optical_band.find('.//oc:lower-frequency',namespaces) upper_freq_ele=optical_band.find('.//oc:upper-frequency',namespaces) admin_status_ele=optical_band.find('.//oc:admin-status',namespaces) source_ele=optical_band.find('.//oc:source/oc:config/oc:port-name',namespaces) dest_ele=optical_band.find('.//oc:dest/oc:config/oc:port-name',namespaces) channel_index= optical_band.find('.//oc:index',namespaces) op_band_obj={ 'band_name':band_ele.text if band_ele is not None else None, 'lower_frequency':lower_freq_ele.text if lower_freq_ele is not None else None, 'upper_frequency':upper_freq_ele.text if upper_freq_ele is not None else None, 'admin_status':admin_status_ele.text if admin_status_ele is not None else None, 'src_port':source_ele.text if source_ele is not None else None, 'dest_port':dest_ele.text if dest_ele is not None else None, "channel_index":channel_index.text if channel_index is not None else None } op_bands.append(op_band_obj) return op_bands def extract_media_channels (data_xml:str,namespace:str): optical_band_namespaces="http://flex-scale-project.eu/yang/flex-scale-mg-on" namespaces={"oc":"http://openconfig.net/yang/wavelength-router",'ob_parent':optical_band_namespaces} xml_bytes = data_xml.encode("utf-8") root = ET.fromstring(xml_bytes) media_channels= root.find(f'.//oc:media-channels',namespaces) op_bands=[] if media_channels is not None : media_channels_ele= media_channels.findall('.//oc:channel',namespaces) for optical_band in media_channels_ele: band_ele=optical_band.find('.//oc:name',namespaces) lower_freq_ele=optical_band.find('.//oc:lower-frequency',namespaces) upper_freq_ele=optical_band.find('.//oc:upper-frequency',namespaces) admin_status_ele=optical_band.find('.//oc:admin-status',namespaces) source_ele=optical_band.find('.//oc:source/oc:config/oc:port-name',namespaces) dest_ele=optical_band.find('.//oc:dest/oc:config/oc:port-name',namespaces) ob_parent=optical_band.find('.//ob_parent:optical-band-parent',namespaces) channel_index= optical_band.find('.//oc:index',namespaces) op_band_obj={ 'band_name':band_ele.text if band_ele is not None else None, 'lower_frequency':lower_freq_ele.text if lower_freq_ele is not None else None, 'upper_frequency':upper_freq_ele.text if upper_freq_ele is not None else None, 'admin_status':admin_status_ele.text if admin_status_ele is not None else None, 'src_port':source_ele.text if source_ele is not None else None, 'dest_port':dest_ele.text if dest_ele is not None else None, 'optical_band_parent':ob_parent.text if ob_parent is not None else None, 'channel_index':channel_index.text if channel_index is not None else None } op_bands.append(op_band_obj) return op_bands device = { 'host': '172.17.0.2', # IP address or hostname of the remote machine 'port': 830, # SSH port (default: 22) 'username': 'openroadm', # SSH username 'password': 'openroadm', # SSH password 'host': '10.0.2.4', # IP address or hostname of the remote machine 'port': 2026, # SSH port (default: 22) 'username': 'admin', # SSH username 'password': 'admin', # SSH password 'device_params': {'name': 'default'}, 'hostkey_verify':False, "allow_agent":False Loading @@ -125,11 +176,22 @@ if __name__ == '__main__': ,hostkey_verify=device['hostkey_verify'] ,allow_agent=device['allow_agent'] ,look_for_keys=device['look_for_keys']) as m : #edit_result = m.edit_config (target="running",config=delete_optical_band) result = m.get_config (source="running").data_xml road_info= extract_openroadm_info(result) circuits=extract_roadm_circuits_pack(result) print(f"roadm_info {road_info}") optical_band_namespaces="http://flex-scale-project.eu/yang/flex-scale-mg-on" namespaces={"oc":"http://openconfig.net/yang/wavelength-router"} obj=extract_media_channels(result,namespaces) obj1=extract_optical_bands(result,namespaces) # road_info= extract_openroadm_info(result) # circuits=extract_roadm_circuits_pack(result) #print (f'edit result {edit_result}') print(f"result {result}") print(f"media_cahnnels {obj}") print(f"optical_bands {obj1}") #print(f"circuits {circuits}") # with open("context.log","w") as f: # print (result,file=f)