Skip to content
Snippets Groups Projects
test.py 7.75 KiB
Newer Older
from ncclient import manager
from ncclient.xml_ import *
import lxml.etree as ET
import re
from typing import Optional, Union
from uuid import UUID, uuid4, uuid5
import logging


Mohammad Ismaeel's avatar
Mohammad Ismaeel committed
create_media='''
<config xmlns="urn:ietf:params:xml:ns:netconf:base:1.0">
  <wavelength-router xmlns="http://openconfig.net/yang/wavelength-router">
    <media-channels>
     <channel>
       <index>1</index>
       <config>
        <index>1</index>
        <name>C_BAND</name>
        <lower-frequency>192006250</lower-frequency>
        <upper-frequency>192056250</upper-frequency>
        <optical-band-parent xmlns="http://flex-scale-project.eu/yang/flex-scale-mg-on">1</optical-band-parent>
       </config>
       <source>
         <config>
          <port-name>111</port-name>
         </config>
       </source>
       <dest>
        <config>
         <port-name>2</port-name>
        </config>
       </dest>
      </channel>
   </media-channels>
  </wavelength-router>
</config>
'''
Mohammad Ismaeel's avatar
Mohammad Ismaeel committed
delete_optical_band= '''
<config xmlns="urn:ietf:params:xml:ns:netconf:base:1.0"> 
 <wavelength-router xmlns="http://openconfig.net/yang/wavelength-router">   
   <optical-bands xmlns="http://flex-scale-project.eu/yang/flex-scale-mg-on">     
      <optical-band operation="delete">      
        <index>1</index>        
        <config>          
          <index>1</index>   
        </config>      
      </optical-band>    
   </optical-bands>  
 </wavelength-router>
</config>
'''
Mohammad Ismaeel's avatar
Mohammad Ismaeel committed
def extract_channel_xmlns (data_xml:str,is_opticalband:bool):
    xml_bytes = data_xml.encode("utf-8")
    root = ET.fromstring(xml_bytes) 
 
    namespace=None
    channels=None
Mohammad Ismaeel's avatar
Mohammad Ismaeel committed
    if (not is_opticalband) :
      
        optical_channel_namespaces = {
        'ns': 'urn:ietf:params:xml:ns:netconf:base:1.0',
          'oc': 'http://openconfig.net/yang/platform',
        }
       
        channels= root.find('.//{*}optical-channel',optical_channel_namespaces)
        if channels is not None :
          optical_channel_namespace = channels.tag.replace("optical-channel", "")
          namespace=optical_channel_namespace.replace("{", "").replace("}", "")
    else :       
        optical_band_namespaces= {
          'oc':'http://openconfig.net/yang/wavelength-router'
        }
        
        channels= root.find('.//{*}optical-bands',optical_band_namespaces)
        if channels is not None: 
          optical_channel_namespace = channels.tag.replace("optical-bands", "")
          namespace=optical_channel_namespace.replace("{", "").replace("}", "")
Mohammad Ismaeel's avatar
Mohammad Ismaeel committed
   
    return namespace
  
def extract_optical_bands (data_xml:str,namespace:str):
    namespaces={"oc":"http://flex-scale-project.eu/yang/flex-scale-mg-on"}
    xml_bytes = data_xml.encode("utf-8")
    root = ET.fromstring(xml_bytes) 
    optical_bands= root.find('.//oc:optical-bands',namespaces)
    op_bands=[]
    if optical_bands is not None :
      optical_bands_ele= optical_bands.findall('.//oc:optical-band',namespaces)
      
      
      for optical_band in optical_bands_ele:
Mohammad Ismaeel's avatar
Mohammad Ismaeel committed
          band_ele=optical_band.find('.//oc:name',namespaces)
          lower_freq_ele=optical_band.find('.//oc:lower-frequency',namespaces)
          upper_freq_ele=optical_band.find('.//oc:upper-frequency',namespaces)
          admin_status_ele=optical_band.find('.//oc:admin-status',namespaces)
          source_ele=optical_band.find('.//oc:source/oc:config/oc:port-name',namespaces)
          dest_ele=optical_band.find('.//oc:dest/oc:config/oc:port-name',namespaces)
          channel_index= optical_band.find('.//oc:index',namespaces)
          op_band_obj={
              'band_name':band_ele.text if band_ele is not None else None,
              'lower_frequency':lower_freq_ele.text if lower_freq_ele is not None else None,
              'upper_frequency':upper_freq_ele.text if upper_freq_ele is not None else None,
              'admin_status':admin_status_ele.text if admin_status_ele is not None else None,
              'src_port':source_ele.text if source_ele is not None else None,
              'dest_port':dest_ele.text if dest_ele is not None else None,
              "channel_index":channel_index.text if channel_index is not None else None
          }
          op_bands.append(op_band_obj)
Mohammad Ismaeel's avatar
Mohammad Ismaeel committed
    return op_bands
        
def extract_media_channels (data_xml:str,namespace:str):
    optical_band_namespaces="http://flex-scale-project.eu/yang/flex-scale-mg-on"
    namespaces={"oc":"http://openconfig.net/yang/wavelength-router",'ob_parent':optical_band_namespaces}
    xml_bytes = data_xml.encode("utf-8")
    root = ET.fromstring(xml_bytes) 
    media_channels= root.find(f'.//oc:media-channels',namespaces)
    op_bands=[]
    if media_channels is not None :
      
      media_channels_ele= media_channels.findall('.//oc:channel',namespaces)
      
Mohammad Ismaeel's avatar
Mohammad Ismaeel committed
      
      for optical_band in media_channels_ele:
          
          band_ele=optical_band.find('.//oc:name',namespaces)
          lower_freq_ele=optical_band.find('.//oc:lower-frequency',namespaces)
          upper_freq_ele=optical_band.find('.//oc:upper-frequency',namespaces)
          admin_status_ele=optical_band.find('.//oc:admin-status',namespaces)
          source_ele=optical_band.find('.//oc:source/oc:config/oc:port-name',namespaces)
          dest_ele=optical_band.find('.//oc:dest/oc:config/oc:port-name',namespaces)
          ob_parent=optical_band.find('.//ob_parent:optical-band-parent',namespaces)
          channel_index= optical_band.find('.//oc:index',namespaces)
          op_band_obj={
              'band_name':band_ele.text if band_ele is not None else None,
              'lower_frequency':lower_freq_ele.text if lower_freq_ele is not None else None,
              'upper_frequency':upper_freq_ele.text if upper_freq_ele is not None else None,
              'admin_status':admin_status_ele.text if admin_status_ele is not None else None,
              'src_port':source_ele.text if source_ele is not None else None,
              'dest_port':dest_ele.text if dest_ele is not None else None,
              'optical_band_parent':ob_parent.text if ob_parent is not None else None,
              'channel_index':channel_index.text if channel_index is not None else None
          }
          op_bands.append(op_band_obj)
        
    return op_bands
              
        
Mohammad Ismaeel's avatar
Mohammad Ismaeel committed
    'host': '10.0.2.4',        # IP address or hostname of the remote machine
    'port': 2026,                # SSH port (default: 22)
    'username': 'admin',    # SSH username
    'password': 'admin',    # SSH password
    'device_params': {'name': 'default'},
     'hostkey_verify':False,
     "allow_agent":False
     ,"look_for_keys":False
}
if __name__ == '__main__':
Mohammad Ismaeel's avatar
Mohammad Ismaeel committed
    with manager.connect(host=device['host']
                         ,port=device['port']
                          ,username=device['username']
                          ,password=device['password']
                          ,hostkey_verify=device['hostkey_verify']
                          ,allow_agent=device['allow_agent']
                          ,look_for_keys=device['look_for_keys']) as m :
Mohammad Ismaeel's avatar
Mohammad Ismaeel committed
        #edit_result = m.edit_config (target="running",config=delete_optical_band)
Mohammad Ismaeel's avatar
Mohammad Ismaeel committed
        result = m.get_config (source="running").data_xml
Mohammad Ismaeel's avatar
Mohammad Ismaeel committed
        optical_band_namespaces="http://flex-scale-project.eu/yang/flex-scale-mg-on"
        namespaces={"oc":"http://openconfig.net/yang/wavelength-router"}
        obj=extract_media_channels(result,namespaces)
        obj1=extract_optical_bands(result,namespaces)
        # road_info= extract_openroadm_info(result)
        # circuits=extract_roadm_circuits_pack(result)
        #print (f'edit result {edit_result}')
        print(f"result {result}")
        print(f"media_cahnnels {obj}")
        print(f"optical_bands {obj1}")

Mohammad Ismaeel's avatar
Mohammad Ismaeel committed
        #print(f"circuits {circuits}")
        # with open("context.log","w") as f:
        #      print (result,file=f)