Scheduled maintenance on Saturday, 27 September 2025, from 07:00 AM to 4:00 PM GMT (09:00 AM to 6:00 PM CEST) - some services may be unavailable -

Skip to content
test.py 7.75 KiB
Newer Older
from ncclient import manager
from ncclient.xml_ import *
import lxml.etree as ET
Mohammad Ismaeel's avatar
Mohammad Ismaeel committed
import re
from typing import Optional, Union
from uuid import UUID, uuid4, uuid5
import logging


Mohammad Ismaeel's avatar
Mohammad Ismaeel committed
create_media='''
<config xmlns="urn:ietf:params:xml:ns:netconf:base:1.0">
  <wavelength-router xmlns="http://openconfig.net/yang/wavelength-router">
    <media-channels>
     <channel>
       <index>1</index>
       <config>
        <index>1</index>
        <name>C_BAND</name>
        <lower-frequency>192006250</lower-frequency>
        <upper-frequency>192056250</upper-frequency>
        <optical-band-parent xmlns="http://flex-scale-project.eu/yang/flex-scale-mg-on">1</optical-band-parent>
       </config>
       <source>
         <config>
          <port-name>111</port-name>
         </config>
       </source>
       <dest>
        <config>
         <port-name>2</port-name>
        </config>
       </dest>
      </channel>
   </media-channels>
  </wavelength-router>
</config>
'''
Mohammad Ismaeel's avatar
Mohammad Ismaeel committed
delete_optical_band= '''
<config xmlns="urn:ietf:params:xml:ns:netconf:base:1.0"> 
 <wavelength-router xmlns="http://openconfig.net/yang/wavelength-router">   
   <optical-bands xmlns="http://flex-scale-project.eu/yang/flex-scale-mg-on">     
      <optical-band operation="delete">      
        <index>1</index>        
        <config>          
          <index>1</index>   
        </config>      
      </optical-band>    
   </optical-bands>  
 </wavelength-router>
</config>
'''
Mohammad Ismaeel's avatar
Mohammad Ismaeel committed
def extract_channel_xmlns (data_xml:str,is_opticalband:bool):
    xml_bytes = data_xml.encode("utf-8")
    root = ET.fromstring(xml_bytes) 
 
    namespace=None
    channels=None
Mohammad Ismaeel's avatar
Mohammad Ismaeel committed
  
Mohammad Ismaeel's avatar
Mohammad Ismaeel committed
    if (not is_opticalband) :
      
        optical_channel_namespaces = {
        'ns': 'urn:ietf:params:xml:ns:netconf:base:1.0',
          'oc': 'http://openconfig.net/yang/platform',
        }
       
        channels= root.find('.//{*}optical-channel',optical_channel_namespaces)
        if channels is not None :
          optical_channel_namespace = channels.tag.replace("optical-channel", "")
          namespace=optical_channel_namespace.replace("{", "").replace("}", "")
    else :       
        optical_band_namespaces= {
          'oc':'http://openconfig.net/yang/wavelength-router'
        }
        
        channels= root.find('.//{*}optical-bands',optical_band_namespaces)
        if channels is not None: 
          optical_channel_namespace = channels.tag.replace("optical-bands", "")
          namespace=optical_channel_namespace.replace("{", "").replace("}", "")
Mohammad Ismaeel's avatar
Mohammad Ismaeel committed
        
Mohammad Ismaeel's avatar
Mohammad Ismaeel committed
   
    return namespace
  
def extract_optical_bands (data_xml:str,namespace:str):
    namespaces={"oc":"http://flex-scale-project.eu/yang/flex-scale-mg-on"}
    xml_bytes = data_xml.encode("utf-8")
    root = ET.fromstring(xml_bytes) 
    optical_bands= root.find('.//oc:optical-bands',namespaces)
    op_bands=[]
    if optical_bands is not None :
      optical_bands_ele= optical_bands.findall('.//oc:optical-band',namespaces)
      
      
      for optical_band in optical_bands_ele:
Mohammad Ismaeel's avatar
Mohammad Ismaeel committed
          
Mohammad Ismaeel's avatar
Mohammad Ismaeel committed
          band_ele=optical_band.find('.//oc:name',namespaces)
          lower_freq_ele=optical_band.find('.//oc:lower-frequency',namespaces)
          upper_freq_ele=optical_band.find('.//oc:upper-frequency',namespaces)
          admin_status_ele=optical_band.find('.//oc:admin-status',namespaces)
          source_ele=optical_band.find('.//oc:source/oc:config/oc:port-name',namespaces)
          dest_ele=optical_band.find('.//oc:dest/oc:config/oc:port-name',namespaces)
          channel_index= optical_band.find('.//oc:index',namespaces)
          op_band_obj={
              'band_name':band_ele.text if band_ele is not None else None,
              'lower_frequency':lower_freq_ele.text if lower_freq_ele is not None else None,
              'upper_frequency':upper_freq_ele.text if upper_freq_ele is not None else None,
              'admin_status':admin_status_ele.text if admin_status_ele is not None else None,
              'src_port':source_ele.text if source_ele is not None else None,
              'dest_port':dest_ele.text if dest_ele is not None else None,
              "channel_index":channel_index.text if channel_index is not None else None
          }
          op_bands.append(op_band_obj)
Mohammad Ismaeel's avatar
Mohammad Ismaeel committed
    return op_bands
        
def extract_media_channels (data_xml:str,namespace:str):
    optical_band_namespaces="http://flex-scale-project.eu/yang/flex-scale-mg-on"
    namespaces={"oc":"http://openconfig.net/yang/wavelength-router",'ob_parent':optical_band_namespaces}
    xml_bytes = data_xml.encode("utf-8")
    root = ET.fromstring(xml_bytes) 
    media_channels= root.find(f'.//oc:media-channels',namespaces)
    op_bands=[]
    if media_channels is not None :
      
      media_channels_ele= media_channels.findall('.//oc:channel',namespaces)
      
Mohammad Ismaeel's avatar
Mohammad Ismaeel committed
      
      for optical_band in media_channels_ele:
          
          band_ele=optical_band.find('.//oc:name',namespaces)
          lower_freq_ele=optical_band.find('.//oc:lower-frequency',namespaces)
          upper_freq_ele=optical_band.find('.//oc:upper-frequency',namespaces)
          admin_status_ele=optical_band.find('.//oc:admin-status',namespaces)
          source_ele=optical_band.find('.//oc:source/oc:config/oc:port-name',namespaces)
          dest_ele=optical_band.find('.//oc:dest/oc:config/oc:port-name',namespaces)
          ob_parent=optical_band.find('.//ob_parent:optical-band-parent',namespaces)
          channel_index= optical_band.find('.//oc:index',namespaces)
          op_band_obj={
              'band_name':band_ele.text if band_ele is not None else None,
              'lower_frequency':lower_freq_ele.text if lower_freq_ele is not None else None,
              'upper_frequency':upper_freq_ele.text if upper_freq_ele is not None else None,
              'admin_status':admin_status_ele.text if admin_status_ele is not None else None,
              'src_port':source_ele.text if source_ele is not None else None,
              'dest_port':dest_ele.text if dest_ele is not None else None,
              'optical_band_parent':ob_parent.text if ob_parent is not None else None,
              'channel_index':channel_index.text if channel_index is not None else None
          }
          op_bands.append(op_band_obj)
        
    return op_bands
              
        

device = {
Mohammad Ismaeel's avatar
Mohammad Ismaeel committed
    'host': '10.0.2.4',        # IP address or hostname of the remote machine
    'port': 2026,                # SSH port (default: 22)
    'username': 'admin',    # SSH username
    'password': 'admin',    # SSH password
    'device_params': {'name': 'default'},
     'hostkey_verify':False,
     "allow_agent":False
     ,"look_for_keys":False
}
if __name__ == '__main__':
Mohammad Ismaeel's avatar
Mohammad Ismaeel committed
    with manager.connect(host=device['host']
                         ,port=device['port']
                          ,username=device['username']
                          ,password=device['password']
                          ,hostkey_verify=device['hostkey_verify']
                          ,allow_agent=device['allow_agent']
                          ,look_for_keys=device['look_for_keys']) as m :
Mohammad Ismaeel's avatar
Mohammad Ismaeel committed
        #edit_result = m.edit_config (target="running",config=delete_optical_band)
Mohammad Ismaeel's avatar
Mohammad Ismaeel committed
        result = m.get_config (source="running").data_xml
Mohammad Ismaeel's avatar
Mohammad Ismaeel committed
        optical_band_namespaces="http://flex-scale-project.eu/yang/flex-scale-mg-on"
        namespaces={"oc":"http://openconfig.net/yang/wavelength-router"}
        obj=extract_media_channels(result,namespaces)
        obj1=extract_optical_bands(result,namespaces)
        # road_info= extract_openroadm_info(result)
        # circuits=extract_roadm_circuits_pack(result)
        #print (f'edit result {edit_result}')
        print(f"result {result}")
        print(f"media_cahnnels {obj}")
        print(f"optical_bands {obj1}")

Mohammad Ismaeel's avatar
Mohammad Ismaeel committed
        #print(f"circuits {circuits}")
        # with open("context.log","w") as f:
        #      print (result,file=f)