Skip to content
ContextServiceServicerImpl.py 66.3 KiB
Newer Older
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


import grpc, json, logging, operator, sqlalchemy, threading, uuid
from sqlalchemy.orm import Session, contains_eager, selectinload, sessionmaker
from sqlalchemy.dialects.postgresql import UUID, insert
from sqlalchemy_cockroachdb import run_transaction
from typing import Dict, Iterator, List, Optional, Set, Tuple, Union
from common.message_broker.MessageBroker import MessageBroker
from common.orm.backend.Tools import key_to_str
from common.proto.context_pb2 import (
    Connection, ConnectionEvent, ConnectionId, ConnectionIdList, ConnectionList,
    Context, ContextEvent, ContextId, ContextIdList, ContextList,
    Device, DeviceEvent, DeviceId, DeviceIdList, DeviceList,
    Empty, EventTypeEnum,
    Link, LinkEvent, LinkId, LinkIdList, LinkList,
    Service, ServiceEvent, ServiceId, ServiceIdList, ServiceList,
    Slice, SliceEvent, SliceId, SliceIdList, SliceList,
    Topology, TopologyEvent, TopologyId, TopologyIdList, TopologyList,
    ConfigActionEnum, Constraint)
from common.proto.policy_pb2 import PolicyRuleIdList, PolicyRuleId, PolicyRuleList, PolicyRule
from common.proto.context_pb2_grpc import ContextServiceServicer
from common.proto.context_policy_pb2_grpc import ContextPolicyServiceServicer
from common.rpc_method_wrapper.Decorator import create_metrics, safe_and_metered_rpc_method
from common.rpc_method_wrapper.ServiceExceptions import InvalidArgumentException, NotFoundException
from common.tools.grpc.Tools import grpc_message_to_json, grpc_message_to_json_string
from context.service.Database import Database
from context.service.database.ConfigModel import (
    ConfigModel, ORM_ConfigActionEnum, ConfigRuleModel, grpc_config_rules_to_raw, update_config)
from context.service.database.ConnectionModel import ConnectionModel, set_path
from context.service.database.ConstraintModel import (
    ConstraintModel, ConstraintsModel, Union_ConstraintModel, CONSTRAINT_PARSERS, set_constraints)
from context.service.database.ContextModel import ContextModel
from context.service.database.DeviceModel import (
    DeviceModel, grpc_to_enum__device_operational_status, set_drivers, grpc_to_enum__device_driver, DriverModel)
from context.service.database.EndPointModel import EndPointModel, KpiSampleTypeModel, set_kpi_sample_types
from context.service.database.Events import notify_event
from context.service.database.KpiSampleType import grpc_to_enum__kpi_sample_type
from context.service.database.LinkModel import LinkModel
from context.service.database.PolicyRuleModel import PolicyRuleModel
from context.service.database.RelationModels import (
    ConnectionSubServiceModel, LinkEndPointModel, ServiceEndPointModel, SliceEndPointModel, SliceServiceModel,
    SliceSubSliceModel, TopologyDeviceModel, TopologyLinkModel)
from context.service.database.ServiceModel import (
    ServiceModel, grpc_to_enum__service_status, grpc_to_enum__service_type)
from context.service.database.SliceModel import SliceModel, grpc_to_enum__slice_status
from context.service.database.TopologyModel import TopologyModel
from .Constants import (
    CONSUME_TIMEOUT, TOPIC_CONNECTION, TOPIC_CONTEXT, TOPIC_DEVICE, TOPIC_LINK, TOPIC_SERVICE, TOPIC_SLICE,
    TOPIC_TOPOLOGY)

LOGGER = logging.getLogger(__name__)

SERVICE_NAME = 'Context'
METHOD_NAMES = [
    'ListConnectionIds', 'ListConnections', 'GetConnection', 'SetConnection', 'RemoveConnection', 'GetConnectionEvents',
    'ListContextIds',    'ListContexts',    'GetContext',    'SetContext',    'RemoveContext',    'GetContextEvents',
    'ListTopologyIds',   'ListTopologies',  'GetTopology',   'SetTopology',   'RemoveTopology',   'GetTopologyEvents',
    'ListDeviceIds',     'ListDevices',     'GetDevice',     'SetDevice',     'RemoveDevice',     'GetDeviceEvents',
    'ListLinkIds',       'ListLinks',       'GetLink',       'SetLink',       'RemoveLink',       'GetLinkEvents',
    'ListServiceIds',    'ListServices',    'GetService',    'SetService',    'RemoveService',    'GetServiceEvents',
    'ListSliceIds',      'ListSlices',      'GetSlice',      'SetSlice',      'RemoveSlice',      'GetSliceEvents',
    'ListPolicyRuleIds', 'ListPolicyRules', 'GetPolicyRule', 'SetPolicyRule', 'RemovePolicyRule',
    'UnsetService',      'UnsetSlice',
]
METRICS = create_metrics(SERVICE_NAME, METHOD_NAMES)

class ContextServiceServicerImpl(ContextServiceServicer, ContextPolicyServiceServicer):
    def __init__(self, db_engine : sqlalchemy.engine.Engine, messagebroker : MessageBroker) -> None:
        LOGGER.debug('Creating Servicer...')
        self.db_engine = db_engine
        #self.lock = threading.Lock()
        #session = sessionmaker(bind=db_engine, expire_on_commit=False)
        #self.session = session
        #self.database = Database(session)
        self.messagebroker = messagebroker
        LOGGER.debug('Servicer Created')

    # ----- Context ----------------------------------------------------------------------------------------------------

    @safe_and_metered_rpc_method(METRICS, LOGGER)
    def ListContextIds(self, request: Empty, context : grpc.ServicerContext) -> ContextIdList:
        def callback(session : Session) -> List[Dict]:
            obj_list : List[ContextModel] = session.query(ContextModel).all()
            return [obj.dump_id() for obj in obj_list]
        return ContextIdList(context_ids=run_transaction(sessionmaker(bind=self.db_engine), callback))

    @safe_and_metered_rpc_method(METRICS, LOGGER)
    def ListContexts(self, request: Empty, context : grpc.ServicerContext) -> ContextList:
        def callback(session : Session) -> List[Dict]:
            obj_list : List[ContextModel] = session.query(ContextModel).all()
            return [obj.dump() for obj in obj_list]
        return ContextList(contexts=run_transaction(sessionmaker(bind=self.db_engine), callback))

    @safe_and_metered_rpc_method(METRICS, LOGGER)
    def GetContext(self, request: ContextId, context : grpc.ServicerContext) -> Context:
        context_uuid = str(uuid.uuid5(uuid.NAMESPACE_OID, request.context_uuid.uuid))
        def callback(session : Session) -> Optional[Dict]:
            obj : Optional[ContextModel] = \
                session.query(ContextModel).filter_by(context_uuid=context_uuid).one_or_none()
            return None if obj is None else obj.dump()
        obj = run_transaction(sessionmaker(bind=self.db_engine), callback)
        if obj is None: raise NotFoundException(ContextModel.__name__.replace('Model', ''), context_uuid)
        return Context(**obj)

    @safe_and_metered_rpc_method(METRICS, LOGGER)
    def SetContext(self, request: Context, context : grpc.ServicerContext) -> ContextId:
        context_uuid = str(uuid.uuid5(uuid.NAMESPACE_OID, request.context_id.context_uuid.uuid))
        context_name = request.context_id.context_uuid.uuid

        for i, topology_id in enumerate(request.topology_ids):
            topology_context_uuid = topology_id.context_id.context_uuid.uuid
            if topology_context_uuid != context_uuid:
                raise InvalidArgumentException(
                    'request.topology_ids[{:d}].context_id.context_uuid.uuid'.format(i), topology_context_uuid,
                    ['should be == {:s}({:s})'.format('request.context_id.context_uuid.uuid', context_uuid)])

        for i, service_id in enumerate(request.service_ids):
            service_context_uuid = service_id.context_id.context_uuid.uuid
            if service_context_uuid != context_uuid:
                raise InvalidArgumentException(
                    'request.service_ids[{:d}].context_id.context_uuid.uuid'.format(i), service_context_uuid,
                    ['should be == {:s}({:s})'.format('request.context_id.context_uuid.uuid', context_uuid)])

        def callback(session : Session) -> Tuple[Optional[Dict], bool]:
            obj : Optional[ContextModel] = \
                session.query(ContextModel).with_for_update().filter_by(context_uuid=context_uuid).one_or_none()
            updated = obj is not None
            obj = ContextModel(context_uuid=context_uuid, context_name=context_name)
            session.merge(obj)
            session.commit()
            obj = session.get(ContextModel, {'context_uuid': context_uuid})
            return (None if obj is None else obj.dump_id()), updated

        obj_id,updated = run_transaction(sessionmaker(bind=self.db_engine), callback)
        if obj_id is None: raise NotFoundException(ContextModel.__name__.replace('Model', ''), context_uuid)

        #event_type = EventTypeEnum.EVENTTYPE_UPDATE if updated else EventTypeEnum.EVENTTYPE_CREATE
        #notify_event(self.messagebroker, TOPIC_CONTEXT, event_type, {'context_id': obj_id})
        return ContextId(**obj_id)

    @safe_and_metered_rpc_method(METRICS, LOGGER)
    def RemoveContext(self, request: ContextId, context : grpc.ServicerContext) -> Empty:
        context_uuid = str(uuid.uuid5(uuid.NAMESPACE_OID, request.context_uuid.uuid))

        def callback(session : Session) -> bool:
            num_deleted = session.query(ContextModel).filter_by(context_uuid=context_uuid).delete()
            return num_deleted > 0

        deleted = run_transaction(sessionmaker(bind=self.db_engine), callback)
        #if deleted:
        #    notify_event(self.messagebroker, TOPIC_CONTEXT, EventTypeEnum.EVENTTYPE_REMOVE, {'context_id': request})
        return Empty()

#    @safe_and_metered_rpc_method(METRICS, LOGGER)
#    def GetContextEvents(self, request: Empty, context : grpc.ServicerContext) -> Iterator[ContextEvent]:
#        for message in self.messagebroker.consume({TOPIC_CONTEXT}, consume_timeout=CONSUME_TIMEOUT):
#            yield ContextEvent(**json.loads(message.content))


    # ----- Topology ---------------------------------------------------------------------------------------------------

#    @safe_and_metered_rpc_method(METRICS, LOGGER)
#    def ListTopologyIds(self, request: ContextId, context : grpc.ServicerContext) -> TopologyIdList:
#        context_uuid = request.context_uuid.uuid
#
#        with self.session() as session:
#            result = session.query(ContextModel).options(selectinload(ContextModel.topology)).filter_by(context_uuid=context_uuid).one_or_none()
#            if not result:
#                raise NotFoundException(ContextModel.__name__.replace('Model', ''), context_uuid)
#
#            db_topologies = result.topology
#            return TopologyIdList(topology_ids=[db_topology.dump_id() for db_topology in db_topologies])
#
#    @safe_and_metered_rpc_method(METRICS, LOGGER)
#    def ListTopologies(self, request: ContextId, context : grpc.ServicerContext) -> TopologyList:
#        context_uuid = request.context_uuid.uuid
#
#        with self.session() as session:
#            result = session.query(ContextModel).options(selectinload(ContextModel.topology)).filter_by(
#                context_uuid=context_uuid).one_or_none()
#            if not result:
#                raise NotFoundException(ContextModel.__name__.replace('Model', ''), context_uuid)
#
#            db_topologies = result.topology
#            return TopologyList(topologies=[db_topology.dump() for db_topology in db_topologies])
#
#    @safe_and_metered_rpc_method(METRICS, LOGGER)
Loading
Loading full blame…