# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/) # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import logging, operator from enum import Enum from typing import Dict, List, Optional, Tuple, Type, Union from common.orm.Database import Database from common.orm.HighLevel import get_object, get_or_create_object, update_or_create_object from common.orm.backend.Tools import key_to_str from common.orm.fields.BooleanField import BooleanField from common.orm.fields.EnumeratedField import EnumeratedField from common.orm.fields.FloatField import FloatField from common.orm.fields.ForeignKeyField import ForeignKeyField from common.orm.fields.IntegerField import IntegerField from common.orm.fields.PrimaryKeyField import PrimaryKeyField from common.orm.fields.StringField import StringField from common.orm.model.Model import Model from common.proto.context_pb2 import Constraint from common.tools.grpc.Tools import grpc_message_to_json_string from .EndPointModel import EndPointModel, get_endpoint from .Tools import fast_hasher, remove_dict_key LOGGER = logging.getLogger(__name__) class ConstraintsModel(Model): # pylint: disable=abstract-method pk = PrimaryKeyField() def delete(self) -> None: db_constraint_pks = self.references(ConstraintModel) for pk,_ in db_constraint_pks: ConstraintModel(self.database, pk).delete() super().delete() def dump(self) -> List[Dict]: db_constraint_pks = self.references(ConstraintModel) constraints = [ConstraintModel(self.database, pk).dump(include_position=True) for pk,_ in db_constraint_pks] constraints = sorted(constraints, key=operator.itemgetter('position')) return [remove_dict_key(constraint, 'position') for constraint in constraints] class ConstraintCustomModel(Model): # pylint: disable=abstract-method constraint_type = StringField(required=True, allow_empty=False) constraint_value = StringField(required=True, allow_empty=False) def dump(self) -> Dict: # pylint: disable=arguments-differ return {'custom': {'constraint_type': self.constraint_type, 'constraint_value': self.constraint_value}} class ConstraintEndpointLocationRegionModel(Model): # pylint: disable=abstract-method endpoint_fk = ForeignKeyField(EndPointModel) region = StringField(required=True, allow_empty=False) def dump(self) -> Dict: # pylint: disable=arguments-differ json_endpoint_id = EndPointModel(self.database, self.endpoint_fk).dump_id() return {'endpoint_location': {'endpoint_id': json_endpoint_id, 'location': {'region': self.region}}} class ConstraintEndpointLocationGpsPositionModel(Model): # pylint: disable=abstract-method endpoint_fk = ForeignKeyField(EndPointModel) latitude = FloatField(required=True, min_value=-90.0, max_value=90.0) longitude = FloatField(required=True, min_value=-180.0, max_value=180.0) def dump(self) -> Dict: # pylint: disable=arguments-differ gps_position = {'latitude': self.latitude, 'longitude': self.longitude} json_endpoint_id = EndPointModel(self.database, self.endpoint_fk).dump_id() return {'endpoint_location': {'endpoint_id': json_endpoint_id, 'location': {'gps_position': gps_position}}} class ConstraintEndpointPriorityModel(Model): # pylint: disable=abstract-method endpoint_fk = ForeignKeyField(EndPointModel) priority = IntegerField(required=True, min_value=0) def dump(self) -> Dict: # pylint: disable=arguments-differ json_endpoint_id = EndPointModel(self.database, self.endpoint_fk).dump_id() return {'endpoint_priority': {'endpoint_id': json_endpoint_id, 'priority': self.priority}} class ConstraintSlaAvailabilityModel(Model): # pylint: disable=abstract-method num_disjoint_paths = IntegerField(required=True, min_value=1) all_active = BooleanField(required=True) def dump(self) -> Dict: # pylint: disable=arguments-differ return {'sla_availability': {'num_disjoint_paths': self.num_disjoint_paths, 'all_active': self.all_active}} # enum values should match name of field in ConstraintModel class ConstraintKindEnum(Enum): CUSTOM = 'custom' ENDPOINT_LOCATION_REGION = 'ep_loc_region' ENDPOINT_LOCATION_GPSPOSITION = 'ep_loc_gpspos' ENDPOINT_PRIORITY = 'ep_priority' SLA_AVAILABILITY = 'sla_avail' Union_SpecificConstraint = Union[ ConstraintCustomModel, ConstraintEndpointLocationRegionModel, ConstraintEndpointLocationGpsPositionModel, ConstraintEndpointPriorityModel, ConstraintSlaAvailabilityModel, ] class ConstraintModel(Model): # pylint: disable=abstract-method pk = PrimaryKeyField() constraints_fk = ForeignKeyField(ConstraintsModel) kind = EnumeratedField(ConstraintKindEnum) position = IntegerField(min_value=0, required=True) constraint_custom_fk = ForeignKeyField(ConstraintCustomModel, required=False) constraint_ep_loc_region_fk = ForeignKeyField(ConstraintEndpointLocationRegionModel, required=False) constraint_ep_loc_gpspos_fk = ForeignKeyField(ConstraintEndpointLocationGpsPositionModel, required=False) constraint_ep_priority_fk = ForeignKeyField(ConstraintEndpointPriorityModel, required=False) constraint_sla_avail_fk = ForeignKeyField(ConstraintSlaAvailabilityModel, required=False) def delete(self) -> None: field_name = 'constraint_{:s}_fk'.format(str(self.kind.value)) specific_fk_value : Optional[ForeignKeyField] = getattr(self, field_name, None) if specific_fk_value is None: raise Exception('Unable to find constraint key for field_name({:s})'.format(field_name)) specific_fk_class = getattr(ConstraintModel, field_name, None) foreign_model_class : Model = specific_fk_class.foreign_model super().delete() get_object(self.database, foreign_model_class, str(specific_fk_value)).delete() def dump(self, include_position=True) -> Dict: # pylint: disable=arguments-differ field_name = 'constraint_{:s}_fk'.format(str(self.kind.value)) specific_fk_value : Optional[ForeignKeyField] = getattr(self, field_name, None) if specific_fk_value is None: raise Exception('Unable to find constraint key for field_name({:s})'.format(field_name)) specific_fk_class = getattr(ConstraintModel, field_name, None) foreign_model_class : Model = specific_fk_class.foreign_model constraint : Union_SpecificConstraint = get_object(self.database, foreign_model_class, str(specific_fk_value)) result = constraint.dump() if include_position: result['position'] = self.position return result Tuple_ConstraintSpecs = Tuple[Type, str, Dict, ConstraintKindEnum] def parse_constraint_custom(database : Database, grpc_constraint) -> Tuple_ConstraintSpecs: constraint_class = ConstraintCustomModel str_constraint_id = grpc_constraint.custom.constraint_type constraint_data = { 'constraint_type' : grpc_constraint.custom.constraint_type, 'constraint_value': grpc_constraint.custom.constraint_value, } return constraint_class, str_constraint_id, constraint_data, ConstraintKindEnum.CUSTOM def parse_constraint_endpoint_location(database : Database, grpc_constraint) -> Tuple_ConstraintSpecs: grpc_endpoint_id = grpc_constraint.endpoint_location.endpoint_id str_endpoint_key, db_endpoint = get_endpoint(database, grpc_endpoint_id) str_constraint_id = str_endpoint_key constraint_data = {'endpoint_fk': db_endpoint} grpc_location = grpc_constraint.endpoint_location.location location_kind = str(grpc_location.WhichOneof('location')) if location_kind == 'region': constraint_class = ConstraintEndpointLocationRegionModel constraint_data.update({'region': grpc_location.region}) return constraint_class, str_constraint_id, constraint_data, ConstraintKindEnum.ENDPOINT_LOCATION_REGION elif location_kind == 'gps_position': constraint_class = ConstraintEndpointLocationGpsPositionModel gps_position = grpc_location.gps_position constraint_data.update({'latitude': gps_position.latitude, 'longitude': gps_position.longitude}) return constraint_class, str_constraint_id, constraint_data, ConstraintKindEnum.ENDPOINT_LOCATION_GPSPOSITION else: MSG = 'Location kind {:s} in Constraint of kind endpoint_location is not implemented: {:s}' raise NotImplementedError(MSG.format(location_kind, grpc_message_to_json_string(grpc_constraint))) def parse_constraint_endpoint_priority(database : Database, grpc_constraint) -> Tuple_ConstraintSpecs: grpc_endpoint_id = grpc_constraint.endpoint_priority.endpoint_id str_endpoint_key, db_endpoint = get_endpoint(database, grpc_endpoint_id) constraint_class = ConstraintEndpointPriorityModel str_constraint_id = str_endpoint_key priority = grpc_constraint.endpoint_priority.priority constraint_data = {'endpoint_fk': db_endpoint, 'priority': priority} return constraint_class, str_constraint_id, constraint_data, ConstraintKindEnum.ENDPOINT_PRIORITY def parse_constraint_sla_availability(database : Database, grpc_constraint) -> Tuple_ConstraintSpecs: constraint_class = ConstraintSlaAvailabilityModel str_constraint_id = '' constraint_data = { 'num_disjoint_paths' : grpc_constraint.sla_availability.num_disjoint_paths, 'all_active': grpc_constraint.sla_availability.all_active, } return constraint_class, str_constraint_id, constraint_data, ConstraintKindEnum.SLA_AVAILABILITY CONSTRAINT_PARSERS = { 'custom' : parse_constraint_custom, 'endpoint_location' : parse_constraint_endpoint_location, 'endpoint_priority' : parse_constraint_endpoint_priority, 'sla_availability' : parse_constraint_sla_availability, } Union_ConstraintModel = Union[ ConstraintCustomModel, ConstraintEndpointLocationGpsPositionModel, ConstraintEndpointLocationRegionModel, ConstraintEndpointPriorityModel, ConstraintSlaAvailabilityModel ] def set_constraint( database : Database, db_constraints : ConstraintsModel, grpc_constraint : Constraint, position : int ) -> Tuple[Union_ConstraintModel, bool]: grpc_constraint_kind = str(grpc_constraint.WhichOneof('constraint')) parser = CONSTRAINT_PARSERS.get(grpc_constraint_kind) if parser is None: raise NotImplementedError('Constraint of kind {:s} is not implemented: {:s}'.format( grpc_constraint_kind, grpc_message_to_json_string(grpc_constraint))) # create specific constraint constraint_class, str_constraint_id, constraint_data, constraint_kind = parser(database, grpc_constraint) str_constraint_key_hash = fast_hasher(':'.join([constraint_kind.value, str_constraint_id])) str_constraint_key = key_to_str([db_constraints.pk, str_constraint_key_hash], separator=':') result : Tuple[Union_ConstraintModel, bool] = update_or_create_object( database, constraint_class, str_constraint_key, constraint_data) db_specific_constraint, updated = result # create generic constraint constraint_fk_field_name = 'constraint_{:s}_fk'.format(constraint_kind.value) constraint_data = { 'constraints_fk': db_constraints, 'position': position, 'kind': constraint_kind, constraint_fk_field_name: db_specific_constraint } result : Tuple[ConstraintModel, bool] = update_or_create_object( database, ConstraintModel, str_constraint_key, constraint_data) db_constraint, updated = result return db_constraint, updated def set_constraints( database : Database, db_parent_pk : str, constraints_name : str, grpc_constraints ) -> List[Tuple[Union[ConstraintsModel, ConstraintModel], bool]]: str_constraints_key = key_to_str([constraints_name, db_parent_pk], separator=':') result : Tuple[ConstraintsModel, bool] = get_or_create_object(database, ConstraintsModel, str_constraints_key) db_constraints, created = result db_objects = [(db_constraints, created)] for position,grpc_constraint in enumerate(grpc_constraints): result : Tuple[ConstraintModel, bool] = set_constraint( database, db_constraints, grpc_constraint, position) db_constraint, updated = result db_objects.append((db_constraint, updated)) return db_objects