Skip to content
Snippets Groups Projects
test.py 4.88 KiB
Newer Older
from ncclient import manager
from ncclient.xml_ import *
import lxml.etree as ET
import re
from typing import Optional, Union
from uuid import UUID, uuid4, uuid5
import logging


Mohammad Ismaeel's avatar
Mohammad Ismaeel committed
def extract_openroadm_info(xml_data:str):
    roadm_info={"node-id":None,"node-number":None,"node-type":None,'clli':None}
    xml_bytes = xml_data.encode("utf-8")
    root = ET.fromstring(xml_bytes)
    namespace = {'oc': "http://org/openroadm/device"}
    info = root.findall('.//oc:info',namespace)
    if info is not None :
        for i in info :
            node_id= i.find('.//oc:node-id',namespace)
            node_number= i.find('.//oc:node-number',namespace)
            node_type=i.find('.//oc:node-type',namespace)
            clli=i.find('.//oc:clli',namespace)
            if (node_id is not None):
                roadm_info['node-id']=node_id.text
            if (node_number is not None):
                roadm_info['node-number']=node_number.text
            if (node_type is not None):
                roadm_info['node-type']=node_type.text
            if (clli is not None):
                roadm_info['clli']=clli.text
    return roadm_info                    
            
               
        
Mohammad Ismaeel's avatar
Mohammad Ismaeel committed
def extract_roadm_circuits_pack (xml_data:str):
  
   
    xml_bytes = xml_data.encode("utf-8")
    root = ET.fromstring(xml_bytes)
    # with open('xml.log', 'w') as f:
    #      print(xml_bytes, file=f)
    
Mohammad Ismaeel's avatar
Mohammad Ismaeel committed
    namespace = {'oc': "http://org/openroadm/device"}
    
    circuits = root.findall('.//oc:circuit-packs',namespace)
  
    circuits_list =[]
    # print(f"component {components}")
    
    if (circuits is not None):
        circuit_info ={}
        for circuit  in circuits:
        
            circuit_name = circuit.find(".//oc:circuit-pack-name",namespace)
            circuit_type=circuit.find(".//oc:circuit-pack-type",namespace)
            circuit_adminstrative_status=circuit.find(".//oc:administrative-state",namespace)
            circuit_equipment_state=circuit.find("./oc:equipment-state",namespace)
            circuit_mode=circuit.find("./oc:circuit-pack-mode",namespace)
            slot= circuit.find("./oc:slot",namespace)
            shelf= circuit.find("./oc:shelf",namespace)
            ports = circuit.findall("./oc:ports",namespace)
           
            circuit_ports=[]
            if (ports is not None):
                
                for port in ports :
                    port_info={}
                    port_name=port.find('./oc:port-name',namespace)
                    port_qual= port.find("./oc:port-qual",namespace)
                   
                    if port_name is not None :
                        port_info["port_name"]=port_name.text
                    if port_qual is not None :
                        port_info["port_qual"]=port_qual.text
                    circuit_ports.append(port_info)            
            if (circuit_name is not None):
                circuit_info["circuit_name"]=circuit_name.text
            if (circuit_type is not None):
                circuit_info["circuit_type"]=circuit_type.text
            if (circuit_adminstrative_status is not None):
                circuit_info["circuit_adminstrative_status"]=circuit_adminstrative_status.text
            if (circuit_equipment_state is not None):
                circuit_info["circuit_equipment_state"]=circuit_equipment_state.text
            if (circuit_mode is not None):
                circuit_info["circuit_mode"]=circuit_mode.text
            if (slot is not None):
                circuit_info["slot"]=slot.text
            if (shelf is not None):
                circuit_info["shelf"]=shelf.text
            circuit_info["ports"]=circuit_ports 
                                   
            circuits_list.append(circuit_info)   
          
        
    return circuits_list                
Mohammad Ismaeel's avatar
Mohammad Ismaeel committed
    'host': '172.17.0.2',        # IP address or hostname of the remote machine
    'port': 830,                # SSH port (default: 22)
    'username': 'openroadm',    # SSH username
    'password': 'openroadm',    # SSH password
    'device_params': {'name': 'default'},
     'hostkey_verify':False,
     "allow_agent":False
     ,"look_for_keys":False
}
if __name__ == '__main__':
Mohammad Ismaeel's avatar
Mohammad Ismaeel committed
    with manager.connect(host=device['host']
                         ,port=device['port']
                          ,username=device['username']
                          ,password=device['password']
                          ,hostkey_verify=device['hostkey_verify']
                          ,allow_agent=device['allow_agent']
                          ,look_for_keys=device['look_for_keys']) as m :
Mohammad Ismaeel's avatar
Mohammad Ismaeel committed
        result = m.get_config (source="running").data_xml
        road_info= extract_openroadm_info(result)
        circuits=extract_roadm_circuits_pack(result)
        print(f"roadm_info {road_info}")
        #print(f"circuits {circuits}")
        # with open("context.log","w") as f:
        #      print (result,file=f)