import grpc, json, logging, operator from typing import Iterator, List, Set, Tuple from common.message_broker.MessageBroker import MessageBroker from common.orm.Database import Database from common.orm.HighLevel import ( get_all_objects, get_object, get_or_create_object, get_related_objects, update_or_create_object) from common.orm.backend.Tools import key_to_str from common.rpc_method_wrapper.Decorator import create_metrics, safe_and_metered_rpc_method from common.rpc_method_wrapper.ServiceExceptions import InvalidArgumentException from context.proto.context_pb2 import ( Context, ContextEvent, ContextId, ContextIdList, ContextList, Device, DeviceEvent, DeviceId, DeviceIdList, DeviceList, Empty, EventTypeEnum, Link, LinkEvent, LinkId, LinkIdList, LinkList, Service, ServiceEvent, ServiceId, ServiceIdList, ServiceList, Topology, TopologyEvent, TopologyId, TopologyIdList, TopologyList) from context.proto.context_pb2_grpc import ContextServiceServicer from context.service.database.ConfigModel import ConfigModel, ConfigRuleModel, grpc_config_rules_to_raw, update_config from context.service.database.ConstraintModel import ConstraintModel, ConstraintsModel, set_constraints from context.service.database.ContextModel import ContextModel from context.service.database.DeviceModel import ( DeviceModel, DriverModel, grpc_to_enum__device_operational_status, set_drivers) from context.service.database.EndPointModel import EndPointModel, KpiSampleTypeModel, set_kpi_sample_types from context.service.database.Events import notify_event from context.service.database.LinkModel import LinkModel from context.service.database.RelationModels import ( LinkEndPointModel, ServiceEndPointModel, TopologyDeviceModel, TopologyLinkModel) from context.service.database.ServiceModel import ( ServiceModel, grpc_to_enum__service_status, grpc_to_enum__service_type) from context.service.database.TopologyModel import TopologyModel from context.service.grpc_server.Constants import ( CONSUME_TIMEOUT, TOPIC_CONTEXT, TOPIC_DEVICE, TOPIC_LINK, TOPIC_SERVICE, TOPIC_TOPOLOGY) LOGGER = logging.getLogger(__name__) SERVICE_NAME = 'Context' METHOD_NAMES = [ 'ListContextIds', 'ListContexts', 'GetContext', 'SetContext', 'RemoveContext', 'GetContextEvents', 'ListTopologyIds', 'ListTopologies', 'GetTopology', 'SetTopology', 'RemoveTopology', 'GetTopologyEvents', 'ListDeviceIds', 'ListDevices', 'GetDevice', 'SetDevice', 'RemoveDevice', 'GetDeviceEvents', 'ListLinkIds', 'ListLinks', 'GetLink', 'SetLink', 'RemoveLink', 'GetLinkEvents', 'ListServiceIds', 'ListServices', 'GetService', 'SetService', 'RemoveService', 'GetServiceEvents', ] METRICS = create_metrics(SERVICE_NAME, METHOD_NAMES) class ContextServiceServicerImpl(ContextServiceServicer): def __init__(self, database : Database, messagebroker : MessageBroker): LOGGER.debug('Creating Servicer...') self.database = database self.messagebroker = messagebroker LOGGER.debug('Servicer Created') # ----- Context ---------------------------------------------------------------------------------------------------- @safe_and_metered_rpc_method(METRICS, LOGGER) def ListContextIds(self, request: Empty, context : grpc.ServicerContext) -> ContextIdList: db_contexts : List[ContextModel] = get_all_objects(self.database, ContextModel) db_contexts = sorted(db_contexts, key=operator.attrgetter('pk')) return ContextIdList(context_ids=[db_context.dump_id() for db_context in db_contexts]) @safe_and_metered_rpc_method(METRICS, LOGGER) def ListContexts(self, request: Empty, context : grpc.ServicerContext) -> ContextList: db_contexts : List[ContextModel] = get_all_objects(self.database, ContextModel) db_contexts = sorted(db_contexts, key=operator.attrgetter('pk')) return ContextList(contexts=[db_context.dump() for db_context in db_contexts]) @safe_and_metered_rpc_method(METRICS, LOGGER) def GetContext(self, request: ContextId, context : grpc.ServicerContext) -> Context: context_uuid = request.context_uuid.uuid db_context : ContextModel = get_object(self.database, ContextModel, context_uuid) return Context(**db_context.dump(include_services=True, include_topologies=True)) @safe_and_metered_rpc_method(METRICS, LOGGER) def SetContext(self, request: Context, context : grpc.ServicerContext) -> ContextId: context_uuid = request.context_id.context_uuid.uuid for i,topology_id in enumerate(request.topology_ids): topology_context_uuid = topology_id.context_id.context_uuid.uuid if topology_context_uuid != context_uuid: raise InvalidArgumentException( 'request.topology_ids[{:d}].context_id.context_uuid.uuid'.format(i), topology_context_uuid, ['should be == {:s}({:s})'.format('request.context_id.context_uuid.uuid', context_uuid)]) for i,service_id in enumerate(request.service_ids): service_context_uuid = service_id.context_id.context_uuid.uuid if service_context_uuid != context_uuid: raise InvalidArgumentException( 'request.service_ids[{:d}].context_id.context_uuid.uuid'.format(i), service_context_uuid, ['should be == {:s}({:s})'.format('request.context_id.context_uuid.uuid', context_uuid)]) result : Tuple[ContextModel, bool] = update_or_create_object( self.database, ContextModel, context_uuid, {'context_uuid': context_uuid}) db_context, updated = result for i,topology_id in enumerate(request.topology_ids): topology_context_uuid = topology_id.context_id.context_uuid.uuid topology_uuid = topology_id.topology_uuid.uuid get_object(self.database, TopologyModel, [context_uuid, topology_uuid]) # just to confirm it exists for i,service_id in enumerate(request.service_ids): service_context_uuid = service_id.context_id.context_uuid.uuid service_uuid = service_id.service_uuid.uuid get_object(self.database, ServiceModel, [context_uuid, service_uuid]) # just to confirm it exists event_type = EventTypeEnum.EVENTTYPE_UPDATE if updated else EventTypeEnum.EVENTTYPE_CREATE dict_context_id = db_context.dump_id() notify_event(self.messagebroker, TOPIC_CONTEXT, event_type, {'context_id': dict_context_id}) return ContextId(**dict_context_id) @safe_and_metered_rpc_method(METRICS, LOGGER) def RemoveContext(self, request: ContextId, context : grpc.ServicerContext) -> Empty: context_uuid = request.context_uuid.uuid db_context = ContextModel(self.database, context_uuid, auto_load=False) found = db_context.load() if not found: return Empty() dict_context_id = db_context.dump_id() db_context.delete() event_type = EventTypeEnum.EVENTTYPE_REMOVE notify_event(self.messagebroker, TOPIC_CONTEXT, event_type, {'context_id': dict_context_id}) return Empty() @safe_and_metered_rpc_method(METRICS, LOGGER) def GetContextEvents(self, request: Empty, context : grpc.ServicerContext) -> Iterator[ContextEvent]: for message in self.messagebroker.consume({TOPIC_CONTEXT}, consume_timeout=CONSUME_TIMEOUT): yield ContextEvent(**json.loads(message.content)) # ----- Topology --------------------------------------------------------------------------------------------------- @safe_and_metered_rpc_method(METRICS, LOGGER) def ListTopologyIds(self, request: ContextId, context : grpc.ServicerContext) -> TopologyIdList: context_uuid = request.context_uuid.uuid db_context : ContextModel = get_object(self.database, ContextModel, context_uuid) db_topologies : Set[TopologyModel] = get_related_objects(db_context, TopologyModel) db_topologies = sorted(db_topologies, key=operator.attrgetter('pk')) return TopologyIdList(topology_ids=[db_topology.dump_id() for db_topology in db_topologies]) @safe_and_metered_rpc_method(METRICS, LOGGER) def ListTopologies(self, request: ContextId, context : grpc.ServicerContext) -> TopologyList: context_uuid = request.context_uuid.uuid db_context : ContextModel = get_object(self.database, ContextModel, context_uuid) db_topologies : Set[TopologyModel] = get_related_objects(db_context, TopologyModel) db_topologies = sorted(db_topologies, key=operator.attrgetter('pk')) return TopologyList(topologies=[db_topology.dump() for db_topology in db_topologies]) @safe_and_metered_rpc_method(METRICS, LOGGER) def GetTopology(self, request: TopologyId, context : grpc.ServicerContext) -> Topology: str_key = key_to_str([request.context_id.context_uuid.uuid, request.topology_uuid.uuid]) db_topology : TopologyModel = get_object(self.database, TopologyModel, str_key) return Topology(**db_topology.dump(include_devices=True, include_links=True)) @safe_and_metered_rpc_method(METRICS, LOGGER) def SetTopology(self, request: Topology, context : grpc.ServicerContext) -> TopologyId: context_uuid = request.topology_id.context_id.context_uuid.uuid db_context : ContextModel = get_object(self.database, ContextModel, context_uuid) topology_uuid = request.topology_id.topology_uuid.uuid str_topology_key = key_to_str([context_uuid, topology_uuid]) result : Tuple[TopologyModel, bool] = update_or_create_object( self.database, TopologyModel, str_topology_key, {'context_fk': db_context, 'topology_uuid': topology_uuid}) db_topology,updated = result for device_id in request.device_ids: device_uuid = device_id.device_uuid.uuid db_device = get_object(self.database, DeviceModel, device_uuid) str_topology_device_key = key_to_str([str_topology_key, device_uuid], separator='--') result : Tuple[TopologyDeviceModel, bool] = update_or_create_object( self.database, TopologyDeviceModel, str_topology_device_key, {'topology_fk': db_topology, 'device_fk': db_device}) #db_topology_device,topology_device_updated = result for link_id in request.link_ids: link_uuid = link_id.link_uuid.uuid db_link = get_object(self.database, LinkModel, link_uuid) str_topology_link_key = key_to_str([str_topology_key, link_uuid], separator='--') result : Tuple[TopologyLinkModel, bool] = update_or_create_object( self.database, TopologyLinkModel, str_topology_link_key, {'topology_fk': db_topology, 'link_fk': db_link}) #db_topology_link,topology_link_updated = result event_type = EventTypeEnum.EVENTTYPE_UPDATE if updated else EventTypeEnum.EVENTTYPE_CREATE dict_topology_id = db_topology.dump_id() notify_event(self.messagebroker, TOPIC_TOPOLOGY, event_type, {'topology_id': dict_topology_id}) return TopologyId(**dict_topology_id) @safe_and_metered_rpc_method(METRICS, LOGGER) def RemoveTopology(self, request: TopologyId, context : grpc.ServicerContext) -> Empty: context_uuid = request.context_id.context_uuid.uuid topology_uuid = request.topology_uuid.uuid db_topology = TopologyModel(self.database, key_to_str([context_uuid, topology_uuid]), auto_load=False) found = db_topology.load() if not found: return Empty() dict_topology_id = db_topology.dump_id() db_topology.delete() event_type = EventTypeEnum.EVENTTYPE_REMOVE notify_event(self.messagebroker, TOPIC_TOPOLOGY, event_type, {'topology_id': dict_topology_id}) return Empty() @safe_and_metered_rpc_method(METRICS, LOGGER) def GetTopologyEvents(self, request: Empty, context : grpc.ServicerContext) -> Iterator[TopologyEvent]: for message in self.messagebroker.consume({TOPIC_TOPOLOGY}, consume_timeout=CONSUME_TIMEOUT): yield TopologyEvent(**json.loads(message.content)) # ----- Device ----------------------------------------------------------------------------------------------------- @safe_and_metered_rpc_method(METRICS, LOGGER) def ListDeviceIds(self, request: Empty, context : grpc.ServicerContext) -> DeviceIdList: db_devices : List[DeviceModel] = get_all_objects(self.database, DeviceModel) db_devices = sorted(db_devices, key=operator.attrgetter('pk')) return DeviceIdList(device_ids=[db_device.dump_id() for db_device in db_devices]) @safe_and_metered_rpc_method(METRICS, LOGGER) def ListDevices(self, request: Empty, context : grpc.ServicerContext) -> DeviceList: db_devices : List[DeviceModel] = get_all_objects(self.database, DeviceModel) db_devices = sorted(db_devices, key=operator.attrgetter('pk')) return DeviceList(devices=[db_device.dump() for db_device in db_devices]) @safe_and_metered_rpc_method(METRICS, LOGGER) def GetDevice(self, request: DeviceId, context : grpc.ServicerContext) -> Device: device_uuid = request.device_uuid.uuid db_device : DeviceModel = get_object(self.database, DeviceModel, device_uuid) return Device(**db_device.dump( include_config_rules=True, include_drivers=True, include_endpoints=True)) @safe_and_metered_rpc_method(METRICS, LOGGER) def SetDevice(self, request: Device, context : grpc.ServicerContext) -> DeviceId: device_uuid = request.device_id.device_uuid.uuid for i,endpoint in enumerate(request.device_endpoints): endpoint_device_uuid = endpoint.endpoint_id.device_id.device_uuid.uuid if len(endpoint_device_uuid) == 0: endpoint_device_uuid = device_uuid if device_uuid != endpoint_device_uuid: raise InvalidArgumentException( 'request.device_endpoints[{:d}].device_id.device_uuid.uuid'.format(i), endpoint_device_uuid, ['should be == {:s}({:s})'.format('request.device_id.device_uuid.uuid', device_uuid)]) config_rules = grpc_config_rules_to_raw(request.device_config.config_rules) running_config_result = update_config(self.database, device_uuid, 'running', config_rules) db_running_config = running_config_result[0][0] result : Tuple[DeviceModel, bool] = update_or_create_object(self.database, DeviceModel, device_uuid, { 'device_uuid' : device_uuid, 'device_type' : request.device_type, 'device_operational_status': grpc_to_enum__device_operational_status(request.device_operational_status), 'device_config_fk' : db_running_config, }) db_device, updated = result set_drivers(self.database, db_device, request.device_drivers) for i,endpoint in enumerate(request.device_endpoints): endpoint_uuid = endpoint.endpoint_id.endpoint_uuid.uuid endpoint_device_uuid = endpoint.endpoint_id.device_id.device_uuid.uuid if len(endpoint_device_uuid) == 0: endpoint_device_uuid = device_uuid str_endpoint_key = key_to_str([device_uuid, endpoint_uuid]) endpoint_attributes = { 'device_fk' : db_device, 'endpoint_uuid': endpoint_uuid, 'endpoint_type': endpoint.endpoint_type, } endpoint_topology_context_uuid = endpoint.endpoint_id.topology_id.context_id.context_uuid.uuid endpoint_topology_uuid = endpoint.endpoint_id.topology_id.topology_uuid.uuid if len(endpoint_topology_context_uuid) > 0 and len(endpoint_topology_uuid) > 0: str_topology_key = key_to_str([endpoint_topology_context_uuid, endpoint_topology_uuid]) db_topology : TopologyModel = get_object(self.database, TopologyModel, str_topology_key) str_topology_device_key = key_to_str([str_topology_key, device_uuid], separator='--') result : Tuple[TopologyDeviceModel, bool] = get_or_create_object( self.database, TopologyDeviceModel, str_topology_device_key, { 'topology_fk': db_topology, 'device_fk': db_device}) #db_topology_device, topology_device_created = result str_endpoint_key = key_to_str([str_endpoint_key, str_topology_key], separator=':') endpoint_attributes['topology_fk'] = db_topology result : Tuple[EndPointModel, bool] = update_or_create_object( self.database, EndPointModel, str_endpoint_key, endpoint_attributes) db_endpoint, endpoint_updated = result set_kpi_sample_types(self.database, db_endpoint, endpoint.kpi_sample_types) event_type = EventTypeEnum.EVENTTYPE_UPDATE if updated else EventTypeEnum.EVENTTYPE_CREATE dict_device_id = db_device.dump_id() notify_event(self.messagebroker, TOPIC_DEVICE, event_type, {'device_id': dict_device_id}) return DeviceId(**dict_device_id) @safe_and_metered_rpc_method(METRICS, LOGGER) def RemoveDevice(self, request: DeviceId, context : grpc.ServicerContext) -> Empty: device_uuid = request.device_uuid.uuid db_device = DeviceModel(self.database, device_uuid, auto_load=False) found = db_device.load() if not found: return Empty() dict_device_id = db_device.dump_id() for db_endpoint_pk,_ in db_device.references(EndPointModel): db_endpoint = EndPointModel(self.database, db_endpoint_pk) for db_kpi_sample_type_pk,_ in db_endpoint.references(KpiSampleTypeModel): KpiSampleTypeModel(self.database, db_kpi_sample_type_pk).delete() db_endpoint.delete() for db_topology_device_pk,_ in db_device.references(TopologyDeviceModel): TopologyDeviceModel(self.database, db_topology_device_pk).delete() for db_driver_pk,_ in db_device.references(DriverModel): DriverModel(self.database, db_driver_pk).delete() db_config = ConfigModel(self.database, db_device.device_config_fk) for db_config_rule_pk,_ in db_config.references(ConfigRuleModel): ConfigRuleModel(self.database, db_config_rule_pk).delete() db_device.delete() db_config.delete() event_type = EventTypeEnum.EVENTTYPE_REMOVE notify_event(self.messagebroker, TOPIC_DEVICE, event_type, {'device_id': dict_device_id}) return Empty() @safe_and_metered_rpc_method(METRICS, LOGGER) def GetDeviceEvents(self, request: Empty, context : grpc.ServicerContext) -> Iterator[DeviceEvent]: for message in self.messagebroker.consume({TOPIC_DEVICE}, consume_timeout=CONSUME_TIMEOUT): yield DeviceEvent(**json.loads(message.content)) # ----- Link ------------------------------------------------------------------------------------------------------- @safe_and_metered_rpc_method(METRICS, LOGGER) def ListLinkIds(self, request: Empty, context : grpc.ServicerContext) -> LinkIdList: db_links : List[LinkModel] = get_all_objects(self.database, LinkModel) db_links = sorted(db_links, key=operator.attrgetter('pk')) return LinkIdList(link_ids=[db_link.dump_id() for db_link in db_links]) @safe_and_metered_rpc_method(METRICS, LOGGER) def ListLinks(self, request: Empty, context : grpc.ServicerContext) -> LinkList: db_links : List[LinkModel] = get_all_objects(self.database, LinkModel) db_links = sorted(db_links, key=operator.attrgetter('pk')) return LinkList(links=[db_link.dump() for db_link in db_links]) @safe_and_metered_rpc_method(METRICS, LOGGER) def GetLink(self, request: LinkId, context : grpc.ServicerContext) -> Link: link_uuid = request.link_uuid.uuid db_link : LinkModel = get_object(self.database, LinkModel, link_uuid) return Link(**db_link.dump()) @safe_and_metered_rpc_method(METRICS, LOGGER) def SetLink(self, request: Link, context : grpc.ServicerContext) -> LinkId: link_uuid = request.link_id.link_uuid.uuid result : Tuple[LinkModel, bool] = update_or_create_object( self.database, LinkModel, link_uuid, {'link_uuid': link_uuid}) db_link, updated = result for endpoint_id in request.link_endpoint_ids: endpoint_uuid = endpoint_id.endpoint_uuid.uuid endpoint_device_uuid = endpoint_id.device_id.device_uuid.uuid endpoint_topology_uuid = endpoint_id.topology_id.topology_uuid.uuid endpoint_topology_context_uuid = endpoint_id.topology_id.context_id.context_uuid.uuid str_endpoint_key = key_to_str([endpoint_device_uuid, endpoint_uuid]) db_topology = None if len(endpoint_topology_context_uuid) > 0 and len(endpoint_topology_uuid) > 0: str_topology_key = key_to_str([endpoint_topology_context_uuid, endpoint_topology_uuid]) db_topology : TopologyModel = get_object(self.database, TopologyModel, str_topology_key) str_topology_device_key = key_to_str([str_topology_key, endpoint_device_uuid], separator='--') get_object(self.database, TopologyDeviceModel, str_topology_device_key) # check device is in topology str_endpoint_key = key_to_str([str_endpoint_key, str_topology_key], separator=':') db_endpoint : EndPointModel = get_object(self.database, EndPointModel, str_endpoint_key) str_link_endpoint_key = key_to_str([link_uuid, endpoint_device_uuid], separator='--') result : Tuple[LinkEndPointModel, bool] = get_or_create_object( self.database, LinkEndPointModel, str_link_endpoint_key, { 'link_fk': db_link, 'endpoint_fk': db_endpoint}) #db_link_endpoint, link_endpoint_created = result if db_topology is not None: str_topology_link_key = key_to_str([str_topology_key, link_uuid], separator='--') result : Tuple[TopologyLinkModel, bool] = get_or_create_object( self.database, TopologyLinkModel, str_topology_link_key, { 'topology_fk': db_topology, 'link_fk': db_link}) #db_topology_link, topology_link_created = result event_type = EventTypeEnum.EVENTTYPE_UPDATE if updated else EventTypeEnum.EVENTTYPE_CREATE dict_link_id = db_link.dump_id() notify_event(self.messagebroker, TOPIC_LINK, event_type, {'link_id': dict_link_id}) return LinkId(**dict_link_id) @safe_and_metered_rpc_method(METRICS, LOGGER) def RemoveLink(self, request: LinkId, context : grpc.ServicerContext) -> Empty: link_uuid = request.link_uuid.uuid db_link = LinkModel(self.database, link_uuid, auto_load=False) found = db_link.load() if not found: return Empty() dict_link_id = db_link.dump_id() for db_link_endpoint_pk,_ in db_link.references(LinkEndPointModel): LinkEndPointModel(self.database, db_link_endpoint_pk).delete() for db_topology_link_pk,_ in db_link.references(TopologyLinkModel): TopologyLinkModel(self.database, db_topology_link_pk).delete() db_link.delete() event_type = EventTypeEnum.EVENTTYPE_REMOVE notify_event(self.messagebroker, TOPIC_LINK, event_type, {'link_id': dict_link_id}) return Empty() @safe_and_metered_rpc_method(METRICS, LOGGER) def GetLinkEvents(self, request: Empty, context : grpc.ServicerContext) -> Iterator[LinkEvent]: for message in self.messagebroker.consume({TOPIC_LINK}, consume_timeout=CONSUME_TIMEOUT): yield LinkEvent(**json.loads(message.content)) # ----- Service ---------------------------------------------------------------------------------------------------- @safe_and_metered_rpc_method(METRICS, LOGGER) def ListServiceIds(self, request: ContextId, context : grpc.ServicerContext) -> ServiceIdList: db_context : ContextModel = get_object(self.database, ContextModel, request.context_uuid.uuid) db_services : Set[ServiceModel] = get_related_objects(db_context, ServiceModel) db_services = sorted(db_services, key=operator.attrgetter('pk')) return ServiceIdList(service_ids=[db_service.dump_id() for db_service in db_services]) @safe_and_metered_rpc_method(METRICS, LOGGER) def ListServices(self, request: ContextId, context : grpc.ServicerContext) -> ServiceList: db_context : ContextModel = get_object(self.database, ContextModel, request.context_uuid.uuid) db_services : Set[ServiceModel] = get_related_objects(db_context, ServiceModel) db_services = sorted(db_services, key=operator.attrgetter('pk')) return ServiceList(services=[db_service.dump() for db_service in db_services]) @safe_and_metered_rpc_method(METRICS, LOGGER) def GetService(self, request: ServiceId, context : grpc.ServicerContext) -> Service: str_key = key_to_str([request.context_id.context_uuid.uuid, request.service_uuid.uuid]) db_service : ServiceModel = get_object(self.database, ServiceModel, str_key) return Service(**db_service.dump( include_endpoint_ids=True, include_constraints=True, include_config_rules=True)) @safe_and_metered_rpc_method(METRICS, LOGGER) def SetService(self, request: Service, context : grpc.ServicerContext) -> ServiceId: context_uuid = request.service_id.context_id.context_uuid.uuid db_context : ContextModel = get_object(self.database, ContextModel, context_uuid) for i,endpoint_id in enumerate(request.service_endpoint_ids): endpoint_topology_context_uuid = endpoint_id.topology_id.context_id.context_uuid.uuid if len(endpoint_topology_context_uuid) > 0 and context_uuid != endpoint_topology_context_uuid: raise InvalidArgumentException( 'request.service_endpoint_ids[{:d}].topology_id.context_id.context_uuid.uuid'.format(i), endpoint_topology_context_uuid, ['should be == {:s}({:s})'.format('request.service_id.context_id.context_uuid.uuid', context_uuid)]) service_uuid = request.service_id.service_uuid.uuid str_service_key = key_to_str([context_uuid, service_uuid]) constraints_result = set_constraints( self.database, str_service_key, 'constraints', request.service_constraints) db_constraints = constraints_result[0][0] config_rules = grpc_config_rules_to_raw(request.service_config.config_rules) running_config_result = update_config(self.database, str_service_key, 'running', config_rules) db_running_config = running_config_result[0][0] result : Tuple[ServiceModel, bool] = update_or_create_object(self.database, ServiceModel, str_service_key, { 'context_fk' : db_context, 'service_uuid' : service_uuid, 'service_type' : grpc_to_enum__service_type(request.service_type), 'service_constraints_fk': db_constraints, 'service_status' : grpc_to_enum__service_status(request.service_status.service_status), 'service_config_fk' : db_running_config, }) db_service, updated = result for i,endpoint_id in enumerate(request.service_endpoint_ids): endpoint_uuid = endpoint_id.endpoint_uuid.uuid endpoint_device_uuid = endpoint_id.device_id.device_uuid.uuid endpoint_topology_uuid = endpoint_id.topology_id.topology_uuid.uuid endpoint_topology_context_uuid = endpoint_id.topology_id.context_id.context_uuid.uuid str_endpoint_key = key_to_str([endpoint_device_uuid, endpoint_uuid]) if len(endpoint_topology_context_uuid) > 0 and len(endpoint_topology_uuid) > 0: str_topology_key = key_to_str([endpoint_topology_context_uuid, endpoint_topology_uuid]) str_endpoint_key = key_to_str([str_endpoint_key, str_topology_key], separator=':') db_endpoint : EndPointModel = get_object(self.database, EndPointModel, str_endpoint_key) str_service_endpoint_key = key_to_str([service_uuid, endpoint_device_uuid], separator='--') result : Tuple[ServiceEndPointModel, bool] = get_or_create_object( self.database, ServiceEndPointModel, str_service_endpoint_key, { 'service_fk': db_service, 'endpoint_fk': db_endpoint}) #db_service_endpoint, service_endpoint_created = result event_type = EventTypeEnum.EVENTTYPE_UPDATE if updated else EventTypeEnum.EVENTTYPE_CREATE dict_service_id = db_service.dump_id() notify_event(self.messagebroker, TOPIC_SERVICE, event_type, {'service_id': dict_service_id}) return ServiceId(**dict_service_id) @safe_and_metered_rpc_method(METRICS, LOGGER) def RemoveService(self, request: ServiceId, context : grpc.ServicerContext) -> Empty: context_uuid = request.context_id.context_uuid.uuid service_uuid = request.service_uuid.uuid db_service = ServiceModel(self.database, key_to_str([context_uuid, service_uuid]), auto_load=False) found = db_service.load() if not found: return Empty() dict_service_id = db_service.dump_id() for db_service_endpoint_pk,_ in db_service.references(ServiceEndPointModel): ServiceEndPointModel(self.database, db_service_endpoint_pk).delete() db_config = ConfigModel(self.database, db_service.service_config_fk) for db_config_rule_pk,_ in db_config.references(ConfigRuleModel): ConfigRuleModel(self.database, db_config_rule_pk).delete() db_constraints = ConstraintsModel(self.database, db_service.service_constraints_fk) for db_constraint_pk,_ in db_constraints.references(ConstraintModel): ConstraintModel(self.database, db_constraint_pk).delete() db_service.delete() db_config.delete() db_constraints.delete() event_type = EventTypeEnum.EVENTTYPE_REMOVE notify_event(self.messagebroker, TOPIC_SERVICE, event_type, {'service_id': dict_service_id}) return Empty() @safe_and_metered_rpc_method(METRICS, LOGGER) def GetServiceEvents(self, request: Empty, context : grpc.ServicerContext) -> Iterator[ServiceEvent]: for message in self.messagebroker.consume({TOPIC_SERVICE}, consume_timeout=CONSUME_TIMEOUT): yield ServiceEvent(**json.loads(message.content))