Newer
Older
# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import grpc, json, logging, operator, sqlalchemy, threading, time, uuid
from sqlalchemy.orm import Session, contains_eager, selectinload, sessionmaker
from sqlalchemy.dialects.postgresql import UUID, insert
from sqlalchemy_cockroachdb import run_transaction
from typing import Dict, Iterator, List, Optional, Set, Tuple, Union
from common.message_broker.MessageBroker import MessageBroker
from common.proto.context_pb2 import (
Connection, ConnectionEvent, ConnectionId, ConnectionIdList, ConnectionList,
Context, ContextEvent, ContextId, ContextIdList, ContextList,
Device, DeviceEvent, DeviceId, DeviceIdList, DeviceList,
Empty, EventTypeEnum,
Link, LinkEvent, LinkId, LinkIdList, LinkList,
Service, ServiceEvent, ServiceId, ServiceIdList, ServiceList,
Slice, SliceEvent, SliceId, SliceIdList, SliceList,
Topology, TopologyEvent, TopologyId, TopologyIdList, TopologyList,
ConfigActionEnum, Constraint)
#from common.proto.policy_pb2 import PolicyRuleIdList, PolicyRuleId, PolicyRuleList, PolicyRule
from common.proto.context_pb2_grpc import ContextServiceServicer
from common.proto.context_policy_pb2_grpc import ContextPolicyServiceServicer
from common.tools.object_factory.Context import json_context_id
from common.rpc_method_wrapper.Decorator import create_metrics, safe_and_metered_rpc_method
from common.rpc_method_wrapper.ServiceExceptions import (
InvalidArgumentException, NotFoundException, OperationFailedException)
#from common.tools.grpc.Tools import grpc_message_to_json, grpc_message_to_json_string
#from context.service.Database import Database
#from context.service.database.ConfigModel import (
# ConfigModel, ORM_ConfigActionEnum, ConfigRuleModel, grpc_config_rules_to_raw, update_config)
#from context.service.database.ConnectionModel import ConnectionModel, set_path
#from context.service.database.ConstraintModel import (
# ConstraintModel, ConstraintsModel, Union_ConstraintModel, CONSTRAINT_PARSERS, set_constraints)
from context.service.database.ContextModel import ContextModel
from context.service.database.DeviceModel import (
DeviceModel, grpc_to_enum__device_operational_status, grpc_to_enum__device_driver)
from context.service.database.EndPointModel import EndPointModel, grpc_to_enum__kpi_sample_type
#from context.service.database.EndPointModel import EndPointModel, set_kpi_sample_types
#from context.service.database.Events import notify_event
#from context.service.database.LinkModel import LinkModel
#from context.service.database.PolicyRuleModel import PolicyRuleModel
from context.service.database.RelationModels import TopologyDeviceModel
# ConnectionSubServiceModel, LinkEndPointModel, ServiceEndPointModel, SliceEndPointModel, SliceServiceModel,
#from context.service.database.ServiceModel import (
# ServiceModel, grpc_to_enum__service_status, grpc_to_enum__service_type)
#from context.service.database.SliceModel import SliceModel, grpc_to_enum__slice_status
from context.service.database.TopologyModel import TopologyModel
#from .Constants import (
# CONSUME_TIMEOUT, TOPIC_CONNECTION, TOPIC_CONTEXT, TOPIC_DEVICE, TOPIC_LINK, TOPIC_SERVICE, TOPIC_SLICE,
# TOPIC_TOPOLOGY)
LOGGER = logging.getLogger(__name__)
SERVICE_NAME = 'Context'
METHOD_NAMES = [
'ListConnectionIds', 'ListConnections', 'GetConnection', 'SetConnection', 'RemoveConnection', 'GetConnectionEvents',
'ListContextIds', 'ListContexts', 'GetContext', 'SetContext', 'RemoveContext', 'GetContextEvents',
'ListTopologyIds', 'ListTopologies', 'GetTopology', 'SetTopology', 'RemoveTopology', 'GetTopologyEvents',
'ListDeviceIds', 'ListDevices', 'GetDevice', 'SetDevice', 'RemoveDevice', 'GetDeviceEvents',
'ListLinkIds', 'ListLinks', 'GetLink', 'SetLink', 'RemoveLink', 'GetLinkEvents',
'ListServiceIds', 'ListServices', 'GetService', 'SetService', 'RemoveService', 'GetServiceEvents',
'ListSliceIds', 'ListSlices', 'GetSlice', 'SetSlice', 'RemoveSlice', 'GetSliceEvents',
'ListPolicyRuleIds', 'ListPolicyRules', 'GetPolicyRule', 'SetPolicyRule', 'RemovePolicyRule',
'UnsetService', 'UnsetSlice',
]
METRICS = create_metrics(SERVICE_NAME, METHOD_NAMES)
class ContextServiceServicerImpl(ContextServiceServicer, ContextPolicyServiceServicer):
def __init__(self, db_engine : sqlalchemy.engine.Engine, messagebroker : MessageBroker) -> None:
LOGGER.debug('Creating Servicer...')
self.db_engine = db_engine
#self.lock = threading.Lock()
#session = sessionmaker(bind=db_engine, expire_on_commit=False)
#self.session = session
#self.database = Database(session)
self.messagebroker = messagebroker
LOGGER.debug('Servicer Created')
# ----- Context ----------------------------------------------------------------------------------------------------
@safe_and_metered_rpc_method(METRICS, LOGGER)
def ListContextIds(self, request : Empty, context : grpc.ServicerContext) -> ContextIdList:
def callback(session : Session) -> List[Dict]:
obj_list : List[ContextModel] = session.query(ContextModel).all()
#.options(selectinload(ContextModel.topology)).filter_by(context_uuid=context_uuid).one_or_none()
return [obj.dump_id() for obj in obj_list]
return ContextIdList(context_ids=run_transaction(sessionmaker(bind=self.db_engine), callback))
@safe_and_metered_rpc_method(METRICS, LOGGER)
def ListContexts(self, request : Empty, context : grpc.ServicerContext) -> ContextList:
def callback(session : Session) -> List[Dict]:
obj_list : List[ContextModel] = session.query(ContextModel).all()
#.options(selectinload(ContextModel.topology)).filter_by(context_uuid=context_uuid).one_or_none()
return [obj.dump() for obj in obj_list]
return ContextList(contexts=run_transaction(sessionmaker(bind=self.db_engine), callback))
@safe_and_metered_rpc_method(METRICS, LOGGER)
def GetContext(self, request : ContextId, context : grpc.ServicerContext) -> Context:
def callback(session : Session) -> Optional[Dict]:
obj : Optional[ContextModel] = session.query(ContextModel)\
.filter_by(context_uuid=context_uuid).one_or_none()
return None if obj is None else obj.dump()
obj = run_transaction(sessionmaker(bind=self.db_engine), callback)
if obj is None: raise NotFoundException(ContextModel.__name__.replace('Model', ''), context_uuid)
return Context(**obj)
@safe_and_metered_rpc_method(METRICS, LOGGER)
def SetContext(self, request : Context, context : grpc.ServicerContext) -> ContextId:
context_uuid = request.context_id.context_uuid.uuid
context_name = request.name
for i, topology_id in enumerate(request.topology_ids):
topology_context_uuid = topology_id.context_id.context_uuid.uuid
if topology_context_uuid != context_uuid:
raise InvalidArgumentException(
'request.topology_ids[{:d}].context_id.context_uuid.uuid'.format(i), topology_context_uuid,
['should be == {:s}({:s})'.format('request.context_id.context_uuid.uuid', context_uuid)])
for i, service_id in enumerate(request.service_ids):
service_context_uuid = service_id.context_id.context_uuid.uuid
if service_context_uuid != context_uuid:
raise InvalidArgumentException(
'request.service_ids[{:d}].context_id.context_uuid.uuid'.format(i), service_context_uuid,
['should be == {:s}({:s})'.format('request.context_id.context_uuid.uuid', context_uuid)])
for i, slice_id in enumerate(request.slice_ids):
slice_context_uuid = slice_id.context_id.context_uuid.uuid
if slice_context_uuid != context_uuid:
raise InvalidArgumentException(
'request.slice_ids[{:d}].context_id.context_uuid.uuid'.format(i), slice_context_uuid,
['should be == {:s}({:s})'.format('request.context_id.context_uuid.uuid', context_uuid)])
def callback(session : Session) -> Tuple[Optional[Dict], bool]:
obj : Optional[ContextModel] = session.query(ContextModel).with_for_update()\
.filter_by(context_uuid=context_uuid).one_or_none()
is_update = obj is not None
if is_update:
obj.context_name = context_name
session.merge(obj)
else:
session.add(ContextModel(context_uuid=context_uuid, context_name=context_name, created_at=time.time()))
obj : Optional[ContextModel] = session.query(ContextModel)\
.filter_by(context_uuid=context_uuid).one_or_none()
return (None if obj is None else obj.dump_id()), is_update
obj_id,updated = run_transaction(sessionmaker(bind=self.db_engine), callback)
if obj_id is None: raise NotFoundException(ContextModel.__name__.replace('Model', ''), context_uuid)
#event_type = EventTypeEnum.EVENTTYPE_UPDATE if updated else EventTypeEnum.EVENTTYPE_CREATE
#notify_event(self.messagebroker, TOPIC_CONTEXT, event_type, {'context_id': obj_id})
return ContextId(**obj_id)
@safe_and_metered_rpc_method(METRICS, LOGGER)
def RemoveContext(self, request : ContextId, context : grpc.ServicerContext) -> Empty:
def callback(session : Session) -> bool:
num_deleted = session.query(ContextModel).filter_by(context_uuid=context_uuid).delete()
return num_deleted > 0
deleted = run_transaction(sessionmaker(bind=self.db_engine), callback)
#if deleted:
# notify_event(self.messagebroker, TOPIC_CONTEXT, EventTypeEnum.EVENTTYPE_REMOVE, {'context_id': request})
return Empty()
def GetContextEvents(self, request : Empty, context : grpc.ServicerContext) -> Iterator[ContextEvent]:
#for message in self.messagebroker.consume({TOPIC_CONTEXT}, consume_timeout=CONSUME_TIMEOUT):
# yield ContextEvent(**json.loads(message.content))
#cf = ChangeFeedClient()
#ready = cf.initialize()
#if not ready: raise OperationFailedException('Initialize ChangeFeed')
#for timestamp, _, primary_key, is_delete, after in cf.get_changes('context'):
# if is_delete:
# event_type = EventTypeEnum.EVENTTYPE_REMOVE
# else:
# is_create = (timestamp - after.get('created_at')) < 1.0
# event_type = EventTypeEnum.EVENTTYPE_CREATE if is_create else EventTypeEnum.EVENTTYPE_UPDATE
# event = {
# 'event': {'timestamp': {'timestamp': timestamp}, 'event_type': event_type},
# 'context_id': json_context_id(primary_key[0]),
# }
# yield ContextEvent(**event)
Loading
Loading full blame…