Skip to content
Snippets Groups Projects
Service.py 14.4 KiB
Newer Older
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import time
from sqlalchemy.dialects.postgresql import insert
from sqlalchemy.engine import Engine
from sqlalchemy.orm import Session, sessionmaker
from sqlalchemy_cockroachdb import run_transaction
from typing import Dict, List, Optional
from common.proto.context_pb2 import ContextId, Service, ServiceId, ServiceIdList, ServiceList
from common.rpc_method_wrapper.ServiceExceptions import InvalidArgumentException, NotFoundException
from context.service.database.models.ServiceModel import ServiceModel

def service_list_ids(db_engine : Engine, request : ContextId) -> ServiceIdList:
    context_uuid = request.context_uuid.uuid
    def callback(session : Session) -> List[Dict]:
        obj_list : List[ServiceModel] = session.query(ServiceModel).filter_by(context_uuid=context_uuid).all()
        #.options(selectinload(ContextModel.service)).filter_by(context_uuid=context_uuid).one_or_none()
        return [obj.dump_id() for obj in obj_list]
    return ServiceIdList(service_ids=run_transaction(sessionmaker(bind=db_engine), callback))

def service_list_objs(db_engine : Engine, request : ContextId) -> ServiceList:
    context_uuid = request.context_uuid.uuid
    def callback(session : Session) -> List[Dict]:
        obj_list : List[ServiceModel] = session.query(ServiceModel).filter_by(context_uuid=context_uuid).all()
        #.options(selectinload(ContextModel.service)).filter_by(context_uuid=context_uuid).one_or_none()
        return [obj.dump() for obj in obj_list]
    return ServiceList(services=run_transaction(sessionmaker(bind=db_engine), callback))

def service_get(db_engine : Engine, request : ServiceId) -> Service:
    context_uuid = request.context_id.context_uuid.uuid
    service_uuid = request.service_uuid.uuid

    def callback(session : Session) -> Optional[Dict]:
        obj : Optional[ServiceModel] = session.query(ServiceModel)\
            .filter_by(context_uuid=context_uuid, service_uuid=service_uuid).one_or_none()
        return None if obj is None else obj.dump()
    obj = run_transaction(sessionmaker(bind=db_engine), callback)
    if obj is None:
        obj_uuid = '{:s}/{:s}'.format(context_uuid, service_uuid)
        raise NotFoundException('Service', obj_uuid)
    return Service(**obj)

def service_set(db_engine : Engine, request : Service) -> bool:
    context_uuid = request.service_id.context_id.context_uuid.uuid
    service_uuid = request.service_id.service_uuid.uuid
    service_name = request.name

    for i,endpoint_id in enumerate(request.service_endpoint_ids):
        endpoint_context_uuid = endpoint_id.topology_id.context_id.context_uuid.uuid
        if len(endpoint_context_uuid) > 0 and context_uuid != endpoint_context_uuid:
            raise InvalidArgumentException(
                'request.service_endpoint_ids[{:d}].topology_id.context_id.context_uuid.uuid'.format(i),
                endpoint_context_uuid,
                ['should be == {:s}({:s})'.format('request.service_id.context_id.context_uuid.uuid', context_uuid)])


    def callback(session : Session) -> None:
        service_data = [{
            'context_uuid' : context_uuid,
            'service_uuid': service_uuid,
            'service_name': service_name,
            'created_at'   : time.time(),
        }]
        stmt = insert(ServiceModel).values(service_data)
        stmt = stmt.on_conflict_do_update(
            index_elements=[ServiceModel.context_uuid, ServiceModel.service_uuid],
            set_=dict(service_name = stmt.excluded.service_name)
        )
        session.execute(stmt)

    run_transaction(sessionmaker(bind=db_engine), callback)
    return False # TODO: improve and check if created/updated


#        # db_context : ContextModel = get_object(self.database, ContextModel, context_uuid)
#        db_context = session.query(ContextModel).filter_by(context_uuid=context_uuid).one_or_none()
#        # str_service_key = key_to_str([context_uuid, service_uuid])
#        constraints_result = self.set_constraints(service_uuid, 'constraints', request.service_constraints)
#        db_constraints = constraints_result[0][0]
#
#        config_rules = grpc_config_rules_to_raw(request.service_config.config_rules)
#        running_config_result = update_config(self.database, str_service_key, 'running', config_rules)
#        db_running_config = running_config_result[0][0]
#
#        result : Tuple[ServiceModel, bool] = update_or_create_object(self.database, ServiceModel, str_service_key, {
#            'context_fk'            : db_context,
#            'service_uuid'          : service_uuid,
#            'service_type'          : grpc_to_enum__service_type(request.service_type),
#            'service_constraints_fk': db_constraints,
#            'service_status'        : grpc_to_enum__service_status(request.service_status.service_status),
#            'service_config_fk'     : db_running_config,
#        })
#        db_service, updated = result
#
#        for i,endpoint_id in enumerate(request.service_endpoint_ids):
#            endpoint_uuid                  = endpoint_id.endpoint_uuid.uuid
#            endpoint_device_uuid           = endpoint_id.device_id.device_uuid.uuid
#            endpoint_topology_uuid         = endpoint_id.topology_id.topology_uuid.uuid
#            endpoint_topology_context_uuid = endpoint_id.topology_id.context_id.context_uuid.uuid
#
#            str_endpoint_key = key_to_str([endpoint_device_uuid, endpoint_uuid])
#            if len(endpoint_topology_context_uuid) > 0 and len(endpoint_topology_uuid) > 0:
#                str_topology_key = key_to_str([endpoint_topology_context_uuid, endpoint_topology_uuid])
#                str_endpoint_key = key_to_str([str_endpoint_key, str_topology_key], separator=':')
#
#            db_endpoint : EndPointModel = get_object(self.database, EndPointModel, str_endpoint_key)
#
#            str_service_endpoint_key = key_to_str([service_uuid, str_endpoint_key], separator='--')
#            result : Tuple[ServiceEndPointModel, bool] = get_or_create_object(
#                self.database, ServiceEndPointModel, str_service_endpoint_key, {
#                    'service_fk': db_service, 'endpoint_fk': db_endpoint})
#            #db_service_endpoint, service_endpoint_created = result
#
#        event_type = EventTypeEnum.EVENTTYPE_UPDATE if updated else EventTypeEnum.EVENTTYPE_CREATE
#        dict_service_id = db_service.dump_id()
#        notify_event(self.messagebroker, TOPIC_SERVICE, event_type, {'service_id': dict_service_id})
#        return ServiceId(**dict_service_id)
#    context_uuid = request.service_id.context_id.context_uuid.uuid
#    db_context : ContextModel = get_object(self.database, ContextModel, context_uuid)
#
#    for i,endpoint_id in enumerate(request.service_endpoint_ids):
#        endpoint_topology_context_uuid = endpoint_id.topology_id.context_id.context_uuid.uuid
#        if len(endpoint_topology_context_uuid) > 0 and context_uuid != endpoint_topology_context_uuid:
#            raise InvalidArgumentException(
#                'request.service_endpoint_ids[{:d}].topology_id.context_id.context_uuid.uuid'.format(i),
#                endpoint_topology_context_uuid,
#                ['should be == {:s}({:s})'.format(
#                    'request.service_id.context_id.context_uuid.uuid', context_uuid)])
#
#    service_uuid = request.service_id.service_uuid.uuid
#    str_service_key = key_to_str([context_uuid, service_uuid])
#
#    constraints_result = set_constraints(
#        self.database, str_service_key, 'service', request.service_constraints)
#    db_constraints = constraints_result[0][0]
#
#    running_config_rules = update_config(
#        self.database, str_service_key, 'service', request.service_config.config_rules)
#    db_running_config = running_config_rules[0][0]
#
#    result : Tuple[ServiceModel, bool] = update_or_create_object(self.database, ServiceModel, str_service_key, {
#        'context_fk'            : db_context,
#        'service_uuid'          : service_uuid,
#        'service_type'          : grpc_to_enum__service_type(request.service_type),
#        'service_constraints_fk': db_constraints,
#        'service_status'        : grpc_to_enum__service_status(request.service_status.service_status),
#        'service_config_fk'     : db_running_config,
#    })
#    db_service, updated = result
#
#    for i,endpoint_id in enumerate(request.service_endpoint_ids):
#        endpoint_uuid                  = endpoint_id.endpoint_uuid.uuid
#        endpoint_device_uuid           = endpoint_id.device_id.device_uuid.uuid
#        endpoint_topology_uuid         = endpoint_id.topology_id.topology_uuid.uuid
#        endpoint_topology_context_uuid = endpoint_id.topology_id.context_id.context_uuid.uuid
#
#        str_endpoint_key = key_to_str([endpoint_device_uuid, endpoint_uuid])
#        if len(endpoint_topology_context_uuid) > 0 and len(endpoint_topology_uuid) > 0:
#            str_topology_key = key_to_str([endpoint_topology_context_uuid, endpoint_topology_uuid])
#            str_endpoint_key = key_to_str([str_endpoint_key, str_topology_key], separator=':')
#
#        db_endpoint : EndPointModel = get_object(self.database, EndPointModel, str_endpoint_key)
#
#        str_service_endpoint_key = key_to_str([service_uuid, str_endpoint_key], separator='--')
#        result : Tuple[ServiceEndPointModel, bool] = get_or_create_object(
#            self.database, ServiceEndPointModel, str_service_endpoint_key, {
#                'service_fk': db_service, 'endpoint_fk': db_endpoint})
#        #db_service_endpoint, service_endpoint_created = result
#
#    event_type = EventTypeEnum.EVENTTYPE_UPDATE if updated else EventTypeEnum.EVENTTYPE_CREATE
#    dict_service_id = db_service.dump_id()
#    notify_event(self.messagebroker, TOPIC_SERVICE, event_type, {'service_id': dict_service_id})
#    return ServiceId(**dict_service_id)


#    def set_constraint(self, db_constraints: ConstraintsModel, grpc_constraint: Constraint, position: int
#    ) -> Tuple[Union_ConstraintModel, bool]:
#        with self.session() as session:
#
#            grpc_constraint_kind = str(grpc_constraint.WhichOneof('constraint'))
#
#            parser = CONSTRAINT_PARSERS.get(grpc_constraint_kind)
#            if parser is None:
#                raise NotImplementedError('Constraint of kind {:s} is not implemented: {:s}'.format(
#                    grpc_constraint_kind, grpc_message_to_json_string(grpc_constraint)))
#
#            # create specific constraint
#            constraint_class, str_constraint_id, constraint_data, constraint_kind = parser(grpc_constraint)
#            str_constraint_id = str(uuid.uuid4())
#            LOGGER.info('str_constraint_id: {}'.format(str_constraint_id))
#            # str_constraint_key_hash = fast_hasher(':'.join([constraint_kind.value, str_constraint_id]))
#            # str_constraint_key = key_to_str([db_constraints.pk, str_constraint_key_hash], separator=':')
#
#            # result : Tuple[Union_ConstraintModel, bool] = update_or_create_object(
#            #     database, constraint_class, str_constraint_key, constraint_data)
#            constraint_data[constraint_class.main_pk_name()] = str_constraint_id
#            db_new_constraint = constraint_class(**constraint_data)
#            result: Tuple[Union_ConstraintModel, bool] = self.database.create_or_update(db_new_constraint)
#            db_specific_constraint, updated = result
#
#            # create generic constraint
#            # constraint_fk_field_name = 'constraint_uuid'.format(constraint_kind.value)
#            constraint_data = {
#                'constraints_uuid': db_constraints.constraints_uuid, 'position': position, 'kind': constraint_kind
#            }
#
#            db_new_constraint = ConstraintModel(**constraint_data)
#            result: Tuple[Union_ConstraintModel, bool] = self.database.create_or_update(db_new_constraint)
#            db_constraint, updated = result
#
#            return db_constraint, updated
#
#    def set_constraints(self, service_uuid: str, constraints_name : str, grpc_constraints
#    ) -> List[Tuple[Union[ConstraintsModel, ConstraintModel], bool]]:
#        with self.session() as session:
#            # str_constraints_key = key_to_str([db_parent_pk, constraints_name], separator=':')
#            # result : Tuple[ConstraintsModel, bool] = get_or_create_object(database, ConstraintsModel, str_constraints_key)
#            result = session.query(ConstraintsModel).filter_by(constraints_uuid=service_uuid).one_or_none()
#            created = None
#            if result:
#                created = True
#            session.query(ConstraintsModel).filter_by(constraints_uuid=service_uuid).one_or_none()
#            db_constraints = ConstraintsModel(constraints_uuid=service_uuid)
#            session.add(db_constraints)
#
#            db_objects = [(db_constraints, created)]
#
#            for position,grpc_constraint in enumerate(grpc_constraints):
#                result : Tuple[ConstraintModel, bool] = self.set_constraint(
#                    db_constraints, grpc_constraint, position)
#                db_constraint, updated = result
#                db_objects.append((db_constraint, updated))
#
#            return db_objects

def service_delete(db_engine : Engine, request : ServiceId) -> bool:
    context_uuid = request.context_id.context_uuid.uuid
    service_uuid = request.service_uuid.uuid
    def callback(session : Session) -> bool:
        num_deleted = session.query(ServiceModel)\
            .filter_by(context_uuid=context_uuid, service_uuid=service_uuid).delete()
        return num_deleted > 0
    return run_transaction(sessionmaker(bind=db_engine), callback)

    # def delete(self) -> None:
    #     from .RelationModels import ServiceEndPointModel
    #     for db_service_endpoint_pk,_ in self.references(ServiceEndPointModel):
    #         ServiceEndPointModel(self.database, db_service_endpoint_pk).delete()
    #     super().delete()
    #     ConfigModel(self.database, self.service_config_fk).delete()
    #     ConstraintsModel(self.database, self.service_constraints_fk).delete()