Newer
Older
# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging, operator
from typing import Dict, List, Optional, Set, Tuple, Union
from common.orm.Database import Database
from common.orm.backend.Tools import key_to_str
from common.orm.fields.ForeignKeyField import ForeignKeyField
from common.orm.fields.IntegerField import IntegerField
from common.orm.fields.PrimaryKeyField import PrimaryKeyField
from common.orm.model.Model import Model
from common.orm.HighLevel import get_object, get_or_create_object, get_related_objects, update_or_create_object
from common.proto.context_pb2 import EndPointId
from .EndPointModel import EndPointModel
from .ServiceModel import ServiceModel
from sqlalchemy import Column, ForeignKey #, ForeignKeyConstraint
#from sqlalchemy.dialects.postgresql import UUID
from sqlalchemy.orm import relationship
from ._Base import _Base
def remove_dict_key(dictionary : Dict, key : str):
dictionary.pop(key, None)
return dictionary
from sqlalchemy import Column, Enum, ForeignKey, Integer, CheckConstraint
from typing import Dict, List
from common.orm.HighLevel import get_related_objects
from common.proto.context_pb2 import ServiceStatusEnum, ServiceTypeEnum
from .Tools import grpc_to_enum
from sqlalchemy.dialects.postgresql import UUID
from context.service.database.models._Base import Base
LOGGER = logging.getLogger(__name__)
class PathModel(Model): # pylint: disable=abstract-method
path_uuid = Column(UUID(as_uuid=False), primary_key=True, unique=True)
def delete(self) -> None:
for db_path_hop_pk,_ in self.references(PathHopModel):
PathHopModel(self.database, db_path_hop_pk).delete()
super().delete()
def dump(self) -> List[Dict]:
db_path_hop_pks = self.references(PathHopModel)
path_hops = [PathHopModel(self.database, pk).dump(include_position=True) for pk,_ in db_path_hop_pks]
path_hops = sorted(path_hops, key=operator.itemgetter('position'))
return [remove_dict_key(path_hop, 'position') for path_hop in path_hops]
class PathHopModel(Model): # pylint: disable=abstract-method
path_hop_uuid = Column(UUID(as_uuid=False), primary_key=True, unique=True)
path_uuid = Column(UUID(as_uuid=False), ForeignKey("Path.path_uuid"))
position = Column(Integer, CheckConstraint('position >= 0'), nullable=False)
endpoint_uuid = Column(UUID(as_uuid=False), ForeignKey("EndPoint.endpoint_uuid"))
def dump(self, include_position=True) -> Dict: # pylint: disable=arguments-differ
db_endpoint : EndPointModel = EndPointModel(self.database, self.endpoint_fk)
result = db_endpoint.dump_id()
if include_position: result['position'] = self.position
return result
class ConnectionModel(Model):
pk = PrimaryKeyField()
# connection_uuid = StringField(required=True, allow_empty=False)
connection_uuid = Column(UUID(as_uuid=False), primary_key=True, unique=True)
# service_fk = ForeignKeyField(ServiceModel, required=False)
service_uuid = Column(UUID(as_uuid=False), ForeignKey("Service.service_uuid"))
path_fk = ForeignKeyField(PathModel, required=True)
def delete(self) -> None:
# pylint: disable=import-outside-toplevel
from .RelationModels import ConnectionSubServiceModel
# Do not remove sub-services automatically. They are supported by real services, so Service component should
# deal with the correct removal workflow to deconfigure the devices.
for db_connection_sub_service_pk,_ in self.references(ConnectionSubServiceModel):
ConnectionSubServiceModel(self.database, db_connection_sub_service_pk).delete()
super().delete()
PathModel(self.database, self.path_fk).delete()
def dump_id(self) -> Dict:
return {
'connection_uuid': {'uuid': self.connection_uuid},
}
def dump_path_hops_endpoint_ids(self) -> List[Dict]:
return PathModel(self.database, self.path_fk).dump()
def dump_sub_service_ids(self) -> List[Dict]:
from .RelationModels import ConnectionSubServiceModel # pylint: disable=import-outside-toplevel
db_sub_services = get_related_objects(self, ConnectionSubServiceModel, 'sub_service_fk')
return [db_sub_service.dump_id() for db_sub_service in sorted(db_sub_services, key=operator.attrgetter('pk'))]
def dump(self, include_path=True, include_sub_service_ids=True) -> Dict: # pylint: disable=arguments-differ
result = {'connection_id': self.dump_id()}
if self.service_fk is not None:
result['service_id'] = ServiceModel(self.database, self.service_fk).dump_id()
if include_path: result['path_hops_endpoint_ids'] = self.dump_path_hops_endpoint_ids()
if include_sub_service_ids: result['sub_service_ids'] = self.dump_sub_service_ids()
return result
# class ConnectionSubServiceModel(Model):
# pk = PrimaryKeyField()
# connection_fk = ForeignKeyField(ConnectionModel)
# sub_service_fk = ForeignKeyField(ServiceModel)
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
def set_path_hop(
database : Database, db_path : PathModel, position : int, db_endpoint : EndPointModel
) -> Tuple[PathHopModel, bool]:
str_path_hop_key = key_to_str([db_path.pk, db_endpoint.pk], separator=':')
result : Tuple[PathHopModel, bool] = update_or_create_object(database, PathHopModel, str_path_hop_key, {
'path_fk': db_path, 'position': position, 'endpoint_fk': db_endpoint})
db_path_hop, updated = result
return db_path_hop, updated
def delete_path_hop(
database : Database, db_path : PathModel, db_path_hop_pk : str
) -> None:
db_path_hop : Optional[PathHopModel] = get_object(database, PathHopModel, db_path_hop_pk, raise_if_not_found=False)
if db_path_hop is None: return
db_path_hop.delete()
def delete_all_path_hops(
database : Database, db_path : PathHopModel
) -> None:
db_path_hop_pks = db_path.references(PathHopModel)
for pk,_ in db_path_hop_pks: PathHopModel(database, pk).delete()
def set_path(
database : Database, connection_uuid : str, raw_endpoint_ids : List[EndPointId], path_name : str = ''
) -> List[Union[PathModel, PathHopModel]]:
str_path_key = connection_uuid if len(path_name) == 0 else key_to_str([connection_uuid, path_name], separator=':')
result : Tuple[PathModel, bool] = get_or_create_object(database, PathModel, str_path_key)
db_path, created = result # pylint: disable=unused-variable
db_path_hop_pks : Set[str] = set(map(operator.itemgetter(0), db_path.references(PathHopModel)))
db_objects : List[Tuple[Union[PathModel, PathHopModel], bool]] = [db_path]
for position,endpoint_id in enumerate(raw_endpoint_ids):
endpoint_uuid = endpoint_id.endpoint_uuid.uuid
endpoint_device_uuid = endpoint_id.device_id.device_uuid.uuid
endpoint_topology_uuid = endpoint_id.topology_id.topology_uuid.uuid
endpoint_topology_context_uuid = endpoint_id.topology_id.context_id.context_uuid.uuid
str_endpoint_key = key_to_str([endpoint_device_uuid, endpoint_uuid])
if len(endpoint_topology_context_uuid) > 0 and len(endpoint_topology_uuid) > 0:
str_topology_key = key_to_str([endpoint_topology_context_uuid, endpoint_topology_uuid])
str_endpoint_key = key_to_str([str_endpoint_key, str_topology_key], separator=':')
db_endpoint : EndPointModel = get_object(database, EndPointModel, str_endpoint_key)
result : Tuple[PathHopModel, bool] = set_path_hop(database, db_path, position, db_endpoint)
db_path_hop, updated = result # pylint: disable=unused-variable
db_objects.append(db_path_hop)
db_path_hop_pks.discard(db_path_hop.instance_key)
for db_path_hop_pk in db_path_hop_pks: delete_path_hop(database, db_path, db_path_hop_pk)
return db_objects