# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/) # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import grpc, json, logging, sqlalchemy #from sqlalchemy.orm import Session, contains_eager, selectinload, sessionmaker #from sqlalchemy.dialects.postgresql import UUID, insert from typing import Iterator from common.message_broker.MessageBroker import MessageBroker #from common.orm.backend.Tools import key_to_str from common.proto.context_pb2 import ( Connection, ConnectionEvent, ConnectionId, ConnectionIdList, ConnectionList, Context, ContextEvent, ContextId, ContextIdList, ContextList, Device, DeviceEvent, DeviceId, DeviceIdList, DeviceList, Empty, Link, LinkEvent, LinkId, LinkIdList, LinkList, Service, ServiceEvent, ServiceId, ServiceIdList, ServiceList, Slice, SliceEvent, SliceId, SliceIdList, SliceList, Topology, TopologyEvent, TopologyId, TopologyIdList, TopologyList) #from common.proto.policy_pb2 import PolicyRuleIdList, PolicyRuleId, PolicyRuleList, PolicyRule from common.proto.context_pb2_grpc import ContextServiceServicer from common.proto.context_policy_pb2_grpc import ContextPolicyServiceServicer #from common.tools.object_factory.Context import json_context_id from common.rpc_method_wrapper.Decorator import create_metrics, safe_and_metered_rpc_method #from common.rpc_method_wrapper.ServiceExceptions import ( # InvalidArgumentException, NotFoundException, OperationFailedException) from .database.methods.Context import ( context_delete, context_get, context_list_ids, context_list_objs, context_set) from .database.methods.Device import ( device_delete, device_get, device_list_ids, device_list_objs, device_set) #from .database.methods.Link import link_delete, link_get, link_list_ids, link_list_objs, link_set #from .database.methods.Service import service_delete, service_get, service_list_ids, service_list_objs, service_set from .database.methods.Topology import ( topology_delete, topology_get, topology_list_ids, topology_list_objs, topology_set) #from common.tools.grpc.Tools import grpc_message_to_json, grpc_message_to_json_string #from context.service.Database import Database #from context.service.database.ConfigModel import ( # ConfigModel, ORM_ConfigActionEnum, ConfigRuleModel, grpc_config_rules_to_raw, update_config) #from context.service.database.ConnectionModel import ConnectionModel, set_path #from context.service.database.ConstraintModel import ( # ConstraintModel, ConstraintsModel, Union_ConstraintModel, CONSTRAINT_PARSERS, set_constraints) #from context.service.database.models.ContextModel import ContextModel #from context.service.database.models.DeviceModel import ( # DeviceModel, grpc_to_enum__device_operational_status, grpc_to_enum__device_driver) #from context.service.database.models.EndPointModel import EndPointModel, grpc_to_enum__kpi_sample_type #from context.service.database.EndPointModel import EndPointModel, set_kpi_sample_types #from context.service.database.Events import notify_event #from context.service.database.LinkModel import LinkModel #from context.service.database.PolicyRuleModel import PolicyRuleModel #from context.service.database.RelationModels import TopologyDeviceModel # ConnectionSubServiceModel, LinkEndPointModel, ServiceEndPointModel, SliceEndPointModel, SliceServiceModel, # SliceSubSliceModel, TopologyLinkModel) #from context.service.database.ServiceModel import ( # ServiceModel, grpc_to_enum__service_status, grpc_to_enum__service_type) #from context.service.database.SliceModel import SliceModel, grpc_to_enum__slice_status #from context.service.database.TopologyModel import TopologyModel from .Constants import ( CONSUME_TIMEOUT, TOPIC_CONNECTION, TOPIC_CONTEXT, TOPIC_DEVICE, TOPIC_LINK, #TOPIC_POLICY, TOPIC_SERVICE, TOPIC_SLICE, TOPIC_TOPOLOGY) #from .ChangeFeedClient import ChangeFeedClient LOGGER = logging.getLogger(__name__) SERVICE_NAME = 'Context' METHOD_NAMES = [ 'ListConnectionIds', 'ListConnections', 'GetConnection', 'SetConnection', 'RemoveConnection', 'GetConnectionEvents', 'ListContextIds', 'ListContexts', 'GetContext', 'SetContext', 'RemoveContext', 'GetContextEvents', 'ListTopologyIds', 'ListTopologies', 'GetTopology', 'SetTopology', 'RemoveTopology', 'GetTopologyEvents', 'ListDeviceIds', 'ListDevices', 'GetDevice', 'SetDevice', 'RemoveDevice', 'GetDeviceEvents', 'ListLinkIds', 'ListLinks', 'GetLink', 'SetLink', 'RemoveLink', 'GetLinkEvents', 'ListServiceIds', 'ListServices', 'GetService', 'SetService', 'RemoveService', 'GetServiceEvents', 'ListSliceIds', 'ListSlices', 'GetSlice', 'SetSlice', 'RemoveSlice', 'GetSliceEvents', 'ListPolicyRuleIds', 'ListPolicyRules', 'GetPolicyRule', 'SetPolicyRule', 'RemovePolicyRule', 'UnsetService', 'UnsetSlice', ] METRICS = create_metrics(SERVICE_NAME, METHOD_NAMES) class ContextServiceServicerImpl(ContextServiceServicer, ContextPolicyServiceServicer): def __init__(self, db_engine : sqlalchemy.engine.Engine, messagebroker : MessageBroker) -> None: LOGGER.debug('Creating Servicer...') self.db_engine = db_engine self.messagebroker = messagebroker LOGGER.debug('Servicer Created') def _get_metrics(self): return METRICS # ----- Context ---------------------------------------------------------------------------------------------------- @safe_and_metered_rpc_method(METRICS, LOGGER) def ListContextIds(self, request : Empty, context : grpc.ServicerContext) -> ContextIdList: return context_list_ids(self.db_engine) @safe_and_metered_rpc_method(METRICS, LOGGER) def ListContexts(self, request : Empty, context : grpc.ServicerContext) -> ContextList: return context_list_objs(self.db_engine) @safe_and_metered_rpc_method(METRICS, LOGGER) def GetContext(self, request : ContextId, context : grpc.ServicerContext) -> Context: return context_get(self.db_engine, request) @safe_and_metered_rpc_method(METRICS, LOGGER) def SetContext(self, request : Context, context : grpc.ServicerContext) -> ContextId: context_id,updated = context_set(self.db_engine, request) #event_type = EventTypeEnum.EVENTTYPE_UPDATE if updated else EventTypeEnum.EVENTTYPE_CREATE #notify_event(self.messagebroker, TOPIC_CONTEXT, event_type, {'context_id': context_id}) return context_id @safe_and_metered_rpc_method(METRICS, LOGGER) def RemoveContext(self, request : ContextId, context : grpc.ServicerContext) -> Empty: deleted = context_delete(self.db_engine, request) #if deleted: # notify_event(self.messagebroker, TOPIC_CONTEXT, EventTypeEnum.EVENTTYPE_REMOVE, {'context_id': request}) return Empty() @safe_and_metered_rpc_method(METRICS, LOGGER) def GetContextEvents(self, request : Empty, context : grpc.ServicerContext) -> Iterator[ContextEvent]: for message in self.messagebroker.consume({TOPIC_CONTEXT}, consume_timeout=CONSUME_TIMEOUT): yield ContextEvent(**json.loads(message.content)) # ----- Topology --------------------------------------------------------------------------------------------------- @safe_and_metered_rpc_method(METRICS, LOGGER) def ListTopologyIds(self, request : ContextId, context : grpc.ServicerContext) -> TopologyIdList: return topology_list_ids(self.db_engine, request) @safe_and_metered_rpc_method(METRICS, LOGGER) def ListTopologies(self, request : ContextId, context : grpc.ServicerContext) -> TopologyList: return topology_list_objs(self.db_engine, request) @safe_and_metered_rpc_method(METRICS, LOGGER) def GetTopology(self, request : TopologyId, context : grpc.ServicerContext) -> Topology: return topology_get(self.db_engine, request) @safe_and_metered_rpc_method(METRICS, LOGGER) def SetTopology(self, request : Topology, context : grpc.ServicerContext) -> TopologyId: topology_id,updated = topology_set(self.db_engine, request) #event_type = EventTypeEnum.EVENTTYPE_UPDATE if updated else EventTypeEnum.EVENTTYPE_CREATE #notify_event(self.messagebroker, TOPIC_TOPOLOGY, event_type, {'topology_id': topology_id}) return topology_id @safe_and_metered_rpc_method(METRICS, LOGGER) def RemoveTopology(self, request : TopologyId, context : grpc.ServicerContext) -> Empty: deleted = topology_delete(self.db_engine, request) #if deleted: # notify_event(self.messagebroker, TOPIC_TOPOLOGY, EventTypeEnum.EVENTTYPE_REMOVE, {'topology_id': request}) return Empty() @safe_and_metered_rpc_method(METRICS, LOGGER) def GetTopologyEvents(self, request : Empty, context : grpc.ServicerContext) -> Iterator[TopologyEvent]: for message in self.messagebroker.consume({TOPIC_TOPOLOGY}, consume_timeout=CONSUME_TIMEOUT): yield TopologyEvent(**json.loads(message.content)) # ----- Device ----------------------------------------------------------------------------------------------------- @safe_and_metered_rpc_method(METRICS, LOGGER) def ListDeviceIds(self, request : Empty, context : grpc.ServicerContext) -> DeviceIdList: return device_list_ids(self.db_engine) @safe_and_metered_rpc_method(METRICS, LOGGER) def ListDevices(self, request : Empty, context : grpc.ServicerContext) -> DeviceList: return device_list_objs(self.db_engine) @safe_and_metered_rpc_method(METRICS, LOGGER) def GetDevice(self, request : ContextId, context : grpc.ServicerContext) -> Device: return device_get(self.db_engine, request) @safe_and_metered_rpc_method(METRICS, LOGGER) def SetDevice(self, request : Device, context : grpc.ServicerContext) -> DeviceId: device_id,updated = device_set(self.db_engine, request) #event_type = EventTypeEnum.EVENTTYPE_UPDATE if updated else EventTypeEnum.EVENTTYPE_CREATE #notify_event(self.messagebroker, TOPIC_DEVICE, event_type, {'device_id': device_id}) return device_id @safe_and_metered_rpc_method(METRICS, LOGGER) def RemoveDevice(self, request : DeviceId, context : grpc.ServicerContext) -> Empty: deleted = device_delete(self.db_engine, request) #if deleted: # notify_event(self.messagebroker, TOPIC_DEVICE, EventTypeEnum.EVENTTYPE_REMOVE, {'device_id': request}) return Empty() @safe_and_metered_rpc_method(METRICS, LOGGER) def GetDeviceEvents(self, request : Empty, context : grpc.ServicerContext) -> Iterator[DeviceEvent]: for message in self.messagebroker.consume({TOPIC_DEVICE}, consume_timeout=CONSUME_TIMEOUT): yield DeviceEvent(**json.loads(message.content)) # ----- Link ------------------------------------------------------------------------------------------------------- # @safe_and_metered_rpc_method(METRICS, LOGGER) # def ListLinkIds(self, request : Empty, context : grpc.ServicerContext) -> LinkIdList: # return link_list_ids(self.db_engine) # @safe_and_metered_rpc_method(METRICS, LOGGER) # def ListLinks(self, request : Empty, context : grpc.ServicerContext) -> LinkList: # return link_list_objs(self.db_engine) # @safe_and_metered_rpc_method(METRICS, LOGGER) # def GetLink(self, request : LinkId, context : grpc.ServicerContext) -> Link: # return link_get(self.db_engine, request) # @safe_and_metered_rpc_method(METRICS, LOGGER) # def SetLink(self, request : Link, context : grpc.ServicerContext) -> LinkId: # link_id,updated = link_set(self.db_engine, request) # #event_type = EventTypeEnum.EVENTTYPE_UPDATE if updated else EventTypeEnum.EVENTTYPE_CREATE # #notify_event(self.messagebroker, TOPIC_LINK, event_type, {'link_id': link_id}) # return link_id # @safe_and_metered_rpc_method(METRICS, LOGGER) # def RemoveLink(self, request : LinkId, context : grpc.ServicerContext) -> Empty: # deleted = link_delete(self.db_engine, request) # #if deleted: # # notify_event(self.messagebroker, TOPIC_LINK, event_type, {'link_id': dict_link_id}) # return Empty() @safe_and_metered_rpc_method(METRICS, LOGGER) def GetLinkEvents(self, request : Empty, context : grpc.ServicerContext) -> Iterator[LinkEvent]: for message in self.messagebroker.consume({TOPIC_LINK}, consume_timeout=CONSUME_TIMEOUT): yield LinkEvent(**json.loads(message.content)) # ----- Service ---------------------------------------------------------------------------------------------------- # @safe_and_metered_rpc_method(METRICS, LOGGER) # def ListServiceIds(self, request : ContextId, context : grpc.ServicerContext) -> ServiceIdList: # return service_list_ids(self.db_engine, request) # @safe_and_metered_rpc_method(METRICS, LOGGER) # def ListServices(self, request : ContextId, context : grpc.ServicerContext) -> ServiceList: # return service_list_objs(self.db_engine, request) # @safe_and_metered_rpc_method(METRICS, LOGGER) # def GetService(self, request : ServiceId, context : grpc.ServicerContext) -> Service: # return service_get(self.db_engine, request) # @safe_and_metered_rpc_method(METRICS, LOGGER) # def SetService(self, request : Service, context : grpc.ServicerContext) -> ServiceId: # service_id,updated = service_set(self.db_engine, request) # #event_type = EventTypeEnum.EVENTTYPE_UPDATE if updated else EventTypeEnum.EVENTTYPE_CREATE # #notify_event(self.messagebroker, TOPIC_SERVICE, event_type, {'service_id': service_id}) # return service_id # @safe_and_metered_rpc_method(METRICS, LOGGER) # def RemoveService(self, request : ServiceId, context : grpc.ServicerContext) -> Empty: # deleted = service_delete(self.db_engine, request) # #if deleted: # # notify_event(self.messagebroker, TOPIC_SERVICE, EventTypeEnum.EVENTTYPE_REMOVE, {'service_id': request}) # return Empty() @safe_and_metered_rpc_method(METRICS, LOGGER) def GetServiceEvents(self, request : Empty, context : grpc.ServicerContext) -> Iterator[ServiceEvent]: for message in self.messagebroker.consume({TOPIC_SERVICE}, consume_timeout=CONSUME_TIMEOUT): yield ServiceEvent(**json.loads(message.content)) # ----- Slice ---------------------------------------------------------------------------------------------------- # @safe_and_metered_rpc_method(METRICS, LOGGER) # def ListSliceIds(self, request : ContextId, context : grpc.ServicerContext) -> SliceIdList: # with self.lock: # db_context : ContextModel = get_object(self.database, ContextModel, request.context_uuid.uuid) # db_slices : Set[SliceModel] = get_related_objects(db_context, SliceModel) # db_slices = sorted(db_slices, key=operator.attrgetter('pk')) # return SliceIdList(slice_ids=[db_slice.dump_id() for db_slice in db_slices]) # @safe_and_metered_rpc_method(METRICS, LOGGER) # def ListSlices(self, request : ContextId, context : grpc.ServicerContext) -> SliceList: # with self.lock: # db_context : ContextModel = get_object(self.database, ContextModel, request.context_uuid.uuid) # db_slices : Set[SliceModel] = get_related_objects(db_context, SliceModel) # db_slices = sorted(db_slices, key=operator.attrgetter('pk')) # return SliceList(slices=[db_slice.dump() for db_slice in db_slices]) # @safe_and_metered_rpc_method(METRICS, LOGGER) # def GetSlice(self, request : SliceId, context : grpc.ServicerContext) -> Slice: # with self.lock: # str_key = key_to_str([request.context_id.context_uuid.uuid, request.slice_uuid.uuid]) # db_slice : SliceModel = get_object(self.database, SliceModel, str_key) # return Slice(**db_slice.dump( # include_endpoint_ids=True, include_constraints=True, include_config_rules=True, # include_service_ids=True, include_subslice_ids=True)) # @safe_and_metered_rpc_method(METRICS, LOGGER) # def SetSlice(self, request : Slice, context : grpc.ServicerContext) -> SliceId: # with self.lock: # context_uuid = request.slice_id.context_id.context_uuid.uuid # db_context : ContextModel = get_object(self.database, ContextModel, context_uuid) # # for i,endpoint_id in enumerate(request.slice_endpoint_ids): # endpoint_topology_context_uuid = endpoint_id.topology_id.context_id.context_uuid.uuid # if len(endpoint_topology_context_uuid) > 0 and context_uuid != endpoint_topology_context_uuid: # raise InvalidArgumentException( # 'request.slice_endpoint_ids[{:d}].topology_id.context_id.context_uuid.uuid'.format(i), # endpoint_topology_context_uuid, # ['should be == {:s}({:s})'.format( # 'request.slice_id.context_id.context_uuid.uuid', context_uuid)]) # # slice_uuid = request.slice_id.slice_uuid.uuid # str_slice_key = key_to_str([context_uuid, slice_uuid]) # # constraints_result = set_constraints( # self.database, str_slice_key, 'slice', request.slice_constraints) # db_constraints = constraints_result[0][0] # # running_config_rules = update_config( # self.database, str_slice_key, 'slice', request.slice_config.config_rules) # db_running_config = running_config_rules[0][0] # # result : Tuple[SliceModel, bool] = update_or_create_object(self.database, SliceModel, str_slice_key, { # 'context_fk' : db_context, # 'slice_uuid' : slice_uuid, # 'slice_constraints_fk': db_constraints, # 'slice_status' : grpc_to_enum__slice_status(request.slice_status.slice_status), # 'slice_config_fk' : db_running_config, # 'slice_owner_uuid' : request.slice_owner.owner_uuid.uuid, # 'slice_owner_string' : request.slice_owner.owner_string, # }) # db_slice, updated = result # # for i,endpoint_id in enumerate(request.slice_endpoint_ids): # endpoint_uuid = endpoint_id.endpoint_uuid.uuid # endpoint_device_uuid = endpoint_id.device_id.device_uuid.uuid # endpoint_topology_uuid = endpoint_id.topology_id.topology_uuid.uuid # endpoint_topology_context_uuid = endpoint_id.topology_id.context_id.context_uuid.uuid # # str_endpoint_key = key_to_str([endpoint_device_uuid, endpoint_uuid]) # if len(endpoint_topology_context_uuid) > 0 and len(endpoint_topology_uuid) > 0: # str_topology_key = key_to_str([endpoint_topology_context_uuid, endpoint_topology_uuid]) # str_endpoint_key = key_to_str([str_endpoint_key, str_topology_key], separator=':') # # db_endpoint : EndPointModel = get_object(self.database, EndPointModel, str_endpoint_key) # # str_slice_endpoint_key = key_to_str([str_slice_key, str_endpoint_key], separator='--') # result : Tuple[SliceEndPointModel, bool] = get_or_create_object( # self.database, SliceEndPointModel, str_slice_endpoint_key, { # 'slice_fk': db_slice, 'endpoint_fk': db_endpoint}) # #db_slice_endpoint, slice_endpoint_created = result # # for i,service_id in enumerate(request.slice_service_ids): # service_uuid = service_id.service_uuid.uuid # service_context_uuid = service_id.context_id.context_uuid.uuid # str_service_key = key_to_str([service_context_uuid, service_uuid]) # db_service : ServiceModel = get_object(self.database, ServiceModel, str_service_key) # # str_slice_service_key = key_to_str([str_slice_key, str_service_key], separator='--') # result : Tuple[SliceServiceModel, bool] = get_or_create_object( # self.database, SliceServiceModel, str_slice_service_key, { # 'slice_fk': db_slice, 'service_fk': db_service}) # #db_slice_service, slice_service_created = result # # for i,subslice_id in enumerate(request.slice_subslice_ids): # subslice_uuid = subslice_id.slice_uuid.uuid # subslice_context_uuid = subslice_id.context_id.context_uuid.uuid # str_subslice_key = key_to_str([subslice_context_uuid, subslice_uuid]) # db_subslice : SliceModel = get_object(self.database, SliceModel, str_subslice_key) # # str_slice_subslice_key = key_to_str([str_slice_key, str_subslice_key], separator='--') # result : Tuple[SliceSubSliceModel, bool] = get_or_create_object( # self.database, SliceSubSliceModel, str_slice_subslice_key, { # 'slice_fk': db_slice, 'sub_slice_fk': db_subslice}) # #db_slice_subslice, slice_subslice_created = result # # event_type = EventTypeEnum.EVENTTYPE_UPDATE if updated else EventTypeEnum.EVENTTYPE_CREATE # dict_slice_id = db_slice.dump_id() # notify_event(self.messagebroker, TOPIC_SLICE, event_type, {'slice_id': dict_slice_id}) # return SliceId(**dict_slice_id) # @safe_and_metered_rpc_method(METRICS, LOGGER) # def UnsetSlice(self, request : Slice, context : grpc.ServicerContext) -> SliceId: # with self.lock: # context_uuid = request.slice_id.context_id.context_uuid.uuid # db_context : ContextModel = get_object(self.database, ContextModel, context_uuid) # # for i,endpoint_id in enumerate(request.slice_endpoint_ids): # endpoint_topology_context_uuid = endpoint_id.topology_id.context_id.context_uuid.uuid # if len(endpoint_topology_context_uuid) > 0 and context_uuid != endpoint_topology_context_uuid: # raise InvalidArgumentException( # 'request.slice_endpoint_ids[{:d}].topology_id.context_id.context_uuid.uuid'.format(i), # endpoint_topology_context_uuid, # ['should be == {:s}({:s})'.format( # 'request.slice_id.context_id.context_uuid.uuid', context_uuid)]) # # slice_uuid = request.slice_id.slice_uuid.uuid # str_slice_key = key_to_str([context_uuid, slice_uuid]) # # if len(request.slice_constraints) > 0: # raise NotImplementedError('UnsetSlice: removal of constraints') # if len(request.slice_config.config_rules) > 0: # raise NotImplementedError('UnsetSlice: removal of config rules') # if len(request.slice_endpoint_ids) > 0: # raise NotImplementedError('UnsetSlice: removal of endpoints') # # updated = False # # for service_id in request.slice_service_ids: # service_uuid = service_id.service_uuid.uuid # service_context_uuid = service_id.context_id.context_uuid.uuid # str_service_key = key_to_str([service_context_uuid, service_uuid]) # str_slice_service_key = key_to_str([str_slice_key, str_service_key], separator='--') # SliceServiceModel(self.database, str_slice_service_key).delete() # updated = True # # for subslice_id in request.slice_subslice_ids: # subslice_uuid = subslice_id.slice_uuid.uuid # subslice_context_uuid = subslice_id.context_id.context_uuid.uuid # str_subslice_key = key_to_str([subslice_context_uuid, subslice_uuid]) # str_slice_subslice_key = key_to_str([str_slice_key, str_subslice_key], separator='--') # SliceSubSliceModel(self.database, str_slice_subslice_key).delete() # updated = True # # event_type = EventTypeEnum.EVENTTYPE_UPDATE if updated else EventTypeEnum.EVENTTYPE_CREATE # db_slice : SliceModel = get_object(self.database, SliceModel, str_slice_key) # dict_slice_id = db_slice.dump_id() # notify_event(self.messagebroker, TOPIC_SLICE, event_type, {'slice_id': dict_slice_id}) # return SliceId(**dict_slice_id) # @safe_and_metered_rpc_method(METRICS, LOGGER) # def RemoveSlice(self, request : SliceId, context : grpc.ServicerContext) -> Empty: # with self.lock: # context_uuid = request.context_id.context_uuid.uuid # slice_uuid = request.slice_uuid.uuid # db_slice = SliceModel(self.database, key_to_str([context_uuid, slice_uuid]), auto_load=False) # found = db_slice.load() # if not found: return Empty() # # dict_slice_id = db_slice.dump_id() # db_slice.delete() # # event_type = EventTypeEnum.EVENTTYPE_REMOVE # notify_event(self.messagebroker, TOPIC_SLICE, event_type, {'slice_id': dict_slice_id}) # return Empty() @safe_and_metered_rpc_method(METRICS, LOGGER) def GetSliceEvents(self, request : Empty, context : grpc.ServicerContext) -> Iterator[SliceEvent]: for message in self.messagebroker.consume({TOPIC_SLICE}, consume_timeout=CONSUME_TIMEOUT): yield SliceEvent(**json.loads(message.content)) # ----- Connection ------------------------------------------------------------------------------------------------- # @safe_and_metered_rpc_method(METRICS, LOGGER) # def ListConnectionIds(self, request : ServiceId, context : grpc.ServicerContext) -> ConnectionIdList: # with self.session() as session: # result = session.query(DeviceModel).all() # return DeviceIdList(device_ids=[device.dump_id() for device in result]) # # with self.lock: # str_key = key_to_str([request.context_id.context_uuid.uuid, request.service_uuid.uuid]) # db_service : ServiceModel = get_object(self.database, ServiceModel, str_key) # db_connections : Set[ConnectionModel] = get_related_objects(db_service, ConnectionModel) # db_connections = sorted(db_connections, key=operator.attrgetter('pk')) # return ConnectionIdList(connection_ids=[db_connection.dump_id() for db_connection in db_connections]) # @safe_and_metered_rpc_method(METRICS, LOGGER) # def ListConnections(self, request : ContextId, context : grpc.ServicerContext) -> ServiceList: # with self.lock: # str_key = key_to_str([request.context_id.context_uuid.uuid, request.service_uuid.uuid]) # db_service : ServiceModel = get_object(self.database, ServiceModel, str_key) # db_connections : Set[ConnectionModel] = get_related_objects(db_service, ConnectionModel) # db_connections = sorted(db_connections, key=operator.attrgetter('pk')) # return ConnectionList(connections=[db_connection.dump() for db_connection in db_connections]) # @safe_and_metered_rpc_method(METRICS, LOGGER) # def GetConnection(self, request : ConnectionId, context : grpc.ServicerContext) -> Connection: # with self.lock: # db_connection : ConnectionModel = get_object(self.database, ConnectionModel, request.connection_uuid.uuid) # return Connection(**db_connection.dump(include_path=True, include_sub_service_ids=True)) # @safe_and_metered_rpc_method(METRICS, LOGGER) # def SetConnection(self, request : Connection, context : grpc.ServicerContext) -> ConnectionId: # with self.lock: # connection_uuid = request.connection_id.connection_uuid.uuid # # connection_attributes = {'connection_uuid': connection_uuid} # # service_context_uuid = request.service_id.context_id.context_uuid.uuid # service_uuid = request.service_id.service_uuid.uuid # if len(service_context_uuid) > 0 and len(service_uuid) > 0: # str_service_key = key_to_str([service_context_uuid, service_uuid]) # db_service : ServiceModel = get_object(self.database, ServiceModel, str_service_key) # connection_attributes['service_fk'] = db_service # # path_hops_result = set_path(self.database, connection_uuid, request.path_hops_endpoint_ids, path_name = '') # db_path = path_hops_result[0] # connection_attributes['path_fk'] = db_path # # result : Tuple[ConnectionModel, bool] = update_or_create_object( # self.database, ConnectionModel, connection_uuid, connection_attributes) # db_connection, updated = result # # for sub_service_id in request.sub_service_ids: # sub_service_uuid = sub_service_id.service_uuid.uuid # sub_service_context_uuid = sub_service_id.context_id.context_uuid.uuid # str_sub_service_key = key_to_str([sub_service_context_uuid, sub_service_uuid]) # db_service : ServiceModel = get_object(self.database, ServiceModel, str_sub_service_key) # # str_connection_sub_service_key = key_to_str([connection_uuid, str_sub_service_key], separator='--') # result : Tuple[ConnectionSubServiceModel, bool] = get_or_create_object( # self.database, ConnectionSubServiceModel, str_connection_sub_service_key, { # 'connection_fk': db_connection, 'sub_service_fk': db_service}) # #db_connection_sub_service, connection_sub_service_created = result # # event_type = EventTypeEnum.EVENTTYPE_UPDATE if updated else EventTypeEnum.EVENTTYPE_CREATE # dict_connection_id = db_connection.dump_id() # notify_event(self.messagebroker, TOPIC_CONNECTION, event_type, {'connection_id': dict_connection_id}) # return ConnectionId(**dict_connection_id) # @safe_and_metered_rpc_method(METRICS, LOGGER) # def RemoveConnection(self, request : ConnectionId, context : grpc.ServicerContext) -> Empty: # with self.lock: # db_connection = ConnectionModel(self.database, request.connection_uuid.uuid, auto_load=False) # found = db_connection.load() # if not found: return Empty() # # dict_connection_id = db_connection.dump_id() # db_connection.delete() # # event_type = EventTypeEnum.EVENTTYPE_REMOVE # notify_event(self.messagebroker, TOPIC_CONNECTION, event_type, {'connection_id': dict_connection_id}) # return Empty() @safe_and_metered_rpc_method(METRICS, LOGGER) def GetConnectionEvents(self, request : Empty, context : grpc.ServicerContext) -> Iterator[ConnectionEvent]: for message in self.messagebroker.consume({TOPIC_CONNECTION}, consume_timeout=CONSUME_TIMEOUT): yield ConnectionEvent(**json.loads(message.content)) # ----- Policy ----------------------------------------------------------------------------------------------------- # @safe_and_metered_rpc_method(METRICS, LOGGER) # def ListPolicyRuleIds(self, request : Empty, context: grpc.ServicerContext) -> PolicyRuleIdList: # with self.lock: # db_policy_rules: List[PolicyRuleModel] = get_all_objects(self.database, PolicyRuleModel) # db_policy_rules = sorted(db_policy_rules, key=operator.attrgetter('pk')) # return PolicyRuleIdList(policyRuleIdList=[db_policy_rule.dump_id() for db_policy_rule in db_policy_rules]) # @safe_and_metered_rpc_method(METRICS, LOGGER) # def ListPolicyRules(self, request : Empty, context: grpc.ServicerContext) -> PolicyRuleList: # with self.lock: # db_policy_rules: List[PolicyRuleModel] = get_all_objects(self.database, PolicyRuleModel) # db_policy_rules = sorted(db_policy_rules, key=operator.attrgetter('pk')) # return PolicyRuleList(policyRules=[db_policy_rule.dump() for db_policy_rule in db_policy_rules]) # @safe_and_metered_rpc_method(METRICS, LOGGER) # def GetPolicyRule(self, request : PolicyRuleId, context: grpc.ServicerContext) -> PolicyRule: # with self.lock: # policy_rule_uuid = request.uuid.uuid # db_policy_rule: PolicyRuleModel = get_object(self.database, PolicyRuleModel, policy_rule_uuid) # return PolicyRule(**db_policy_rule.dump()) # @safe_and_metered_rpc_method(METRICS, LOGGER) # def SetPolicyRule(self, request : PolicyRule, context: grpc.ServicerContext) -> PolicyRuleId: # with self.lock: # policy_rule_type = request.WhichOneof('policy_rule') # policy_rule_json = grpc_message_to_json(request) # policy_rule_uuid = policy_rule_json[policy_rule_type]['policyRuleBasic']['policyRuleId']['uuid']['uuid'] # result: Tuple[PolicyRuleModel, bool] = update_or_create_object( # self.database, PolicyRuleModel, policy_rule_uuid, {'value': json.dumps(policy_rule_json)}) # db_policy, updated = result # # #event_type = EventTypeEnum.EVENTTYPE_UPDATE if updated else EventTypeEnum.EVENTTYPE_CREATE # dict_policy_id = db_policy.dump_id() # #notify_event(self.messagebroker, TOPIC_POLICY, event_type, {"policy_id": dict_policy_id}) # return PolicyRuleId(**dict_policy_id) # @safe_and_metered_rpc_method(METRICS, LOGGER) # def RemovePolicyRule(self, request : PolicyRuleId, context: grpc.ServicerContext) -> Empty: # with self.lock: # policy_uuid = request.uuid.uuid # db_policy = PolicyRuleModel(self.database, policy_uuid, auto_load=False) # found = db_policy.load() # if not found: return Empty() # # dict_policy_id = db_policy.dump_id() # db_policy.delete() # #event_type = EventTypeEnum.EVENTTYPE_REMOVE # #notify_event(self.messagebroker, TOPIC_POLICY, event_type, {"policy_id": dict_policy_id}) # return Empty()