Skip to content
Snippets Groups Projects
ServiceGenerator.py 14 KiB
Newer Older
  • Learn to ignore specific revisions
  • # Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/)
    #
    # Licensed under the Apache License, Version 2.0 (the "License");
    # you may not use this file except in compliance with the License.
    # You may obtain a copy of the License at
    #
    #      http://www.apache.org/licenses/LICENSE-2.0
    #
    # Unless required by applicable law or agreed to in writing, software
    # distributed under the License is distributed on an "AS IS" BASIS,
    # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    # See the License for the specific language governing permissions and
    # limitations under the License.
    
    import logging, json, random, threading
    from typing import Dict, Optional, Set, Tuple
    from common.proto.context_pb2 import Empty
    from common.tools.object_factory.Constraint import json_constraint_custom
    from common.tools.object_factory.ConfigRule import json_config_rule_set
    from common.tools.object_factory.Device import json_device_id
    from common.tools.object_factory.EndPoint import json_endpoint_id
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
    from common.tools.object_factory.Service import (
        json_service_l2nm_planned, json_service_l3nm_planned, json_service_tapi_planned)
    
    from context.client.ContextClient import ContextClient
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
    from .Constants import SERVICE_TYPE_L2NM, SERVICE_TYPE_L3NM, SERVICE_TYPE_TAPI
    from .Parameters import Parameters
    
    
    LOGGER = logging.getLogger(__name__)
    
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
    ENDPOINT_COMPATIBILITY = {
        'PHOTONIC_MEDIA:FLEX:G_6_25GHZ:INPUT': 'PHOTONIC_MEDIA:FLEX:G_6_25GHZ:OUTPUT',
        'PHOTONIC_MEDIA:DWDM:G_50GHZ:INPUT'  : 'PHOTONIC_MEDIA:DWDM:G_50GHZ:OUTPUT',
    }
    
    
    class ServiceGenerator:
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
        def __init__(self, parameters : Parameters) -> None:
            self._parameters = parameters
    
            self._lock = threading.Lock()
            self._num_services = 0
            self._available_device_endpoints : Dict[str, Set[str]] = dict()
            self._used_device_endpoints : Dict[str, Dict[str, str]] = dict()
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
            self._endpoint_ids_to_types : Dict[Tuple[str, str], str] = dict()
            self._endpoint_types_to_ids : Dict[str, Set[Tuple[str, str]]] = dict()
    
    
        def initialize(self) -> None:
            with self._lock:
                self._available_device_endpoints.clear()
                self._used_device_endpoints.clear()
    
                context_client = ContextClient()
    
                devices = context_client.ListDevices(Empty())
                for device in devices.devices:
                    device_uuid = device.device_id.device_uuid.uuid
                    _endpoints = self._available_device_endpoints.setdefault(device_uuid, set())
                    for endpoint in device.device_endpoints:
                        endpoint_uuid = endpoint.endpoint_id.endpoint_uuid.uuid
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
                        endpoint_type = endpoint.endpoint_type
    
                        _endpoints.add(endpoint_uuid)
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
                        self._endpoint_ids_to_types.setdefault((device_uuid, endpoint_uuid), endpoint_type)
                        self._endpoint_types_to_ids.setdefault(endpoint_type, set()).add((device_uuid, endpoint_uuid))
    
    
                links = context_client.ListLinks(Empty())
                for link in links.links:
                    for endpoint_id in link.link_endpoint_ids:
                        device_uuid = endpoint_id.device_id.device_uuid.uuid
                        endpoint_uuid = endpoint_id.endpoint_uuid.uuid
                        _endpoints = self._available_device_endpoints.get(device_uuid, set())
                        _endpoints.discard(endpoint_uuid)
                        if len(_endpoints) == 0: self._available_device_endpoints.pop(device_uuid, None)
    
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
                        endpoint_type = self._endpoint_ids_to_types.pop((device_uuid, endpoint_uuid), None)
                        if endpoint_type is None: continue
    
                        if endpoint_type not in self._endpoint_types_to_ids: continue
                        endpoints_for_type = self._endpoint_types_to_ids[endpoint_type]
                        endpoint_key = (device_uuid, endpoint_uuid)
                        if endpoint_key not in endpoints_for_type: continue
                        endpoints_for_type.discard(endpoint_key)
    
    
        @property
        def num_services_generated(self): return self._num_services
    
        def dump_state(self) -> None:
            with self._lock:
                _endpoints = {
                    device_uuid:[endpoint_uuid for endpoint_uuid in endpoint_uuids]
                    for device_uuid,endpoint_uuids in self._available_device_endpoints.items()
                }
                LOGGER.info('[dump_state] available_device_endpoints = {:s}'.format(json.dumps(_endpoints)))
                LOGGER.info('[dump_state] used_device_endpoints = {:s}'.format(json.dumps(self._used_device_endpoints)))
    
        def _use_device_endpoint(
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
            self, service_uuid : str, endpoint_types : Optional[Set[str]] = None, exclude_device_uuids : Set[str] = set()
    
        ) -> Optional[Tuple[str, str]]:
            with self._lock:
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
                compatible_endpoints : Set[Tuple[str, str]] = set()
                elegible_device_endpoints : Dict[str, Set[str]] = {}
    
                if endpoint_types is None:
                    # allow all
                    elegible_device_endpoints : Dict[str, Set[str]] = {
                        device_uuid:device_endpoint_uuids
                        for device_uuid,device_endpoint_uuids in self._available_device_endpoints.items()
                        if device_uuid not in exclude_device_uuids and len(device_endpoint_uuids) > 0
                    }
                else:
                    # allow only compatible endpoints
                    for endpoint_type in endpoint_types:
                        if endpoint_type not in self._endpoint_types_to_ids: continue
                        compatible_endpoints.update(self._endpoint_types_to_ids[endpoint_type])
    
                    for device_uuid,device_endpoint_uuids in self._available_device_endpoints.items():
                        if device_uuid in exclude_device_uuids or len(device_endpoint_uuids) == 0: continue
                        for endpoint_uuid in device_endpoint_uuids:
                            endpoint_key = (device_uuid,endpoint_uuid)
                            if endpoint_key not in compatible_endpoints: continue
                            elegible_device_endpoints.setdefault(device_uuid, set()).add(endpoint_uuid)
    
                if len(elegible_device_endpoints) == 0:
                    LOGGER.warning(' '.join([
                        '>> No endpoint is available:',
                        'endpoint_types={:s}'.format(str(endpoint_types)),
                        'exclude_device_uuids={:s}'.format(str(exclude_device_uuids)),
                        'self._endpoint_types_to_ids={:s}'.format(str(self._endpoint_types_to_ids)),
                        'self._available_device_endpoints={:s}'.format(str(self._available_device_endpoints)),
                        'compatible_endpoints={:s}'.format(str(compatible_endpoints)),
                    ]))
                    return None
    
                device_uuid = random.choice(list(elegible_device_endpoints.keys()))
                device_endpoint_uuids = elegible_device_endpoints.get(device_uuid)
                endpoint_uuid = random.choice(list(device_endpoint_uuids))
                self._available_device_endpoints.setdefault(device_uuid, set()).discard(endpoint_uuid)
                self._used_device_endpoints.setdefault(device_uuid, dict())[endpoint_uuid] = service_uuid
                return device_uuid, endpoint_uuid
    
        def _release_device_endpoint(self, device_uuid : str, endpoint_uuid : str) -> None:
            with self._lock:
                self._used_device_endpoints.setdefault(device_uuid, set()).pop(endpoint_uuid, None)
                self._available_device_endpoints.setdefault(device_uuid, set()).add(endpoint_uuid)
    
        def compose_service(self) -> Optional[Dict]:
            with self._lock:
                self._num_services += 1
                num_service = self._num_services
            #service_uuid = str(uuid.uuid4())
            service_uuid = 'svc_{:d}'.format(num_service)
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
            
            # choose service type
            service_type = random.choice(self._parameters.service_types)
    
            # choose source endpoint
            src_endpoint_types = set(ENDPOINT_COMPATIBILITY.keys()) if service_type in {SERVICE_TYPE_TAPI} else None
            src = self._use_device_endpoint(service_uuid, endpoint_types=src_endpoint_types)
            if src is None:
                LOGGER.warning('>> No source endpoint is available')
                return None
    
            src_device_uuid,src_endpoint_uuid = src
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
    
            # identify compatible destination endpoint types
            src_endpoint_type = self._endpoint_ids_to_types.get((src_device_uuid,src_endpoint_uuid))
            dst_endpoint_type = ENDPOINT_COMPATIBILITY.get(src_endpoint_type)
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
            dst_endpoint_types = {dst_endpoint_type} if service_type in {SERVICE_TYPE_TAPI} else None
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
    
            # identify expluded destination devices
            exclude_device_uuids = {} if service_type in {SERVICE_TYPE_TAPI} else {src_device_uuid}
    
            # choose feasible destination endpoint
            dst = self._use_device_endpoint(
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
                service_uuid, endpoint_types=dst_endpoint_types, exclude_device_uuids=exclude_device_uuids)
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
            
            # if destination endpoint not found, release source, and terminate current service generation
    
            if dst is None:
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
                LOGGER.warning('>> No destination endpoint is available')
    
                self._release_device_endpoint(src_device_uuid, src_endpoint_uuid)
                return None
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
    
            # compose endpoints
    
            dst_device_uuid,dst_endpoint_uuid = dst
            endpoint_ids = [
                json_endpoint_id(json_device_id(src_device_uuid), src_endpoint_uuid),
                json_endpoint_id(json_device_id(dst_device_uuid), dst_endpoint_uuid),
            ]
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
    
            if service_type == SERVICE_TYPE_L2NM:
                constraints = [
                    json_constraint_custom('bandwidth[gbps]', '10.0'),
                    json_constraint_custom('latency[ms]',     '20.0'),
                ]
                vlan_id = num_service % 1000
                circuit_id = '{:03d}'.format(vlan_id)
                src_router_id = '10.0.0.{:d}'.format(int(src_device_uuid.replace('R', '')))
                dst_router_id = '10.0.0.{:d}'.format(int(src_device_uuid.replace('R', '')))
                config_rules = [
                    json_config_rule_set('/settings', {
                        'mtu': 1512
                    }),
                    json_config_rule_set('/device[{:s}]/endpoint[{:s}]/settings'.format(src_device_uuid, src_endpoint_uuid), {
                        'router_id': src_router_id,
                        'sub_interface_index': vlan_id,
                        'vlan_id': vlan_id,
                        'remote_router': dst_router_id,
                        'circuit_id': circuit_id,
                    }),
                    json_config_rule_set('/device[{:s}]/endpoint[{:s}]/settings'.format(dst_device_uuid, dst_endpoint_uuid), {
                        'router_id': dst_router_id,
                        'sub_interface_index': vlan_id,
                        'vlan_id': vlan_id,
                        'remote_router': src_router_id,
                        'circuit_id': circuit_id,
                    }),
                ]
                return json_service_l2nm_planned(
                    service_uuid, endpoint_ids=endpoint_ids, constraints=constraints, config_rules=config_rules)
    
            elif service_type == SERVICE_TYPE_L3NM:
                constraints = [
                    json_constraint_custom('bandwidth[gbps]', '10.0'),
                    json_constraint_custom('latency[ms]',     '20.0'),
                ]
                vlan_id = num_service % 1000
                bgp_as = 60000 + (num_service % 10000)
                bgp_route_target = '{:5d}:{:03d}'.format(bgp_as, 333)
                route_distinguisher = '{:5d}:{:03d}'.format(bgp_as, vlan_id)
                src_router_id = '10.0.0.{:d}'.format(int(src_device_uuid.replace('R', '')))
                dst_router_id = '10.0.0.{:d}'.format(int(src_device_uuid.replace('R', '')))
                src_address_ip = '.'.join([src_device_uuid.replace('R', ''), '0'] + src_endpoint_uuid.split('/'))
                dst_address_ip = '.'.join([dst_device_uuid.replace('R', ''), '0'] + dst_endpoint_uuid.split('/'))
                config_rules = [
                    json_config_rule_set('/settings', {
                        'mtu'             : 1512,
                        'bgp_as'          : bgp_as,
                        'bgp_route_target': bgp_route_target,
                    }),
                    json_config_rule_set('/device[{:s}]/endpoint[{:s}]/settings'.format(src_device_uuid, src_endpoint_uuid), {
                        'router_id'          : src_router_id,
                        'route_distinguisher': route_distinguisher,
                        'sub_interface_index': vlan_id,
                        'vlan_id'            : vlan_id,
                        'address_ip'         : src_address_ip,
                        'address_prefix'     : 16,
                    }),
                    json_config_rule_set('/device[{:s}]/endpoint[{:s}]/settings'.format(dst_device_uuid, dst_endpoint_uuid), {
                        'router_id'          : dst_router_id,
                        'route_distinguisher': route_distinguisher,
                        'sub_interface_index': vlan_id,
                        'vlan_id'            : vlan_id,
                        'address_ip'         : dst_address_ip,
                        'address_prefix'     : 16,
                    }),
                ]
                return json_service_l3nm_planned(
                    service_uuid, endpoint_ids=endpoint_ids, constraints=constraints, config_rules=config_rules)
    
            elif service_type == SERVICE_TYPE_TAPI:
                config_rules = [
                    json_config_rule_set('/settings', {
                        'capacity_value'  : 50.0,
                        'capacity_unit'   : 'GHz',
                        'layer_proto_name': 'PHOTONIC_MEDIA',
                        'layer_proto_qual': 'tapi-photonic-media:PHOTONIC_LAYER_QUALIFIER_NMC',
                        'direction'       : 'UNIDIRECTIONAL',
                    }),
                ]
                return json_service_tapi_planned(
                    service_uuid, endpoint_ids=endpoint_ids, constraints=[], config_rules=config_rules)
    
    
        def release_service(self, json_service : Dict) -> None:
            for endpoint_id in json_service['service_endpoint_ids']:
                device_uuid = endpoint_id['device_id']['device_uuid']['uuid']
                endpoint_uuid = endpoint_id['endpoint_uuid']['uuid']
                self._release_device_endpoint(device_uuid, endpoint_uuid)