# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/) # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import grpc, json, logging, operator, sqlalchemy, threading, time, uuid from sqlalchemy.orm import Session, contains_eager, selectinload, sessionmaker #from sqlalchemy.dialects.postgresql import UUID, insert from sqlalchemy_cockroachdb import run_transaction from typing import Dict, Iterator, List, Optional, Set, Tuple, Union from common.message_broker.MessageBroker import MessageBroker #from common.orm.backend.Tools import key_to_str from common.proto.context_pb2 import ( Connection, ConnectionEvent, ConnectionId, ConnectionIdList, ConnectionList, Context, ContextEvent, ContextId, ContextIdList, ContextList, Device, DeviceEvent, DeviceId, DeviceIdList, DeviceList, Empty, EventTypeEnum, Link, LinkEvent, LinkId, LinkIdList, LinkList, Service, ServiceEvent, ServiceId, ServiceIdList, ServiceList, Slice, SliceEvent, SliceId, SliceIdList, SliceList, Topology, TopologyEvent, TopologyId, TopologyIdList, TopologyList, ConfigActionEnum, Constraint) #from common.proto.policy_pb2 import PolicyRuleIdList, PolicyRuleId, PolicyRuleList, PolicyRule from common.proto.context_pb2_grpc import ContextServiceServicer from common.proto.context_policy_pb2_grpc import ContextPolicyServiceServicer from common.tools.object_factory.Context import json_context_id from common.rpc_method_wrapper.Decorator import create_metrics, safe_and_metered_rpc_method from common.rpc_method_wrapper.ServiceExceptions import ( InvalidArgumentException, NotFoundException, OperationFailedException) #from common.tools.grpc.Tools import grpc_message_to_json, grpc_message_to_json_string #from context.service.Database import Database #from context.service.database.ConfigModel import ( # ConfigModel, ORM_ConfigActionEnum, ConfigRuleModel, grpc_config_rules_to_raw, update_config) #from context.service.database.ConnectionModel import ConnectionModel, set_path #from context.service.database.ConstraintModel import ( # ConstraintModel, ConstraintsModel, Union_ConstraintModel, CONSTRAINT_PARSERS, set_constraints) from context.service.database.ContextModel import ContextModel #from context.service.database.DeviceModel import ( # DeviceModel, grpc_to_enum__device_operational_status, set_drivers, grpc_to_enum__device_driver, DriverModel) #from context.service.database.EndPointModel import EndPointModel, KpiSampleTypeModel, set_kpi_sample_types #from context.service.database.Events import notify_event #from context.service.database.KpiSampleType import grpc_to_enum__kpi_sample_type #from context.service.database.LinkModel import LinkModel #from context.service.database.PolicyRuleModel import PolicyRuleModel #from context.service.database.RelationModels import ( # ConnectionSubServiceModel, LinkEndPointModel, ServiceEndPointModel, SliceEndPointModel, SliceServiceModel, # SliceSubSliceModel, TopologyDeviceModel, TopologyLinkModel) #from context.service.database.ServiceModel import ( # ServiceModel, grpc_to_enum__service_status, grpc_to_enum__service_type) #from context.service.database.SliceModel import SliceModel, grpc_to_enum__slice_status from context.service.database.TopologyModel import TopologyModel #from .Constants import ( # CONSUME_TIMEOUT, TOPIC_CONNECTION, TOPIC_CONTEXT, TOPIC_DEVICE, TOPIC_LINK, TOPIC_SERVICE, TOPIC_SLICE, # TOPIC_TOPOLOGY) #from .ChangeFeedClient import ChangeFeedClient LOGGER = logging.getLogger(__name__) SERVICE_NAME = 'Context' METHOD_NAMES = [ 'ListConnectionIds', 'ListConnections', 'GetConnection', 'SetConnection', 'RemoveConnection', 'GetConnectionEvents', 'ListContextIds', 'ListContexts', 'GetContext', 'SetContext', 'RemoveContext', 'GetContextEvents', 'ListTopologyIds', 'ListTopologies', 'GetTopology', 'SetTopology', 'RemoveTopology', 'GetTopologyEvents', 'ListDeviceIds', 'ListDevices', 'GetDevice', 'SetDevice', 'RemoveDevice', 'GetDeviceEvents', 'ListLinkIds', 'ListLinks', 'GetLink', 'SetLink', 'RemoveLink', 'GetLinkEvents', 'ListServiceIds', 'ListServices', 'GetService', 'SetService', 'RemoveService', 'GetServiceEvents', 'ListSliceIds', 'ListSlices', 'GetSlice', 'SetSlice', 'RemoveSlice', 'GetSliceEvents', 'ListPolicyRuleIds', 'ListPolicyRules', 'GetPolicyRule', 'SetPolicyRule', 'RemovePolicyRule', 'UnsetService', 'UnsetSlice', ] METRICS = create_metrics(SERVICE_NAME, METHOD_NAMES) class ContextServiceServicerImpl(ContextServiceServicer, ContextPolicyServiceServicer): def __init__(self, db_engine : sqlalchemy.engine.Engine, messagebroker : MessageBroker) -> None: LOGGER.debug('Creating Servicer...') self.db_engine = db_engine #self.lock = threading.Lock() #session = sessionmaker(bind=db_engine, expire_on_commit=False) #self.session = session #self.database = Database(session) self.messagebroker = messagebroker LOGGER.debug('Servicer Created') # ----- Context ---------------------------------------------------------------------------------------------------- @safe_and_metered_rpc_method(METRICS, LOGGER) def ListContextIds(self, request: Empty, context : grpc.ServicerContext) -> ContextIdList: def callback(session : Session) -> List[Dict]: obj_list : List[ContextModel] = session.query(ContextModel).all() return [obj.dump_id() for obj in obj_list] return ContextIdList(context_ids=run_transaction(sessionmaker(bind=self.db_engine), callback)) @safe_and_metered_rpc_method(METRICS, LOGGER) def ListContexts(self, request: Empty, context : grpc.ServicerContext) -> ContextList: def callback(session : Session) -> List[Dict]: obj_list : List[ContextModel] = session.query(ContextModel).all() return [obj.dump() for obj in obj_list] return ContextList(contexts=run_transaction(sessionmaker(bind=self.db_engine), callback)) @safe_and_metered_rpc_method(METRICS, LOGGER) def GetContext(self, request: ContextId, context : grpc.ServicerContext) -> Context: context_uuid = request.context_uuid.uuid def callback(session : Session) -> Optional[Dict]: obj : Optional[ContextModel] = session\ .query(ContextModel)\ .filter_by(context_uuid=context_uuid)\ .one_or_none() return None if obj is None else obj.dump() obj = run_transaction(sessionmaker(bind=self.db_engine), callback) if obj is None: raise NotFoundException(ContextModel.__name__.replace('Model', ''), context_uuid) return Context(**obj) @safe_and_metered_rpc_method(METRICS, LOGGER) def SetContext(self, request: Context, context : grpc.ServicerContext) -> ContextId: context_uuid = request.context_id.context_uuid.uuid context_name = request.name for i, topology_id in enumerate(request.topology_ids): topology_context_uuid = topology_id.context_id.context_uuid.uuid if topology_context_uuid != context_uuid: raise InvalidArgumentException( 'request.topology_ids[{:d}].context_id.context_uuid.uuid'.format(i), topology_context_uuid, ['should be == {:s}({:s})'.format('request.context_id.context_uuid.uuid', context_uuid)]) for i, service_id in enumerate(request.service_ids): service_context_uuid = service_id.context_id.context_uuid.uuid if service_context_uuid != context_uuid: raise InvalidArgumentException( 'request.service_ids[{:d}].context_id.context_uuid.uuid'.format(i), service_context_uuid, ['should be == {:s}({:s})'.format('request.context_id.context_uuid.uuid', context_uuid)]) for i, slice_id in enumerate(request.slice_ids): slice_context_uuid = slice_id.context_id.context_uuid.uuid if slice_context_uuid != context_uuid: raise InvalidArgumentException( 'request.slice_ids[{:d}].context_id.context_uuid.uuid'.format(i), slice_context_uuid, ['should be == {:s}({:s})'.format('request.context_id.context_uuid.uuid', context_uuid)]) def callback(session : Session) -> Tuple[Optional[Dict], bool]: obj : Optional[ContextModel] = \ session.query(ContextModel).with_for_update().filter_by(context_uuid=context_uuid).one_or_none() is_update = obj is not None if is_update: obj.context_name = context_name session.merge(obj) else: session.add(ContextModel(context_uuid=context_uuid, context_name=context_name, created_at=time.time())) obj = session.get(ContextModel, {'context_uuid': context_uuid}) return (None if obj is None else obj.dump_id()), is_update obj_id,updated = run_transaction(sessionmaker(bind=self.db_engine), callback) if obj_id is None: raise NotFoundException(ContextModel.__name__.replace('Model', ''), context_uuid) #event_type = EventTypeEnum.EVENTTYPE_UPDATE if updated else EventTypeEnum.EVENTTYPE_CREATE #notify_event(self.messagebroker, TOPIC_CONTEXT, event_type, {'context_id': obj_id}) return ContextId(**obj_id) @safe_and_metered_rpc_method(METRICS, LOGGER) def RemoveContext(self, request: ContextId, context : grpc.ServicerContext) -> Empty: context_uuid = request.context_uuid.uuid def callback(session : Session) -> bool: num_deleted = session.query(ContextModel).filter_by(context_uuid=context_uuid).delete() return num_deleted > 0 deleted = run_transaction(sessionmaker(bind=self.db_engine), callback) #if deleted: # notify_event(self.messagebroker, TOPIC_CONTEXT, EventTypeEnum.EVENTTYPE_REMOVE, {'context_id': request}) return Empty() @safe_and_metered_rpc_method(METRICS, LOGGER) def GetContextEvents(self, request: Empty, context : grpc.ServicerContext) -> Iterator[ContextEvent]: pass #for message in self.messagebroker.consume({TOPIC_CONTEXT}, consume_timeout=CONSUME_TIMEOUT): # yield ContextEvent(**json.loads(message.content)) #cf = ChangeFeedClient() #ready = cf.initialize() #if not ready: raise OperationFailedException('Initialize ChangeFeed') #for timestamp, _, primary_key, is_delete, after in cf.get_changes('context'): # if is_delete: # event_type = EventTypeEnum.EVENTTYPE_REMOVE # else: # is_create = (timestamp - after.get('created_at')) < 1.0 # event_type = EventTypeEnum.EVENTTYPE_CREATE if is_create else EventTypeEnum.EVENTTYPE_UPDATE # event = { # 'event': {'timestamp': {'timestamp': timestamp}, 'event_type': event_type}, # 'context_id': json_context_id(primary_key[0]), # } # yield ContextEvent(**event) # ----- Topology --------------------------------------------------------------------------------------------------- @safe_and_metered_rpc_method(METRICS, LOGGER) def ListTopologyIds(self, request: ContextId, context : grpc.ServicerContext) -> TopologyIdList: context_uuid = request.context_uuid.uuid def callback(session : Session) -> List[Dict]: obj_list : List[TopologyModel] = session.query(TopologyModel).filter_by(context_uuid=context_uuid).all() #.options(selectinload(ContextModel.topology)).filter_by(context_uuid=context_uuid).one_or_none() return [obj.dump_id() for obj in obj_list] #with self.session() as session: # result = session.query(ContextModel).options(selectinload(ContextModel.topology)).filter_by(context_uuid=context_uuid).one_or_none() # if not result: # raise NotFoundException(ContextModel.__name__.replace('Model', ''), context_uuid) # db_topologies = result.topology return TopologyIdList(topology_ids=run_transaction(sessionmaker(bind=self.db_engine), callback)) @safe_and_metered_rpc_method(METRICS, LOGGER) def ListTopologies(self, request: ContextId, context : grpc.ServicerContext) -> TopologyList: context_uuid = request.context_uuid.uuid def callback(session : Session) -> List[Dict]: obj_list : List[TopologyModel] = session.query(TopologyModel).filter_by(context_uuid=context_uuid).all() #.options(selectinload(ContextModel.topology)).filter_by(context_uuid=context_uuid).one_or_none() return [obj.dump() for obj in obj_list] #with self.session() as session: # result = session.query(ContextModel).options(selectinload(ContextModel.topology)).filter_by( # context_uuid=context_uuid).one_or_none() # if not result: # raise NotFoundException(ContextModel.__name__.replace('Model', ''), context_uuid) # db_topologies = result.topology return TopologyList(topologies=run_transaction(sessionmaker(bind=self.db_engine), callback)) @safe_and_metered_rpc_method(METRICS, LOGGER) def GetTopology(self, request: TopologyId, context : grpc.ServicerContext) -> Topology: context_uuid = request.context_id.context_uuid.uuid topology_uuid = request.topology_uuid.uuid def callback(session : Session) -> Optional[Dict]: obj : Optional[TopologyModel] = session\ .query(TopologyModel)\ .filter_by(context_uuid=context_uuid, topology_uuid=topology_uuid)\ .one_or_none() return None if obj is None else obj.dump() obj = run_transaction(sessionmaker(bind=self.db_engine), callback) if obj is None: raise NotFoundException(TopologyModel.__name__.replace('Model', ''), context_uuid) return Topology(**obj) # result, dump = self.database.get_object(TopologyModel, topology_uuid, True) # with self.session() as session: # devs = None # links = None # # filt = {'topology_uuid': topology_uuid} # topology_devices = session.query(TopologyDeviceModel).filter_by(**filt).all() # if topology_devices: # devs = [] # for td in topology_devices: # filt = {'device_uuid': td.device_uuid} # devs.append(session.query(DeviceModel).filter_by(**filt).one()) # # filt = {'topology_uuid': topology_uuid} # topology_links = session.query(TopologyLinkModel).filter_by(**filt).all() # if topology_links: # links = [] # for tl in topology_links: # filt = {'link_uuid': tl.link_uuid} # links.append(session.query(LinkModel).filter_by(**filt).one()) # # return Topology(**result.dump(devs, links)) # @safe_and_metered_rpc_method(METRICS, LOGGER) # def SetTopology(self, request: Topology, context : grpc.ServicerContext) -> TopologyId: # context_uuid = request.topology_id.context_id.context_uuid.uuid # topology_uuid = request.topology_id.topology_uuid.uuid # with self.session() as session: # topology_add = TopologyModel(topology_uuid=topology_uuid, context_uuid=context_uuid) # updated = True # db_topology = session.query(TopologyModel).join(TopologyModel.context).filter(TopologyModel.topology_uuid==topology_uuid).one_or_none() # if not db_topology: # updated = False # session.merge(topology_add) # session.commit() # db_topology = session.query(TopologyModel).join(TopologyModel.context).filter(TopologyModel.topology_uuid==topology_uuid).one_or_none() # # for device_id in request.device_ids: # device_uuid = device_id.device_uuid.uuid # td = TopologyDeviceModel(topology_uuid=topology_uuid, device_uuid=device_uuid) # result: Tuple[TopologyDeviceModel, bool] = self.database.create_or_update(td) # # # for link_id in request.link_ids: # link_uuid = link_id.link_uuid.uuid # db_link = session.query(LinkModel).filter( # LinkModel.link_uuid == link_uuid).one_or_none() # tl = TopologyLinkModel(topology_uuid=topology_uuid, link_uuid=link_uuid) # result: Tuple[TopologyDeviceModel, bool] = self.database.create_or_update(tl) # # # # event_type = EventTypeEnum.EVENTTYPE_UPDATE if updated else EventTypeEnum.EVENTTYPE_CREATE # dict_topology_id = db_topology.dump_id() # notify_event(self.messagebroker, TOPIC_TOPOLOGY, event_type, {'topology_id': dict_topology_id}) # return TopologyId(**dict_topology_id) # @safe_and_metered_rpc_method(METRICS, LOGGER) # def RemoveTopology(self, request: TopologyId, context : grpc.ServicerContext) -> Empty: # context_uuid = request.context_id.context_uuid.uuid # topology_uuid = request.topology_uuid.uuid # # with self.session() as session: # result = session.query(TopologyModel).filter_by(topology_uuid=topology_uuid, context_uuid=context_uuid).one_or_none() # if not result: # return Empty() # dict_topology_id = result.dump_id() # # session.query(TopologyModel).filter_by(topology_uuid=topology_uuid, context_uuid=context_uuid).delete() # session.commit() # event_type = EventTypeEnum.EVENTTYPE_REMOVE # notify_event(self.messagebroker, TOPIC_TOPOLOGY, event_type, {'topology_id': dict_topology_id}) # return Empty() # @safe_and_metered_rpc_method(METRICS, LOGGER) # def GetTopologyEvents(self, request: Empty, context : grpc.ServicerContext) -> Iterator[TopologyEvent]: # for message in self.messagebroker.consume({TOPIC_TOPOLOGY}, consume_timeout=CONSUME_TIMEOUT): # yield TopologyEvent(**json.loads(message.content)) # # ----- Device ----------------------------------------------------------------------------------------------------- # # @safe_and_metered_rpc_method(METRICS, LOGGER) # def ListDeviceIds(self, request: Empty, context : grpc.ServicerContext) -> DeviceIdList: # with self.session() as session: # result = session.query(DeviceModel).all() # return DeviceIdList(device_ids=[device.dump_id() for device in result]) # # @safe_and_metered_rpc_method(METRICS, LOGGER) # def ListDevices(self, request: Empty, context : grpc.ServicerContext) -> DeviceList: # with self.session() as session: # result = session.query(DeviceModel).all() # return DeviceList(devices=[device.dump() for device in result]) # # @safe_and_metered_rpc_method(METRICS, LOGGER) # def GetDevice(self, request: DeviceId, context : grpc.ServicerContext) -> Device: # device_uuid = request.device_uuid.uuid # with self.session() as session: # result = session.query(DeviceModel).filter(DeviceModel.device_uuid == device_uuid).one_or_none() # if not result: # raise NotFoundException(DeviceModel.__name__.replace('Model', ''), device_uuid) # # rd = result.dump(include_config_rules=True, include_drivers=True, include_endpoints=True) # # rt = Device(**rd) # # return rt # # @safe_and_metered_rpc_method(METRICS, LOGGER) # def SetDevice(self, request: Device, context : grpc.ServicerContext) -> DeviceId: # with self.session() as session: # device_uuid = request.device_id.device_uuid.uuid # # for i, endpoint in enumerate(request.device_endpoints): # endpoint_device_uuid = endpoint.endpoint_id.device_id.device_uuid.uuid # if len(endpoint_device_uuid) == 0: # endpoint_device_uuid = device_uuid # if device_uuid != endpoint_device_uuid: # raise InvalidArgumentException( # 'request.device_endpoints[{:d}].device_id.device_uuid.uuid'.format(i), endpoint_device_uuid, # ['should be == {:s}({:s})'.format('request.device_id.device_uuid.uuid', device_uuid)]) # # config_rules = grpc_config_rules_to_raw(request.device_config.config_rules) # running_config_result = self.update_config(session, device_uuid, 'device', config_rules) # db_running_config = running_config_result[0][0] # config_uuid = db_running_config.config_uuid # running_config_rules = update_config( # self.database, device_uuid, 'device', request.device_config.config_rules) # db_running_config = running_config_rules[0][0] # # new_obj = DeviceModel(**{ # 'device_uuid' : device_uuid, # 'device_type' : request.device_type, # 'device_operational_status' : grpc_to_enum__device_operational_status(request.device_operational_status), # 'device_config_uuid' : config_uuid, # }) # result: Tuple[DeviceModel, bool] = self.database.create_or_update(new_obj) # db_device, updated = result # # self.set_drivers(db_device, request.device_drivers) # # for i, endpoint in enumerate(request.device_endpoints): # endpoint_uuid = endpoint.endpoint_id.endpoint_uuid.uuid # # endpoint_device_uuid = endpoint.endpoint_id.device_id.device_uuid.uuid # # if len(endpoint_device_uuid) == 0: # # endpoint_device_uuid = device_uuid # # endpoint_attributes = { # 'device_uuid' : db_device.device_uuid, # 'endpoint_uuid': endpoint_uuid, # 'endpoint_type': endpoint.endpoint_type, # } # # endpoint_topology_context_uuid = endpoint.endpoint_id.topology_id.context_id.context_uuid.uuid # endpoint_topology_uuid = endpoint.endpoint_id.topology_id.topology_uuid.uuid # if len(endpoint_topology_context_uuid) > 0 and len(endpoint_topology_uuid) > 0: # # str_topology_key = key_to_str([endpoint_topology_context_uuid, endpoint_topology_uuid]) # # db_topology, topo_dump = self.database.get_object(TopologyModel, endpoint_topology_uuid) # # topology_device = TopologyDeviceModel( # topology_uuid=endpoint_topology_uuid, # device_uuid=db_device.device_uuid) # self.database.create_or_update(topology_device) # # endpoint_attributes['topology_uuid'] = db_topology.topology_uuid # result : Tuple[EndPointModel, bool] = update_or_create_object( # self.database, EndPointModel, str_endpoint_key, endpoint_attributes) # db_endpoint, endpoint_updated = result # pylint: disable=unused-variable # # new_endpoint = EndPointModel(**endpoint_attributes) # result: Tuple[EndPointModel, bool] = self.database.create_or_update(new_endpoint) # db_endpoint, updated = result # # self.set_kpi_sample_types(db_endpoint, endpoint.kpi_sample_types) # # # event_type = EventTypeEnum.EVENTTYPE_UPDATE if updated else EventTypeEnum.EVENTTYPE_CREATE # dict_device_id = db_device.dump_id() # # notify_event(self.messagebroker, TOPIC_DEVICE, event_type, {'device_id': dict_device_id}) # # return DeviceId(**dict_device_id) # # def set_kpi_sample_types(self, db_endpoint: EndPointModel, grpc_endpoint_kpi_sample_types): # db_endpoint_pk = db_endpoint.endpoint_uuid # for kpi_sample_type in grpc_endpoint_kpi_sample_types: # orm_kpi_sample_type = grpc_to_enum__kpi_sample_type(kpi_sample_type) # # str_endpoint_kpi_sample_type_key = key_to_str([db_endpoint_pk, orm_kpi_sample_type.name]) # data = {'endpoint_uuid': db_endpoint_pk, # 'kpi_sample_type': orm_kpi_sample_type.name, # 'kpi_uuid': str(uuid.uuid4())} # db_endpoint_kpi_sample_type = KpiSampleTypeModel(**data) # self.database.create(db_endpoint_kpi_sample_type) # # def set_drivers(self, db_device: DeviceModel, grpc_device_drivers): # db_device_pk = db_device.device_uuid # for driver in grpc_device_drivers: # orm_driver = grpc_to_enum__device_driver(driver) # str_device_driver_key = key_to_str([db_device_pk, orm_driver.name]) # driver_config = { # # "driver_uuid": str(uuid.uuid4()), # "device_uuid": db_device_pk, # "driver": orm_driver.name # } # db_device_driver = DriverModel(**driver_config) # db_device_driver.device_fk = db_device # db_device_driver.driver = orm_driver # # self.database.create_or_update(db_device_driver) # # def update_config( # self, session, db_parent_pk: str, config_name: str, # raw_config_rules: List[Tuple[ORM_ConfigActionEnum, str, str]] # ) -> List[Tuple[Union[ConfigModel, ConfigRuleModel], bool]]: # # created = False # # db_config = session.query(ConfigModel).filter_by(**{ConfigModel.main_pk_name(): db_parent_pk}).one_or_none() # if not db_config: # db_config = ConfigModel() # setattr(db_config, ConfigModel.main_pk_name(), db_parent_pk) # session.add(db_config) # session.commit() # created = True # # LOGGER.info('UPDATED-CONFIG: {}'.format(db_config.dump())) # # db_objects: List[Tuple[Union[ConfigModel, ConfigRuleModel], bool]] = [(db_config, created)] # # for position, (action, resource_key, resource_value) in enumerate(raw_config_rules): # if action == ORM_ConfigActionEnum.SET: # result : Tuple[ConfigRuleModel, bool] = self.set_config_rule( # db_config, position, resource_key, resource_value) # db_config_rule, updated = result # db_objects.append((db_config_rule, updated)) # elif action == ORM_ConfigActionEnum.DELETE: # self.delete_config_rule(db_config, resource_key) # else: # msg = 'Unsupported action({:s}) for resource_key({:s})/resource_value({:s})' # raise AttributeError( # msg.format(str(ConfigActionEnum.Name(action)), str(resource_key), str(resource_value))) # # return db_objects # # def set_config_rule(self, db_config: ConfigModel, position: int, resource_key: str, resource_value: str, # ): # -> Tuple[ConfigRuleModel, bool]: # # from src.context.service.database.Tools import fast_hasher # str_rule_key_hash = fast_hasher(resource_key) # str_config_rule_key = key_to_str([db_config.config_uuid, str_rule_key_hash], separator=':') # pk = str(uuid.uuid5(uuid.UUID('9566448d-e950-425e-b2ae-7ead656c7e47'), str_config_rule_key)) # data = {'config_rule_uuid': pk, 'config_uuid': db_config.config_uuid, 'position': position, # 'action': ORM_ConfigActionEnum.SET, 'key': resource_key, 'value': resource_value} # to_add = ConfigRuleModel(**data) # # result, updated = self.database.create_or_update(to_add) # return result, updated # # def delete_config_rule( # self, db_config: ConfigModel, resource_key: str # ) -> None: # # from src.context.service.database.Tools import fast_hasher # str_rule_key_hash = fast_hasher(resource_key) # str_config_rule_key = key_to_str([db_config.pk, str_rule_key_hash], separator=':') # # db_config_rule = self.database.get_object(ConfigRuleModel, str_config_rule_key, raise_if_not_found=False) # # if db_config_rule is None: # return # db_config_rule.delete() # # def delete_all_config_rules(self, db_config: ConfigModel) -> None: # # db_config_rule_pks = db_config.references(ConfigRuleModel) # for pk, _ in db_config_rule_pks: ConfigRuleModel(self.database, pk).delete() # # """ # for position, (action, resource_key, resource_value) in enumerate(raw_config_rules): # if action == ORM_ConfigActionEnum.SET: # result: Tuple[ConfigRuleModel, bool] = set_config_rule( # database, db_config, position, resource_key, resource_value) # db_config_rule, updated = result # db_objects.append((db_config_rule, updated)) # elif action == ORM_ConfigActionEnum.DELETE: # delete_config_rule(database, db_config, resource_key) # else: # msg = 'Unsupported action({:s}) for resource_key({:s})/resource_value({:s})' # raise AttributeError( # msg.format(str(ConfigActionEnum.Name(action)), str(resource_key), str(resource_value))) # # return db_objects # """ # # @safe_and_metered_rpc_method(METRICS, LOGGER) # def RemoveDevice(self, request: DeviceId, context : grpc.ServicerContext) -> Empty: # device_uuid = request.device_uuid.uuid # # with self.session() as session: # db_device = session.query(DeviceModel).filter_by(device_uuid=device_uuid).one_or_none() # # session.query(TopologyDeviceModel).filter_by(device_uuid=device_uuid).delete() # session.query(ConfigRuleModel).filter_by(config_uuid=db_device.device_config_uuid).delete() # session.query(ConfigModel).filter_by(config_uuid=db_device.device_config_uuid).delete() # # if not db_device: # return Empty() # dict_device_id = db_device.dump_id() # # session.query(DeviceModel).filter_by(device_uuid=device_uuid).delete() # session.commit() # event_type = EventTypeEnum.EVENTTYPE_REMOVE # notify_event(self.messagebroker, TOPIC_DEVICE, event_type, {'device_id': dict_device_id}) # return Empty() # ## @safe_and_metered_rpc_method(METRICS, LOGGER) ## def GetDeviceEvents(self, request: Empty, context : grpc.ServicerContext) -> Iterator[DeviceEvent]: ## for message in self.messagebroker.consume({TOPIC_DEVICE}, consume_timeout=CONSUME_TIMEOUT): ## yield DeviceEvent(**json.loads(message.content)) # # # # # # ----- Link ------------------------------------------------------------------------------------------------------- # # @safe_and_metered_rpc_method(METRICS, LOGGER) # def ListLinkIds(self, request: Empty, context : grpc.ServicerContext) -> LinkIdList: # with self.session() as session: # result = session.query(LinkModel).all() # return LinkIdList(link_ids=[db_link.dump_id() for db_link in result]) # # # @safe_and_metered_rpc_method(METRICS, LOGGER) # def ListLinks(self, request: Empty, context : grpc.ServicerContext) -> LinkList: # with self.session() as session: # link_list = LinkList() # # db_links = session.query(LinkModel).all() # # for db_link in db_links: # link_uuid = db_link.link_uuid # filt = {'link_uuid': link_uuid} # link_endpoints = session.query(LinkEndPointModel).filter_by(**filt).all() # if link_endpoints: # eps = [] # for lep in link_endpoints: # filt = {'endpoint_uuid': lep.endpoint_uuid} # eps.append(session.query(EndPointModel).filter_by(**filt).one()) # link_list.links.append(Link(**db_link.dump(eps))) # # return link_list # # @safe_and_metered_rpc_method(METRICS, LOGGER) # def GetLink(self, request: LinkId, context : grpc.ServicerContext) -> Link: # link_uuid = request.link_uuid.uuid # with self.session() as session: # result = session.query(LinkModel).filter(LinkModel.link_uuid == link_uuid).one_or_none() # if not result: # raise NotFoundException(LinkModel.__name__.replace('Model', ''), link_uuid) # # filt = {'link_uuid': link_uuid} # link_endpoints = session.query(LinkEndPointModel).filter_by(**filt).all() # if link_endpoints: # eps = [] # for lep in link_endpoints: # filt = {'endpoint_uuid': lep.endpoint_uuid} # eps.append(session.query(EndPointModel).filter_by(**filt).one()) # return Link(**result.dump(eps)) # # rd = result.dump() # rt = Link(**rd) # # return rt # # # # @safe_and_metered_rpc_method(METRICS, LOGGER) # def SetLink(self, request: Link, context : grpc.ServicerContext) -> LinkId: # link_uuid = request.link_id.link_uuid.uuid # # new_link = LinkModel(**{ # 'link_uuid': link_uuid # }) # result: Tuple[LinkModel, bool] = self.database.create_or_update(new_link) # db_link, updated = result # # for endpoint_id in request.link_endpoint_ids: # endpoint_uuid = endpoint_id.endpoint_uuid.uuid # endpoint_device_uuid = endpoint_id.device_id.device_uuid.uuid # endpoint_topology_uuid = endpoint_id.topology_id.topology_uuid.uuid # endpoint_topology_context_uuid = endpoint_id.topology_id.context_id.context_uuid.uuid # # # db_topology = None # if len(endpoint_topology_context_uuid) > 0 and len(endpoint_topology_uuid) > 0: # db_topology: TopologyModel = self.database.get_object(TopologyModel, endpoint_topology_uuid) # # check device is in topology # self.database.get_object(TopologyDeviceModel, endpoint_device_uuid) # # # link_endpoint = LinkEndPointModel(link_uuid=link_uuid, endpoint_uuid=endpoint_uuid) # result: Tuple[LinkEndPointModel, bool] = self.database.create_or_update(link_endpoint) # # if db_topology is not None: # topology_link = TopologyLinkModel(topology_uuid=endpoint_topology_uuid, link_uuid=link_uuid) # result: Tuple[TopologyLinkModel, bool] = self.database.create_or_update(topology_link) # # event_type = EventTypeEnum.EVENTTYPE_UPDATE if updated else EventTypeEnum.EVENTTYPE_CREATE # dict_link_id = db_link.dump_id() # notify_event(self.messagebroker, TOPIC_LINK, event_type, {'link_id': dict_link_id}) # return LinkId(**dict_link_id) # # @safe_and_metered_rpc_method(METRICS, LOGGER) # def RemoveLink(self, request: LinkId, context : grpc.ServicerContext) -> Empty: # with self.session() as session: # link_uuid = request.link_uuid.uuid # # session.query(TopologyLinkModel).filter_by(link_uuid=link_uuid).delete() # session.query(LinkEndPointModel).filter_by(link_uuid=link_uuid).delete() # # result = session.query(LinkModel).filter_by(link_uuid=link_uuid).one_or_none() # if not result: # return Empty() # dict_link_id = result.dump_id() # # session.query(LinkModel).filter_by(link_uuid=link_uuid).delete() # session.commit() # event_type = EventTypeEnum.EVENTTYPE_REMOVE # notify_event(self.messagebroker, TOPIC_LINK, event_type, {'link_id': dict_link_id}) # return Empty() # ## @safe_and_metered_rpc_method(METRICS, LOGGER) ## def GetLinkEvents(self, request: Empty, context : grpc.ServicerContext) -> Iterator[LinkEvent]: ## for message in self.messagebroker.consume({TOPIC_LINK}, consume_timeout=CONSUME_TIMEOUT): ## yield LinkEvent(**json.loads(message.content)) # # # # ----- Service ---------------------------------------------------------------------------------------------------- # # @safe_and_metered_rpc_method(METRICS, LOGGER) # def ListServiceIds(self, request: ContextId, context : grpc.ServicerContext) -> ServiceIdList: # context_uuid = request.context_uuid.uuid # # with self.session() as session: # db_services = session.query(ServiceModel).filter_by(context_uuid=context_uuid).all() # return ServiceIdList(service_ids=[db_service.dump_id() for db_service in db_services]) # # @safe_and_metered_rpc_method(METRICS, LOGGER) # def ListServices(self, request: ContextId, context : grpc.ServicerContext) -> ServiceList: # context_uuid = request.context_uuid.uuid # # with self.session() as session: # db_services = session.query(ServiceModel).filter_by(context_uuid=context_uuid).all() # return ServiceList(services=[db_service.dump() for db_service in db_services]) # # # # @safe_and_metered_rpc_method(METRICS, LOGGER) # def GetService(self, request: ServiceId, context : grpc.ServicerContext) -> Service: # service_uuid = request.service_uuid.uuid # with self.session() as session: # result = session.query(ServiceModel).filter_by(service_uuid=service_uuid).one_or_none() # # if not result: # raise NotFoundException(ServiceModel.__name__.replace('Model', ''), service_uuid) # # return Service(**result.dump()) # # def set_constraint(self, db_constraints: ConstraintsModel, grpc_constraint: Constraint, position: int # ) -> Tuple[Union_ConstraintModel, bool]: # with self.session() as session: # # grpc_constraint_kind = str(grpc_constraint.WhichOneof('constraint')) # # parser = CONSTRAINT_PARSERS.get(grpc_constraint_kind) # if parser is None: # raise NotImplementedError('Constraint of kind {:s} is not implemented: {:s}'.format( # grpc_constraint_kind, grpc_message_to_json_string(grpc_constraint))) # # # create specific constraint # constraint_class, str_constraint_id, constraint_data, constraint_kind = parser(grpc_constraint) # str_constraint_id = str(uuid.uuid4()) # LOGGER.info('str_constraint_id: {}'.format(str_constraint_id)) # # str_constraint_key_hash = fast_hasher(':'.join([constraint_kind.value, str_constraint_id])) # # str_constraint_key = key_to_str([db_constraints.pk, str_constraint_key_hash], separator=':') # # # result : Tuple[Union_ConstraintModel, bool] = update_or_create_object( # # database, constraint_class, str_constraint_key, constraint_data) # constraint_data[constraint_class.main_pk_name()] = str_constraint_id # db_new_constraint = constraint_class(**constraint_data) # result: Tuple[Union_ConstraintModel, bool] = self.database.create_or_update(db_new_constraint) # db_specific_constraint, updated = result # # # create generic constraint # # constraint_fk_field_name = 'constraint_uuid'.format(constraint_kind.value) # constraint_data = { # 'constraints_uuid': db_constraints.constraints_uuid, 'position': position, 'kind': constraint_kind # } # # db_new_constraint = ConstraintModel(**constraint_data) # result: Tuple[Union_ConstraintModel, bool] = self.database.create_or_update(db_new_constraint) # db_constraint, updated = result # # return db_constraint, updated # # def set_constraints(self, service_uuid: str, constraints_name : str, grpc_constraints # ) -> List[Tuple[Union[ConstraintsModel, ConstraintModel], bool]]: # with self.session() as session: # # str_constraints_key = key_to_str([db_parent_pk, constraints_name], separator=':') # # result : Tuple[ConstraintsModel, bool] = get_or_create_object(database, ConstraintsModel, str_constraints_key) # result = session.query(ConstraintsModel).filter_by(constraints_uuid=service_uuid).one_or_none() # created = None # if result: # created = True # session.query(ConstraintsModel).filter_by(constraints_uuid=service_uuid).one_or_none() # db_constraints = ConstraintsModel(constraints_uuid=service_uuid) # session.add(db_constraints) # # db_objects = [(db_constraints, created)] # # for position,grpc_constraint in enumerate(grpc_constraints): # result : Tuple[ConstraintModel, bool] = self.set_constraint( # db_constraints, grpc_constraint, position) # db_constraint, updated = result # db_objects.append((db_constraint, updated)) # # return db_objects # # @safe_and_metered_rpc_method(METRICS, LOGGER) # def SetService(self, request: Service, context : grpc.ServicerContext) -> ServiceId: # with self.lock: # with self.session() as session: # # context_uuid = request.service_id.context_id.context_uuid.uuid # # db_context : ContextModel = get_object(self.database, ContextModel, context_uuid) # db_context = session.query(ContextModel).filter_by(context_uuid=context_uuid).one_or_none() # # for i,endpoint_id in enumerate(request.service_endpoint_ids): # endpoint_topology_context_uuid = endpoint_id.topology_id.context_id.context_uuid.uuid # if len(endpoint_topology_context_uuid) > 0 and context_uuid != endpoint_topology_context_uuid: # raise InvalidArgumentException( # 'request.service_endpoint_ids[{:d}].topology_id.context_id.context_uuid.uuid'.format(i), # endpoint_topology_context_uuid, # ['should be == {:s}({:s})'.format( # 'request.service_id.context_id.context_uuid.uuid', context_uuid)]) # # service_uuid = request.service_id.service_uuid.uuid # # str_service_key = key_to_str([context_uuid, service_uuid]) # # constraints_result = self.set_constraints(service_uuid, 'constraints', request.service_constraints) # db_constraints = constraints_result[0][0] # # config_rules = grpc_config_rules_to_raw(request.service_config.config_rules) # running_config_result = update_config(self.database, str_service_key, 'running', config_rules) # db_running_config = running_config_result[0][0] # # result : Tuple[ServiceModel, bool] = update_or_create_object(self.database, ServiceModel, str_service_key, { # 'context_fk' : db_context, # 'service_uuid' : service_uuid, # 'service_type' : grpc_to_enum__service_type(request.service_type), # 'service_constraints_fk': db_constraints, # 'service_status' : grpc_to_enum__service_status(request.service_status.service_status), # 'service_config_fk' : db_running_config, # }) # db_service, updated = result # # for i,endpoint_id in enumerate(request.service_endpoint_ids): # endpoint_uuid = endpoint_id.endpoint_uuid.uuid # endpoint_device_uuid = endpoint_id.device_id.device_uuid.uuid # endpoint_topology_uuid = endpoint_id.topology_id.topology_uuid.uuid # endpoint_topology_context_uuid = endpoint_id.topology_id.context_id.context_uuid.uuid # # str_endpoint_key = key_to_str([endpoint_device_uuid, endpoint_uuid]) # if len(endpoint_topology_context_uuid) > 0 and len(endpoint_topology_uuid) > 0: # str_topology_key = key_to_str([endpoint_topology_context_uuid, endpoint_topology_uuid]) # str_endpoint_key = key_to_str([str_endpoint_key, str_topology_key], separator=':') # # db_endpoint : EndPointModel = get_object(self.database, EndPointModel, str_endpoint_key) # # str_service_endpoint_key = key_to_str([service_uuid, str_endpoint_key], separator='--') # result : Tuple[ServiceEndPointModel, bool] = get_or_create_object( # self.database, ServiceEndPointModel, str_service_endpoint_key, { # 'service_fk': db_service, 'endpoint_fk': db_endpoint}) # #db_service_endpoint, service_endpoint_created = result # # event_type = EventTypeEnum.EVENTTYPE_UPDATE if updated else EventTypeEnum.EVENTTYPE_CREATE # dict_service_id = db_service.dump_id() # notify_event(self.messagebroker, TOPIC_SERVICE, event_type, {'service_id': dict_service_id}) # return ServiceId(**dict_service_id) # context_uuid = request.service_id.context_id.context_uuid.uuid # db_context : ContextModel = get_object(self.database, ContextModel, context_uuid) # # for i,endpoint_id in enumerate(request.service_endpoint_ids): # endpoint_topology_context_uuid = endpoint_id.topology_id.context_id.context_uuid.uuid # if len(endpoint_topology_context_uuid) > 0 and context_uuid != endpoint_topology_context_uuid: # raise InvalidArgumentException( # 'request.service_endpoint_ids[{:d}].topology_id.context_id.context_uuid.uuid'.format(i), # endpoint_topology_context_uuid, # ['should be == {:s}({:s})'.format( # 'request.service_id.context_id.context_uuid.uuid', context_uuid)]) # # service_uuid = request.service_id.service_uuid.uuid # str_service_key = key_to_str([context_uuid, service_uuid]) # # constraints_result = set_constraints( # self.database, str_service_key, 'service', request.service_constraints) # db_constraints = constraints_result[0][0] # # running_config_rules = update_config( # self.database, str_service_key, 'service', request.service_config.config_rules) # db_running_config = running_config_rules[0][0] # # result : Tuple[ServiceModel, bool] = update_or_create_object(self.database, ServiceModel, str_service_key, { # 'context_fk' : db_context, # 'service_uuid' : service_uuid, # 'service_type' : grpc_to_enum__service_type(request.service_type), # 'service_constraints_fk': db_constraints, # 'service_status' : grpc_to_enum__service_status(request.service_status.service_status), # 'service_config_fk' : db_running_config, # }) # db_service, updated = result # # for i,endpoint_id in enumerate(request.service_endpoint_ids): # endpoint_uuid = endpoint_id.endpoint_uuid.uuid # endpoint_device_uuid = endpoint_id.device_id.device_uuid.uuid # endpoint_topology_uuid = endpoint_id.topology_id.topology_uuid.uuid # endpoint_topology_context_uuid = endpoint_id.topology_id.context_id.context_uuid.uuid # # str_endpoint_key = key_to_str([endpoint_device_uuid, endpoint_uuid]) # if len(endpoint_topology_context_uuid) > 0 and len(endpoint_topology_uuid) > 0: # str_topology_key = key_to_str([endpoint_topology_context_uuid, endpoint_topology_uuid]) # str_endpoint_key = key_to_str([str_endpoint_key, str_topology_key], separator=':') # # db_endpoint : EndPointModel = get_object(self.database, EndPointModel, str_endpoint_key) # # str_service_endpoint_key = key_to_str([service_uuid, str_endpoint_key], separator='--') # result : Tuple[ServiceEndPointModel, bool] = get_or_create_object( # self.database, ServiceEndPointModel, str_service_endpoint_key, { # 'service_fk': db_service, 'endpoint_fk': db_endpoint}) # #db_service_endpoint, service_endpoint_created = result # # event_type = EventTypeEnum.EVENTTYPE_UPDATE if updated else EventTypeEnum.EVENTTYPE_CREATE # dict_service_id = db_service.dump_id() # notify_event(self.messagebroker, TOPIC_SERVICE, event_type, {'service_id': dict_service_id}) # return ServiceId(**dict_service_id) # # @safe_and_metered_rpc_method(METRICS, LOGGER) # def RemoveService(self, request: ServiceId, context : grpc.ServicerContext) -> Empty: # with self.lock: # context_uuid = request.context_id.context_uuid.uuid # service_uuid = request.service_uuid.uuid # db_service = ServiceModel(self.database, key_to_str([context_uuid, service_uuid]), auto_load=False) # found = db_service.load() # if not found: return Empty() # # dict_service_id = db_service.dump_id() # db_service.delete() # # event_type = EventTypeEnum.EVENTTYPE_REMOVE # notify_event(self.messagebroker, TOPIC_SERVICE, event_type, {'service_id': dict_service_id}) # return Empty() # ## @safe_and_metered_rpc_method(METRICS, LOGGER) ## def GetServiceEvents(self, request: Empty, context : grpc.ServicerContext) -> Iterator[ServiceEvent]: ## for message in self.messagebroker.consume({TOPIC_SERVICE}, consume_timeout=CONSUME_TIMEOUT): ## yield ServiceEvent(**json.loads(message.content)) # # # # ----- Slice ---------------------------------------------------------------------------------------------------- # # @safe_and_metered_rpc_method(METRICS, LOGGER) # def ListSliceIds(self, request: ContextId, context : grpc.ServicerContext) -> SliceIdList: # with self.lock: # db_context : ContextModel = get_object(self.database, ContextModel, request.context_uuid.uuid) # db_slices : Set[SliceModel] = get_related_objects(db_context, SliceModel) # db_slices = sorted(db_slices, key=operator.attrgetter('pk')) # return SliceIdList(slice_ids=[db_slice.dump_id() for db_slice in db_slices]) # # @safe_and_metered_rpc_method(METRICS, LOGGER) # def ListSlices(self, request: ContextId, context : grpc.ServicerContext) -> SliceList: # with self.lock: # db_context : ContextModel = get_object(self.database, ContextModel, request.context_uuid.uuid) # db_slices : Set[SliceModel] = get_related_objects(db_context, SliceModel) # db_slices = sorted(db_slices, key=operator.attrgetter('pk')) # return SliceList(slices=[db_slice.dump() for db_slice in db_slices]) # # @safe_and_metered_rpc_method(METRICS, LOGGER) # def GetSlice(self, request: SliceId, context : grpc.ServicerContext) -> Slice: # with self.lock: # str_key = key_to_str([request.context_id.context_uuid.uuid, request.slice_uuid.uuid]) # db_slice : SliceModel = get_object(self.database, SliceModel, str_key) # return Slice(**db_slice.dump( # include_endpoint_ids=True, include_constraints=True, include_config_rules=True, # include_service_ids=True, include_subslice_ids=True)) # # @safe_and_metered_rpc_method(METRICS, LOGGER) # def SetSlice(self, request: Slice, context : grpc.ServicerContext) -> SliceId: # with self.lock: # context_uuid = request.slice_id.context_id.context_uuid.uuid # db_context : ContextModel = get_object(self.database, ContextModel, context_uuid) # # for i,endpoint_id in enumerate(request.slice_endpoint_ids): # endpoint_topology_context_uuid = endpoint_id.topology_id.context_id.context_uuid.uuid # if len(endpoint_topology_context_uuid) > 0 and context_uuid != endpoint_topology_context_uuid: # raise InvalidArgumentException( # 'request.slice_endpoint_ids[{:d}].topology_id.context_id.context_uuid.uuid'.format(i), # endpoint_topology_context_uuid, # ['should be == {:s}({:s})'.format( # 'request.slice_id.context_id.context_uuid.uuid', context_uuid)]) # # slice_uuid = request.slice_id.slice_uuid.uuid # str_slice_key = key_to_str([context_uuid, slice_uuid]) # # constraints_result = set_constraints( # self.database, str_slice_key, 'slice', request.slice_constraints) # db_constraints = constraints_result[0][0] # # running_config_rules = update_config( # self.database, str_slice_key, 'slice', request.slice_config.config_rules) # db_running_config = running_config_rules[0][0] # # result : Tuple[SliceModel, bool] = update_or_create_object(self.database, SliceModel, str_slice_key, { # 'context_fk' : db_context, # 'slice_uuid' : slice_uuid, # 'slice_constraints_fk': db_constraints, # 'slice_status' : grpc_to_enum__slice_status(request.slice_status.slice_status), # 'slice_config_fk' : db_running_config, # 'slice_owner_uuid' : request.slice_owner.owner_uuid.uuid, # 'slice_owner_string' : request.slice_owner.owner_string, # }) # db_slice, updated = result # # for i,endpoint_id in enumerate(request.slice_endpoint_ids): # endpoint_uuid = endpoint_id.endpoint_uuid.uuid # endpoint_device_uuid = endpoint_id.device_id.device_uuid.uuid # endpoint_topology_uuid = endpoint_id.topology_id.topology_uuid.uuid # endpoint_topology_context_uuid = endpoint_id.topology_id.context_id.context_uuid.uuid # # str_endpoint_key = key_to_str([endpoint_device_uuid, endpoint_uuid]) # if len(endpoint_topology_context_uuid) > 0 and len(endpoint_topology_uuid) > 0: # str_topology_key = key_to_str([endpoint_topology_context_uuid, endpoint_topology_uuid]) # str_endpoint_key = key_to_str([str_endpoint_key, str_topology_key], separator=':') # # db_endpoint : EndPointModel = get_object(self.database, EndPointModel, str_endpoint_key) # # str_slice_endpoint_key = key_to_str([str_slice_key, str_endpoint_key], separator='--') # result : Tuple[SliceEndPointModel, bool] = get_or_create_object( # self.database, SliceEndPointModel, str_slice_endpoint_key, { # 'slice_fk': db_slice, 'endpoint_fk': db_endpoint}) # #db_slice_endpoint, slice_endpoint_created = result # # for i,service_id in enumerate(request.slice_service_ids): # service_uuid = service_id.service_uuid.uuid # service_context_uuid = service_id.context_id.context_uuid.uuid # str_service_key = key_to_str([service_context_uuid, service_uuid]) # db_service : ServiceModel = get_object(self.database, ServiceModel, str_service_key) # # str_slice_service_key = key_to_str([str_slice_key, str_service_key], separator='--') # result : Tuple[SliceServiceModel, bool] = get_or_create_object( # self.database, SliceServiceModel, str_slice_service_key, { # 'slice_fk': db_slice, 'service_fk': db_service}) # #db_slice_service, slice_service_created = result # # for i,subslice_id in enumerate(request.slice_subslice_ids): # subslice_uuid = subslice_id.slice_uuid.uuid # subslice_context_uuid = subslice_id.context_id.context_uuid.uuid # str_subslice_key = key_to_str([subslice_context_uuid, subslice_uuid]) # db_subslice : SliceModel = get_object(self.database, SliceModel, str_subslice_key) # # str_slice_subslice_key = key_to_str([str_slice_key, str_subslice_key], separator='--') # result : Tuple[SliceSubSliceModel, bool] = get_or_create_object( # self.database, SliceSubSliceModel, str_slice_subslice_key, { # 'slice_fk': db_slice, 'sub_slice_fk': db_subslice}) # #db_slice_subslice, slice_subslice_created = result # # event_type = EventTypeEnum.EVENTTYPE_UPDATE if updated else EventTypeEnum.EVENTTYPE_CREATE # dict_slice_id = db_slice.dump_id() # notify_event(self.messagebroker, TOPIC_SLICE, event_type, {'slice_id': dict_slice_id}) # return SliceId(**dict_slice_id) # # @safe_and_metered_rpc_method(METRICS, LOGGER) # def UnsetSlice(self, request: Slice, context : grpc.ServicerContext) -> SliceId: # with self.lock: # context_uuid = request.slice_id.context_id.context_uuid.uuid # db_context : ContextModel = get_object(self.database, ContextModel, context_uuid) # # for i,endpoint_id in enumerate(request.slice_endpoint_ids): # endpoint_topology_context_uuid = endpoint_id.topology_id.context_id.context_uuid.uuid # if len(endpoint_topology_context_uuid) > 0 and context_uuid != endpoint_topology_context_uuid: # raise InvalidArgumentException( # 'request.slice_endpoint_ids[{:d}].topology_id.context_id.context_uuid.uuid'.format(i), # endpoint_topology_context_uuid, # ['should be == {:s}({:s})'.format( # 'request.slice_id.context_id.context_uuid.uuid', context_uuid)]) # # slice_uuid = request.slice_id.slice_uuid.uuid # str_slice_key = key_to_str([context_uuid, slice_uuid]) # # if len(request.slice_constraints) > 0: # raise NotImplementedError('UnsetSlice: removal of constraints') # if len(request.slice_config.config_rules) > 0: # raise NotImplementedError('UnsetSlice: removal of config rules') # if len(request.slice_endpoint_ids) > 0: # raise NotImplementedError('UnsetSlice: removal of endpoints') # # updated = False # # for service_id in request.slice_service_ids: # service_uuid = service_id.service_uuid.uuid # service_context_uuid = service_id.context_id.context_uuid.uuid # str_service_key = key_to_str([service_context_uuid, service_uuid]) # str_slice_service_key = key_to_str([str_slice_key, str_service_key], separator='--') # SliceServiceModel(self.database, str_slice_service_key).delete() # updated = True # # for subslice_id in request.slice_subslice_ids: # subslice_uuid = subslice_id.slice_uuid.uuid # subslice_context_uuid = subslice_id.context_id.context_uuid.uuid # str_subslice_key = key_to_str([subslice_context_uuid, subslice_uuid]) # str_slice_subslice_key = key_to_str([str_slice_key, str_subslice_key], separator='--') # SliceSubSliceModel(self.database, str_slice_subslice_key).delete() # updated = True # # event_type = EventTypeEnum.EVENTTYPE_UPDATE if updated else EventTypeEnum.EVENTTYPE_CREATE # db_slice : SliceModel = get_object(self.database, SliceModel, str_slice_key) # dict_slice_id = db_slice.dump_id() # notify_event(self.messagebroker, TOPIC_SLICE, event_type, {'slice_id': dict_slice_id}) # return SliceId(**dict_slice_id) # # @safe_and_metered_rpc_method(METRICS, LOGGER) # def RemoveSlice(self, request: SliceId, context : grpc.ServicerContext) -> Empty: # with self.lock: # context_uuid = request.context_id.context_uuid.uuid # slice_uuid = request.slice_uuid.uuid # db_slice = SliceModel(self.database, key_to_str([context_uuid, slice_uuid]), auto_load=False) # found = db_slice.load() # if not found: return Empty() # # dict_slice_id = db_slice.dump_id() # db_slice.delete() # # event_type = EventTypeEnum.EVENTTYPE_REMOVE # notify_event(self.messagebroker, TOPIC_SLICE, event_type, {'slice_id': dict_slice_id}) # return Empty() # ## @safe_and_metered_rpc_method(METRICS, LOGGER) ## def GetSliceEvents(self, request: Empty, context : grpc.ServicerContext) -> Iterator[SliceEvent]: ## for message in self.messagebroker.consume({TOPIC_SLICE}, consume_timeout=CONSUME_TIMEOUT): ## yield SliceEvent(**json.loads(message.content)) # # # # ----- Connection ------------------------------------------------------------------------------------------------- # # @safe_and_metered_rpc_method(METRICS, LOGGER) # def ListConnectionIds(self, request: ServiceId, context : grpc.ServicerContext) -> ConnectionIdList: # with self.session() as session: # result = session.query(DeviceModel).all() # return DeviceIdList(device_ids=[device.dump_id() for device in result]) # # with self.lock: # str_key = key_to_str([request.context_id.context_uuid.uuid, request.service_uuid.uuid]) # db_service : ServiceModel = get_object(self.database, ServiceModel, str_key) # db_connections : Set[ConnectionModel] = get_related_objects(db_service, ConnectionModel) # db_connections = sorted(db_connections, key=operator.attrgetter('pk')) # return ConnectionIdList(connection_ids=[db_connection.dump_id() for db_connection in db_connections]) # # @safe_and_metered_rpc_method(METRICS, LOGGER) # def ListConnections(self, request: ContextId, context : grpc.ServicerContext) -> ServiceList: # with self.lock: # str_key = key_to_str([request.context_id.context_uuid.uuid, request.service_uuid.uuid]) # db_service : ServiceModel = get_object(self.database, ServiceModel, str_key) # db_connections : Set[ConnectionModel] = get_related_objects(db_service, ConnectionModel) # db_connections = sorted(db_connections, key=operator.attrgetter('pk')) # return ConnectionList(connections=[db_connection.dump() for db_connection in db_connections]) # # @safe_and_metered_rpc_method(METRICS, LOGGER) # def GetConnection(self, request: ConnectionId, context : grpc.ServicerContext) -> Connection: # with self.lock: # db_connection : ConnectionModel = get_object(self.database, ConnectionModel, request.connection_uuid.uuid) # return Connection(**db_connection.dump(include_path=True, include_sub_service_ids=True)) # # @safe_and_metered_rpc_method(METRICS, LOGGER) # def SetConnection(self, request: Connection, context : grpc.ServicerContext) -> ConnectionId: # with self.lock: # connection_uuid = request.connection_id.connection_uuid.uuid # # connection_attributes = {'connection_uuid': connection_uuid} # # service_context_uuid = request.service_id.context_id.context_uuid.uuid # service_uuid = request.service_id.service_uuid.uuid # if len(service_context_uuid) > 0 and len(service_uuid) > 0: # str_service_key = key_to_str([service_context_uuid, service_uuid]) # db_service : ServiceModel = get_object(self.database, ServiceModel, str_service_key) # connection_attributes['service_fk'] = db_service # # path_hops_result = set_path(self.database, connection_uuid, request.path_hops_endpoint_ids, path_name = '') # db_path = path_hops_result[0] # connection_attributes['path_fk'] = db_path # # result : Tuple[ConnectionModel, bool] = update_or_create_object( # self.database, ConnectionModel, connection_uuid, connection_attributes) # db_connection, updated = result # # for sub_service_id in request.sub_service_ids: # sub_service_uuid = sub_service_id.service_uuid.uuid # sub_service_context_uuid = sub_service_id.context_id.context_uuid.uuid # str_sub_service_key = key_to_str([sub_service_context_uuid, sub_service_uuid]) # db_service : ServiceModel = get_object(self.database, ServiceModel, str_sub_service_key) # # str_connection_sub_service_key = key_to_str([connection_uuid, str_sub_service_key], separator='--') # result : Tuple[ConnectionSubServiceModel, bool] = get_or_create_object( # self.database, ConnectionSubServiceModel, str_connection_sub_service_key, { # 'connection_fk': db_connection, 'sub_service_fk': db_service}) # #db_connection_sub_service, connection_sub_service_created = result # # event_type = EventTypeEnum.EVENTTYPE_UPDATE if updated else EventTypeEnum.EVENTTYPE_CREATE # dict_connection_id = db_connection.dump_id() # notify_event(self.messagebroker, TOPIC_CONNECTION, event_type, {'connection_id': dict_connection_id}) # return ConnectionId(**dict_connection_id) # # @safe_and_metered_rpc_method(METRICS, LOGGER) # def RemoveConnection(self, request: ConnectionId, context : grpc.ServicerContext) -> Empty: # with self.lock: # db_connection = ConnectionModel(self.database, request.connection_uuid.uuid, auto_load=False) # found = db_connection.load() # if not found: return Empty() # # dict_connection_id = db_connection.dump_id() # db_connection.delete() # # event_type = EventTypeEnum.EVENTTYPE_REMOVE # notify_event(self.messagebroker, TOPIC_CONNECTION, event_type, {'connection_id': dict_connection_id}) # return Empty() # ## @safe_and_metered_rpc_method(METRICS, LOGGER) ## def GetConnectionEvents(self, request: Empty, context : grpc.ServicerContext) -> Iterator[ConnectionEvent]: ## for message in self.messagebroker.consume({TOPIC_CONNECTION}, consume_timeout=CONSUME_TIMEOUT): ## yield ConnectionEvent(**json.loads(message.content)) # # # # ----- Policy ----------------------------------------------------------------------------------------------------- # # @safe_and_metered_rpc_method(METRICS, LOGGER) # def ListPolicyRuleIds(self, request: Empty, context: grpc.ServicerContext) -> PolicyRuleIdList: # with self.lock: # db_policy_rules: List[PolicyRuleModel] = get_all_objects(self.database, PolicyRuleModel) # db_policy_rules = sorted(db_policy_rules, key=operator.attrgetter('pk')) # return PolicyRuleIdList(policyRuleIdList=[db_policy_rule.dump_id() for db_policy_rule in db_policy_rules]) # # @safe_and_metered_rpc_method(METRICS, LOGGER) # def ListPolicyRules(self, request: Empty, context: grpc.ServicerContext) -> PolicyRuleList: # with self.lock: # db_policy_rules: List[PolicyRuleModel] = get_all_objects(self.database, PolicyRuleModel) # db_policy_rules = sorted(db_policy_rules, key=operator.attrgetter('pk')) # return PolicyRuleList(policyRules=[db_policy_rule.dump() for db_policy_rule in db_policy_rules]) # # @safe_and_metered_rpc_method(METRICS, LOGGER) # def GetPolicyRule(self, request: PolicyRuleId, context: grpc.ServicerContext) -> PolicyRule: # with self.lock: # policy_rule_uuid = request.uuid.uuid # db_policy_rule: PolicyRuleModel = get_object(self.database, PolicyRuleModel, policy_rule_uuid) # return PolicyRule(**db_policy_rule.dump()) # # @safe_and_metered_rpc_method(METRICS, LOGGER) # def SetPolicyRule(self, request: PolicyRule, context: grpc.ServicerContext) -> PolicyRuleId: # with self.lock: # policy_rule_type = request.WhichOneof('policy_rule') # policy_rule_json = grpc_message_to_json(request) # policy_rule_uuid = policy_rule_json[policy_rule_type]['policyRuleBasic']['policyRuleId']['uuid']['uuid'] # result: Tuple[PolicyRuleModel, bool] = update_or_create_object( # self.database, PolicyRuleModel, policy_rule_uuid, {'value': json.dumps(policy_rule_json)}) # db_policy, updated = result # pylint: disable=unused-variable # # #event_type = EventTypeEnum.EVENTTYPE_UPDATE if updated else EventTypeEnum.EVENTTYPE_CREATE # dict_policy_id = db_policy.dump_id() # #notify_event(self.messagebroker, TOPIC_POLICY, event_type, {"policy_id": dict_policy_id}) # return PolicyRuleId(**dict_policy_id) # # @safe_and_metered_rpc_method(METRICS, LOGGER) # def RemovePolicyRule(self, request: PolicyRuleId, context: grpc.ServicerContext) -> Empty: # with self.lock: # policy_uuid = request.uuid.uuid # db_policy = PolicyRuleModel(self.database, policy_uuid, auto_load=False) # found = db_policy.load() # if not found: return Empty() # # dict_policy_id = db_policy.dump_id() # db_policy.delete() # #event_type = EventTypeEnum.EVENTTYPE_REMOVE # #notify_event(self.messagebroker, TOPIC_POLICY, event_type, {"policy_id": dict_policy_id}) # return Empty() #