Skip to content
TfsApiClient.py 11.6 KiB
Newer Older
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import logging
from typing import Dict, List, Optional, Tuple
from common.Constants import DEFAULT_CONTEXT_NAME, DEFAULT_TOPOLOGY_NAME
from common.proto.context_pb2 import ServiceStatusEnum, ServiceTypeEnum
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from common.tools.client.RestClient import RestClient
from common.tools.object_factory.Constraint import json_constraint_custom
from common.tools.object_factory.Context import json_context_id
from common.tools.object_factory.Device import json_device_id
from common.tools.object_factory.EndPoint import json_endpoint_id
from common.tools.object_factory.Service import json_service
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from device.service.driver_api.ImportTopologyEnum import ImportTopologyEnum

CONTEXT_IDS_URL = '/tfs-api/context_ids'
TOPOLOGY_URL    = '/tfs-api/context/{context_uuid:s}/topology_details/{topology_uuid:s}'
SERVICES_URL    = '/tfs-api/context/{context_uuid:s}/services'
SERVICE_URL     = '/tfs-api/context/{context_uuid:s}/service/{service_uuid:s}'
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

MAPPING_STATUS = {
    'DEVICEOPERATIONALSTATUS_UNDEFINED': 0,
    'DEVICEOPERATIONALSTATUS_DISABLED' : 1,
    'DEVICEOPERATIONALSTATUS_ENABLED'  : 2,
}

MAPPING_DRIVER = {
    'DEVICEDRIVER_UNDEFINED'            : 0,
    'DEVICEDRIVER_OPENCONFIG'           : 1,
    'DEVICEDRIVER_TRANSPORT_API'        : 2,
    'DEVICEDRIVER_P4'                   : 3,
    'DEVICEDRIVER_IETF_NETWORK_TOPOLOGY': 4,
    'DEVICEDRIVER_ONF_TR_532'           : 5,
    'DEVICEDRIVER_XR'                   : 6,
    'DEVICEDRIVER_IETF_L2VPN'           : 7,
    'DEVICEDRIVER_GNMI_OPENCONFIG'      : 8,
    'DEVICEDRIVER_OPTICAL_TFS'          : 9,
    'DEVICEDRIVER_IETF_ACTN'            : 10,
    'DEVICEDRIVER_OC'                   : 11,
    'DEVICEDRIVER_QKD'                  : 12,
    'DEVICEDRIVER_IETF_L3VPN'           : 13,
    'DEVICEDRIVER_IETF_SLICE'           : 14,
    'DEVICEDRIVER_NCE'                  : 15,
}

LOGGER = logging.getLogger(__name__)

class TfsApiClient(RestClient):
    def __init__(
        self, address : str, port : int, scheme : str = 'http',
        username : Optional[str] = None, password : Optional[str] = None,
        timeout : Optional[int] = 30
    ) -> None:
        super().__init__(
            address, port, scheme=scheme, username=username, password=password,
            timeout=timeout, verify_certs=False, allow_redirects=True, logger=LOGGER
        )

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    def check_credentials(self) -> None:
        self.get(CONTEXT_IDS_URL)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        LOGGER.info('Credentials checked')

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    def get_devices_endpoints(
        self, import_topology : ImportTopologyEnum = ImportTopologyEnum.DEVICES
    ) -> List[Dict]:
        LOGGER.debug('[get_devices_endpoints] begin')
        MSG = '[get_devices_endpoints] import_topology={:s}'
        LOGGER.debug(MSG.format(str(import_topology)))

        if import_topology == ImportTopologyEnum.DISABLED:
            MSG = 'Unsupported import_topology mode: {:s}'
            raise Exception(MSG.format(str(import_topology)))

        topology = self.get(TOPOLOGY_URL.format(
            context_uuid=DEFAULT_CONTEXT_NAME, topology_uuid=DEFAULT_TOPOLOGY_NAME
        ))
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

        result = list()
        for json_device in topology['devices']:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            device_uuid : str = json_device['device_id']['device_uuid']['uuid']
            device_type : str = json_device['device_type']
            #if not device_type.startswith('emu-'): device_type = 'emu-' + device_type
            device_status = json_device['device_operational_status']
            device_url = '/devices/device[{:s}]'.format(device_uuid)
            device_data = {
                'uuid': json_device['device_id']['device_uuid']['uuid'],
                'name': json_device['name'],
                'type': device_type,
                'status': MAPPING_STATUS[device_status],
                'drivers': [
                    MAPPING_DRIVER[driver]
                    for driver in json_device['device_drivers']
                ],
            }
            result.append((device_url, device_data))

            for json_endpoint in json_device['device_endpoints']:
                endpoint_uuid = json_endpoint['endpoint_id']['endpoint_uuid']['uuid']
                endpoint_url = '/endpoints/endpoint[{:s}]'.format(endpoint_uuid)
                endpoint_data = {
                    'device_uuid': device_uuid,
                    'uuid': endpoint_uuid,
                    'name': json_endpoint['name'],
                    'type': json_endpoint['endpoint_type'],
                }
                result.append((endpoint_url, endpoint_data))

        if import_topology == ImportTopologyEnum.DEVICES:
            LOGGER.debug('[get_devices_endpoints] devices only; returning')
            return result

        for json_link in topology['links']:
            link_uuid : str = json_link['link_id']['link_uuid']['uuid']
            link_url = '/links/link[{:s}]'.format(link_uuid)
            link_endpoint_ids = [
                (
                    json_endpoint_id['device_id']['device_uuid']['uuid'],
                    json_endpoint_id['endpoint_uuid']['uuid'],
                )
                for json_endpoint_id in json_link['link_endpoint_ids']
            ]
            link_data = {
                'uuid': json_link['link_id']['link_uuid']['uuid'],
                'name': json_link['name'],
                'endpoints': link_endpoint_ids,
            }
            result.append((link_url, link_data))
        for json_link in topology['optical_links']:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            link_uuid : str = json_link['link_id']['link_uuid']['uuid']
            link_url = '/links/link[{:s}]'.format(link_uuid)
            link_endpoint_ids = [
                (
                    json_endpoint_id['device_id']['device_uuid']['uuid'],
                    json_endpoint_id['endpoint_uuid']['uuid'],
                )
                for json_endpoint_id in json_link['link_endpoint_ids']
            ]
            link_data = {
                'uuid': json_link['link_id']['link_uuid']['uuid'],
                'name': json_link['name'],
                'endpoints': link_endpoint_ids,
            }
            result.append((link_url, link_data))

        LOGGER.debug('[get_devices_endpoints] topology; returning')
        return result

    def setup_service(self, resource_value : Dict) -> None:
        service_uuid      = resource_value['service_uuid'     ]
        service_name      = resource_value['service_name'     ]
        src_device_uuid   = resource_value['src_device_uuid'  ]
        src_endpoint_uuid = resource_value['src_endpoint_uuid']
        dst_device_uuid   = resource_value['dst_device_uuid'  ]
        dst_endpoint_uuid = resource_value['dst_endpoint_uuid']
        bitrate           = resource_value['bitrate'          ]
        bidir             = resource_value['bidir'            ]
        ob_width          = resource_value['ob_width'         ]

        endpoint_ids = [
            json_endpoint_id(json_device_id(src_device_uuid), src_endpoint_uuid),
            json_endpoint_id(json_device_id(dst_device_uuid), dst_endpoint_uuid),
        ]
        constraints = [
            json_constraint_custom('bandwidth[gbps]',  str(bitrate)),
            json_constraint_custom('bidirectionality', '1' if bidir else '0'),
        ]
        if service_name == 'IP1/PORT-xe1==IP2/PORT-xe1':
            constraints.append(json_constraint_custom('optical-band-width[GHz]', str(ob_width)))

        service_add = json_service(
            service_uuid,
            ServiceTypeEnum.Name(ServiceTypeEnum.SERVICETYPE_OPTICAL_CONNECTIVITY),
            context_id = json_context_id(DEFAULT_CONTEXT_NAME),
            name = service_name,
            status = ServiceStatusEnum.Name(ServiceStatusEnum.SERVICESTATUS_PLANNED),
        )
        services_url = SERVICES_URL.format(context_uuid=DEFAULT_CONTEXT_NAME)
        service_ids = self.post(services_url, body=service_add)
        assert len(service_ids) == 1
        service_id = service_ids[0]
        service_uuid = service_id['service_uuid']['uuid']

        service_upd = json_service(
            service_uuid,
            ServiceTypeEnum.SERVICETYPE_OPTICAL_CONNECTIVITY,
            context_id = json_context_id(DEFAULT_CONTEXT_NAME),
            name = service_name, endpoint_ids = endpoint_ids, constraints = constraints,
            status = ServiceStatusEnum.Name(ServiceStatusEnum.SERVICESTATUS_PLANNED),
        )
        service_url = SERVICE_URL.format(context_uuid=DEFAULT_CONTEXT_NAME, service_uuid=service_uuid)
        self.put(service_url, body=service_upd)

    def teardown_service(self, resource_value : Dict) -> None:
        service_uuid = resource_value['service_uuid']
        service_name = resource_value['service_name']

        service_url = SERVICE_URL.format(context_uuid=DEFAULT_CONTEXT_NAME, service_uuid=service_uuid)
        self.delete(service_url)
        if service_name == 'IP1/PORT-xe1==IP2/PORT-xe1':
            self.delete(service_url)

    @staticmethod
    def parse_service(service : Dict) -> Tuple[str, Dict]:
        service_uuid = service['service_id']['service_uuid']['uuid']
        src_endpoint_id = service['service_endpoint_ids'][ 0]
        dst_endpoint_id = service['service_endpoint_ids'][-1]
        parsed_service = {
            'service_uuid'     : service_uuid,
            'service_name'     : service['name'],
            'src_device_uuid'  : src_endpoint_id['device_id']['device_uuid']['uuid'],
            'src_endpoint_uuid': src_endpoint_id['endpoint_uuid']['uuid'],
            'dst_device_uuid'  : dst_endpoint_id['device_id']['device_uuid']['uuid'],
            'dst_endpoint_uuid': dst_endpoint_id['endpoint_uuid']['uuid'],
        }

        for constraint in service.get('service_constraints', list()):
            if 'custom' not in constraint: continue
            constraint_type  = constraint['custom']['constraint_type']
            constraint_value = constraint['custom']['constraint_value']
            if constraint_type == 'bandwidth[gbps]':
                parsed_service['bitrate'] = int(float(constraint_value))
            if constraint_type == 'bidirectionality':
                parsed_service['bidir'] = int(constraint_value) == 1
            if constraint_type == 'optical-band-width[GHz]':
                parsed_service['ob_width'] = int(constraint_value)

        resource_key = '/services/service[{:s}]'.format(service_uuid)
        return resource_key, parsed_service

    def get_services(self) -> List[Tuple[str, Dict]]:
        services_url = SERVICES_URL.format(context_uuid=DEFAULT_CONTEXT_NAME)
        _services = self.get(services_url)
        OPTICAL_CONNECTIVITY_SERVICE_TYPES = {
            'SERVICETYPE_OPTICAL_CONNECTIVITY',
            ServiceTypeEnum.SERVICETYPE_OPTICAL_CONNECTIVITY
        }
        return [
            TfsApiClient.parse_service(service)
            for service in _services['services']
            if service['service_type'] in OPTICAL_CONNECTIVITY_SERVICE_TYPES
        ]

    def get_service(self, service_uuid : str) -> Tuple[str, Dict]:
        service_url = SERVICE_URL.format(context_uuid=DEFAULT_CONTEXT_NAME, service_uuid=service_uuid)
        service = self.get(service_url)
        return TfsApiClient.parse_service(service)