# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/) # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # RFC 8466 - L2VPN Service Model (L2SM) # Ref: https://datatracker.ietf.org/doc/html/rfc8466 import json from typing import Any, Dict, Optional, Tuple from common.proto.context_pb2 import Constraint, EndPointId from common.tools.grpc.Tools import grpc_message_to_json_string def update_constraint_custom(constraints, constraint_type : str, fields : Dict[str, Tuple[Any, bool]]) -> Constraint: # fields: Dict[field_name : str, Tuple[field_value : Any, raise_if_differs : bool]] for constraint in constraints: if constraint.WhichOneof('constraint') != 'custom': continue if constraint.custom.constraint_type != constraint_type: continue json_constraint_value = json.loads(constraint.custom.constraint_value) break # found, end loop else: # not found, add it constraint = constraints.add() # pylint: disable=no-member constraint.custom.constraint_type = constraint_type json_constraint_value = {} for field_name,(field_value, raise_if_differs) in fields.items(): if (field_name not in json_constraint_value) or not raise_if_differs: # missing or raise_if_differs=False, add/update it json_constraint_value[field_name] = field_value elif json_constraint_value[field_name] != field_value: # exists, differs, and raise_if_differs=True msg = 'Specified {:s}({:s}) differs existing value({:s})' raise Exception(msg.format(str(field_name), str(field_value), str(json_constraint_value[field_name]))) constraint.custom.constraint_value = json.dumps(json_constraint_value, sort_keys=True) def update_constraint_endpoint_location( constraints, endpoint_id : EndPointId, region : Optional[str] = None, gps_position : Optional[Tuple[float, float]] = None ) -> Constraint: # gps_position: (latitude, longitude) if region is not None and gps_position is not None: raise Exception('Only one of region/gps_position can be provided') endpoint_uuid = endpoint_id.endpoint_uuid.uuid device_uuid = endpoint_id.device_id.device_uuid.uuid topology_uuid = endpoint_id.topology_id.topology_uuid.uuid context_uuid = endpoint_id.topology_id.context_id.context_uuid.uuid for constraint in constraints: if constraint.WhichOneof('constraint') != 'endpoint_location': continue _endpoint_id = constraint.endpoint_location.endpoint_id if _endpoint_id.endpoint_uuid.uuid != endpoint_uuid: continue if _endpoint_id.device_id.device_uuid.uuid != device_uuid: continue if _endpoint_id.topology_id.topology_uuid.uuid != topology_uuid: continue if _endpoint_id.topology_id.context_id.context_uuid.uuid != context_uuid: continue break # found, end loop else: # not found, add it constraint = constraints.add() # pylint: disable=no-member _endpoint_id = constraint.endpoint_location.endpoint_id _endpoint_id.endpoint_uuid.uuid = endpoint_uuid _endpoint_id.device_id.device_uuid.uuid = device_uuid _endpoint_id.topology_id.topology_uuid.uuid = topology_uuid _endpoint_id.topology_id.context_id.context_uuid.uuid = context_uuid location = constraint.endpoint_location.location if region is not None: location.region = region elif gps_position is not None: location.gps_position.latitude = gps_position[0] location.gps_position.longitude = gps_position[1] return constraint def update_constraint_endpoint_priority(constraints, endpoint_id : EndPointId, priority : int) -> Constraint: endpoint_uuid = endpoint_id.endpoint_uuid.uuid device_uuid = endpoint_id.device_id.device_uuid.uuid topology_uuid = endpoint_id.topology_id.topology_uuid.uuid context_uuid = endpoint_id.topology_id.context_id.context_uuid.uuid for constraint in constraints: if constraint.WhichOneof('constraint') != 'endpoint_priority': continue _endpoint_id = constraint.endpoint_priority.endpoint_id if _endpoint_id.endpoint_uuid.uuid != endpoint_uuid: continue if _endpoint_id.device_id.device_uuid.uuid != device_uuid: continue if _endpoint_id.topology_id.topology_uuid.uuid != topology_uuid: continue if _endpoint_id.topology_id.context_id.context_uuid.uuid != context_uuid: continue break # found, end loop else: # not found, add it constraint = constraints.add() # pylint: disable=no-member _endpoint_id = constraint.endpoint_priority.endpoint_id _endpoint_id.endpoint_uuid.uuid = endpoint_uuid _endpoint_id.device_id.device_uuid.uuid = device_uuid _endpoint_id.topology_id.topology_uuid.uuid = topology_uuid _endpoint_id.topology_id.context_id.context_uuid.uuid = context_uuid constraint.endpoint_priority.priority = priority return constraint def update_constraint_sla_availability(constraints, num_disjoint_paths : int, all_active : bool) -> Constraint: for constraint in constraints: if constraint.WhichOneof('constraint') != 'sla_availability': continue break # found, end loop else: # not found, add it constraint = constraints.add() # pylint: disable=no-member constraint.sla_availability.num_disjoint_paths = num_disjoint_paths constraint.sla_availability.all_active = all_active return constraint def copy_constraints(source_constraints, target_constraints): for source_constraint in source_constraints: constraint_kind = source_constraint.WhichOneof('constraint') if constraint_kind == 'custom': custom = source_constraint.custom constraint_type = custom.constraint_type constraint_value = json.loads(custom.constraint_value) raise_if_differs = True fields = {name:(value, raise_if_differs) for name,value in constraint_value.items()} update_constraint_custom(target_constraints, constraint_type, fields) elif constraint_kind == 'endpoint_location': endpoint_id = source_constraint.endpoint_location.endpoint_id location = source_constraint.endpoint_location.location location_kind = location.WhichOneof('location') if location_kind == 'region': region = location.region update_constraint_endpoint_location(target_constraints, endpoint_id, region=region) elif location_kind == 'gps_position': gps_position = location.gps_position gps_position = (gps_position.latitude, gps_position.longitude) update_constraint_endpoint_location(target_constraints, endpoint_id, gps_position=gps_position) else: str_constraint = grpc_message_to_json_string(source_constraint) raise NotImplementedError('Constraint({:s}): Location({:s})'.format(str_constraint, constraint_kind)) elif constraint_kind == 'endpoint_priority': endpoint_id = source_constraint.endpoint_priority.endpoint_id priority = source_constraint.endpoint_priority.priority update_constraint_endpoint_priority(target_constraints, endpoint_id, priority) elif constraint_kind == 'sla_availability': sla_availability = source_constraint.sla_availability num_disjoint_paths = sla_availability.num_disjoint_paths all_active = sla_availability.all_active update_constraint_sla_availability(target_constraints, num_disjoint_paths, all_active) else: raise NotImplementedError('Constraint({:s})'.format(grpc_message_to_json_string(source_constraint)))