Skip to content
Snippets Groups Projects
Tools.py 10.2 KiB
Newer Older
# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import re,logging
import json
import lxml.etree as ET
from typing import Collection, Dict, Any

from yattag import Doc, indent
from .VPN.physical import create_optical_channel
def add_value_from_tag(target : Dict, field_name: str, field_value : ET.Element, cast=None) -> None:
    if isinstance(field_value,str) or field_value is None or field_value.text is None: return
    field_value = field_value.text
    if cast is not None: field_value = cast(field_value)
    target[field_name] = field_value

def add_value_from_collection(target : Dict, field_name: str, field_value : Collection) -> None:
    if field_value is None or len(field_value) == 0: return
    target[field_name] = field_value

"""
# Method Name: generate_templates
  
# Parameters:
  - resource_key:   [str]  Variable to identify the rule to be executed.
  - resource_value: [str]  Variable with the configuration parameters of the rule to be executed.
  - delete:         [bool] Variable to identify whether to create or delete the rule.
  - vendor:         [str]  Variable to identify the vendor of the equipment to be configured.
  
# Functionality:
  This method generates the template to configure the equipment using pyangbind. 
  To generate the template the following steps are performed:
  1) Get the first parameter of the variable "resource_key" to identify the main path of the rule.
  2) Search for the specific configuration path
  3) Call the method with the configuration parameters (resource_data variable). 
  
# Return:
  [dict] Set of templates generated according to the configuration rule
"""
def generate_templates(resource_key: str, resource_value: str, channel:str) -> str:    # template management to be configured

    result_templates = []
    data={}
    data['name']=channel
    data['resource_key']=resource_key
    data['value']=resource_value
    result_templates.append(create_physical_config(data))

    return result_templates
def extract_channel_xmlns (data_xml:str,is_opticalband:bool):
    xml_bytes = data_xml.encode("utf-8")
    root = ET.fromstring(xml_bytes) 
 
    namespace=None
    channels=None
  
    if (not is_opticalband) :
      
        optical_channel_namespaces = {
        'ns': 'urn:ietf:params:xml:ns:netconf:base:1.0',
          'oc': 'http://openconfig.net/yang/platform',
        }
       
        channels= root.find('.//{*}optical-channel',optical_channel_namespaces)
        if channels is not None :
          optical_channel_namespace = channels.tag.replace("optical-channel", "")
          namespace=optical_channel_namespace.replace("{", "").replace("}", "")
    else :       
        optical_band_namespaces= {
          'oc':'http://openconfig.net/yang/wavelength-router'
        }
        
        channels= root.find('.//{*}optical-bands',optical_band_namespaces)
        if channels is not None: 
          optical_channel_namespace = channels.tag.replace("optical-bands", "")
          namespace=optical_channel_namespace.replace("{", "").replace("}", "")
        
   
    return namespace
def extract_channels_based_on_channelnamespace (xml_data:str,channel_namespace:str,is_opticalband:bool):
    xml_bytes = xml_data.encode("utf-8")
    root = ET.fromstring(xml_bytes)
    channels=[]
   
    # Find the component names whose children include the "optical-channel" element
    if (not is_opticalband):
       namespace = {'namespace': 'http://openconfig.net/yang/platform','cn':channel_namespace}

       component_names = root.findall('.//namespace:component[cn:optical-channel]',namespace)

      # Extract and print the component names
       for component in component_names:
          component_name = component.find('namespace:name', namespace).text 
          channels.append({"index":component_name})
    else :
        namespaces = {
              'wr': 'http://openconfig.net/yang/wavelength-router',
              'fs': channel_namespace
        }
       
        wl = root.findall('.//fs:optical-band',namespaces=namespaces)
  
        for component in wl :
                index=component.find('.//fs:index',namespaces).text
                dest_port_name = component.find('.//fs:dest/fs:config/fs:port-name', namespaces).text

        # Retrieve port-name for source (assuming it exists in the XML structure)
                source_port_name = component.find('.//fs:source/fs:config/fs:port-name', namespaces).text
                channels.append({"index":index,"endpoints":(source_port_name,dest_port_name)})
             
        # Retrieve port-name for dest

    return channels
def extract_channels_based_on_type (xml_data:str):
    xml_bytes = xml_data.encode("utf-8")
    root = ET.fromstring(xml_bytes)

    namespace = {'oc': 'http://openconfig.net/yang/platform', 'typex': 'http://openconfig.net/yang/platform-types'}
    channel_names = []
    components = root.findall('.//oc:component', namespace)
    for component in components:
      
        type_element = component.find('.//oc:state/oc:type[.="oc-opt-types:OPTICAL_CHANNEL"]',namespaces=namespace)
    
        if type_element is not None and type_element.text == 'oc-opt-types:OPTICAL_CHANNEL':
            name_element = component.find('oc:name', namespace)
            if name_element is not None:
                channel_names.append(name_element.text)
    return channel_names            
    
def extract_value(resource_key:str,xml_data:str,dic:dict,channel_name:str,channel_namespace:str):
    xml_bytes = xml_data.encode("utf-8")
    root = ET.fromstring(xml_bytes)

    namespace = {'oc': 'http://openconfig.net/yang/platform',
              'td': channel_namespace}

    element = root.find(f'.//oc:component[oc:name="{channel_name}"]', namespace)

    if element is not None:
      parameter= element.find(f'.//td:{resource_key}',namespace)
      if (parameter is not None):
        value = parameter.text
        dic[resource_key]=value
      
    else:
       print(" element not found.")
       
    return dic  


def extract_port_value (xml_string:list,port_name:str):

    xml_bytes = xml_string.encode("utf-8")
    root = ET.fromstring(xml_bytes)

    namespace = {"oc": "http://openconfig.net/yang/platform"}
    component=root.find(f".//oc:component[oc:name='{port_name}']", namespace)
    onos_index = component.find(
        f".//oc:property//oc:state/oc:name[.='onos-index']/../oc:value", namespace
    ).text
  
    return (port_name,onos_index)
            
           

  
def extract_tranceiver (data_xml:str,dic:dict):
    xml_bytes = data_xml.encode("utf-8")
    root = ET.fromstring(xml_bytes)
    namespaces = {
      'ns': 'urn:ietf:params:xml:ns:netconf:base:1.0',
      'oc': 'http://openconfig.net/yang/platform',
      'oc-terminal': 'http://openconfig.net/yang/terminal-device',
      'oc-platform-types': 'http://openconfig.net/yang/platform-types'
    }

 
    transceiver_components = root.findall('.//oc:component/oc:state/[oc:type="oc-platform-types:TRANSCEIVER"]../oc:state/oc:name', namespaces)
   
    component_names = [component.text for component in transceiver_components]
    dic['transceiver']=component_names
    return dic
def extract_interface (xml_data:str,dic:dict):
    xml_bytes = xml_data.encode("utf-8")
    root = ET.fromstring(xml_bytes)
    namespaces = {
    'ns': 'urn:ietf:params:xml:ns:netconf:base:1.0',
    'oc': 'http://openconfig.net/yang/interfaces',
    }
    ip_namespaces = {
    'oc': 'http://openconfig.net/yang/interfaces',
    'ip': 'http://openconfig.net/yang/interfaces/ip',
    }

    interfaces = root.findall('.//oc:interfaces/oc:interface', namespaces)
    interface_names = [interface.find('oc:name', namespaces).text for interface in interfaces]
    interface_enabled=[interface.find('oc:config/oc:enabled', namespaces).text for interface in interfaces]
    ip_address_element = root.find('.//ip:ip', ip_namespaces)
    interface_prefix_length=root.find('.//ip:prefix-length',ip_namespaces)
    if (len(interface_names) > 0):
       dic['interface']={"name":interface_names[0],'ip':ip_address_element.text,'enabled':interface_enabled[0],"prefix-length":interface_prefix_length.text}
    else :
        dic['interface']={}   
    return dic
def has_opticalbands(xml_data:str):
    xml_bytes = xml_data.encode("utf-8")
    root = ET.fromstring(xml_bytes)
 
    has_opticalbands=False
    elements= root.find('.//{*}optical-bands')

    if (elements is not None and len(elements) >0):
      has_opticalbands=True
    else :
      has_opticalbands=False
    return has_opticalbands
    
def extractor(data_xml:str,resource_keys:list,dic:dict):
    endpoints=[]
    is_opticalband=has_opticalbands(xml_data=data_xml)
    channel_namespace=extract_channel_xmlns(data_xml=data_xml,is_opticalband=is_opticalband)
    # channel_names=extract_channels_based_on_type(xml_data=data_xml) 
    # if len(channel_names)==0 :
    channel_names= extract_channels_based_on_channelnamespace(xml_data=data_xml,channel_namespace=channel_namespace,is_opticalband=is_opticalband)
   
    lst_dic=[]
    if (is_opticalband):
        endpoints=channel_names
    else:
            
        for channel_name in channel_names:
            dic={}
            for resource_key in resource_keys :
                
                if (resource_key != 'interface'):
                    dic=extract_value(dic=dic,resource_key=resource_key,xml_data=data_xml,channel_name=channel_name,channel_namespace=channel_namespace)  
            dic["name"]=channel_name
            endpoints.append({"endpoint_uuid":{"uuid":channel_name}})
            lst_dic.append(dic)                
    transceivers_dic=extract_tranceiver(data_xml=data_xml,dic={})
    interfaces_dic=extract_interface(xml_data=data_xml,dic={})
   
    return [transceivers_dic,interfaces_dic,lst_dic,channel_namespace,endpoints]