Scheduled maintenance on Saturday, 27 September 2025, from 07:00 AM to 4:00 PM GMT (09:00 AM to 6:00 PM CEST) - some services may be unavailable -

Skip to content
Snippets Groups Projects
Select Git revision
  • cca3e72cbaf1f6d84d0a24ebf09d93f6cc56f587
  • master default
  • feat/320-cttc-ietf-simap-basic-support-with-kafka-yang-push
  • feat/314-tid-new-service-for-ipowdm-configuration-fron-orchestrator-to-ipowdm-controller
  • feat/327-tid-new-service-to-ipowdm-controller-to-manage-transceivers-configuration-on-external-agent
  • feat/292-cttc-implement-integration-test-for-ryu-openflow
  • cnit_tapi
  • cnit-p2mp-premerge
  • feat/325-tid-nbi-e2e-to-manage-e2e-path-computation
  • feat/307-update-python-version-service
  • feat/326-tid-external-management-of-devices-telemetry-nbi
  • openroadm-flex-grid
  • feat/310-cttc-implement-nbi-connector-to-interface-with-osm-client
  • develop protected
  • feat/324-tid-nbi-ietf_l3vpn-deploy-fail
  • feat/321-add-support-for-gnmi-configuration-via-proto
  • feat/322-add-read-support-for-ipinfusion-devices-via-netconf
  • feat/323-add-support-for-restconf-protocol-in-devices
  • feat/policy-refactor
  • feat/192-cttc-implement-telemetry-backend-collector-gnmi-openconfig
  • feat/307-update-python-version
  • feat/telemetry-collector-int
  • v5.0.0 protected
  • v4.0.0 protected
  • demo-dpiab-eucnc2024
  • v3.0.0 protected
  • v2.1.0 protected
  • v2.0.0 protected
  • v1.0.0 protected
29 results

test.py

Blame
  • ismaeel's avatar
    Mohammad Ismaeel authored
    996dad7e
    History
    Code owners
    Assign users and groups as approvers for specific file changes. Learn more.
    test.py 7.75 KiB
    from ncclient import manager
    from ncclient.xml_ import *
    import lxml.etree as ET
    import re
    from typing import Optional, Union
    from uuid import UUID, uuid4, uuid5
    import logging
    
    
    create_media='''
    <config xmlns="urn:ietf:params:xml:ns:netconf:base:1.0">
      <wavelength-router xmlns="http://openconfig.net/yang/wavelength-router">
        <media-channels>
         <channel>
           <index>1</index>
           <config>
            <index>1</index>
            <name>C_BAND</name>
            <lower-frequency>192006250</lower-frequency>
            <upper-frequency>192056250</upper-frequency>
            <optical-band-parent xmlns="http://flex-scale-project.eu/yang/flex-scale-mg-on">1</optical-band-parent>
           </config>
           <source>
             <config>
              <port-name>111</port-name>
             </config>
           </source>
           <dest>
            <config>
             <port-name>2</port-name>
            </config>
           </dest>
          </channel>
       </media-channels>
      </wavelength-router>
    </config>
    '''
    
    delete_optical_band= '''
    <config xmlns="urn:ietf:params:xml:ns:netconf:base:1.0"> 
     <wavelength-router xmlns="http://openconfig.net/yang/wavelength-router">   
       <optical-bands xmlns="http://flex-scale-project.eu/yang/flex-scale-mg-on">     
          <optical-band operation="delete">      
            <index>1</index>        
            <config>          
              <index>1</index>   
            </config>      
          </optical-band>    
       </optical-bands>  
     </wavelength-router>
    </config>
    '''
    
    def extract_channel_xmlns (data_xml:str,is_opticalband:bool):
        xml_bytes = data_xml.encode("utf-8")
        root = ET.fromstring(xml_bytes) 
     
        namespace=None
        channels=None
      
        if (not is_opticalband) :
          
            optical_channel_namespaces = {
            'ns': 'urn:ietf:params:xml:ns:netconf:base:1.0',
              'oc': 'http://openconfig.net/yang/platform',
            }
           
            channels= root.find('.//{*}optical-channel',optical_channel_namespaces)
            if channels is not None :
              optical_channel_namespace = channels.tag.replace("optical-channel", "")
              namespace=optical_channel_namespace.replace("{", "").replace("}", "")
        else :       
            optical_band_namespaces= {
              'oc':'http://openconfig.net/yang/wavelength-router'
            }
            
            channels= root.find('.//{*}optical-bands',optical_band_namespaces)
            if channels is not None: 
              optical_channel_namespace = channels.tag.replace("optical-bands", "")
              namespace=optical_channel_namespace.replace("{", "").replace("}", "")
            
       
        return namespace
      
    def extract_optical_bands (data_xml:str,namespace:str):
        namespaces={"oc":"http://flex-scale-project.eu/yang/flex-scale-mg-on"}
        xml_bytes = data_xml.encode("utf-8")
        root = ET.fromstring(xml_bytes) 
        optical_bands= root.find('.//oc:optical-bands',namespaces)
        op_bands=[]
        if optical_bands is not None :
          optical_bands_ele= optical_bands.findall('.//oc:optical-band',namespaces)
          
          
          for optical_band in optical_bands_ele:
              
              band_ele=optical_band.find('.//oc:name',namespaces)
              lower_freq_ele=optical_band.find('.//oc:lower-frequency',namespaces)
              upper_freq_ele=optical_band.find('.//oc:upper-frequency',namespaces)
              admin_status_ele=optical_band.find('.//oc:admin-status',namespaces)
              source_ele=optical_band.find('.//oc:source/oc:config/oc:port-name',namespaces)
              dest_ele=optical_band.find('.//oc:dest/oc:config/oc:port-name',namespaces)
              channel_index= optical_band.find('.//oc:index',namespaces)
              op_band_obj={
                  'band_name':band_ele.text if band_ele is not None else None,
                  'lower_frequency':lower_freq_ele.text if lower_freq_ele is not None else None,
                  'upper_frequency':upper_freq_ele.text if upper_freq_ele is not None else None,
                  'admin_status':admin_status_ele.text if admin_status_ele is not None else None,
                  'src_port':source_ele.text if source_ele is not None else None,
                  'dest_port':dest_ele.text if dest_ele is not None else None,
                  "channel_index":channel_index.text if channel_index is not None else None
              }
              op_bands.append(op_band_obj)
            
        return op_bands
            
    def extract_media_channels (data_xml:str,namespace:str):
        optical_band_namespaces="http://flex-scale-project.eu/yang/flex-scale-mg-on"
        namespaces={"oc":"http://openconfig.net/yang/wavelength-router",'ob_parent':optical_band_namespaces}
        xml_bytes = data_xml.encode("utf-8")
        root = ET.fromstring(xml_bytes) 
        media_channels= root.find(f'.//oc:media-channels',namespaces)
        op_bands=[]
        if media_channels is not None :
          
          media_channels_ele= media_channels.findall('.//oc:channel',namespaces)
          
    
          
          for optical_band in media_channels_ele:
              
              band_ele=optical_band.find('.//oc:name',namespaces)
              lower_freq_ele=optical_band.find('.//oc:lower-frequency',namespaces)
              upper_freq_ele=optical_band.find('.//oc:upper-frequency',namespaces)
              admin_status_ele=optical_band.find('.//oc:admin-status',namespaces)
              source_ele=optical_band.find('.//oc:source/oc:config/oc:port-name',namespaces)
              dest_ele=optical_band.find('.//oc:dest/oc:config/oc:port-name',namespaces)
              ob_parent=optical_band.find('.//ob_parent:optical-band-parent',namespaces)
              channel_index= optical_band.find('.//oc:index',namespaces)
              op_band_obj={
                  'band_name':band_ele.text if band_ele is not None else None,
                  'lower_frequency':lower_freq_ele.text if lower_freq_ele is not None else None,
                  'upper_frequency':upper_freq_ele.text if upper_freq_ele is not None else None,
                  'admin_status':admin_status_ele.text if admin_status_ele is not None else None,
                  'src_port':source_ele.text if source_ele is not None else None,
                  'dest_port':dest_ele.text if dest_ele is not None else None,
                  'optical_band_parent':ob_parent.text if ob_parent is not None else None,
                  'channel_index':channel_index.text if channel_index is not None else None
              }
              op_bands.append(op_band_obj)
            
        return op_bands
                  
            
    
    
    device = {
        'host': '10.0.2.4',        # IP address or hostname of the remote machine
        'port': 2026,                # SSH port (default: 22)
        'username': 'admin',    # SSH username
        'password': 'admin',    # SSH password
        'device_params': {'name': 'default'},
         'hostkey_verify':False,
         "allow_agent":False
         ,"look_for_keys":False
    }
    
    
          
    if __name__ == '__main__':
        
        with manager.connect(host=device['host']
                             ,port=device['port']
                              ,username=device['username']
                              ,password=device['password']
                              ,hostkey_verify=device['hostkey_verify']
                              ,allow_agent=device['allow_agent']
                              ,look_for_keys=device['look_for_keys']) as m :
            #edit_result = m.edit_config (target="running",config=delete_optical_band)
            result = m.get_config (source="running").data_xml
            optical_band_namespaces="http://flex-scale-project.eu/yang/flex-scale-mg-on"
            namespaces={"oc":"http://openconfig.net/yang/wavelength-router"}
            obj=extract_media_channels(result,namespaces)
            obj1=extract_optical_bands(result,namespaces)
            # road_info= extract_openroadm_info(result)
            # circuits=extract_roadm_circuits_pack(result)
            #print (f'edit result {edit_result}')
            print(f"result {result}")
            print(f"media_cahnnels {obj}")
            print(f"optical_bands {obj1}")
    
            #print(f"circuits {circuits}")
            # with open("context.log","w") as f:
            #      print (result,file=f)