# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/) # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import uuid import grpc, json, logging, operator, threading from typing import Iterator, List, Set, Tuple, Union from common.message_broker.MessageBroker import MessageBroker from context.service.Database import Database from common.proto.context_pb2 import ( Connection, ConnectionEvent, ConnectionId, ConnectionIdList, ConnectionList, Context, ContextEvent, ContextId, ContextIdList, ContextList, Device, DeviceEvent, DeviceId, DeviceIdList, DeviceList, Empty, EventTypeEnum, Link, LinkEvent, LinkId, LinkIdList, LinkList, Service, ServiceEvent, ServiceId, ServiceIdList, ServiceList, Slice, SliceEvent, SliceId, SliceIdList, SliceList, Topology, TopologyEvent, TopologyId, TopologyIdList, TopologyList, ConfigActionEnum) from common.proto.context_pb2_grpc import ContextServiceServicer from common.rpc_method_wrapper.Decorator import create_metrics, safe_and_metered_rpc_method from common.rpc_method_wrapper.ServiceExceptions import InvalidArgumentException from sqlalchemy.orm import Session, contains_eager, selectinload from common.rpc_method_wrapper.ServiceExceptions import NotFoundException from context.service.database.ConfigModel import grpc_config_rules_to_raw from context.service.database.DeviceModel import DeviceModel, grpc_to_enum__device_operational_status, set_drivers, grpc_to_enum__device_driver, DriverModel from context.service.database.ConfigModel import ConfigModel, ORM_ConfigActionEnum, ConfigRuleModel from common.orm.backend.Tools import key_to_str from ..database.KpiSampleType import grpc_to_enum__kpi_sample_type """ from context.service.database.ConnectionModel import ConnectionModel, set_path from context.service.database.ConstraintModel import set_constraints from context.service.database.EndPointModel import EndPointModel, set_kpi_sample_types from context.service.database.Events import notify_event from context.service.database.LinkModel import LinkModel from context.service.database.RelationModels import ( ConnectionSubServiceModel, LinkEndPointModel, ServiceEndPointModel, SliceEndPointModel, SliceServiceModel, SliceSubSliceModel, TopologyDeviceModel, TopologyLinkModel) from context.service.database.ServiceModel import ( ServiceModel, grpc_to_enum__service_status, grpc_to_enum__service_type) from context.service.database.SliceModel import SliceModel, grpc_to_enum__slice_status from context.service.database.TopologyModel import TopologyModel """ from context.service.database.ContextModel import ContextModel from context.service.database.TopologyModel import TopologyModel from context.service.database.Events import notify_event from context.service.database.EndPointModel import EndPointModel from context.service.database.EndPointModel import KpiSampleTypeModel from .Constants import ( CONSUME_TIMEOUT, TOPIC_CONNECTION, TOPIC_CONTEXT, TOPIC_DEVICE, TOPIC_LINK, TOPIC_SERVICE, TOPIC_SLICE, TOPIC_TOPOLOGY) LOGGER = logging.getLogger(__name__) SERVICE_NAME = 'Context' METHOD_NAMES = [ 'ListConnectionIds', 'ListConnections', 'GetConnection', 'SetConnection', 'RemoveConnection', 'GetConnectionEvents', 'ListContextIds', 'ListContexts', 'GetContext', 'SetContext', 'RemoveContext', 'GetContextEvents', 'ListTopologyIds', 'ListTopologies', 'GetTopology', 'SetTopology', 'RemoveTopology', 'GetTopologyEvents', 'ListDeviceIds', 'ListDevices', 'GetDevice', 'SetDevice', 'RemoveDevice', 'GetDeviceEvents', 'ListLinkIds', 'ListLinks', 'GetLink', 'SetLink', 'RemoveLink', 'GetLinkEvents', 'ListServiceIds', 'ListServices', 'GetService', 'SetService', 'RemoveService', 'GetServiceEvents', 'ListSliceIds', 'ListSlices', 'GetSlice', 'SetSlice', 'RemoveSlice', 'GetSliceEvents', ] METRICS = create_metrics(SERVICE_NAME, METHOD_NAMES) class ContextServiceServicerImpl(ContextServiceServicer): def __init__(self, session : Session, messagebroker : MessageBroker): LOGGER.debug('Creating Servicer...') self.lock = threading.Lock() self.session = session self.database = Database(session) self.messagebroker = messagebroker LOGGER.debug('Servicer Created') # ----- Context ---------------------------------------------------------------------------------------------------- @safe_and_metered_rpc_method(METRICS, LOGGER) def ListContextIds(self, request: Empty, context : grpc.ServicerContext) -> ContextIdList: with self.session() as session: result = session.query(ContextModel).all() return ContextIdList(context_ids=[row.dump_id() for row in result]) @safe_and_metered_rpc_method(METRICS, LOGGER) def ListContexts(self, request: Empty, context : grpc.ServicerContext) -> ContextList: with self.session() as session: result = session.query(ContextModel).all() return ContextList(contexts=[row.dump() for row in result]) @safe_and_metered_rpc_method(METRICS, LOGGER) def GetContext(self, request: ContextId, context : grpc.ServicerContext) -> Context: context_uuid = request.context_uuid.uuid with self.session() as session: result = session.query(ContextModel).filter_by(context_uuid=context_uuid).one_or_none() if not result: raise NotFoundException(ContextModel.__name__.replace('Model', ''), context_uuid) return Context(**result.dump()) @safe_and_metered_rpc_method(METRICS, LOGGER) def SetContext(self, request: Context, context : grpc.ServicerContext) -> ContextId: context_uuid = request.context_id.context_uuid.uuid for i, topology_id in enumerate(request.topology_ids): topology_context_uuid = topology_id.context_id.context_uuid.uuid if topology_context_uuid != context_uuid: raise InvalidArgumentException( 'request.topology_ids[{:d}].context_id.context_uuid.uuid'.format(i), topology_context_uuid, ['should be == {:s}({:s})'.format('request.context_id.context_uuid.uuid', context_uuid)]) for i, service_id in enumerate(request.service_ids): service_context_uuid = service_id.context_id.context_uuid.uuid if service_context_uuid != context_uuid: raise InvalidArgumentException( 'request.service_ids[{:d}].context_id.context_uuid.uuid'.format(i), service_context_uuid, ['should be == {:s}({:s})'.format('request.context_id.context_uuid.uuid', context_uuid)]) context_add = ContextModel(context_uuid=context_uuid) updated = True with self.session() as session: result = session.query(ContextModel).filter_by(context_uuid=context_uuid).all() if not result: updated = False session.merge(context_add) session.commit() event_type = EventTypeEnum.EVENTTYPE_UPDATE if updated else EventTypeEnum.EVENTTYPE_CREATE dict_context_id = context_add.dump_id() notify_event(self.messagebroker, TOPIC_CONTEXT, event_type, {'context_id': dict_context_id}) return ContextId(**context_add.dump_id()) @safe_and_metered_rpc_method(METRICS, LOGGER) def RemoveContext(self, request: ContextId, context : grpc.ServicerContext) -> Empty: context_uuid = request.context_uuid.uuid with self.session() as session: result = session.query(ContextModel).filter_by(context_uuid=context_uuid).one_or_none() if not result: return Empty() session.query(ContextModel).filter_by(context_uuid=context_uuid).delete() session.commit() event_type = EventTypeEnum.EVENTTYPE_REMOVE notify_event(self.messagebroker, TOPIC_CONTEXT, event_type, {'context_id': result.dump_id()}) return Empty() @safe_and_metered_rpc_method(METRICS, LOGGER) def GetContextEvents(self, request: Empty, context : grpc.ServicerContext) -> Iterator[ContextEvent]: for message in self.messagebroker.consume({TOPIC_CONTEXT}, consume_timeout=CONSUME_TIMEOUT): yield ContextEvent(**json.loads(message.content)) # ----- Topology --------------------------------------------------------------------------------------------------- @safe_and_metered_rpc_method(METRICS, LOGGER) def ListTopologyIds(self, request: ContextId, context : grpc.ServicerContext) -> TopologyIdList: context_uuid = request.context_uuid.uuid with self.session() as session: result = session.query(ContextModel).options(selectinload(ContextModel.topology)).filter_by(context_uuid=context_uuid).one_or_none() if not result: raise NotFoundException(ContextModel.__name__.replace('Model', ''), context_uuid) db_topologies = result.topology return TopologyIdList(topology_ids=[db_topology.dump_id() for db_topology in db_topologies]) @safe_and_metered_rpc_method(METRICS, LOGGER) def ListTopologies(self, request: ContextId, context : grpc.ServicerContext) -> TopologyList: context_uuid = request.context_uuid.uuid with self.session() as session: result = session.query(ContextModel).options(selectinload(ContextModel.topology)).filter_by( context_uuid=context_uuid).one_or_none() if not result: raise NotFoundException(ContextModel.__name__.replace('Model', ''), context_uuid) db_topologies = result.topology return TopologyList(topologies=[db_topology.dump() for db_topology in db_topologies]) @safe_and_metered_rpc_method(METRICS, LOGGER) def GetTopology(self, request: TopologyId, context : grpc.ServicerContext) -> Topology: context_uuid = request.context_id.context_uuid.uuid topology_uuid = request.topology_uuid.uuid with self.session() as session: result = session.query(TopologyModel).join(TopologyModel.context).filter(TopologyModel.topology_uuid==topology_uuid).options(contains_eager(TopologyModel.context)).one_or_none() if not result: raise NotFoundException(TopologyModel.__name__.replace('Model', ''), topology_uuid) return Topology(**result.dump()) @safe_and_metered_rpc_method(METRICS, LOGGER) def SetTopology(self, request: Topology, context : grpc.ServicerContext) -> TopologyId: context_uuid = request.topology_id.context_id.context_uuid.uuid topology_uuid = request.topology_id.topology_uuid.uuid with self.session() as session: topology_add = TopologyModel(topology_uuid=topology_uuid, context_uuid=context_uuid) updated = True result = session.query(TopologyModel).join(TopologyModel.context).filter(TopologyModel.topology_uuid==topology_uuid).one_or_none() if not result: updated = False session.merge(topology_add) session.commit() result = session.query(TopologyModel).join(TopologyModel.context).filter(TopologyModel.topology_uuid==topology_uuid).one_or_none() event_type = EventTypeEnum.EVENTTYPE_UPDATE if updated else EventTypeEnum.EVENTTYPE_CREATE dict_topology_id = result.dump_id() notify_event(self.messagebroker, TOPIC_TOPOLOGY, event_type, {'topology_id': dict_topology_id}) return TopologyId(**dict_topology_id) @safe_and_metered_rpc_method(METRICS, LOGGER) def RemoveTopology(self, request: TopologyId, context : grpc.ServicerContext) -> Empty: context_uuid = request.context_id.context_uuid.uuid topology_uuid = request.topology_uuid.uuid with self.session() as session: result = session.query(TopologyModel).filter_by(topology_uuid=topology_uuid, context_uuid=context_uuid).one_or_none() if not result: return Empty() dict_topology_id = result.dump_id() session.query(TopologyModel).filter_by(topology_uuid=topology_uuid, context_uuid=context_uuid).delete() session.commit() event_type = EventTypeEnum.EVENTTYPE_REMOVE notify_event(self.messagebroker, TOPIC_TOPOLOGY, event_type, {'topology_id': dict_topology_id}) return Empty() @safe_and_metered_rpc_method(METRICS, LOGGER) def GetTopologyEvents(self, request: Empty, context : grpc.ServicerContext) -> Iterator[TopologyEvent]: for message in self.messagebroker.consume({TOPIC_TOPOLOGY}, consume_timeout=CONSUME_TIMEOUT): yield TopologyEvent(**json.loads(message.content)) # ----- Device ----------------------------------------------------------------------------------------------------- @safe_and_metered_rpc_method(METRICS, LOGGER) def ListDeviceIds(self, request: Empty, context : grpc.ServicerContext) -> DeviceIdList: with self.session() as session: result = session.query(DeviceModel).all() return DeviceIdList(device_ids=[device.dump_id() for device in result]) @safe_and_metered_rpc_method(METRICS, LOGGER) def ListDevices(self, request: Empty, context : grpc.ServicerContext) -> DeviceList: with self.session() as session: result = session.query(DeviceModel).all() return DeviceList(devices=[device.dump_id() for device in result]) @safe_and_metered_rpc_method(METRICS, LOGGER) def GetDevice(self, request: DeviceId, context : grpc.ServicerContext) -> Device: device_uuid = request.device_uuid.uuid with self.session() as session: result = session.query(DeviceModel).filter(DeviceModel.device_uuid == device_uuid).one_or_none() if not result: raise NotFoundException(DeviceModel.__name__.replace('Model', ''), device_uuid) rd = result.dump() rt = Device(**rd) return rt @safe_and_metered_rpc_method(METRICS, LOGGER) def SetDevice(self, request: Device, context : grpc.ServicerContext) -> DeviceId: device_uuid = request.device_id.device_uuid.uuid for i,endpoint in enumerate(request.device_endpoints): endpoint_device_uuid = endpoint.endpoint_id.device_id.device_uuid.uuid if len(endpoint_device_uuid) == 0: endpoint_device_uuid = device_uuid if device_uuid != endpoint_device_uuid: raise InvalidArgumentException( 'request.device_endpoints[{:d}].device_id.device_uuid.uuid'.format(i), endpoint_device_uuid, ['should be == {:s}({:s})'.format('request.device_id.device_uuid.uuid', device_uuid)]) config_rules = grpc_config_rules_to_raw(request.device_config.config_rules) running_config_result = self.update_config(device_uuid, 'running', config_rules) db_running_config = running_config_result[0][0] config_uuid = db_running_config.config_uuid new_obj = DeviceModel(**{ 'device_uuid' : device_uuid, 'device_type' : request.device_type, 'device_operational_status' : grpc_to_enum__device_operational_status(request.device_operational_status), 'device_config_uuid' : config_uuid, }) result: Tuple[DeviceModel, bool] = self.database.create_or_update(new_obj) db_device, updated = result self.set_drivers(db_device, request.device_drivers) for i,endpoint in enumerate(request.device_endpoints): endpoint_uuid = endpoint.endpoint_id.endpoint_uuid.uuid endpoint_device_uuid = endpoint.endpoint_id.device_id.device_uuid.uuid if len(endpoint_device_uuid) == 0: endpoint_device_uuid = device_uuid str_endpoint_key = key_to_str([device_uuid, endpoint_uuid]) endpoint_attributes = { 'device_uuid' : db_device.device_uuid, 'endpoint_uuid': endpoint_uuid, 'endpoint_type': endpoint.endpoint_type, } endpoint_topology_context_uuid = endpoint.endpoint_id.topology_id.context_id.context_uuid.uuid endpoint_topology_uuid = endpoint.endpoint_id.topology_id.topology_uuid.uuid if len(endpoint_topology_context_uuid) > 0 and len(endpoint_topology_uuid) > 0: str_topology_key = key_to_str([endpoint_topology_context_uuid, endpoint_topology_uuid]) db_topology : TopologyModel = self.database.get_object(TopologyModel, endpoint_topology_uuid) str_endpoint_key = key_to_str([str_endpoint_key, str_topology_key], separator=':') endpoint_attributes['topology_uuid'] = db_topology.topology_uuid new_endpoint = EndPointModel(**endpoint_attributes) result : Tuple[EndPointModel, bool] = self.database.create_or_update(new_endpoint) db_endpoint, updated = result self.set_kpi_sample_types(db_endpoint, endpoint.kpi_sample_types) # event_type = EventTypeEnum.EVENTTYPE_UPDATE if updated else EventTypeEnum.EVENTTYPE_CREATE dict_device_id = db_device.dump_id() # notify_event(self.messagebroker, TOPIC_DEVICE, event_type, {'device_id': dict_device_id}) return DeviceId(**dict_device_id) def set_kpi_sample_types(self, db_endpoint: EndPointModel, grpc_endpoint_kpi_sample_types): db_endpoint_pk = db_endpoint.endpoint_uuid for kpi_sample_type in grpc_endpoint_kpi_sample_types: orm_kpi_sample_type = grpc_to_enum__kpi_sample_type(kpi_sample_type) # str_endpoint_kpi_sample_type_key = key_to_str([db_endpoint_pk, orm_kpi_sample_type.name]) data = {'endpoint_uuid': db_endpoint_pk, 'kpi_sample_type': orm_kpi_sample_type.name, 'kpi_uuid': str(uuid.uuid4())} db_endpoint_kpi_sample_type = KpiSampleTypeModel(**data) self.database.create(db_endpoint_kpi_sample_type) def set_drivers(self, db_device: DeviceModel, grpc_device_drivers): db_device_pk = db_device.device_uuid for driver in grpc_device_drivers: orm_driver = grpc_to_enum__device_driver(driver) str_device_driver_key = key_to_str([db_device_pk, orm_driver.name]) driver_config = { "driver_uuid": str(uuid.uuid4()), "device_uuid": db_device_pk, "driver": orm_driver.name } db_device_driver = DriverModel(**driver_config) db_device_driver.device_fk = db_device db_device_driver.driver = orm_driver self.database.create_or_update(db_device_driver) def update_config( self, db_parent_pk: str, config_name: str, raw_config_rules: List[Tuple[ORM_ConfigActionEnum, str, str]] ) -> List[Tuple[Union[ConfigModel, ConfigRuleModel], bool]]: str_config_key = key_to_str([db_parent_pk, config_name], separator=':') result = self.database.get_or_create(ConfigModel, db_parent_pk) db_config, created = result LOGGER.info('UPDATED-CONFIG: {}'.format(db_config.dump())) db_objects: List[Tuple[Union[ConfigModel, ConfigRuleModel], bool]] = [(db_config, created)] for position, (action, resource_key, resource_value) in enumerate(raw_config_rules): if action == ORM_ConfigActionEnum.SET: result : Tuple[ConfigRuleModel, bool] = self.set_config_rule( db_config, position, resource_key, resource_value) db_config_rule, updated = result db_objects.append((db_config_rule, updated)) elif action == ORM_ConfigActionEnum.DELETE: self.delete_config_rule(db_config, resource_key) else: msg = 'Unsupported action({:s}) for resource_key({:s})/resource_value({:s})' raise AttributeError( msg.format(str(ConfigActionEnum.Name(action)), str(resource_key), str(resource_value))) return db_objects def set_config_rule(self, db_config: ConfigModel, position: int, resource_key: str, resource_value: str, ): # -> Tuple[ConfigRuleModel, bool]: from src.context.service.database.Tools import fast_hasher str_rule_key_hash = fast_hasher(resource_key) str_config_rule_key = key_to_str([db_config.config_uuid, str_rule_key_hash], separator=':') pk = str(uuid.uuid5(uuid.UUID('9566448d-e950-425e-b2ae-7ead656c7e47'), str_config_rule_key)) data = {'config_rule_uuid': pk, 'config_uuid': db_config.config_uuid, 'position': position, 'action': ORM_ConfigActionEnum.SET, 'key': resource_key, 'value': resource_value} to_add = ConfigRuleModel(**data) result, updated = self.database.create_or_update(to_add) return result, updated def delete_config_rule( self, db_config: ConfigModel, resource_key: str ) -> None: from src.context.service.database.Tools import fast_hasher str_rule_key_hash = fast_hasher(resource_key) str_config_rule_key = key_to_str([db_config.pk, str_rule_key_hash], separator=':') db_config_rule = self.database.get_object(ConfigRuleModel, str_config_rule_key, raise_if_not_found=False) if db_config_rule is None: return db_config_rule.delete() def delete_all_config_rules(self, db_config: ConfigModel) -> None: db_config_rule_pks = db_config.references(ConfigRuleModel) for pk, _ in db_config_rule_pks: ConfigRuleModel(self.database, pk).delete() """ for position, (action, resource_key, resource_value) in enumerate(raw_config_rules): if action == ORM_ConfigActionEnum.SET: result: Tuple[ConfigRuleModel, bool] = set_config_rule( database, db_config, position, resource_key, resource_value) db_config_rule, updated = result db_objects.append((db_config_rule, updated)) elif action == ORM_ConfigActionEnum.DELETE: delete_config_rule(database, db_config, resource_key) else: msg = 'Unsupported action({:s}) for resource_key({:s})/resource_value({:s})' raise AttributeError( msg.format(str(ConfigActionEnum.Name(action)), str(resource_key), str(resource_value))) return db_objects """ @safe_and_metered_rpc_method(METRICS, LOGGER) def RemoveDevice(self, request: DeviceId, context : grpc.ServicerContext) -> Empty: with self.lock: device_uuid = request.device_uuid.uuid db_device = DeviceModel(self.database, device_uuid, auto_load=False) found = db_device.load() if not found: return Empty() dict_device_id = db_device.dump_id() db_device.delete() event_type = EventTypeEnum.EVENTTYPE_REMOVE notify_event(self.messagebroker, TOPIC_DEVICE, event_type, {'device_id': dict_device_id}) return Empty() @safe_and_metered_rpc_method(METRICS, LOGGER) def GetDeviceEvents(self, request: Empty, context : grpc.ServicerContext) -> Iterator[DeviceEvent]: for message in self.messagebroker.consume({TOPIC_DEVICE}, consume_timeout=CONSUME_TIMEOUT): yield DeviceEvent(**json.loads(message.content)) """ # ----- Link ------------------------------------------------------------------------------------------------------- @safe_and_metered_rpc_method(METRICS, LOGGER) def ListLinkIds(self, request: Empty, context : grpc.ServicerContext) -> LinkIdList: with self.lock: db_links : List[LinkModel] = get_all_objects(self.database, LinkModel) db_links = sorted(db_links, key=operator.attrgetter('pk')) return LinkIdList(link_ids=[db_link.dump_id() for db_link in db_links]) @safe_and_metered_rpc_method(METRICS, LOGGER) def ListLinks(self, request: Empty, context : grpc.ServicerContext) -> LinkList: with self.lock: db_links : List[LinkModel] = get_all_objects(self.database, LinkModel) db_links = sorted(db_links, key=operator.attrgetter('pk')) return LinkList(links=[db_link.dump() for db_link in db_links]) @safe_and_metered_rpc_method(METRICS, LOGGER) def GetLink(self, request: LinkId, context : grpc.ServicerContext) -> Link: with self.lock: link_uuid = request.link_uuid.uuid db_link : LinkModel = get_object(self.database, LinkModel, link_uuid) return Link(**db_link.dump()) @safe_and_metered_rpc_method(METRICS, LOGGER) def SetLink(self, request: Link, context : grpc.ServicerContext) -> LinkId: with self.lock: link_uuid = request.link_id.link_uuid.uuid result : Tuple[LinkModel, bool] = update_or_create_object( self.database, LinkModel, link_uuid, {'link_uuid': link_uuid}) db_link, updated = result for endpoint_id in request.link_endpoint_ids: endpoint_uuid = endpoint_id.endpoint_uuid.uuid endpoint_device_uuid = endpoint_id.device_id.device_uuid.uuid endpoint_topology_uuid = endpoint_id.topology_id.topology_uuid.uuid endpoint_topology_context_uuid = endpoint_id.topology_id.context_id.context_uuid.uuid str_endpoint_key = key_to_str([endpoint_device_uuid, endpoint_uuid]) db_topology = None if len(endpoint_topology_context_uuid) > 0 and len(endpoint_topology_uuid) > 0: str_topology_key = key_to_str([endpoint_topology_context_uuid, endpoint_topology_uuid]) db_topology : TopologyModel = get_object(self.database, TopologyModel, str_topology_key) str_topology_device_key = key_to_str([str_topology_key, endpoint_device_uuid], separator='--') # check device is in topology get_object(self.database, TopologyDeviceModel, str_topology_device_key) str_endpoint_key = key_to_str([str_endpoint_key, str_topology_key], separator=':') db_endpoint : EndPointModel = get_object(self.database, EndPointModel, str_endpoint_key) str_link_endpoint_key = key_to_str([link_uuid, endpoint_device_uuid], separator='--') result : Tuple[LinkEndPointModel, bool] = get_or_create_object( self.database, LinkEndPointModel, str_link_endpoint_key, { 'link_fk': db_link, 'endpoint_fk': db_endpoint}) #db_link_endpoint, link_endpoint_created = result if db_topology is not None: str_topology_link_key = key_to_str([str_topology_key, link_uuid], separator='--') result : Tuple[TopologyLinkModel, bool] = get_or_create_object( self.database, TopologyLinkModel, str_topology_link_key, { 'topology_fk': db_topology, 'link_fk': db_link}) #db_topology_link, topology_link_created = result event_type = EventTypeEnum.EVENTTYPE_UPDATE if updated else EventTypeEnum.EVENTTYPE_CREATE dict_link_id = db_link.dump_id() notify_event(self.messagebroker, TOPIC_LINK, event_type, {'link_id': dict_link_id}) return LinkId(**dict_link_id) @safe_and_metered_rpc_method(METRICS, LOGGER) def RemoveLink(self, request: LinkId, context : grpc.ServicerContext) -> Empty: with self.lock: link_uuid = request.link_uuid.uuid db_link = LinkModel(self.database, link_uuid, auto_load=False) found = db_link.load() if not found: return Empty() dict_link_id = db_link.dump_id() db_link.delete() event_type = EventTypeEnum.EVENTTYPE_REMOVE notify_event(self.messagebroker, TOPIC_LINK, event_type, {'link_id': dict_link_id}) return Empty() @safe_and_metered_rpc_method(METRICS, LOGGER) def GetLinkEvents(self, request: Empty, context : grpc.ServicerContext) -> Iterator[LinkEvent]: for message in self.messagebroker.consume({TOPIC_LINK}, consume_timeout=CONSUME_TIMEOUT): yield LinkEvent(**json.loads(message.content)) # ----- Service ---------------------------------------------------------------------------------------------------- @safe_and_metered_rpc_method(METRICS, LOGGER) def ListServiceIds(self, request: ContextId, context : grpc.ServicerContext) -> ServiceIdList: with self.lock: db_context : ContextModel = get_object(self.database, ContextModel, request.context_uuid.uuid) db_services : Set[ServiceModel] = get_related_objects(db_context, ServiceModel) db_services = sorted(db_services, key=operator.attrgetter('pk')) return ServiceIdList(service_ids=[db_service.dump_id() for db_service in db_services]) @safe_and_metered_rpc_method(METRICS, LOGGER) def ListServices(self, request: ContextId, context : grpc.ServicerContext) -> ServiceList: with self.lock: db_context : ContextModel = get_object(self.database, ContextModel, request.context_uuid.uuid) db_services : Set[ServiceModel] = get_related_objects(db_context, ServiceModel) db_services = sorted(db_services, key=operator.attrgetter('pk')) return ServiceList(services=[db_service.dump() for db_service in db_services]) @safe_and_metered_rpc_method(METRICS, LOGGER) def GetService(self, request: ServiceId, context : grpc.ServicerContext) -> Service: with self.lock: str_key = key_to_str([request.context_id.context_uuid.uuid, request.service_uuid.uuid]) db_service : ServiceModel = get_object(self.database, ServiceModel, str_key) return Service(**db_service.dump( include_endpoint_ids=True, include_constraints=True, include_config_rules=True)) @safe_and_metered_rpc_method(METRICS, LOGGER) def SetService(self, request: Service, context : grpc.ServicerContext) -> ServiceId: with self.lock: context_uuid = request.service_id.context_id.context_uuid.uuid db_context : ContextModel = get_object(self.database, ContextModel, context_uuid) for i,endpoint_id in enumerate(request.service_endpoint_ids): endpoint_topology_context_uuid = endpoint_id.topology_id.context_id.context_uuid.uuid if len(endpoint_topology_context_uuid) > 0 and context_uuid != endpoint_topology_context_uuid: raise InvalidArgumentException( 'request.service_endpoint_ids[{:d}].topology_id.context_id.context_uuid.uuid'.format(i), endpoint_topology_context_uuid, ['should be == {:s}({:s})'.format( 'request.service_id.context_id.context_uuid.uuid', context_uuid)]) service_uuid = request.service_id.service_uuid.uuid str_service_key = key_to_str([context_uuid, service_uuid]) constraints_result = set_constraints( self.database, str_service_key, 'constraints', request.service_constraints) db_constraints = constraints_result[0][0] config_rules = grpc_config_rules_to_raw(request.service_config.config_rules) running_config_result = update_config(self.database, str_service_key, 'running', config_rules) db_running_config = running_config_result[0][0] result : Tuple[ServiceModel, bool] = update_or_create_object(self.database, ServiceModel, str_service_key, { 'context_fk' : db_context, 'service_uuid' : service_uuid, 'service_type' : grpc_to_enum__service_type(request.service_type), 'service_constraints_fk': db_constraints, 'service_status' : grpc_to_enum__service_status(request.service_status.service_status), 'service_config_fk' : db_running_config, }) db_service, updated = result for i,endpoint_id in enumerate(request.service_endpoint_ids): endpoint_uuid = endpoint_id.endpoint_uuid.uuid endpoint_device_uuid = endpoint_id.device_id.device_uuid.uuid endpoint_topology_uuid = endpoint_id.topology_id.topology_uuid.uuid endpoint_topology_context_uuid = endpoint_id.topology_id.context_id.context_uuid.uuid str_endpoint_key = key_to_str([endpoint_device_uuid, endpoint_uuid]) if len(endpoint_topology_context_uuid) > 0 and len(endpoint_topology_uuid) > 0: str_topology_key = key_to_str([endpoint_topology_context_uuid, endpoint_topology_uuid]) str_endpoint_key = key_to_str([str_endpoint_key, str_topology_key], separator=':') db_endpoint : EndPointModel = get_object(self.database, EndPointModel, str_endpoint_key) str_service_endpoint_key = key_to_str([service_uuid, str_endpoint_key], separator='--') result : Tuple[ServiceEndPointModel, bool] = get_or_create_object( self.database, ServiceEndPointModel, str_service_endpoint_key, { 'service_fk': db_service, 'endpoint_fk': db_endpoint}) #db_service_endpoint, service_endpoint_created = result event_type = EventTypeEnum.EVENTTYPE_UPDATE if updated else EventTypeEnum.EVENTTYPE_CREATE dict_service_id = db_service.dump_id() notify_event(self.messagebroker, TOPIC_SERVICE, event_type, {'service_id': dict_service_id}) return ServiceId(**dict_service_id) @safe_and_metered_rpc_method(METRICS, LOGGER) def RemoveService(self, request: ServiceId, context : grpc.ServicerContext) -> Empty: with self.lock: context_uuid = request.context_id.context_uuid.uuid service_uuid = request.service_uuid.uuid db_service = ServiceModel(self.database, key_to_str([context_uuid, service_uuid]), auto_load=False) found = db_service.load() if not found: return Empty() dict_service_id = db_service.dump_id() db_service.delete() event_type = EventTypeEnum.EVENTTYPE_REMOVE notify_event(self.messagebroker, TOPIC_SERVICE, event_type, {'service_id': dict_service_id}) return Empty() @safe_and_metered_rpc_method(METRICS, LOGGER) def GetServiceEvents(self, request: Empty, context : grpc.ServicerContext) -> Iterator[ServiceEvent]: for message in self.messagebroker.consume({TOPIC_SERVICE}, consume_timeout=CONSUME_TIMEOUT): yield ServiceEvent(**json.loads(message.content)) # ----- Slice ---------------------------------------------------------------------------------------------------- @safe_and_metered_rpc_method(METRICS, LOGGER) def ListSliceIds(self, request: ContextId, context : grpc.ServicerContext) -> SliceIdList: with self.lock: db_context : ContextModel = get_object(self.database, ContextModel, request.context_uuid.uuid) db_slices : Set[SliceModel] = get_related_objects(db_context, SliceModel) db_slices = sorted(db_slices, key=operator.attrgetter('pk')) return SliceIdList(slice_ids=[db_slice.dump_id() for db_slice in db_slices]) @safe_and_metered_rpc_method(METRICS, LOGGER) def ListSlices(self, request: ContextId, context : grpc.ServicerContext) -> SliceList: with self.lock: db_context : ContextModel = get_object(self.database, ContextModel, request.context_uuid.uuid) db_slices : Set[SliceModel] = get_related_objects(db_context, SliceModel) db_slices = sorted(db_slices, key=operator.attrgetter('pk')) return SliceList(slices=[db_slice.dump() for db_slice in db_slices]) @safe_and_metered_rpc_method(METRICS, LOGGER) def GetSlice(self, request: SliceId, context : grpc.ServicerContext) -> Slice: with self.lock: str_key = key_to_str([request.context_id.context_uuid.uuid, request.slice_uuid.uuid]) db_slice : SliceModel = get_object(self.database, SliceModel, str_key) return Slice(**db_slice.dump( include_endpoint_ids=True, include_constraints=True, include_config_rules=True, include_service_ids=True, include_subslice_ids=True)) @safe_and_metered_rpc_method(METRICS, LOGGER) def SetSlice(self, request: Slice, context : grpc.ServicerContext) -> SliceId: with self.lock: context_uuid = request.slice_id.context_id.context_uuid.uuid db_context : ContextModel = get_object(self.database, ContextModel, context_uuid) for i,endpoint_id in enumerate(request.slice_endpoint_ids): endpoint_topology_context_uuid = endpoint_id.topology_id.context_id.context_uuid.uuid if len(endpoint_topology_context_uuid) > 0 and context_uuid != endpoint_topology_context_uuid: raise InvalidArgumentException( 'request.slice_endpoint_ids[{:d}].topology_id.context_id.context_uuid.uuid'.format(i), endpoint_topology_context_uuid, ['should be == {:s}({:s})'.format( 'request.slice_id.context_id.context_uuid.uuid', context_uuid)]) slice_uuid = request.slice_id.slice_uuid.uuid str_slice_key = key_to_str([context_uuid, slice_uuid]) constraints_result = set_constraints( self.database, str_slice_key, 'constraints', request.slice_constraints) db_constraints = constraints_result[0][0] config_rules = grpc_config_rules_to_raw(request.slice_config.config_rules) running_config_result = update_config(self.database, str_slice_key, 'running', config_rules) db_running_config = running_config_result[0][0] result : Tuple[SliceModel, bool] = update_or_create_object(self.database, SliceModel, str_slice_key, { 'context_fk' : db_context, 'slice_uuid' : slice_uuid, 'slice_constraints_fk': db_constraints, 'slice_status' : grpc_to_enum__slice_status(request.slice_status.slice_status), 'slice_config_fk' : db_running_config, }) db_slice, updated = result for i,endpoint_id in enumerate(request.slice_endpoint_ids): endpoint_uuid = endpoint_id.endpoint_uuid.uuid endpoint_device_uuid = endpoint_id.device_id.device_uuid.uuid endpoint_topology_uuid = endpoint_id.topology_id.topology_uuid.uuid endpoint_topology_context_uuid = endpoint_id.topology_id.context_id.context_uuid.uuid str_endpoint_key = key_to_str([endpoint_device_uuid, endpoint_uuid]) if len(endpoint_topology_context_uuid) > 0 and len(endpoint_topology_uuid) > 0: str_topology_key = key_to_str([endpoint_topology_context_uuid, endpoint_topology_uuid]) str_endpoint_key = key_to_str([str_endpoint_key, str_topology_key], separator=':') db_endpoint : EndPointModel = get_object(self.database, EndPointModel, str_endpoint_key) str_slice_endpoint_key = key_to_str([slice_uuid, str_endpoint_key], separator='--') result : Tuple[SliceEndPointModel, bool] = get_or_create_object( self.database, SliceEndPointModel, str_slice_endpoint_key, { 'slice_fk': db_slice, 'endpoint_fk': db_endpoint}) #db_slice_endpoint, slice_endpoint_created = result for i,service_id in enumerate(request.slice_service_ids): service_uuid = service_id.service_uuid.uuid service_context_uuid = service_id.context_id.context_uuid.uuid str_service_key = key_to_str([service_context_uuid, service_uuid]) db_service : ServiceModel = get_object(self.database, ServiceModel, str_service_key) str_slice_service_key = key_to_str([str_slice_key, str_service_key], separator='--') result : Tuple[SliceServiceModel, bool] = get_or_create_object( self.database, SliceServiceModel, str_slice_service_key, { 'slice_fk': db_slice, 'service_fk': db_service}) #db_slice_service, slice_service_created = result for i,subslice_id in enumerate(request.slice_subslice_ids): subslice_uuid = subslice_id.slice_uuid.uuid subslice_context_uuid = subslice_id.context_id.context_uuid.uuid str_subslice_key = key_to_str([subslice_context_uuid, subslice_uuid]) db_subslice : SliceModel = get_object(self.database, SliceModel, str_subslice_key) str_slice_subslice_key = key_to_str([str_slice_key, str_subslice_key], separator='--') result : Tuple[SliceSubSliceModel, bool] = get_or_create_object( self.database, SliceSubSliceModel, str_slice_subslice_key, { 'slice_fk': db_slice, 'sub_slice_fk': db_subslice}) #db_slice_subslice, slice_subslice_created = result event_type = EventTypeEnum.EVENTTYPE_UPDATE if updated else EventTypeEnum.EVENTTYPE_CREATE dict_slice_id = db_slice.dump_id() notify_event(self.messagebroker, TOPIC_SLICE, event_type, {'slice_id': dict_slice_id}) return SliceId(**dict_slice_id) @safe_and_metered_rpc_method(METRICS, LOGGER) def RemoveSlice(self, request: SliceId, context : grpc.ServicerContext) -> Empty: with self.lock: context_uuid = request.context_id.context_uuid.uuid slice_uuid = request.slice_uuid.uuid db_slice = SliceModel(self.database, key_to_str([context_uuid, slice_uuid]), auto_load=False) found = db_slice.load() if not found: return Empty() dict_slice_id = db_slice.dump_id() db_slice.delete() event_type = EventTypeEnum.EVENTTYPE_REMOVE notify_event(self.messagebroker, TOPIC_SLICE, event_type, {'slice_id': dict_slice_id}) return Empty() @safe_and_metered_rpc_method(METRICS, LOGGER) def GetSliceEvents(self, request: Empty, context : grpc.ServicerContext) -> Iterator[SliceEvent]: for message in self.messagebroker.consume({TOPIC_SLICE}, consume_timeout=CONSUME_TIMEOUT): yield SliceEvent(**json.loads(message.content)) # ----- Connection ------------------------------------------------------------------------------------------------- @safe_and_metered_rpc_method(METRICS, LOGGER) def ListConnectionIds(self, request: ServiceId, context : grpc.ServicerContext) -> ConnectionIdList: with self.lock: str_key = key_to_str([request.context_id.context_uuid.uuid, request.service_uuid.uuid]) db_service : ServiceModel = get_object(self.database, ServiceModel, str_key) db_connections : Set[ConnectionModel] = get_related_objects(db_service, ConnectionModel) db_connections = sorted(db_connections, key=operator.attrgetter('pk')) return ConnectionIdList(connection_ids=[db_connection.dump_id() for db_connection in db_connections]) @safe_and_metered_rpc_method(METRICS, LOGGER) def ListConnections(self, request: ContextId, context : grpc.ServicerContext) -> ServiceList: with self.lock: str_key = key_to_str([request.context_id.context_uuid.uuid, request.service_uuid.uuid]) db_service : ServiceModel = get_object(self.database, ServiceModel, str_key) db_connections : Set[ConnectionModel] = get_related_objects(db_service, ConnectionModel) db_connections = sorted(db_connections, key=operator.attrgetter('pk')) return ConnectionList(connections=[db_connection.dump() for db_connection in db_connections]) @safe_and_metered_rpc_method(METRICS, LOGGER) def GetConnection(self, request: ConnectionId, context : grpc.ServicerContext) -> Connection: with self.lock: db_connection : ConnectionModel = get_object(self.database, ConnectionModel, request.connection_uuid.uuid) return Connection(**db_connection.dump(include_path=True, include_sub_service_ids=True)) @safe_and_metered_rpc_method(METRICS, LOGGER) def SetConnection(self, request: Connection, context : grpc.ServicerContext) -> ConnectionId: with self.lock: connection_uuid = request.connection_id.connection_uuid.uuid connection_attributes = {'connection_uuid': connection_uuid} service_context_uuid = request.service_id.context_id.context_uuid.uuid service_uuid = request.service_id.service_uuid.uuid if len(service_context_uuid) > 0 and len(service_uuid) > 0: str_service_key = key_to_str([service_context_uuid, service_uuid]) db_service : ServiceModel = get_object(self.database, ServiceModel, str_service_key) connection_attributes['service_fk'] = db_service path_hops_result = set_path(self.database, connection_uuid, request.path_hops_endpoint_ids, path_name = '') db_path = path_hops_result[0] connection_attributes['path_fk'] = db_path result : Tuple[ConnectionModel, bool] = update_or_create_object( self.database, ConnectionModel, connection_uuid, connection_attributes) db_connection, updated = result for sub_service_id in request.sub_service_ids: sub_service_uuid = sub_service_id.service_uuid.uuid sub_service_context_uuid = sub_service_id.context_id.context_uuid.uuid str_sub_service_key = key_to_str([sub_service_context_uuid, sub_service_uuid]) db_service : ServiceModel = get_object(self.database, ServiceModel, str_sub_service_key) str_connection_sub_service_key = key_to_str([connection_uuid, str_sub_service_key], separator='--') result : Tuple[ConnectionSubServiceModel, bool] = get_or_create_object( self.database, ConnectionSubServiceModel, str_connection_sub_service_key, { 'connection_fk': db_connection, 'sub_service_fk': db_service}) #db_connection_sub_service, connection_sub_service_created = result event_type = EventTypeEnum.EVENTTYPE_UPDATE if updated else EventTypeEnum.EVENTTYPE_CREATE dict_connection_id = db_connection.dump_id() notify_event(self.messagebroker, TOPIC_CONNECTION, event_type, {'connection_id': dict_connection_id}) return ConnectionId(**dict_connection_id) @safe_and_metered_rpc_method(METRICS, LOGGER) def RemoveConnection(self, request: ConnectionId, context : grpc.ServicerContext) -> Empty: with self.lock: db_connection = ConnectionModel(self.database, request.connection_uuid.uuid, auto_load=False) found = db_connection.load() if not found: return Empty() dict_connection_id = db_connection.dump_id() db_connection.delete() event_type = EventTypeEnum.EVENTTYPE_REMOVE notify_event(self.messagebroker, TOPIC_CONNECTION, event_type, {'connection_id': dict_connection_id}) return Empty() @safe_and_metered_rpc_method(METRICS, LOGGER) def GetConnectionEvents(self, request: Empty, context : grpc.ServicerContext) -> Iterator[ConnectionEvent]: for message in self.messagebroker.consume({TOPIC_CONNECTION}, consume_timeout=CONSUME_TIMEOUT): yield ConnectionEvent(**json.loads(message.content)) """