diff --git a/scripts/run_tests_locally-context.sh b/scripts/run_tests_locally-context.sh index ec12d8a80b9c48d5af6864fd0a95db966155e5da..61f8cee916374b228ca1bcee8810bd299837a2f5 100755 --- a/scripts/run_tests_locally-context.sh +++ b/scripts/run_tests_locally-context.sh @@ -36,14 +36,16 @@ cd $PROJECTDIR/src #export REDIS_SERVICE_HOST=$(kubectl get node $TFS_K8S_HOSTNAME -o 'jsonpath={.status.addresses[?(@.type=="InternalIP")].address}') #export REDIS_SERVICE_PORT=$(kubectl --namespace $TFS_K8S_NAMESPACE get service redis-tests -o 'jsonpath={.spec.ports[?(@.port==6379)].nodePort}') -export CRDB_URI="cockroachdb://tfs:tfs123@10.1.7.195:26257/tfs?sslmode=require" +#export CRDB_URI="cockroachdb://tfs:tfs123@127.0.0.1:26257/tfs_test?sslmode=require" +export CRDB_URI="cockroachdb://tfs:tfs123@10.1.7.195:26257/tfs_test?sslmode=require" export PYTHONPATH=/home/tfs/tfs-ctrl/src # Run unitary tests and analyze coverage of code at same time #coverage run --rcfile=$RCFILE --append -m pytest --log-level=INFO --verbose --maxfail=1 \ # context/tests/test_unitary.py -pytest --log-level=INFO --verbose -o log_cli=true --maxfail=1 \ +# --log-level=INFO -o log_cli=true +pytest --verbose --maxfail=1 --durations=0 \ context/tests/test_unitary.py #kubectl --namespace $TFS_K8S_NAMESPACE delete service redis-tests diff --git a/src/context/service/ContextServiceServicerImpl.py b/src/context/service/ContextServiceServicerImpl.py index 6db5b99e7b1c6148cfe72c6d3e393e8841944802..2661f25c12a5797a1bf11ec51551569a36c5c50f 100644 --- a/src/context/service/ContextServiceServicerImpl.py +++ b/src/context/service/ContextServiceServicerImpl.py @@ -15,7 +15,7 @@ import grpc, json, logging, operator, sqlalchemy, threading, time, uuid from sqlalchemy.orm import Session, contains_eager, selectinload, sessionmaker -#from sqlalchemy.dialects.postgresql import UUID, insert +from sqlalchemy.dialects.postgresql import UUID, insert from sqlalchemy_cockroachdb import run_transaction from typing import Dict, Iterator, List, Optional, Set, Tuple, Union from common.message_broker.MessageBroker import MessageBroker @@ -45,16 +45,16 @@ from common.rpc_method_wrapper.ServiceExceptions import ( #from context.service.database.ConstraintModel import ( # ConstraintModel, ConstraintsModel, Union_ConstraintModel, CONSTRAINT_PARSERS, set_constraints) from context.service.database.ContextModel import ContextModel -#from context.service.database.DeviceModel import ( -# DeviceModel, grpc_to_enum__device_operational_status, set_drivers, grpc_to_enum__device_driver, DriverModel) -#from context.service.database.EndPointModel import EndPointModel, KpiSampleTypeModel, set_kpi_sample_types +from context.service.database.DeviceModel import ( + DeviceModel, grpc_to_enum__device_operational_status, grpc_to_enum__device_driver) +from context.service.database.EndPointModel import EndPointModel, grpc_to_enum__kpi_sample_type +#from context.service.database.EndPointModel import EndPointModel, set_kpi_sample_types #from context.service.database.Events import notify_event -#from context.service.database.KpiSampleType import grpc_to_enum__kpi_sample_type #from context.service.database.LinkModel import LinkModel #from context.service.database.PolicyRuleModel import PolicyRuleModel -#from context.service.database.RelationModels import ( +from context.service.database.RelationModels import TopologyDeviceModel # ConnectionSubServiceModel, LinkEndPointModel, ServiceEndPointModel, SliceEndPointModel, SliceServiceModel, -# SliceSubSliceModel, TopologyDeviceModel, TopologyLinkModel) +# SliceSubSliceModel, TopologyLinkModel) #from context.service.database.ServiceModel import ( # ServiceModel, grpc_to_enum__service_status, grpc_to_enum__service_type) #from context.service.database.SliceModel import SliceModel, grpc_to_enum__slice_status @@ -94,34 +94,34 @@ class ContextServiceServicerImpl(ContextServiceServicer, ContextPolicyServiceSer # ----- Context ---------------------------------------------------------------------------------------------------- @safe_and_metered_rpc_method(METRICS, LOGGER) - def ListContextIds(self, request: Empty, context : grpc.ServicerContext) -> ContextIdList: + def ListContextIds(self, request : Empty, context : grpc.ServicerContext) -> ContextIdList: def callback(session : Session) -> List[Dict]: obj_list : List[ContextModel] = session.query(ContextModel).all() + #.options(selectinload(ContextModel.topology)).filter_by(context_uuid=context_uuid).one_or_none() return [obj.dump_id() for obj in obj_list] return ContextIdList(context_ids=run_transaction(sessionmaker(bind=self.db_engine), callback)) @safe_and_metered_rpc_method(METRICS, LOGGER) - def ListContexts(self, request: Empty, context : grpc.ServicerContext) -> ContextList: + def ListContexts(self, request : Empty, context : grpc.ServicerContext) -> ContextList: def callback(session : Session) -> List[Dict]: obj_list : List[ContextModel] = session.query(ContextModel).all() + #.options(selectinload(ContextModel.topology)).filter_by(context_uuid=context_uuid).one_or_none() return [obj.dump() for obj in obj_list] return ContextList(contexts=run_transaction(sessionmaker(bind=self.db_engine), callback)) @safe_and_metered_rpc_method(METRICS, LOGGER) - def GetContext(self, request: ContextId, context : grpc.ServicerContext) -> Context: + def GetContext(self, request : ContextId, context : grpc.ServicerContext) -> Context: context_uuid = request.context_uuid.uuid def callback(session : Session) -> Optional[Dict]: - obj : Optional[ContextModel] = session\ - .query(ContextModel)\ - .filter_by(context_uuid=context_uuid)\ - .one_or_none() + obj : Optional[ContextModel] = session.query(ContextModel)\ + .filter_by(context_uuid=context_uuid).one_or_none() return None if obj is None else obj.dump() obj = run_transaction(sessionmaker(bind=self.db_engine), callback) if obj is None: raise NotFoundException(ContextModel.__name__.replace('Model', ''), context_uuid) return Context(**obj) @safe_and_metered_rpc_method(METRICS, LOGGER) - def SetContext(self, request: Context, context : grpc.ServicerContext) -> ContextId: + def SetContext(self, request : Context, context : grpc.ServicerContext) -> ContextId: context_uuid = request.context_id.context_uuid.uuid context_name = request.name @@ -147,15 +147,16 @@ class ContextServiceServicerImpl(ContextServiceServicer, ContextPolicyServiceSer ['should be == {:s}({:s})'.format('request.context_id.context_uuid.uuid', context_uuid)]) def callback(session : Session) -> Tuple[Optional[Dict], bool]: - obj : Optional[ContextModel] = \ - session.query(ContextModel).with_for_update().filter_by(context_uuid=context_uuid).one_or_none() + obj : Optional[ContextModel] = session.query(ContextModel).with_for_update()\ + .filter_by(context_uuid=context_uuid).one_or_none() is_update = obj is not None if is_update: obj.context_name = context_name session.merge(obj) else: session.add(ContextModel(context_uuid=context_uuid, context_name=context_name, created_at=time.time())) - obj = session.get(ContextModel, {'context_uuid': context_uuid}) + obj : Optional[ContextModel] = session.query(ContextModel)\ + .filter_by(context_uuid=context_uuid).one_or_none() return (None if obj is None else obj.dump_id()), is_update obj_id,updated = run_transaction(sessionmaker(bind=self.db_engine), callback) @@ -166,7 +167,7 @@ class ContextServiceServicerImpl(ContextServiceServicer, ContextPolicyServiceSer return ContextId(**obj_id) @safe_and_metered_rpc_method(METRICS, LOGGER) - def RemoveContext(self, request: ContextId, context : grpc.ServicerContext) -> Empty: + def RemoveContext(self, request : ContextId, context : grpc.ServicerContext) -> Empty: context_uuid = request.context_uuid.uuid def callback(session : Session) -> bool: @@ -179,7 +180,7 @@ class ContextServiceServicerImpl(ContextServiceServicer, ContextPolicyServiceSer return Empty() @safe_and_metered_rpc_method(METRICS, LOGGER) - def GetContextEvents(self, request: Empty, context : grpc.ServicerContext) -> Iterator[ContextEvent]: + def GetContextEvents(self, request : Empty, context : grpc.ServicerContext) -> Iterator[ContextEvent]: pass #for message in self.messagebroker.consume({TOPIC_CONTEXT}, consume_timeout=CONSUME_TIMEOUT): # yield ContextEvent(**json.loads(message.content)) @@ -201,174 +202,232 @@ class ContextServiceServicerImpl(ContextServiceServicer, ContextPolicyServiceSer # ----- Topology --------------------------------------------------------------------------------------------------- @safe_and_metered_rpc_method(METRICS, LOGGER) - def ListTopologyIds(self, request: ContextId, context : grpc.ServicerContext) -> TopologyIdList: + def ListTopologyIds(self, request : ContextId, context : grpc.ServicerContext) -> TopologyIdList: context_uuid = request.context_uuid.uuid - def callback(session : Session) -> List[Dict]: obj_list : List[TopologyModel] = session.query(TopologyModel).filter_by(context_uuid=context_uuid).all() #.options(selectinload(ContextModel.topology)).filter_by(context_uuid=context_uuid).one_or_none() return [obj.dump_id() for obj in obj_list] - - #with self.session() as session: - # result = session.query(ContextModel).options(selectinload(ContextModel.topology)).filter_by(context_uuid=context_uuid).one_or_none() - # if not result: - # raise NotFoundException(ContextModel.__name__.replace('Model', ''), context_uuid) - # db_topologies = result.topology return TopologyIdList(topology_ids=run_transaction(sessionmaker(bind=self.db_engine), callback)) @safe_and_metered_rpc_method(METRICS, LOGGER) - def ListTopologies(self, request: ContextId, context : grpc.ServicerContext) -> TopologyList: + def ListTopologies(self, request : ContextId, context : grpc.ServicerContext) -> TopologyList: context_uuid = request.context_uuid.uuid - def callback(session : Session) -> List[Dict]: obj_list : List[TopologyModel] = session.query(TopologyModel).filter_by(context_uuid=context_uuid).all() #.options(selectinload(ContextModel.topology)).filter_by(context_uuid=context_uuid).one_or_none() return [obj.dump() for obj in obj_list] - - #with self.session() as session: - # result = session.query(ContextModel).options(selectinload(ContextModel.topology)).filter_by( - # context_uuid=context_uuid).one_or_none() - # if not result: - # raise NotFoundException(ContextModel.__name__.replace('Model', ''), context_uuid) - # db_topologies = result.topology return TopologyList(topologies=run_transaction(sessionmaker(bind=self.db_engine), callback)) @safe_and_metered_rpc_method(METRICS, LOGGER) - def GetTopology(self, request: TopologyId, context : grpc.ServicerContext) -> Topology: + def GetTopology(self, request : TopologyId, context : grpc.ServicerContext) -> Topology: context_uuid = request.context_id.context_uuid.uuid topology_uuid = request.topology_uuid.uuid def callback(session : Session) -> Optional[Dict]: - obj : Optional[TopologyModel] = session\ - .query(TopologyModel)\ - .filter_by(context_uuid=context_uuid, topology_uuid=topology_uuid)\ - .one_or_none() + obj : Optional[TopologyModel] = session.query(TopologyModel)\ + .filter_by(context_uuid=context_uuid, topology_uuid=topology_uuid).one_or_none() return None if obj is None else obj.dump() obj = run_transaction(sessionmaker(bind=self.db_engine), callback) - if obj is None: raise NotFoundException(TopologyModel.__name__.replace('Model', ''), context_uuid) + if obj is None: + obj_uuid = '{:s}/{:s}'.format(context_uuid, topology_uuid) + raise NotFoundException(TopologyModel.__name__.replace('Model', ''), obj_uuid) return Topology(**obj) -# result, dump = self.database.get_object(TopologyModel, topology_uuid, True) -# with self.session() as session: -# devs = None -# links = None -# -# filt = {'topology_uuid': topology_uuid} -# topology_devices = session.query(TopologyDeviceModel).filter_by(**filt).all() -# if topology_devices: -# devs = [] -# for td in topology_devices: -# filt = {'device_uuid': td.device_uuid} -# devs.append(session.query(DeviceModel).filter_by(**filt).one()) -# -# filt = {'topology_uuid': topology_uuid} -# topology_links = session.query(TopologyLinkModel).filter_by(**filt).all() -# if topology_links: -# links = [] -# for tl in topology_links: -# filt = {'link_uuid': tl.link_uuid} -# links.append(session.query(LinkModel).filter_by(**filt).one()) -# -# return Topology(**result.dump(devs, links)) + @safe_and_metered_rpc_method(METRICS, LOGGER) + def SetTopology(self, request : Topology, context : grpc.ServicerContext) -> TopologyId: + context_uuid = request.topology_id.context_id.context_uuid.uuid + topology_uuid = request.topology_id.topology_uuid.uuid + topology_name = request.name -# @safe_and_metered_rpc_method(METRICS, LOGGER) -# def SetTopology(self, request: Topology, context : grpc.ServicerContext) -> TopologyId: -# context_uuid = request.topology_id.context_id.context_uuid.uuid -# topology_uuid = request.topology_id.topology_uuid.uuid -# with self.session() as session: -# topology_add = TopologyModel(topology_uuid=topology_uuid, context_uuid=context_uuid) -# updated = True -# db_topology = session.query(TopologyModel).join(TopologyModel.context).filter(TopologyModel.topology_uuid==topology_uuid).one_or_none() -# if not db_topology: -# updated = False -# session.merge(topology_add) -# session.commit() -# db_topology = session.query(TopologyModel).join(TopologyModel.context).filter(TopologyModel.topology_uuid==topology_uuid).one_or_none() -# -# for device_id in request.device_ids: -# device_uuid = device_id.device_uuid.uuid -# td = TopologyDeviceModel(topology_uuid=topology_uuid, device_uuid=device_uuid) -# result: Tuple[TopologyDeviceModel, bool] = self.database.create_or_update(td) -# -# -# for link_id in request.link_ids: -# link_uuid = link_id.link_uuid.uuid -# db_link = session.query(LinkModel).filter( -# LinkModel.link_uuid == link_uuid).one_or_none() -# tl = TopologyLinkModel(topology_uuid=topology_uuid, link_uuid=link_uuid) -# result: Tuple[TopologyDeviceModel, bool] = self.database.create_or_update(tl) -# -# -# -# event_type = EventTypeEnum.EVENTTYPE_UPDATE if updated else EventTypeEnum.EVENTTYPE_CREATE -# dict_topology_id = db_topology.dump_id() -# notify_event(self.messagebroker, TOPIC_TOPOLOGY, event_type, {'topology_id': dict_topology_id}) -# return TopologyId(**dict_topology_id) + devices_to_add : List[str] = [ + {'context_uuid': context_uuid, 'topology_uuid': topology_uuid, 'device_uuid': device_id.device_uuid.uuid} + for device_id in request.device_ids + ] + links_to_add : List[str] = [ + {'context_uuid': context_uuid, 'topology_uuid': topology_uuid, 'link_uuid': link_id.link_uuid.uuid} + for link_id in request.link_ids + ] + print('devices_to_add', devices_to_add) -# @safe_and_metered_rpc_method(METRICS, LOGGER) -# def RemoveTopology(self, request: TopologyId, context : grpc.ServicerContext) -> Empty: -# context_uuid = request.context_id.context_uuid.uuid -# topology_uuid = request.topology_uuid.uuid -# -# with self.session() as session: -# result = session.query(TopologyModel).filter_by(topology_uuid=topology_uuid, context_uuid=context_uuid).one_or_none() -# if not result: -# return Empty() -# dict_topology_id = result.dump_id() -# -# session.query(TopologyModel).filter_by(topology_uuid=topology_uuid, context_uuid=context_uuid).delete() -# session.commit() -# event_type = EventTypeEnum.EVENTTYPE_REMOVE -# notify_event(self.messagebroker, TOPIC_TOPOLOGY, event_type, {'topology_id': dict_topology_id}) -# return Empty() + def callback(session : Session) -> Tuple[Optional[Dict], bool]: + topology_data = [{ + 'context_uuid' : context_uuid, + 'topology_uuid': topology_uuid, + 'topology_name': topology_name, + 'created_at' : time.time(), + }] + stmt = insert(TopologyModel).values(topology_data) + stmt = stmt.on_conflict_do_update( + index_elements=[TopologyModel.context_uuid, TopologyModel.topology_uuid], + set_=dict(topology_name = stmt.excluded.topology_name) + ) + session.execute(stmt) -# @safe_and_metered_rpc_method(METRICS, LOGGER) -# def GetTopologyEvents(self, request: Empty, context : grpc.ServicerContext) -> Iterator[TopologyEvent]: -# for message in self.messagebroker.consume({TOPIC_TOPOLOGY}, consume_timeout=CONSUME_TIMEOUT): -# yield TopologyEvent(**json.loads(message.content)) + if len(devices_to_add) > 0: + session.execute(insert(TopologyDeviceModel).values(devices_to_add).on_conflict_do_nothing( + index_elements=[ + TopologyDeviceModel.context_uuid, TopologyDeviceModel.topology_uuid, + TopologyDeviceModel.device_uuid + ] + )) + + #if len(link_to_add) > 0: + # session.execute(insert(TopologyLinkModel).values(link_to_add).on_conflict_do_nothing( + # index_elements=[ + # TopologyLinkModel.context_uuid, TopologyLinkModel.topology_uuid, + # TopologyLinkModel.link_uuid + # ] + # )) + + is_update = True + obj : Optional[TopologyModel] = session.query(TopologyModel)\ + .filter_by(context_uuid=context_uuid, topology_uuid=topology_uuid).one_or_none() + return (None if obj is None else obj.dump_id()), is_update + + obj_id,updated = run_transaction(sessionmaker(bind=self.db_engine), callback) + if obj_id is None: raise NotFoundException(ContextModel.__name__.replace('Model', ''), context_uuid) + + #event_type = EventTypeEnum.EVENTTYPE_UPDATE if updated else EventTypeEnum.EVENTTYPE_CREATE + #notify_event(self.messagebroker, TOPIC_TOPOLOGY, event_type, {'topology_id': obj_id}) + return TopologyId(**obj_id) + + @safe_and_metered_rpc_method(METRICS, LOGGER) + def RemoveTopology(self, request : TopologyId, context : grpc.ServicerContext) -> Empty: + context_uuid = request.context_id.context_uuid.uuid + topology_uuid = request.topology_uuid.uuid + + def callback(session : Session) -> bool: + num_deleted = session.query(TopologyModel)\ + .filter_by(context_uuid=context_uuid, topology_uuid=topology_uuid).delete() + return num_deleted > 0 + + deleted = run_transaction(sessionmaker(bind=self.db_engine), callback) + #if deleted: + # notify_event(self.messagebroker, TOPIC_TOPOLOGY, EventTypeEnum.EVENTTYPE_REMOVE, {'topology_id': request}) + return Empty() + + @safe_and_metered_rpc_method(METRICS, LOGGER) + def GetTopologyEvents(self, request : Empty, context : grpc.ServicerContext) -> Iterator[TopologyEvent]: + pass + #for message in self.messagebroker.consume({TOPIC_TOPOLOGY}, consume_timeout=CONSUME_TIMEOUT): + # yield TopologyEvent(**json.loads(message.content)) + + # ----- Device ----------------------------------------------------------------------------------------------------- + + @safe_and_metered_rpc_method(METRICS, LOGGER) + def ListDeviceIds(self, request : Empty, context : grpc.ServicerContext) -> DeviceIdList: + def callback(session : Session) -> List[Dict]: + obj_list : List[DeviceModel] = session.query(DeviceModel).all() + #.options(selectinload(DeviceModel.topology)).filter_by(context_uuid=context_uuid).one_or_none() + return [obj.dump_id() for obj in obj_list] + return DeviceIdList(device_ids=run_transaction(sessionmaker(bind=self.db_engine), callback)) + + @safe_and_metered_rpc_method(METRICS, LOGGER) + def ListDevices(self, request : Empty, context : grpc.ServicerContext) -> DeviceList: + def callback(session : Session) -> List[Dict]: + obj_list : List[DeviceModel] = session.query(DeviceModel).all() + #.options(selectinload(DeviceModel.topology)).filter_by(context_uuid=context_uuid).one_or_none() + return [obj.dump() for obj in obj_list] + return DeviceList(devices=run_transaction(sessionmaker(bind=self.db_engine), callback)) + + @safe_and_metered_rpc_method(METRICS, LOGGER) + def GetDevice(self, request : ContextId, context : grpc.ServicerContext) -> Device: + device_uuid = request.device_uuid.uuid + def callback(session : Session) -> Optional[Dict]: + obj : Optional[DeviceModel] = session.query(DeviceModel)\ + .filter_by(device_uuid=device_uuid).one_or_none() + return None if obj is None else obj.dump() + obj = run_transaction(sessionmaker(bind=self.db_engine), callback) + if obj is None: raise NotFoundException(DeviceModel.__name__.replace('Model', ''), device_uuid) + return Device(**obj) + + @safe_and_metered_rpc_method(METRICS, LOGGER) + def SetDevice(self, request : Device, context : grpc.ServicerContext) -> DeviceId: + device_uuid = request.device_id.device_uuid.uuid + device_name = request.name + device_type = request.device_type + oper_status = grpc_to_enum__device_operational_status(request.device_operational_status) + device_drivers = [grpc_to_enum__device_driver(d) for d in request.device_drivers] + + related_topology_uuids : Set[Tuple[str, str]] = set() + endpoints_data : List[Dict] = list() + for i, endpoint in enumerate(request.device_endpoints): + endpoint_device_uuid = endpoint.endpoint_id.device_id.device_uuid.uuid + if len(endpoint_device_uuid) == 0: endpoint_device_uuid = device_uuid + if device_uuid != endpoint_device_uuid: + raise InvalidArgumentException( + 'request.device_endpoints[{:d}].device_id.device_uuid.uuid'.format(i), endpoint_device_uuid, + ['should be == {:s}({:s})'.format('request.device_id.device_uuid.uuid', device_uuid)]) + + endpoint_context_uuid = endpoint.endpoint_id.topology_id.context_id.context_uuid.uuid + endpoint_topology_uuid = endpoint.endpoint_id.topology_id.topology_uuid.uuid + + kpi_sample_types = [grpc_to_enum__kpi_sample_type(kst) for kst in endpoint.kpi_sample_types] + + endpoints_data.append({ + 'context_uuid' : endpoint_context_uuid, + 'topology_uuid' : endpoint_topology_uuid, + 'device_uuid' : endpoint_device_uuid, + 'endpoint_uuid' : endpoint.endpoint_id.endpoint_uuid.uuid, + 'endpoint_type' : endpoint.endpoint_type, + 'kpi_sample_types': kpi_sample_types, + }) + + if len(endpoint_context_uuid) > 0 and len(endpoint_topology_uuid) > 0: + related_topology_uuids.add({ + 'context_uuid': endpoint_context_uuid, + 'topology_uuid': endpoint_topology_uuid, + 'device_uuid': endpoint_device_uuid, + }) + + def callback(session : Session) -> Tuple[Optional[Dict], bool]: + obj : Optional[DeviceModel] = session.query(DeviceModel).with_for_update()\ + .filter_by(device_uuid=device_uuid).one_or_none() + is_update = obj is not None + if is_update: + obj.device_name = device_name + obj.device_type = device_type + obj.device_operational_status = oper_status + obj.device_drivers = device_drivers + session.merge(obj) + else: + session.add(DeviceModel( + device_uuid=device_uuid, device_name=device_name, device_type=device_type, + device_operational_status=oper_status, device_drivers=device_drivers, created_at=time.time())) + obj : Optional[DeviceModel] = session.query(DeviceModel)\ + .filter_by(device_uuid=device_uuid).one_or_none() + + stmt = insert(EndPointModel).values(endpoints_data) + stmt = stmt.on_conflict_do_update( + index_elements=[ + EndPointModel.context_uuid, EndPointModel.topology_uuid, EndPointModel.device_uuid, + EndPointModel.endpoint_uuid + ], + set_=dict( + endpoint_type = stmt.excluded.endpoint_type, + kpi_sample_types = stmt.excluded.kpi_sample_types, + ) + ) + session.execute(stmt) + + session.execute(insert(TopologyDeviceModel).values(list(related_topology_uuids)).on_conflict_do_nothing( + index_elements=[ + TopologyDeviceModel.context_uuid, TopologyDeviceModel.topology_uuid, + TopologyDeviceModel.device_uuid + ] + )) + + return (None if obj is None else obj.dump_id()), is_update + + obj_id,updated = run_transaction(sessionmaker(bind=self.db_engine), callback) + if obj_id is None: raise NotFoundException(DeviceModel.__name__.replace('Model', ''), device_uuid) + + #event_type = EventTypeEnum.EVENTTYPE_UPDATE if updated else EventTypeEnum.EVENTTYPE_CREATE + #notify_event(self.messagebroker, TOPIC_DEVICE, event_type, {'device_id': obj_id}) + return DeviceId(**obj_id) -# # ----- Device ----------------------------------------------------------------------------------------------------- -# -# @safe_and_metered_rpc_method(METRICS, LOGGER) -# def ListDeviceIds(self, request: Empty, context : grpc.ServicerContext) -> DeviceIdList: -# with self.session() as session: -# result = session.query(DeviceModel).all() -# return DeviceIdList(device_ids=[device.dump_id() for device in result]) -# -# @safe_and_metered_rpc_method(METRICS, LOGGER) -# def ListDevices(self, request: Empty, context : grpc.ServicerContext) -> DeviceList: -# with self.session() as session: -# result = session.query(DeviceModel).all() -# return DeviceList(devices=[device.dump() for device in result]) -# -# @safe_and_metered_rpc_method(METRICS, LOGGER) -# def GetDevice(self, request: DeviceId, context : grpc.ServicerContext) -> Device: -# device_uuid = request.device_uuid.uuid -# with self.session() as session: -# result = session.query(DeviceModel).filter(DeviceModel.device_uuid == device_uuid).one_or_none() -# if not result: -# raise NotFoundException(DeviceModel.__name__.replace('Model', ''), device_uuid) -# -# rd = result.dump(include_config_rules=True, include_drivers=True, include_endpoints=True) -# -# rt = Device(**rd) -# -# return rt -# -# @safe_and_metered_rpc_method(METRICS, LOGGER) -# def SetDevice(self, request: Device, context : grpc.ServicerContext) -> DeviceId: # with self.session() as session: -# device_uuid = request.device_id.device_uuid.uuid -# -# for i, endpoint in enumerate(request.device_endpoints): -# endpoint_device_uuid = endpoint.endpoint_id.device_id.device_uuid.uuid -# if len(endpoint_device_uuid) == 0: -# endpoint_device_uuid = device_uuid -# if device_uuid != endpoint_device_uuid: -# raise InvalidArgumentException( -# 'request.device_endpoints[{:d}].device_id.device_uuid.uuid'.format(i), endpoint_device_uuid, -# ['should be == {:s}({:s})'.format('request.device_id.device_uuid.uuid', device_uuid)]) -# # config_rules = grpc_config_rules_to_raw(request.device_config.config_rules) # running_config_result = self.update_config(session, device_uuid, 'device', config_rules) # db_running_config = running_config_result[0][0] @@ -388,198 +447,42 @@ class ContextServiceServicerImpl(ContextServiceServicer, ContextPolicyServiceSer # # self.set_drivers(db_device, request.device_drivers) # -# for i, endpoint in enumerate(request.device_endpoints): -# endpoint_uuid = endpoint.endpoint_id.endpoint_uuid.uuid -# # endpoint_device_uuid = endpoint.endpoint_id.device_id.device_uuid.uuid -# # if len(endpoint_device_uuid) == 0: -# # endpoint_device_uuid = device_uuid -# -# endpoint_attributes = { -# 'device_uuid' : db_device.device_uuid, -# 'endpoint_uuid': endpoint_uuid, -# 'endpoint_type': endpoint.endpoint_type, -# } -# -# endpoint_topology_context_uuid = endpoint.endpoint_id.topology_id.context_id.context_uuid.uuid -# endpoint_topology_uuid = endpoint.endpoint_id.topology_id.topology_uuid.uuid -# if len(endpoint_topology_context_uuid) > 0 and len(endpoint_topology_uuid) > 0: -# # str_topology_key = key_to_str([endpoint_topology_context_uuid, endpoint_topology_uuid]) -# -# db_topology, topo_dump = self.database.get_object(TopologyModel, endpoint_topology_uuid) -# -# topology_device = TopologyDeviceModel( -# topology_uuid=endpoint_topology_uuid, -# device_uuid=db_device.device_uuid) -# self.database.create_or_update(topology_device) -# -# endpoint_attributes['topology_uuid'] = db_topology.topology_uuid -# result : Tuple[EndPointModel, bool] = update_or_create_object( -# self.database, EndPointModel, str_endpoint_key, endpoint_attributes) -# db_endpoint, endpoint_updated = result # pylint: disable=unused-variable -# -# new_endpoint = EndPointModel(**endpoint_attributes) -# result: Tuple[EndPointModel, bool] = self.database.create_or_update(new_endpoint) -# db_endpoint, updated = result -# -# self.set_kpi_sample_types(db_endpoint, endpoint.kpi_sample_types) -# -# # event_type = EventTypeEnum.EVENTTYPE_UPDATE if updated else EventTypeEnum.EVENTTYPE_CREATE -# dict_device_id = db_device.dump_id() -# # notify_event(self.messagebroker, TOPIC_DEVICE, event_type, {'device_id': dict_device_id}) -# -# return DeviceId(**dict_device_id) -# -# def set_kpi_sample_types(self, db_endpoint: EndPointModel, grpc_endpoint_kpi_sample_types): -# db_endpoint_pk = db_endpoint.endpoint_uuid -# for kpi_sample_type in grpc_endpoint_kpi_sample_types: -# orm_kpi_sample_type = grpc_to_enum__kpi_sample_type(kpi_sample_type) -# # str_endpoint_kpi_sample_type_key = key_to_str([db_endpoint_pk, orm_kpi_sample_type.name]) -# data = {'endpoint_uuid': db_endpoint_pk, -# 'kpi_sample_type': orm_kpi_sample_type.name, -# 'kpi_uuid': str(uuid.uuid4())} -# db_endpoint_kpi_sample_type = KpiSampleTypeModel(**data) -# self.database.create(db_endpoint_kpi_sample_type) -# -# def set_drivers(self, db_device: DeviceModel, grpc_device_drivers): -# db_device_pk = db_device.device_uuid -# for driver in grpc_device_drivers: -# orm_driver = grpc_to_enum__device_driver(driver) -# str_device_driver_key = key_to_str([db_device_pk, orm_driver.name]) -# driver_config = { -# # "driver_uuid": str(uuid.uuid4()), -# "device_uuid": db_device_pk, -# "driver": orm_driver.name -# } -# db_device_driver = DriverModel(**driver_config) -# db_device_driver.device_fk = db_device -# db_device_driver.driver = orm_driver -# -# self.database.create_or_update(db_device_driver) -# -# def update_config( -# self, session, db_parent_pk: str, config_name: str, -# raw_config_rules: List[Tuple[ORM_ConfigActionEnum, str, str]] -# ) -> List[Tuple[Union[ConfigModel, ConfigRuleModel], bool]]: -# -# created = False -# -# db_config = session.query(ConfigModel).filter_by(**{ConfigModel.main_pk_name(): db_parent_pk}).one_or_none() -# if not db_config: -# db_config = ConfigModel() -# setattr(db_config, ConfigModel.main_pk_name(), db_parent_pk) -# session.add(db_config) -# session.commit() -# created = True -# -# LOGGER.info('UPDATED-CONFIG: {}'.format(db_config.dump())) -# -# db_objects: List[Tuple[Union[ConfigModel, ConfigRuleModel], bool]] = [(db_config, created)] -# -# for position, (action, resource_key, resource_value) in enumerate(raw_config_rules): -# if action == ORM_ConfigActionEnum.SET: -# result : Tuple[ConfigRuleModel, bool] = self.set_config_rule( -# db_config, position, resource_key, resource_value) -# db_config_rule, updated = result -# db_objects.append((db_config_rule, updated)) -# elif action == ORM_ConfigActionEnum.DELETE: -# self.delete_config_rule(db_config, resource_key) -# else: -# msg = 'Unsupported action({:s}) for resource_key({:s})/resource_value({:s})' -# raise AttributeError( -# msg.format(str(ConfigActionEnum.Name(action)), str(resource_key), str(resource_value))) -# -# return db_objects -# -# def set_config_rule(self, db_config: ConfigModel, position: int, resource_key: str, resource_value: str, -# ): # -> Tuple[ConfigRuleModel, bool]: -# -# from src.context.service.database.Tools import fast_hasher -# str_rule_key_hash = fast_hasher(resource_key) -# str_config_rule_key = key_to_str([db_config.config_uuid, str_rule_key_hash], separator=':') -# pk = str(uuid.uuid5(uuid.UUID('9566448d-e950-425e-b2ae-7ead656c7e47'), str_config_rule_key)) -# data = {'config_rule_uuid': pk, 'config_uuid': db_config.config_uuid, 'position': position, -# 'action': ORM_ConfigActionEnum.SET, 'key': resource_key, 'value': resource_value} -# to_add = ConfigRuleModel(**data) -# -# result, updated = self.database.create_or_update(to_add) -# return result, updated -# -# def delete_config_rule( -# self, db_config: ConfigModel, resource_key: str -# ) -> None: -# -# from src.context.service.database.Tools import fast_hasher -# str_rule_key_hash = fast_hasher(resource_key) -# str_config_rule_key = key_to_str([db_config.pk, str_rule_key_hash], separator=':') -# -# db_config_rule = self.database.get_object(ConfigRuleModel, str_config_rule_key, raise_if_not_found=False) -# -# if db_config_rule is None: -# return -# db_config_rule.delete() -# -# def delete_all_config_rules(self, db_config: ConfigModel) -> None: -# -# db_config_rule_pks = db_config.references(ConfigRuleModel) -# for pk, _ in db_config_rule_pks: ConfigRuleModel(self.database, pk).delete() -# -# """ -# for position, (action, resource_key, resource_value) in enumerate(raw_config_rules): -# if action == ORM_ConfigActionEnum.SET: -# result: Tuple[ConfigRuleModel, bool] = set_config_rule( -# database, db_config, position, resource_key, resource_value) -# db_config_rule, updated = result -# db_objects.append((db_config_rule, updated)) -# elif action == ORM_ConfigActionEnum.DELETE: -# delete_config_rule(database, db_config, resource_key) -# else: -# msg = 'Unsupported action({:s}) for resource_key({:s})/resource_value({:s})' -# raise AttributeError( -# msg.format(str(ConfigActionEnum.Name(action)), str(resource_key), str(resource_value))) -# -# return db_objects -# """ -# -# @safe_and_metered_rpc_method(METRICS, LOGGER) -# def RemoveDevice(self, request: DeviceId, context : grpc.ServicerContext) -> Empty: -# device_uuid = request.device_uuid.uuid -# -# with self.session() as session: -# db_device = session.query(DeviceModel).filter_by(device_uuid=device_uuid).one_or_none() -# -# session.query(TopologyDeviceModel).filter_by(device_uuid=device_uuid).delete() -# session.query(ConfigRuleModel).filter_by(config_uuid=db_device.device_config_uuid).delete() -# session.query(ConfigModel).filter_by(config_uuid=db_device.device_config_uuid).delete() -# -# if not db_device: -# return Empty() -# dict_device_id = db_device.dump_id() -# -# session.query(DeviceModel).filter_by(device_uuid=device_uuid).delete() -# session.commit() -# event_type = EventTypeEnum.EVENTTYPE_REMOVE -# notify_event(self.messagebroker, TOPIC_DEVICE, event_type, {'device_id': dict_device_id}) -# return Empty() -# -## @safe_and_metered_rpc_method(METRICS, LOGGER) -## def GetDeviceEvents(self, request: Empty, context : grpc.ServicerContext) -> Iterator[DeviceEvent]: -## for message in self.messagebroker.consume({TOPIC_DEVICE}, consume_timeout=CONSUME_TIMEOUT): -## yield DeviceEvent(**json.loads(message.content)) -# -# -# # + + @safe_and_metered_rpc_method(METRICS, LOGGER) + def RemoveDevice(self, request : DeviceId, context : grpc.ServicerContext) -> Empty: + device_uuid = request.device_uuid.uuid + def callback(session : Session) -> bool: + session.query(TopologyDeviceModel).filter_by(device_uuid=device_uuid).delete() + num_deleted = session.query(DeviceModel).filter_by(device_uuid=device_uuid).delete() + #db_device = session.query(DeviceModel).filter_by(device_uuid=device_uuid).one_or_none() + #session.query(ConfigRuleModel).filter_by(config_uuid=db_device.device_config_uuid).delete() + #session.query(ConfigModel).filter_by(config_uuid=db_device.device_config_uuid).delete() + #session.query(DeviceModel).filter_by(device_uuid=device_uuid).delete() + return num_deleted > 0 + deleted = run_transaction(sessionmaker(bind=self.db_engine), callback) + #if deleted: + # notify_event(self.messagebroker, TOPIC_DEVICE, EventTypeEnum.EVENTTYPE_REMOVE, {'device_id': request}) + return Empty() + + @safe_and_metered_rpc_method(METRICS, LOGGER) + def GetDeviceEvents(self, request : Empty, context : grpc.ServicerContext) -> Iterator[DeviceEvent]: + pass + #for message in self.messagebroker.consume({TOPIC_DEVICE}, consume_timeout=CONSUME_TIMEOUT): + # yield DeviceEvent(**json.loads(message.content)) + + # # ----- Link ------------------------------------------------------------------------------------------------------- # # @safe_and_metered_rpc_method(METRICS, LOGGER) -# def ListLinkIds(self, request: Empty, context : grpc.ServicerContext) -> LinkIdList: +# def ListLinkIds(self, request : Empty, context : grpc.ServicerContext) -> LinkIdList: # with self.session() as session: # result = session.query(LinkModel).all() # return LinkIdList(link_ids=[db_link.dump_id() for db_link in result]) # # # @safe_and_metered_rpc_method(METRICS, LOGGER) -# def ListLinks(self, request: Empty, context : grpc.ServicerContext) -> LinkList: +# def ListLinks(self, request : Empty, context : grpc.ServicerContext) -> LinkList: # with self.session() as session: # link_list = LinkList() # @@ -599,7 +502,7 @@ class ContextServiceServicerImpl(ContextServiceServicer, ContextPolicyServiceSer # return link_list # # @safe_and_metered_rpc_method(METRICS, LOGGER) -# def GetLink(self, request: LinkId, context : grpc.ServicerContext) -> Link: +# def GetLink(self, request : LinkId, context : grpc.ServicerContext) -> Link: # link_uuid = request.link_uuid.uuid # with self.session() as session: # result = session.query(LinkModel).filter(LinkModel.link_uuid == link_uuid).one_or_none() @@ -623,7 +526,7 @@ class ContextServiceServicerImpl(ContextServiceServicer, ContextPolicyServiceSer # # # @safe_and_metered_rpc_method(METRICS, LOGGER) -# def SetLink(self, request: Link, context : grpc.ServicerContext) -> LinkId: +# def SetLink(self, request : Link, context : grpc.ServicerContext) -> LinkId: # link_uuid = request.link_id.link_uuid.uuid # # new_link = LinkModel(**{ @@ -659,7 +562,7 @@ class ContextServiceServicerImpl(ContextServiceServicer, ContextPolicyServiceSer # return LinkId(**dict_link_id) # # @safe_and_metered_rpc_method(METRICS, LOGGER) -# def RemoveLink(self, request: LinkId, context : grpc.ServicerContext) -> Empty: +# def RemoveLink(self, request : LinkId, context : grpc.ServicerContext) -> Empty: # with self.session() as session: # link_uuid = request.link_uuid.uuid # @@ -678,7 +581,7 @@ class ContextServiceServicerImpl(ContextServiceServicer, ContextPolicyServiceSer # return Empty() # ## @safe_and_metered_rpc_method(METRICS, LOGGER) -## def GetLinkEvents(self, request: Empty, context : grpc.ServicerContext) -> Iterator[LinkEvent]: +## def GetLinkEvents(self, request : Empty, context : grpc.ServicerContext) -> Iterator[LinkEvent]: ## for message in self.messagebroker.consume({TOPIC_LINK}, consume_timeout=CONSUME_TIMEOUT): ## yield LinkEvent(**json.loads(message.content)) # @@ -686,7 +589,7 @@ class ContextServiceServicerImpl(ContextServiceServicer, ContextPolicyServiceSer # # ----- Service ---------------------------------------------------------------------------------------------------- # # @safe_and_metered_rpc_method(METRICS, LOGGER) -# def ListServiceIds(self, request: ContextId, context : grpc.ServicerContext) -> ServiceIdList: +# def ListServiceIds(self, request : ContextId, context : grpc.ServicerContext) -> ServiceIdList: # context_uuid = request.context_uuid.uuid # # with self.session() as session: @@ -694,7 +597,7 @@ class ContextServiceServicerImpl(ContextServiceServicer, ContextPolicyServiceSer # return ServiceIdList(service_ids=[db_service.dump_id() for db_service in db_services]) # # @safe_and_metered_rpc_method(METRICS, LOGGER) -# def ListServices(self, request: ContextId, context : grpc.ServicerContext) -> ServiceList: +# def ListServices(self, request : ContextId, context : grpc.ServicerContext) -> ServiceList: # context_uuid = request.context_uuid.uuid # # with self.session() as session: @@ -704,7 +607,7 @@ class ContextServiceServicerImpl(ContextServiceServicer, ContextPolicyServiceSer # # # @safe_and_metered_rpc_method(METRICS, LOGGER) -# def GetService(self, request: ServiceId, context : grpc.ServicerContext) -> Service: +# def GetService(self, request : ServiceId, context : grpc.ServicerContext) -> Service: # service_uuid = request.service_uuid.uuid # with self.session() as session: # result = session.query(ServiceModel).filter_by(service_uuid=service_uuid).one_or_none() @@ -775,7 +678,7 @@ class ContextServiceServicerImpl(ContextServiceServicer, ContextPolicyServiceSer # return db_objects # # @safe_and_metered_rpc_method(METRICS, LOGGER) -# def SetService(self, request: Service, context : grpc.ServicerContext) -> ServiceId: +# def SetService(self, request : Service, context : grpc.ServicerContext) -> ServiceId: # with self.lock: # with self.session() as session: # @@ -893,7 +796,7 @@ class ContextServiceServicerImpl(ContextServiceServicer, ContextPolicyServiceSer # return ServiceId(**dict_service_id) # # @safe_and_metered_rpc_method(METRICS, LOGGER) -# def RemoveService(self, request: ServiceId, context : grpc.ServicerContext) -> Empty: +# def RemoveService(self, request : ServiceId, context : grpc.ServicerContext) -> Empty: # with self.lock: # context_uuid = request.context_id.context_uuid.uuid # service_uuid = request.service_uuid.uuid @@ -909,7 +812,7 @@ class ContextServiceServicerImpl(ContextServiceServicer, ContextPolicyServiceSer # return Empty() # ## @safe_and_metered_rpc_method(METRICS, LOGGER) -## def GetServiceEvents(self, request: Empty, context : grpc.ServicerContext) -> Iterator[ServiceEvent]: +## def GetServiceEvents(self, request : Empty, context : grpc.ServicerContext) -> Iterator[ServiceEvent]: ## for message in self.messagebroker.consume({TOPIC_SERVICE}, consume_timeout=CONSUME_TIMEOUT): ## yield ServiceEvent(**json.loads(message.content)) # @@ -917,7 +820,7 @@ class ContextServiceServicerImpl(ContextServiceServicer, ContextPolicyServiceSer # # ----- Slice ---------------------------------------------------------------------------------------------------- # # @safe_and_metered_rpc_method(METRICS, LOGGER) -# def ListSliceIds(self, request: ContextId, context : grpc.ServicerContext) -> SliceIdList: +# def ListSliceIds(self, request : ContextId, context : grpc.ServicerContext) -> SliceIdList: # with self.lock: # db_context : ContextModel = get_object(self.database, ContextModel, request.context_uuid.uuid) # db_slices : Set[SliceModel] = get_related_objects(db_context, SliceModel) @@ -925,7 +828,7 @@ class ContextServiceServicerImpl(ContextServiceServicer, ContextPolicyServiceSer # return SliceIdList(slice_ids=[db_slice.dump_id() for db_slice in db_slices]) # # @safe_and_metered_rpc_method(METRICS, LOGGER) -# def ListSlices(self, request: ContextId, context : grpc.ServicerContext) -> SliceList: +# def ListSlices(self, request : ContextId, context : grpc.ServicerContext) -> SliceList: # with self.lock: # db_context : ContextModel = get_object(self.database, ContextModel, request.context_uuid.uuid) # db_slices : Set[SliceModel] = get_related_objects(db_context, SliceModel) @@ -933,7 +836,7 @@ class ContextServiceServicerImpl(ContextServiceServicer, ContextPolicyServiceSer # return SliceList(slices=[db_slice.dump() for db_slice in db_slices]) # # @safe_and_metered_rpc_method(METRICS, LOGGER) -# def GetSlice(self, request: SliceId, context : grpc.ServicerContext) -> Slice: +# def GetSlice(self, request : SliceId, context : grpc.ServicerContext) -> Slice: # with self.lock: # str_key = key_to_str([request.context_id.context_uuid.uuid, request.slice_uuid.uuid]) # db_slice : SliceModel = get_object(self.database, SliceModel, str_key) @@ -942,7 +845,7 @@ class ContextServiceServicerImpl(ContextServiceServicer, ContextPolicyServiceSer # include_service_ids=True, include_subslice_ids=True)) # # @safe_and_metered_rpc_method(METRICS, LOGGER) -# def SetSlice(self, request: Slice, context : grpc.ServicerContext) -> SliceId: +# def SetSlice(self, request : Slice, context : grpc.ServicerContext) -> SliceId: # with self.lock: # context_uuid = request.slice_id.context_id.context_uuid.uuid # db_context : ContextModel = get_object(self.database, ContextModel, context_uuid) @@ -1027,7 +930,7 @@ class ContextServiceServicerImpl(ContextServiceServicer, ContextPolicyServiceSer # return SliceId(**dict_slice_id) # # @safe_and_metered_rpc_method(METRICS, LOGGER) -# def UnsetSlice(self, request: Slice, context : grpc.ServicerContext) -> SliceId: +# def UnsetSlice(self, request : Slice, context : grpc.ServicerContext) -> SliceId: # with self.lock: # context_uuid = request.slice_id.context_id.context_uuid.uuid # db_context : ContextModel = get_object(self.database, ContextModel, context_uuid) @@ -1076,7 +979,7 @@ class ContextServiceServicerImpl(ContextServiceServicer, ContextPolicyServiceSer # return SliceId(**dict_slice_id) # # @safe_and_metered_rpc_method(METRICS, LOGGER) -# def RemoveSlice(self, request: SliceId, context : grpc.ServicerContext) -> Empty: +# def RemoveSlice(self, request : SliceId, context : grpc.ServicerContext) -> Empty: # with self.lock: # context_uuid = request.context_id.context_uuid.uuid # slice_uuid = request.slice_uuid.uuid @@ -1092,7 +995,7 @@ class ContextServiceServicerImpl(ContextServiceServicer, ContextPolicyServiceSer # return Empty() # ## @safe_and_metered_rpc_method(METRICS, LOGGER) -## def GetSliceEvents(self, request: Empty, context : grpc.ServicerContext) -> Iterator[SliceEvent]: +## def GetSliceEvents(self, request : Empty, context : grpc.ServicerContext) -> Iterator[SliceEvent]: ## for message in self.messagebroker.consume({TOPIC_SLICE}, consume_timeout=CONSUME_TIMEOUT): ## yield SliceEvent(**json.loads(message.content)) # @@ -1100,7 +1003,7 @@ class ContextServiceServicerImpl(ContextServiceServicer, ContextPolicyServiceSer # # ----- Connection ------------------------------------------------------------------------------------------------- # # @safe_and_metered_rpc_method(METRICS, LOGGER) -# def ListConnectionIds(self, request: ServiceId, context : grpc.ServicerContext) -> ConnectionIdList: +# def ListConnectionIds(self, request : ServiceId, context : grpc.ServicerContext) -> ConnectionIdList: # with self.session() as session: # result = session.query(DeviceModel).all() # return DeviceIdList(device_ids=[device.dump_id() for device in result]) @@ -1113,7 +1016,7 @@ class ContextServiceServicerImpl(ContextServiceServicer, ContextPolicyServiceSer # return ConnectionIdList(connection_ids=[db_connection.dump_id() for db_connection in db_connections]) # # @safe_and_metered_rpc_method(METRICS, LOGGER) -# def ListConnections(self, request: ContextId, context : grpc.ServicerContext) -> ServiceList: +# def ListConnections(self, request : ContextId, context : grpc.ServicerContext) -> ServiceList: # with self.lock: # str_key = key_to_str([request.context_id.context_uuid.uuid, request.service_uuid.uuid]) # db_service : ServiceModel = get_object(self.database, ServiceModel, str_key) @@ -1122,13 +1025,13 @@ class ContextServiceServicerImpl(ContextServiceServicer, ContextPolicyServiceSer # return ConnectionList(connections=[db_connection.dump() for db_connection in db_connections]) # # @safe_and_metered_rpc_method(METRICS, LOGGER) -# def GetConnection(self, request: ConnectionId, context : grpc.ServicerContext) -> Connection: +# def GetConnection(self, request : ConnectionId, context : grpc.ServicerContext) -> Connection: # with self.lock: # db_connection : ConnectionModel = get_object(self.database, ConnectionModel, request.connection_uuid.uuid) # return Connection(**db_connection.dump(include_path=True, include_sub_service_ids=True)) # # @safe_and_metered_rpc_method(METRICS, LOGGER) -# def SetConnection(self, request: Connection, context : grpc.ServicerContext) -> ConnectionId: +# def SetConnection(self, request : Connection, context : grpc.ServicerContext) -> ConnectionId: # with self.lock: # connection_uuid = request.connection_id.connection_uuid.uuid # @@ -1167,7 +1070,7 @@ class ContextServiceServicerImpl(ContextServiceServicer, ContextPolicyServiceSer # return ConnectionId(**dict_connection_id) # # @safe_and_metered_rpc_method(METRICS, LOGGER) -# def RemoveConnection(self, request: ConnectionId, context : grpc.ServicerContext) -> Empty: +# def RemoveConnection(self, request : ConnectionId, context : grpc.ServicerContext) -> Empty: # with self.lock: # db_connection = ConnectionModel(self.database, request.connection_uuid.uuid, auto_load=False) # found = db_connection.load() @@ -1181,7 +1084,7 @@ class ContextServiceServicerImpl(ContextServiceServicer, ContextPolicyServiceSer # return Empty() # ## @safe_and_metered_rpc_method(METRICS, LOGGER) -## def GetConnectionEvents(self, request: Empty, context : grpc.ServicerContext) -> Iterator[ConnectionEvent]: +## def GetConnectionEvents(self, request : Empty, context : grpc.ServicerContext) -> Iterator[ConnectionEvent]: ## for message in self.messagebroker.consume({TOPIC_CONNECTION}, consume_timeout=CONSUME_TIMEOUT): ## yield ConnectionEvent(**json.loads(message.content)) # @@ -1189,28 +1092,28 @@ class ContextServiceServicerImpl(ContextServiceServicer, ContextPolicyServiceSer # # ----- Policy ----------------------------------------------------------------------------------------------------- # # @safe_and_metered_rpc_method(METRICS, LOGGER) -# def ListPolicyRuleIds(self, request: Empty, context: grpc.ServicerContext) -> PolicyRuleIdList: +# def ListPolicyRuleIds(self, request : Empty, context: grpc.ServicerContext) -> PolicyRuleIdList: # with self.lock: # db_policy_rules: List[PolicyRuleModel] = get_all_objects(self.database, PolicyRuleModel) # db_policy_rules = sorted(db_policy_rules, key=operator.attrgetter('pk')) # return PolicyRuleIdList(policyRuleIdList=[db_policy_rule.dump_id() for db_policy_rule in db_policy_rules]) # # @safe_and_metered_rpc_method(METRICS, LOGGER) -# def ListPolicyRules(self, request: Empty, context: grpc.ServicerContext) -> PolicyRuleList: +# def ListPolicyRules(self, request : Empty, context: grpc.ServicerContext) -> PolicyRuleList: # with self.lock: # db_policy_rules: List[PolicyRuleModel] = get_all_objects(self.database, PolicyRuleModel) # db_policy_rules = sorted(db_policy_rules, key=operator.attrgetter('pk')) # return PolicyRuleList(policyRules=[db_policy_rule.dump() for db_policy_rule in db_policy_rules]) # # @safe_and_metered_rpc_method(METRICS, LOGGER) -# def GetPolicyRule(self, request: PolicyRuleId, context: grpc.ServicerContext) -> PolicyRule: +# def GetPolicyRule(self, request : PolicyRuleId, context: grpc.ServicerContext) -> PolicyRule: # with self.lock: # policy_rule_uuid = request.uuid.uuid # db_policy_rule: PolicyRuleModel = get_object(self.database, PolicyRuleModel, policy_rule_uuid) # return PolicyRule(**db_policy_rule.dump()) # # @safe_and_metered_rpc_method(METRICS, LOGGER) -# def SetPolicyRule(self, request: PolicyRule, context: grpc.ServicerContext) -> PolicyRuleId: +# def SetPolicyRule(self, request : PolicyRule, context: grpc.ServicerContext) -> PolicyRuleId: # with self.lock: # policy_rule_type = request.WhichOneof('policy_rule') # policy_rule_json = grpc_message_to_json(request) @@ -1225,7 +1128,7 @@ class ContextServiceServicerImpl(ContextServiceServicer, ContextPolicyServiceSer # return PolicyRuleId(**dict_policy_id) # # @safe_and_metered_rpc_method(METRICS, LOGGER) -# def RemovePolicyRule(self, request: PolicyRuleId, context: grpc.ServicerContext) -> Empty: +# def RemovePolicyRule(self, request : PolicyRuleId, context: grpc.ServicerContext) -> Empty: # with self.lock: # policy_uuid = request.uuid.uuid # db_policy = PolicyRuleModel(self.database, policy_uuid, auto_load=False) diff --git a/src/context/service/Engine.py b/src/context/service/Engine.py index 08e1e4f93f6a67a3fe43b8f672613ab78ffc0c81..ec4702f271ecec2659f7c227e9540db8a1c03e26 100644 --- a/src/context/service/Engine.py +++ b/src/context/service/Engine.py @@ -20,21 +20,31 @@ LOGGER = logging.getLogger(__name__) APP_NAME = 'tfs' class Engine: - def get_engine(self) -> sqlalchemy.engine.Engine: + @staticmethod + def get_engine() -> sqlalchemy.engine.Engine: crdb_uri = get_setting('CRDB_URI') try: engine = sqlalchemy.create_engine( - crdb_uri, connect_args={'application_name': APP_NAME}, echo=False, future=True) + crdb_uri, connect_args={'application_name': APP_NAME}, echo=True, future=True) except: # pylint: disable=bare-except LOGGER.exception('Failed to connect to database: {:s}'.format(crdb_uri)) return None try: - if not sqlalchemy_utils.database_exists(engine.url): - sqlalchemy_utils.create_database(engine.url) + Engine.create_database(engine) except: # pylint: disable=bare-except - LOGGER.exception('Failed to check/create to database: {:s}'.format(crdb_uri)) + LOGGER.exception('Failed to check/create to database: {:s}'.format(engine.url)) return None return engine + + @staticmethod + def create_database(engine : sqlalchemy.engine.Engine) -> None: + if not sqlalchemy_utils.database_exists(engine.url): + sqlalchemy_utils.create_database(engine.url) + + @staticmethod + def drop_database(engine : sqlalchemy.engine.Engine) -> None: + if sqlalchemy_utils.database_exists(engine.url): + sqlalchemy_utils.drop_database(engine.url) diff --git a/src/context/service/__main__.py b/src/context/service/__main__.py index c5bbcc3f27aef2a55fa6614adad34c8004f4fdd7..fbdabb2d7d624aa5ba97f859bd9cfd50e792b8b1 100644 --- a/src/context/service/__main__.py +++ b/src/context/service/__main__.py @@ -45,7 +45,7 @@ def main(): metrics_port = get_metrics_port() start_http_server(metrics_port) - db_engine = Engine().get_engine() + db_engine = Engine.get_engine() rebuild_database(db_engine, drop_if_exists=False) # Get message broker instance diff --git a/src/context/service/database/ContextModel.py b/src/context/service/database/ContextModel.py index 241198d3f798830ff2206911c71933307c6976aa..ae8cf995f67cba1708e1bad6d941ef2c976eb67c 100644 --- a/src/context/service/database/ContextModel.py +++ b/src/context/service/database/ContextModel.py @@ -12,15 +12,12 @@ # See the License for the specific language governing permissions and # limitations under the License. -import logging -from typing import Dict +from typing import Dict, List from sqlalchemy import Column, Float, String from sqlalchemy.dialects.postgresql import UUID from sqlalchemy.orm import relationship from ._Base import _Base -LOGGER = logging.getLogger(__name__) - class ContextModel(_Base): __tablename__ = 'context' context_uuid = Column(UUID(as_uuid=False), primary_key=True) @@ -28,33 +25,20 @@ class ContextModel(_Base): created_at = Column(Float) topology = relationship('TopologyModel', back_populates='context') + #service = relationship('ServiceModel', back_populates='context') + #slice = relationship('SliceModel', back_populates='context') def dump_id(self) -> Dict: return {'context_uuid': {'uuid': self.context_uuid}} - #@staticmethod - #def main_pk_name(): - # return 'context_uuid' - - """ - def dump_service_ids(self) -> List[Dict]: - from .ServiceModel import ServiceModel # pylint: disable=import-outside-toplevel - db_service_pks = self.references(ServiceModel) - return [ServiceModel(self.database, pk).dump_id() for pk,_ in db_service_pks] - def dump_topology_ids(self) -> List[Dict]: - from .TopologyModel import TopologyModel # pylint: disable=import-outside-toplevel - db_topology_pks = self.references(TopologyModel) - return [TopologyModel(self.database, pk).dump_id() for pk,_ in db_topology_pks] - """ - - def dump(self, - include_services : bool = True, # pylint: disable=arguments-differ - include_slices : bool = True, # pylint: disable=arguments-differ - include_topologies : bool = True # pylint: disable=arguments-differ - ) -> Dict: - result = {'context_id': self.dump_id(), 'name': self.context_name} - # if include_services: result['service_ids'] = self.dump_service_ids() - # if include_slices: result['slice_ids'] = self.dump_slice_ids() - # if include_topologies: result['topology_ids'] = self.dump_topology_ids() - return result + return + + def dump(self) -> Dict: + return { + 'context_id' : self.dump_id(), + 'name' : self.context_name, + 'topology_ids': [obj.dump_id() for obj in self.topology], + #'service_ids' : [obj.dump_id() for obj in self.service ], + #'slice_ids' : [obj.dump_id() for obj in self.slice ], + } diff --git a/src/context/service/database/DeviceModel.py b/src/context/service/database/DeviceModel.py index cb568e123f7f67b84e614845180001165f3e0172..5c9e27e06ded6a4a3d31a0d356fd8621cd930dc2 100644 --- a/src/context/service/database/DeviceModel.py +++ b/src/context/service/database/DeviceModel.py @@ -11,17 +11,18 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. + import enum import functools, logging -import uuid -from typing import Dict, List -from common.orm.Database import Database -from common.orm.backend.Tools import key_to_str +#import uuid +from typing import Dict #, List +#from common.orm.Database import Database +#from common.orm.backend.Tools import key_to_str from common.proto.context_pb2 import DeviceDriverEnum, DeviceOperationalStatusEnum -from sqlalchemy import Column, ForeignKey, String, Enum +from sqlalchemy import Column, Float, ForeignKey, String, Enum from sqlalchemy.dialects.postgresql import UUID, ARRAY -from context.service.database._Base import Base from sqlalchemy.orm import relationship +from context.service.database._Base import _Base from .Tools import grpc_to_enum LOGGER = logging.getLogger(__name__) @@ -46,80 +47,152 @@ class ORM_DeviceOperationalStatusEnum(enum.Enum): grpc_to_enum__device_operational_status = functools.partial( grpc_to_enum, DeviceOperationalStatusEnum, ORM_DeviceOperationalStatusEnum) -class DeviceModel(Base): - __tablename__ = 'Device' +class DeviceModel(_Base): + __tablename__ = 'device' device_uuid = Column(UUID(as_uuid=False), primary_key=True) - device_type = Column(String) - device_config_uuid = Column(UUID(as_uuid=False), ForeignKey("Config.config_uuid", ondelete='CASCADE')) - device_operational_status = Column(Enum(ORM_DeviceOperationalStatusEnum, create_constraint=False, - native_enum=False)) + device_name = Column(String(), nullable=False) + device_type = Column(String(), nullable=False) + #device_config_uuid = Column(UUID(as_uuid=False), ForeignKey('config.config_uuid', ondelete='CASCADE')) + device_operational_status = Column(Enum(ORM_DeviceOperationalStatusEnum)) + device_drivers = Column(ARRAY(Enum(ORM_DeviceDriverEnum), dimensions=1)) + created_at = Column(Float) # Relationships - device_config = relationship("ConfigModel", passive_deletes=True, lazy="joined") - driver = relationship("DriverModel", passive_deletes=True, back_populates="device") - endpoints = relationship("EndPointModel", passive_deletes=True, back_populates="device") + topology_device = relationship('TopologyDeviceModel', back_populates='devices') + #device_config = relationship("ConfigModel", passive_deletes=True, lazy="joined") + endpoints = relationship('EndPointModel', passive_deletes=True, back_populates='device') def dump_id(self) -> Dict: return {'device_uuid': {'uuid': self.device_uuid}} - def dump_config(self) -> Dict: - return self.device_config.dump() - - def dump_drivers(self) -> List[int]: - response = [] - for a in self.driver: - response.append(a.dump()) - - return response - - def dump_endpoints(self) -> List[Dict]: - response = [] - - for a in self.endpoints: - response.append(a.dump()) - - return response - - def dump( # pylint: disable=arguments-differ - self, include_config_rules=True, include_drivers=True, include_endpoints=True - ) -> Dict: - result = { - 'device_id': self.dump_id(), - 'device_type': self.device_type, + def dump(self) -> Dict: + return { + 'device_id' : self.dump_id(), + 'name' : self.device_name, + 'device_type' : self.device_type, 'device_operational_status': self.device_operational_status.value, + 'device_drivers' : [d.value for d in self.device_drivers], + #'device_config' : {'config_rules': self.device_config.dump()}, + #'device_endpoints' : [ep.dump() for ep in self.endpoints], } - if include_config_rules: result.setdefault('device_config', {})['config_rules'] = self.dump_config() - if include_drivers: result['device_drivers'] = self.dump_drivers() - if include_endpoints: result['device_endpoints'] = self.dump_endpoints() - return result - - @staticmethod - def main_pk_name(): - return 'device_uuid' -class DriverModel(Base): # pylint: disable=abstract-method - __tablename__ = 'Driver' - # driver_uuid = Column(UUID(as_uuid=False), primary_key=True) - device_uuid = Column(UUID(as_uuid=False), ForeignKey("Device.device_uuid", ondelete='CASCADE'), primary_key=True) - driver = Column(Enum(ORM_DeviceDriverEnum, create_constraint=False, native_enum=False)) - - # Relationships - device = relationship("DeviceModel", back_populates="driver") - - - def dump(self) -> Dict: - return self.driver.value - - @staticmethod - def main_pk_name(): - return 'device_uuid' +#def set_drivers(database : Database, db_device : DeviceModel, grpc_device_drivers): +# db_device_pk = db_device.device_uuid +# for driver in grpc_device_drivers: +# orm_driver = grpc_to_enum__device_driver(driver) +# str_device_driver_key = key_to_str([db_device_pk, orm_driver.name]) +# db_device_driver = DriverModel(database, str_device_driver_key) +# db_device_driver.device_fk = db_device +# db_device_driver.driver = orm_driver +# db_device_driver.save() + +# def set_kpi_sample_types(self, db_endpoint: EndPointModel, grpc_endpoint_kpi_sample_types): +# db_endpoint_pk = db_endpoint.endpoint_uuid +# for kpi_sample_type in grpc_endpoint_kpi_sample_types: +# orm_kpi_sample_type = grpc_to_enum__kpi_sample_type(kpi_sample_type) +# # str_endpoint_kpi_sample_type_key = key_to_str([db_endpoint_pk, orm_kpi_sample_type.name]) +# data = {'endpoint_uuid': db_endpoint_pk, +# 'kpi_sample_type': orm_kpi_sample_type.name, +# 'kpi_uuid': str(uuid.uuid4())} +# db_endpoint_kpi_sample_type = KpiSampleTypeModel(**data) +# self.database.create(db_endpoint_kpi_sample_type) + +# def set_drivers(self, db_device: DeviceModel, grpc_device_drivers): +# db_device_pk = db_device.device_uuid +# for driver in grpc_device_drivers: +# orm_driver = grpc_to_enum__device_driver(driver) +# str_device_driver_key = key_to_str([db_device_pk, orm_driver.name]) +# driver_config = { +# # "driver_uuid": str(uuid.uuid4()), +# "device_uuid": db_device_pk, +# "driver": orm_driver.name +# } +# db_device_driver = DriverModel(**driver_config) +# db_device_driver.device_fk = db_device +# db_device_driver.driver = orm_driver +# +# self.database.create_or_update(db_device_driver) -def set_drivers(database : Database, db_device : DeviceModel, grpc_device_drivers): - db_device_pk = db_device.device_uuid - for driver in grpc_device_drivers: - orm_driver = grpc_to_enum__device_driver(driver) - str_device_driver_key = key_to_str([db_device_pk, orm_driver.name]) - db_device_driver = DriverModel(database, str_device_driver_key) - db_device_driver.device_fk = db_device - db_device_driver.driver = orm_driver - db_device_driver.save() +# def update_config( +# self, session, db_parent_pk: str, config_name: str, +# raw_config_rules: List[Tuple[ORM_ConfigActionEnum, str, str]] +# ) -> List[Tuple[Union[ConfigModel, ConfigRuleModel], bool]]: +# +# created = False +# +# db_config = session.query(ConfigModel).filter_by(**{ConfigModel.main_pk_name(): db_parent_pk}).one_or_none() +# if not db_config: +# db_config = ConfigModel() +# setattr(db_config, ConfigModel.main_pk_name(), db_parent_pk) +# session.add(db_config) +# session.commit() +# created = True +# +# LOGGER.info('UPDATED-CONFIG: {}'.format(db_config.dump())) +# +# db_objects: List[Tuple[Union[ConfigModel, ConfigRuleModel], bool]] = [(db_config, created)] +# +# for position, (action, resource_key, resource_value) in enumerate(raw_config_rules): +# if action == ORM_ConfigActionEnum.SET: +# result : Tuple[ConfigRuleModel, bool] = self.set_config_rule( +# db_config, position, resource_key, resource_value) +# db_config_rule, updated = result +# db_objects.append((db_config_rule, updated)) +# elif action == ORM_ConfigActionEnum.DELETE: +# self.delete_config_rule(db_config, resource_key) +# else: +# msg = 'Unsupported action({:s}) for resource_key({:s})/resource_value({:s})' +# raise AttributeError( +# msg.format(str(ConfigActionEnum.Name(action)), str(resource_key), str(resource_value))) +# +# return db_objects +# +# def set_config_rule(self, db_config: ConfigModel, position: int, resource_key: str, resource_value: str, +# ): # -> Tuple[ConfigRuleModel, bool]: +# +# from src.context.service.database.Tools import fast_hasher +# str_rule_key_hash = fast_hasher(resource_key) +# str_config_rule_key = key_to_str([db_config.config_uuid, str_rule_key_hash], separator=':') +# pk = str(uuid.uuid5(uuid.UUID('9566448d-e950-425e-b2ae-7ead656c7e47'), str_config_rule_key)) +# data = {'config_rule_uuid': pk, 'config_uuid': db_config.config_uuid, 'position': position, +# 'action': ORM_ConfigActionEnum.SET, 'key': resource_key, 'value': resource_value} +# to_add = ConfigRuleModel(**data) +# +# result, updated = self.database.create_or_update(to_add) +# return result, updated +# +# def delete_config_rule( +# self, db_config: ConfigModel, resource_key: str +# ) -> None: +# +# from src.context.service.database.Tools import fast_hasher +# str_rule_key_hash = fast_hasher(resource_key) +# str_config_rule_key = key_to_str([db_config.pk, str_rule_key_hash], separator=':') +# +# db_config_rule = self.database.get_object(ConfigRuleModel, str_config_rule_key, raise_if_not_found=False) +# +# if db_config_rule is None: +# return +# db_config_rule.delete() +# +# def delete_all_config_rules(self, db_config: ConfigModel) -> None: +# +# db_config_rule_pks = db_config.references(ConfigRuleModel) +# for pk, _ in db_config_rule_pks: ConfigRuleModel(self.database, pk).delete() +# +# """ +# for position, (action, resource_key, resource_value) in enumerate(raw_config_rules): +# if action == ORM_ConfigActionEnum.SET: +# result: Tuple[ConfigRuleModel, bool] = set_config_rule( +# database, db_config, position, resource_key, resource_value) +# db_config_rule, updated = result +# db_objects.append((db_config_rule, updated)) +# elif action == ORM_ConfigActionEnum.DELETE: +# delete_config_rule(database, db_config, resource_key) +# else: +# msg = 'Unsupported action({:s}) for resource_key({:s})/resource_value({:s})' +# raise AttributeError( +# msg.format(str(ConfigActionEnum.Name(action)), str(resource_key), str(resource_value))) +# +# return db_objects +# """ diff --git a/src/context/service/database/EndPointModel.py b/src/context/service/database/EndPointModel.py index 38214aa9bd57556f617568c112786964d1b87a67..a8d3c2c699db8f54024508edc376c519509f0c03 100644 --- a/src/context/service/database/EndPointModel.py +++ b/src/context/service/database/EndPointModel.py @@ -12,93 +12,63 @@ # See the License for the specific language governing permissions and # limitations under the License. -import logging -from typing import Dict, List, Optional, Tuple -from common.orm.Database import Database -from common.orm.HighLevel import get_object -from common.orm.backend.Tools import key_to_str -from common.proto.context_pb2 import EndPointId -from .KpiSampleType import ORM_KpiSampleTypeEnum, grpc_to_enum__kpi_sample_type -from sqlalchemy import Column, ForeignKey, String, Enum, ForeignKeyConstraint -from sqlalchemy.dialects.postgresql import UUID -from context.service.database._Base import Base +import enum, functools +from typing import Dict +from sqlalchemy import Column, String, Enum, ForeignKeyConstraint +from sqlalchemy.dialects.postgresql import ARRAY, UUID from sqlalchemy.orm import relationship -LOGGER = logging.getLogger(__name__) +from common.proto.kpi_sample_types_pb2 import KpiSampleType +from ._Base import _Base +from .Tools import grpc_to_enum -class EndPointModel(Base): - __tablename__ = 'EndPoint' - topology_uuid = Column(UUID(as_uuid=False), ForeignKey("Topology.topology_uuid"), primary_key=True) - device_uuid = Column(UUID(as_uuid=False), ForeignKey("Device.device_uuid", ondelete='CASCADE'), primary_key=True) - endpoint_uuid = Column(UUID(as_uuid=False), primary_key=True, unique=True) - endpoint_type = Column(String) +class ORM_KpiSampleTypeEnum(enum.Enum): + UNKNOWN = KpiSampleType.KPISAMPLETYPE_UNKNOWN + PACKETS_TRANSMITTED = KpiSampleType.KPISAMPLETYPE_PACKETS_TRANSMITTED + PACKETS_RECEIVED = KpiSampleType.KPISAMPLETYPE_PACKETS_RECEIVED + BYTES_TRANSMITTED = KpiSampleType.KPISAMPLETYPE_BYTES_TRANSMITTED + BYTES_RECEIVED = KpiSampleType.KPISAMPLETYPE_BYTES_RECEIVED + +grpc_to_enum__kpi_sample_type = functools.partial( + grpc_to_enum, KpiSampleType, ORM_KpiSampleTypeEnum) - # Relationships - kpi_sample_types = relationship("KpiSampleTypeModel", passive_deletes=True, back_populates="EndPoint") - device = relationship("DeviceModel", back_populates="endpoints") +class EndPointModel(_Base): + __tablename__ = 'endpoint' + context_uuid = Column(UUID(as_uuid=False), primary_key=True) + topology_uuid = Column(UUID(as_uuid=False), primary_key=True) + device_uuid = Column(UUID(as_uuid=False), primary_key=True) + endpoint_uuid = Column(UUID(as_uuid=False), primary_key=True) + endpoint_type = Column(String) + kpi_sample_types = Column(ARRAY(Enum(ORM_KpiSampleTypeEnum), dimensions=1)) - @staticmethod - def main_pk_name(): - return 'endpoint_uuid' + __table_args__ = ( + ForeignKeyConstraint( + ['context_uuid', 'topology_uuid'], + ['topology.context_uuid', 'topology.topology_uuid'], + ondelete='CASCADE'), + ForeignKeyConstraint( + ['device_uuid'], + ['device.device_uuid'], + ondelete='CASCADE'), + ) - def delete(self) -> None: - for db_kpi_sample_type_pk,_ in self.references(KpiSampleTypeModel): - KpiSampleTypeModel(self.database, db_kpi_sample_type_pk).delete() - super().delete() + topology = relationship('TopologyModel', back_populates='endpoints') + device = relationship('DeviceModel', back_populates='endpoints') def dump_id(self) -> Dict: result = { + 'topology_id': self.topology.dump_id(), 'device_id': self.device.dump_id(), 'endpoint_uuid': {'uuid': self.endpoint_uuid}, } return result - def dump_kpi_sample_types(self) -> List[int]: - # db_kpi_sample_type_pks = self.references(KpiSampleTypeModel) - # return [KpiSampleTypeModel(self.database, pk).dump() for pk,_ in db_kpi_sample_type_pks] - response = [] - for a in self.kpi_sample_types: - response.append(a.dump()) - return response - - def dump( # pylint: disable=arguments-differ - self, include_kpi_sample_types=True - ) -> Dict: - result = { - 'endpoint_id': self.dump_id(), - 'endpoint_type': self.endpoint_type, - } - if include_kpi_sample_types: result['kpi_sample_types'] = self.dump_kpi_sample_types() - return result - - -class KpiSampleTypeModel(Base): # pylint: disable=abstract-method - __tablename__ = 'KpiSampleType' - kpi_uuid = Column(UUID(as_uuid=False), primary_key=True) - endpoint_uuid = Column(UUID(as_uuid=False), ForeignKey("EndPoint.endpoint_uuid", ondelete='CASCADE')) - kpi_sample_type = Column(Enum(ORM_KpiSampleTypeEnum, create_constraint=False, - native_enum=False)) - # __table_args__ = (ForeignKeyConstraint([endpoint_uuid], [EndPointModel.endpoint_uuid]), {}) - - # Relationships - EndPoint = relationship("EndPointModel", passive_deletes=True, back_populates="kpi_sample_types") - def dump(self) -> Dict: - return self.kpi_sample_type.value - - def main_pk_name(self): - return 'kpi_uuid' + return { + 'endpoint_id' : self.dump_id(), + 'endpoint_type' : self.endpoint_type, + 'kpi_sample_types': [kst.value for kst in self.kpi_sample_types], + } -""" -def set_kpi_sample_types(database : Database, db_endpoint : EndPointModel, grpc_endpoint_kpi_sample_types): - db_endpoint_pk = db_endpoint.pk - for kpi_sample_type in grpc_endpoint_kpi_sample_types: - orm_kpi_sample_type = grpc_to_enum__kpi_sample_type(kpi_sample_type) - str_endpoint_kpi_sample_type_key = key_to_str([db_endpoint_pk, orm_kpi_sample_type.name]) - db_endpoint_kpi_sample_type = KpiSampleTypeModel(database, str_endpoint_kpi_sample_type_key) - db_endpoint_kpi_sample_type.endpoint_fk = db_endpoint - db_endpoint_kpi_sample_type.kpi_sample_type = orm_kpi_sample_type - db_endpoint_kpi_sample_type.save() -""" # def get_endpoint( # database : Database, grpc_endpoint_id : EndPointId, # validate_topology_exists : bool = True, validate_device_in_topology : bool = True diff --git a/src/context/service/database/KpiSampleType.py b/src/context/service/database/KpiSampleType.py deleted file mode 100644 index 7f122f185d6ae7ac2a209b78ae6e4deed01582b4..0000000000000000000000000000000000000000 --- a/src/context/service/database/KpiSampleType.py +++ /dev/null @@ -1,28 +0,0 @@ -# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/) -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import functools -import enum -from common.proto.kpi_sample_types_pb2 import KpiSampleType -from .Tools import grpc_to_enum - -class ORM_KpiSampleTypeEnum(enum.Enum): - UNKNOWN = KpiSampleType.KPISAMPLETYPE_UNKNOWN - PACKETS_TRANSMITTED = KpiSampleType.KPISAMPLETYPE_PACKETS_TRANSMITTED - PACKETS_RECEIVED = KpiSampleType.KPISAMPLETYPE_PACKETS_RECEIVED - BYTES_TRANSMITTED = KpiSampleType.KPISAMPLETYPE_BYTES_TRANSMITTED - BYTES_RECEIVED = KpiSampleType.KPISAMPLETYPE_BYTES_RECEIVED - -grpc_to_enum__kpi_sample_type = functools.partial( - grpc_to_enum, KpiSampleType, ORM_KpiSampleTypeEnum) diff --git a/src/context/service/database/RelationModels.py b/src/context/service/database/RelationModels.py index 61e05db0e4f02f342bd42fad413c279427e8fa9c..bcf85d005d6e0c11cb77ea567b1bf28fa47bdc1a 100644 --- a/src/context/service/database/RelationModels.py +++ b/src/context/service/database/RelationModels.py @@ -13,39 +13,39 @@ # limitations under the License. import logging -from sqlalchemy import Column, ForeignKey +from sqlalchemy import Column, ForeignKey, ForeignKeyConstraint from sqlalchemy.dialects.postgresql import UUID -from context.service.database._Base import Base +from sqlalchemy.orm import relationship +from context.service.database._Base import _Base LOGGER = logging.getLogger(__name__) -# -# class ConnectionSubServiceModel(Model): # pylint: disable=abstract-method + +# class ConnectionSubServiceModel(Model): # pk = PrimaryKeyField() # connection_fk = ForeignKeyField(ConnectionModel) # sub_service_fk = ForeignKeyField(ServiceModel) # -class LinkEndPointModel(Base): # pylint: disable=abstract-method - __tablename__ = 'LinkEndPoint' - # uuid = Column(UUID(as_uuid=False), primary_key=True, unique=True) - link_uuid = Column(UUID(as_uuid=False), ForeignKey("Link.link_uuid")) - endpoint_uuid = Column(UUID(as_uuid=False), ForeignKey("EndPoint.endpoint_uuid"), primary_key=True) - - @staticmethod - def main_pk_name(): - return 'endpoint_uuid' - +#class LinkEndPointModel(Base): +# __tablename__ = 'LinkEndPoint' +# # uuid = Column(UUID(as_uuid=False), primary_key=True, unique=True) +# link_uuid = Column(UUID(as_uuid=False), ForeignKey("Link.link_uuid")) +# endpoint_uuid = Column(UUID(as_uuid=False), ForeignKey("EndPoint.endpoint_uuid"), primary_key=True) # -# class ServiceEndPointModel(Model): # pylint: disable=abstract-method +# @staticmethod +# def main_pk_name(): +# return 'endpoint_uuid' +# +# class ServiceEndPointModel(Model): # pk = PrimaryKeyField() # service_fk = ForeignKeyField(ServiceModel) # endpoint_fk = ForeignKeyField(EndPointModel) # -# class SliceEndPointModel(Model): # pylint: disable=abstract-method +# class SliceEndPointModel(Model): # pk = PrimaryKeyField() # slice_fk = ForeignKeyField(SliceModel) # endpoint_fk = ForeignKeyField(EndPointModel) # -# class SliceServiceModel(Model): # pylint: disable=abstract-method +# class SliceServiceModel(Model): # pk = PrimaryKeyField() # slice_fk = ForeignKeyField(SliceModel) # service_fk = ForeignKeyField(ServiceMo# pylint: disable=abstract-method @@ -55,26 +55,32 @@ class LinkEndPointModel(Base): # pylint: disable=abstract-method # endpoint_uuid = Column(UUID(as_uuid=False), ForeignKey("EndPoint.endpoint_uuid")) #del) # -# class SliceSubSliceModel(Model): # pylint: disable=abstract-method +# class SliceSubSliceModel(Model): # pk = PrimaryKeyField() # slice_fk = ForeignKeyField(SliceModel) # sub_slice_fk = ForeignKeyField(SliceModel) -class TopologyDeviceModel(Base): # pylint: disable=abstract-method - __tablename__ = 'TopologyDevice' - # uuid = Column(UUID(as_uuid=False), primary_key=True, unique=True) - topology_uuid = Column(UUID(as_uuid=False), ForeignKey("Topology.topology_uuid")) - device_uuid = Column(UUID(as_uuid=False), ForeignKey("Device.device_uuid"), primary_key=True) +class TopologyDeviceModel(_Base): + __tablename__ = 'topology_device' + context_uuid = Column(UUID(as_uuid=False), primary_key=True) + topology_uuid = Column(UUID(as_uuid=False), primary_key=True) + device_uuid = Column(UUID(as_uuid=False), primary_key=True) - @staticmethod - def main_pk_name(): - return 'device_uuid' -# -class TopologyLinkModel(Base): # pylint: disable=abstract-method - __tablename__ = 'TopologyLink' - topology_uuid = Column(UUID(as_uuid=False), ForeignKey("Topology.topology_uuid")) - link_uuid = Column(UUID(as_uuid=False), ForeignKey("Link.link_uuid"), primary_key=True) + topologies = relationship('TopologyModel', back_populates='topology_device') + devices = relationship('DeviceModel', back_populates='topology_device') + + __table_args__ = ( + ForeignKeyConstraint( + ['context_uuid', 'topology_uuid'], + ['topology.context_uuid', 'topology.topology_uuid'], + ondelete='CASCADE'), + ForeignKeyConstraint( + ['device_uuid'], + ['device.device_uuid'], + ondelete='CASCADE'), + ) - @staticmethod - def main_pk_name(): - return 'link_uuid' \ No newline at end of file +#class TopologyLinkModel(Base): +# __tablename__ = 'TopologyLink' +# topology_uuid = Column(UUID(as_uuid=False), ForeignKey("Topology.topology_uuid")) +# link_uuid = Column(UUID(as_uuid=False), ForeignKey("Link.link_uuid"), primary_key=True) diff --git a/src/context/service/database/TopologyModel.py b/src/context/service/database/TopologyModel.py index 102e3ae3f11ceace9e9b08a431a8ba32c15bc045..57fe1b3475aa643eba881324f0fc3c05dad74578 100644 --- a/src/context/service/database/TopologyModel.py +++ b/src/context/service/database/TopologyModel.py @@ -12,40 +12,35 @@ # See the License for the specific language governing permissions and # limitations under the License. -import logging #, operator -from typing import Dict #, List -from sqlalchemy import Column, ForeignKey +from typing import Dict +from sqlalchemy import Column, Float, ForeignKey, String from sqlalchemy.dialects.postgresql import UUID from sqlalchemy.orm import relationship from ._Base import _Base -LOGGER = logging.getLogger(__name__) - class TopologyModel(_Base): - __tablename__ = 'Topology' + __tablename__ = 'topology' context_uuid = Column(UUID(as_uuid=False), ForeignKey('context.context_uuid'), primary_key=True) topology_uuid = Column(UUID(as_uuid=False), primary_key=True, unique=True) + topology_name = Column(String(), nullable=False) + created_at = Column(Float) # Relationships - context = relationship('ContextModel', back_populates='topology') + context = relationship('ContextModel', back_populates='topology') + topology_device = relationship('TopologyDeviceModel', back_populates='topologies') + #topology_link = relationship('TopologyLinkModel', back_populates='topology') + endpoints = relationship('EndPointModel', back_populates='topology') def dump_id(self) -> Dict: - context_id = self.context.dump_id() return { - 'context_id': context_id, + 'context_id': self.context.dump_id(), 'topology_uuid': {'uuid': self.topology_uuid}, } - #@staticmethod - #def main_pk_name() -> str: - # return 'topology_uuid' - def dump(self) -> Dict: - # pylint: disable=arguments-differ - result = {'topology_id': self.dump_id()} - # params: , devices=None, links=None - #if devices: - # result['device_ids'] = [device.dump_id() for device in devices] - #if links: - # result['link_ids'] = [link.dump_id() for link in links] - return result + return { + 'topology_id': self.dump_id(), + 'name' : self.topology_name, + 'device_ids' : [{'device_uuid': {'uuid': td.device_uuid}} for td in self.topology_device], + #'link_ids' : [{'link_uuid' : {'uuid': td.link_uuid }} for td in self.topology_link ], + } diff --git a/src/context/service/database/__init__.py b/src/context/service/database/__init__.py index c4940470a23d6b672e1389b32419894d0563ab70..9953c820575d42fa88351cc8de022d880ba96e6a 100644 --- a/src/context/service/database/__init__.py +++ b/src/context/service/database/__init__.py @@ -11,7 +11,3 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - -from ._Base import _Base, rebuild_database -from .ContextModel import ContextModel -from .TopologyModel import TopologyModel diff --git a/src/context/tests/test_unitary.py b/src/context/tests/test_unitary.py index 32c571359cfadd8da134b42c48691a697166ef96..c85042d2cd71285c98015a3a8872cf59d4cf7683 100644 --- a/src/context/tests/test_unitary.py +++ b/src/context/tests/test_unitary.py @@ -23,7 +23,7 @@ from context.service.Database import Database from common.message_broker.Factory import get_messagebroker_backend, BackendEnum as MessageBrokerBackendEnum from common.message_broker.MessageBroker import MessageBroker from common.proto.context_pb2 import ( - Connection, ConnectionEvent, ConnectionId, Context, ContextEvent, ContextId, Device, DeviceEvent, DeviceId, + Connection, ConnectionEvent, ConnectionId, Context, ContextEvent, ContextId, Device, DeviceDriverEnum, DeviceEvent, DeviceId, DeviceOperationalStatusEnum, Empty, EventTypeEnum, Link, LinkEvent, LinkId, Service, ServiceEvent, ServiceId, ServiceStatusEnum, ServiceTypeEnum, Topology, TopologyEvent, TopologyId) from common.proto.policy_pb2 import (PolicyRuleIdList, PolicyRuleId, PolicyRuleList, PolicyRule) @@ -93,7 +93,10 @@ def context_db_mb(request) -> Tuple[Session, MessageBroker]: #msg = 'Running scenario {:s} db_session={:s}, mb_backend={:s}, mb_settings={:s}...' #LOGGER.info(msg.format(str(name), str(db_session), str(mb_backend.value), str(mb_settings))) - _db_engine = Engine().get_engine() + _db_engine = Engine.get_engine() + Engine.drop_database(_db_engine) + Engine.create_database(_db_engine) + rebuild_database(_db_engine) _msg_broker = MessageBroker(get_messagebroker_backend(backend=MessageBrokerBackendEnum.INMEMORY)) yield _db_engine, _msg_broker @@ -133,16 +136,14 @@ def context_client_grpc(context_service_grpc : ContextService): # pylint: disabl # assert reply.status_code == 200, 'Reply failed with code {}'.format(reply.status_code) # return reply.json() -# ----- Test gRPC methods ---------------------------------------------------------------------------------------------- +# pylint: disable=redefined-outer-name, unused-argument +def test_grpc_initialize(context_client_grpc : ContextClient) -> None: + # dummy method used to initialize fixtures, database, message broker, etc. + pass -def test_grpc_context( - context_client_grpc : ContextClient, # pylint: disable=redefined-outer-name - context_db_mb : Tuple[sqlalchemy.engine.Engine, MessageBroker] # pylint: disable=redefined-outer-name -) -> None: - db_engine = context_db_mb[0] +# ----- Test gRPC methods ---------------------------------------------------------------------------------------------- - # ----- Clean the database ----------------------------------------------------------------------------------------- - rebuild_database(db_engine, drop_if_exists=True) +def test_grpc_context(context_client_grpc : ContextClient) -> None: # pylint: disable=redefined-outer-name # ----- Initialize the EventsCollector ----------------------------------------------------------------------------- #events_collector = EventsCollector( @@ -165,14 +166,6 @@ def test_grpc_context( response = context_client_grpc.ListContexts(Empty()) assert len(response.contexts) == 0 - # ----- Dump state of database before create the object ------------------------------------------------------------ - #db_entries = database.dump_all() - #LOGGER.info('----- Database Dump [{:3d} entries] -------------------------'.format(len(db_entries))) - #for db_entry in db_entries: - # LOGGER.info(' [{:>4s}] {:40s} :: {:s}'.format(*db_entry)) # pragma: no cover - #LOGGER.info('-----------------------------------------------------------') - #assert len(db_entries) == 0 - # ----- Create the object ------------------------------------------------------------------------------------------ response = context_client_grpc.SetContext(Context(**CONTEXT)) assert response.context_uuid.uuid == DEFAULT_CONTEXT_UUID @@ -267,14 +260,6 @@ def test_grpc_context( assert len(response.contexts[0].service_ids) == 0 assert len(response.contexts[0].slice_ids) == 0 - # ----- Dump state of database after create/update the object ------------------------------------------------------ - #db_entries = database.dump_all() - #LOGGER.info('----- Database Dump [{:3d} entries] -------------------------'.format(len(db_entries))) - #for db_entry in db_entries: - # LOGGER.info(db_entry) - #LOGGER.info('-----------------------------------------------------------') - #assert len(db_entries) == 1 - # ----- Remove the object ------------------------------------------------------------------------------------------ context_client_grpc.RemoveContext(ContextId(**CONTEXT_ID)) @@ -294,28 +279,16 @@ def test_grpc_context( # ----- Stop the EventsCollector ----------------------------------------------------------------------------------- #events_collector.stop() - # ----- Dump state of database after remove the object ------------------------------------------------------------- - #db_entries = database.dump_all() - #LOGGER.info('----- Database Dump [{:3d} entries] -------------------------'.format(len(db_entries))) - #for db_entry in db_entries: - # LOGGER.info(db_entry) - #LOGGER.info('-----------------------------------------------------------') - #assert len(db_entries) == 0 - -""" -def test_grpc_topology( - context_client_grpc: ContextClient, # pylint: disable=redefined-outer-name - context_db_mb: Tuple[Session, MessageBroker]): # pylint: disable=redefined-outer-name - session = context_db_mb[0] - - database = Database(session) - # ----- Clean the database ----------------------------------------------------------------------------------------- - database.clear() +def test_grpc_topology(context_client_grpc : ContextClient) -> None: # pylint: disable=redefined-outer-name # ----- Initialize the EventsCollector ----------------------------------------------------------------------------- - events_collector = EventsCollector(context_client_grpc) - events_collector.start() + #events_collector = EventsCollector( + # context_client_grpc, log_events_received=True, + # activate_context_collector = False, activate_topology_collector = True, activate_device_collector = False, + # activate_link_collector = False, activate_service_collector = False, activate_slice_collector = False, + # activate_connection_collector = False) + #events_collector.start() # ----- Prepare dependencies for the test and capture related events ----------------------------------------------- response = context_client_grpc.SetContext(Context(**CONTEXT)) @@ -329,72 +302,90 @@ def test_grpc_topology( with pytest.raises(grpc.RpcError) as e: context_client_grpc.GetTopology(TopologyId(**TOPOLOGY_ID)) assert e.value.code() == grpc.StatusCode.NOT_FOUND - # assert e.value.details() == 'Topology({:s}/{:s}) not found'.format(DEFAULT_CONTEXT_UUID, DEFAULT_TOPOLOGY_UUID) - assert e.value.details() == 'Topology({:s}) not found'.format(DEFAULT_TOPOLOGY_UUID) + assert e.value.details() == 'Topology({:s}/{:s}) not found'.format(DEFAULT_CONTEXT_UUID, DEFAULT_TOPOLOGY_UUID) + # ----- List when the object does not exist ------------------------------------------------------------------------ response = context_client_grpc.ListTopologyIds(ContextId(**CONTEXT_ID)) assert len(response.topology_ids) == 0 + response = context_client_grpc.ListTopologies(ContextId(**CONTEXT_ID)) assert len(response.topologies) == 0 - # ----- Dump state of database before create the object ------------------------------------------------------------ - db_entries = database.dump_all() - LOGGER.info('----- Database Dump [{:3d} entries] -------------------------'.format(len(db_entries))) - for db_entry in db_entries: - LOGGER.info(db_entry) - LOGGER.info('-----------------------------------------------------------') - assert len(db_entries) == 1 - # ----- Create the object ------------------------------------------------------------------------------------------ response = context_client_grpc.SetTopology(Topology(**TOPOLOGY)) assert response.context_id.context_uuid.uuid == DEFAULT_CONTEXT_UUID assert response.topology_uuid.uuid == DEFAULT_TOPOLOGY_UUID - CONTEXT_WITH_TOPOLOGY = copy.deepcopy(CONTEXT) - CONTEXT_WITH_TOPOLOGY['topology_ids'].append(TOPOLOGY_ID) - response = context_client_grpc.SetContext(Context(**CONTEXT_WITH_TOPOLOGY)) - assert response.context_uuid.uuid == DEFAULT_CONTEXT_UUID + #CONTEXT_WITH_TOPOLOGY = copy.deepcopy(CONTEXT) + #CONTEXT_WITH_TOPOLOGY['topology_ids'].append(TOPOLOGY_ID) + #response = context_client_grpc.SetContext(Context(**CONTEXT_WITH_TOPOLOGY)) + #assert response.context_uuid.uuid == DEFAULT_CONTEXT_UUID # ----- Check create event ----------------------------------------------------------------------------------------- - # events = events_collector.get_events(block=True, count=2) + #events = events_collector.get_events(block=True, count=2) + #assert isinstance(events[0], TopologyEvent) + #assert events[0].event.event_type == EventTypeEnum.EVENTTYPE_CREATE + #assert events[0].topology_id.context_id.context_uuid.uuid == DEFAULT_CONTEXT_UUID + #assert events[0].topology_id.topology_uuid.uuid == DEFAULT_TOPOLOGY_UUID + #assert isinstance(events[1], ContextEvent) + #assert events[1].event.event_type == EventTypeEnum.EVENTTYPE_UPDATE + #assert events[1].context_id.context_uuid.uuid == DEFAULT_CONTEXT_UUID - # assert isinstance(events[0], TopologyEvent) - # assert events[0].event.event_type == EventTypeEnum.EVENTTYPE_CREATE - # assert events[0].topology_id.context_id.context_uuid.uuid == DEFAULT_CONTEXT_UUID - # assert events[0].topology_id.topology_uuid.uuid == DEFAULT_TOPOLOGY_UUID + # ----- Get when the object exists --------------------------------------------------------------------------------- + response = context_client_grpc.GetContext(ContextId(**CONTEXT_ID)) + assert response.context_id.context_uuid.uuid == DEFAULT_CONTEXT_UUID + assert response.name == '' + assert len(response.topology_ids) == 1 + assert response.topology_ids[0].context_id.context_uuid.uuid == DEFAULT_CONTEXT_UUID + assert response.topology_ids[0].topology_uuid.uuid == DEFAULT_TOPOLOGY_UUID + assert len(response.service_ids) == 0 + assert len(response.slice_ids) == 0 + + response = context_client_grpc.GetTopology(TopologyId(**TOPOLOGY_ID)) + assert response.topology_id.context_id.context_uuid.uuid == DEFAULT_CONTEXT_UUID + assert response.topology_id.topology_uuid.uuid == DEFAULT_TOPOLOGY_UUID + assert response.name == '' + assert len(response.device_ids) == 0 + assert len(response.link_ids) == 0 + + # ----- List when the object exists -------------------------------------------------------------------------------- + response = context_client_grpc.ListTopologyIds(ContextId(**CONTEXT_ID)) + assert len(response.topology_ids) == 1 + assert response.topology_ids[0].context_id.context_uuid.uuid == DEFAULT_CONTEXT_UUID + assert response.topology_ids[0].topology_uuid.uuid == DEFAULT_TOPOLOGY_UUID - # assert isinstance(events[1], ContextEvent) - # assert events[1].event.event_type == EventTypeEnum.EVENTTYPE_UPDATE - # assert events[1].context_id.context_uuid.uuid == DEFAULT_CONTEXT_UUID + response = context_client_grpc.ListTopologies(ContextId(**CONTEXT_ID)) + assert len(response.topologies) == 1 + assert response.topologies[0].topology_id.context_id.context_uuid.uuid == DEFAULT_CONTEXT_UUID + assert response.topologies[0].topology_id.topology_uuid.uuid == DEFAULT_TOPOLOGY_UUID + assert response.topologies[0].name == '' + assert len(response.topologies[0].device_ids) == 0 + assert len(response.topologies[0].link_ids) == 0 # ----- Update the object ------------------------------------------------------------------------------------------ - response = context_client_grpc.SetTopology(Topology(**TOPOLOGY)) + new_topology_name = 'new' + TOPOLOGY_WITH_NAME = copy.deepcopy(TOPOLOGY) + TOPOLOGY_WITH_NAME['name'] = new_topology_name + response = context_client_grpc.SetTopology(Topology(**TOPOLOGY_WITH_NAME)) assert response.context_id.context_uuid.uuid == DEFAULT_CONTEXT_UUID assert response.topology_uuid.uuid == DEFAULT_TOPOLOGY_UUID # ----- Check update event ----------------------------------------------------------------------------------------- - # event = events_collector.get_event(block=True) - # assert isinstance(event, TopologyEvent) - # assert event.event.event_type == EventTypeEnum.EVENTTYPE_UPDATE - # assert event.topology_id.context_id.context_uuid.uuid == DEFAULT_CONTEXT_UUID - # assert event.topology_id.topology_uuid.uuid == DEFAULT_TOPOLOGY_UUID - - # ----- Dump state of database after create/update the object ------------------------------------------------------ - db_entries = database.dump_all() - LOGGER.info('----- Database Dump [{:3d} entries] -------------------------'.format(len(db_entries))) - for db_entry in db_entries: - LOGGER.info(db_entry) - LOGGER.info('-----------------------------------------------------------') - assert len(db_entries) == 2 + #event = events_collector.get_event(block=True) + #assert isinstance(event, TopologyEvent) + #assert event.event.event_type == EventTypeEnum.EVENTTYPE_UPDATE + #assert event.topology_id.context_id.context_uuid.uuid == DEFAULT_CONTEXT_UUID + #assert event.topology_id.topology_uuid.uuid == DEFAULT_TOPOLOGY_UUID - # ----- Get when the object exists --------------------------------------------------------------------------------- + # ----- Get when the object is modified ---------------------------------------------------------------------------- response = context_client_grpc.GetTopology(TopologyId(**TOPOLOGY_ID)) assert response.topology_id.context_id.context_uuid.uuid == DEFAULT_CONTEXT_UUID assert response.topology_id.topology_uuid.uuid == DEFAULT_TOPOLOGY_UUID + assert response.name == new_topology_name assert len(response.device_ids) == 0 assert len(response.link_ids) == 0 - # ----- List when the object exists -------------------------------------------------------------------------------- + # ----- List when the object is modified --------------------------------------------------------------------------- response = context_client_grpc.ListTopologyIds(ContextId(**CONTEXT_ID)) assert len(response.topology_ids) == 1 assert response.topology_ids[0].context_id.context_uuid.uuid == DEFAULT_CONTEXT_UUID @@ -404,50 +395,46 @@ def test_grpc_topology( assert len(response.topologies) == 1 assert response.topologies[0].topology_id.context_id.context_uuid.uuid == DEFAULT_CONTEXT_UUID assert response.topologies[0].topology_id.topology_uuid.uuid == DEFAULT_TOPOLOGY_UUID + assert response.topologies[0].name == new_topology_name assert len(response.topologies[0].device_ids) == 0 assert len(response.topologies[0].link_ids) == 0 # ----- Remove the object ------------------------------------------------------------------------------------------ context_client_grpc.RemoveTopology(TopologyId(**TOPOLOGY_ID)) - context_client_grpc.RemoveContext(ContextId(**CONTEXT_ID)) # ----- Check remove event ----------------------------------------------------------------------------------------- - # events = events_collector.get_events(block=True, count=2) - - # assert isinstance(events[0], TopologyEvent) - # assert events[0].event.event_type == EventTypeEnum.EVENTTYPE_REMOVE - # assert events[0].topology_id.context_id.context_uuid.uuid == DEFAULT_CONTEXT_UUID - # assert events[0].topology_id.topology_uuid.uuid == DEFAULT_TOPOLOGY_UUID - - # assert isinstance(events[1], ContextEvent) - # assert events[1].event.event_type == EventTypeEnum.EVENTTYPE_REMOVE - # assert events[1].context_id.context_uuid.uuid == DEFAULT_CONTEXT_UUID - - # ----- Stop the EventsCollector ----------------------------------------------------------------------------------- - # events_collector.stop() + #event = events_collector.get_event(block=True) + #assert isinstance(event, TopologyEvent) + #assert event.event.event_type == EventTypeEnum.EVENTTYPE_REMOVE + #assert event.topology_id.context_id.context_uuid.uuid == DEFAULT_CONTEXT_UUID + #assert event.topology_id.topology_uuid.uuid == DEFAULT_TOPOLOGY_UUID - # ----- Dump state of database after remove the object ------------------------------------------------------------- - db_entries = database.dump_all() - LOGGER.info('----- Database Dump [{:3d} entries] -------------------------'.format(len(db_entries))) - for db_entry in db_entries: - LOGGER.info(db_entry) - LOGGER.info('-----------------------------------------------------------') - assert len(db_entries) == 0 + # ----- List after deleting the object ----------------------------------------------------------------------------- + response = context_client_grpc.ListTopologyIds(ContextId(**CONTEXT_ID)) + assert len(response.topology_ids) == 0 + response = context_client_grpc.ListTopologies(ContextId(**CONTEXT_ID)) + assert len(response.topologies) == 0 -def test_grpc_device( - context_client_grpc: ContextClient, # pylint: disable=redefined-outer-name - context_db_mb: Tuple[Session, MessageBroker]): # pylint: disable=redefined-outer-name - session = context_db_mb[0] + # ----- Clean dependencies used in the test and capture related events --------------------------------------------- + context_client_grpc.RemoveContext(ContextId(**CONTEXT_ID)) + #event = events_collector.get_event(block=True) + #assert isinstance(event, ContextEvent) + #assert event.event.event_type == EventTypeEnum.EVENTTYPE_REMOVE + #assert event.context_id.context_uuid.uuid == DEFAULT_CONTEXT_UUID - database = Database(session) + # ----- Stop the EventsCollector ----------------------------------------------------------------------------------- + #events_collector.stop() - # ----- Clean the database ----------------------------------------------------------------------------------------- - database.clear() +def test_grpc_device(context_client_grpc : ContextClient) -> None: # pylint: disable=redefined-outer-name # ----- Initialize the EventsCollector ----------------------------------------------------------------------------- - events_collector = EventsCollector(context_client_grpc) - events_collector.start() + #events_collector = EventsCollector( + # context_client_grpc, log_events_received=True, + # activate_context_collector = False, activate_topology_collector = False, activate_device_collector = True, + # activate_link_collector = False, activate_service_collector = False, activate_slice_collector = False, + # activate_connection_collector = False) + #events_collector.start() # ----- Prepare dependencies for the test and capture related events ----------------------------------------------- response = context_client_grpc.SetContext(Context(**CONTEXT)) @@ -457,16 +444,14 @@ def test_grpc_device( assert response.context_id.context_uuid.uuid == DEFAULT_CONTEXT_UUID assert response.topology_uuid.uuid == DEFAULT_TOPOLOGY_UUID - events = events_collector.get_events(block=True, count=2) - - assert isinstance(events[0], ContextEvent) - assert events[0].event.event_type == EventTypeEnum.EVENTTYPE_CREATE - assert events[0].context_id.context_uuid.uuid == DEFAULT_CONTEXT_UUID - - assert isinstance(events[1], TopologyEvent) - assert events[1].event.event_type == EventTypeEnum.EVENTTYPE_CREATE - assert events[1].topology_id.context_id.context_uuid.uuid == DEFAULT_CONTEXT_UUID - assert events[1].topology_id.topology_uuid.uuid == DEFAULT_TOPOLOGY_UUID + #events = events_collector.get_events(block=True, count=2) + #assert isinstance(events[0], ContextEvent) + #assert events[0].event.event_type == EventTypeEnum.EVENTTYPE_CREATE + #assert events[0].context_id.context_uuid.uuid == DEFAULT_CONTEXT_UUID + #assert isinstance(events[1], TopologyEvent) + #assert events[1].event.event_type == EventTypeEnum.EVENTTYPE_CREATE + #assert events[1].topology_id.context_id.context_uuid.uuid == DEFAULT_CONTEXT_UUID + #assert events[1].topology_id.topology_uuid.uuid == DEFAULT_TOPOLOGY_UUID # ----- Get when the object does not exist ------------------------------------------------------------------------- with pytest.raises(grpc.RpcError) as e: @@ -481,14 +466,6 @@ def test_grpc_device( response = context_client_grpc.ListDevices(Empty()) assert len(response.devices) == 0 - # ----- Dump state of database before create the object ------------------------------------------------------------ - db_entries = database.dump_all() - LOGGER.info('----- Database Dump [{:3d} entries] -------------------------'.format(len(db_entries))) - for db_entry in db_entries: - LOGGER.info(db_entry) - LOGGER.info('-----------------------------------------------------------') - assert len(db_entries) == 2 - # ----- Create the object ------------------------------------------------------------------------------------------ with pytest.raises(grpc.RpcError) as e: WRONG_DEVICE = copy.deepcopy(DEVICE_R1) @@ -499,6 +476,7 @@ def test_grpc_device( msg = 'request.device_endpoints[0].device_id.device_uuid.uuid({}) is invalid; '\ 'should be == request.device_id.device_uuid.uuid({})'.format(WRONG_DEVICE_UUID, DEVICE_R1_UUID) assert e.value.details() == msg + response = context_client_grpc.SetDevice(Device(**DEVICE_R1)) assert response.device_uuid.uuid == DEVICE_R1_UUID @@ -508,8 +486,41 @@ def test_grpc_device( # assert event.event.event_type == EventTypeEnum.EVENTTYPE_CREATE # assert event.device_id.device_uuid.uuid == DEVICE_R1_UUID + # ----- Get when the object exists --------------------------------------------------------------------------------- + response = context_client_grpc.GetDevice(DeviceId(**DEVICE_R1_ID)) + assert response.device_id.device_uuid.uuid == DEVICE_R1_UUID + assert response.name == '' + assert response.device_type == 'packet-router' + #assert len(response.device_config.config_rules) == 3 + assert response.device_operational_status == DeviceOperationalStatusEnum.DEVICEOPERATIONALSTATUS_DISABLED + assert len(response.device_drivers) == 1 + assert DeviceDriverEnum.DEVICEDRIVER_OPENCONFIG in response.device_drivers + #assert len(response.device_endpoints) == 3 + + # ----- List when the object exists -------------------------------------------------------------------------------- + response = context_client_grpc.ListDeviceIds(Empty()) + assert len(response.device_ids) == 1 + assert response.device_ids[0].device_uuid.uuid == DEVICE_R1_UUID + + response = context_client_grpc.ListDevices(Empty()) + assert len(response.devices) == 1 + assert response.devices[0].device_id.device_uuid.uuid == DEVICE_R1_UUID + assert response.devices[0].name == '' + assert response.devices[0].device_type == 'packet-router' + #assert len(response.devices[0].device_config.config_rules) == 3 + assert response.devices[0].device_operational_status == DeviceOperationalStatusEnum.DEVICEOPERATIONALSTATUS_DISABLED + assert len(response.devices[0].device_drivers) == 1 + assert DeviceDriverEnum.DEVICEDRIVER_OPENCONFIG in response.devices[0].device_drivers + #assert len(response.devices[0].device_endpoints) == 3 + # ----- Update the object ------------------------------------------------------------------------------------------ - response = context_client_grpc.SetDevice(Device(**DEVICE_R1)) + new_device_name = 'r1' + new_device_driver = DeviceDriverEnum.DEVICEDRIVER_UNDEFINED + DEVICE_UPDATED = copy.deepcopy(DEVICE_R1) + DEVICE_UPDATED['name'] = new_device_name + DEVICE_UPDATED['device_operational_status'] = DeviceOperationalStatusEnum.DEVICEOPERATIONALSTATUS_ENABLED + DEVICE_UPDATED['device_drivers'].append(new_device_driver) + response = context_client_grpc.SetDevice(Device(**DEVICE_UPDATED)) assert response.device_uuid.uuid == DEVICE_R1_UUID # ----- Check update event ----------------------------------------------------------------------------------------- @@ -518,24 +529,19 @@ def test_grpc_device( # assert event.event.event_type == EventTypeEnum.EVENTTYPE_UPDATE # assert event.device_id.device_uuid.uuid == DEVICE_R1_UUID - # ----- Dump state of database after create/update the object ------------------------------------------------------ - db_entries = database.dump_all() - LOGGER.info('----- Database Dump [{:3d} entries] -------------------------'.format(len(db_entries))) - for db_entry in db_entries: - LOGGER.info(db_entry) - LOGGER.info('-----------------------------------------------------------') - assert len(db_entries) == 47 - - # ----- Get when the object exists --------------------------------------------------------------------------------- + # ----- Get when the object is modified ---------------------------------------------------------------------------- response = context_client_grpc.GetDevice(DeviceId(**DEVICE_R1_ID)) assert response.device_id.device_uuid.uuid == DEVICE_R1_UUID + assert response.name == 'r1' assert response.device_type == 'packet-router' - assert len(response.device_config.config_rules) == 3 - assert response.device_operational_status == DeviceOperationalStatusEnum.DEVICEOPERATIONALSTATUS_DISABLED - assert len(response.device_drivers) == 1 - assert len(response.device_endpoints) == 3 + #assert len(response.device_config.config_rules) == 3 + assert response.device_operational_status == DeviceOperationalStatusEnum.DEVICEOPERATIONALSTATUS_ENABLED + assert len(response.device_drivers) == 2 + assert DeviceDriverEnum.DEVICEDRIVER_UNDEFINED in response.device_drivers + assert DeviceDriverEnum.DEVICEDRIVER_OPENCONFIG in response.device_drivers + #assert len(response.device_endpoints) == 3 - # ----- List when the object exists -------------------------------------------------------------------------------- + # ----- List when the object is modified --------------------------------------------------------------------------- response = context_client_grpc.ListDeviceIds(Empty()) assert len(response.device_ids) == 1 assert response.device_ids[0].device_uuid.uuid == DEVICE_R1_UUID @@ -543,11 +549,14 @@ def test_grpc_device( response = context_client_grpc.ListDevices(Empty()) assert len(response.devices) == 1 assert response.devices[0].device_id.device_uuid.uuid == DEVICE_R1_UUID + assert response.devices[0].name == 'r1' assert response.devices[0].device_type == 'packet-router' - assert len(response.devices[0].device_config.config_rules) == 3 - assert response.devices[0].device_operational_status == DeviceOperationalStatusEnum.DEVICEOPERATIONALSTATUS_DISABLED - assert len(response.devices[0].device_drivers) == 1 - assert len(response.devices[0].device_endpoints) == 3 + #assert len(response.devices[0].device_config.config_rules) == 3 + assert response.devices[0].device_operational_status == DeviceOperationalStatusEnum.DEVICEOPERATIONALSTATUS_ENABLED + assert len(response.devices[0].device_drivers) == 2 + assert DeviceDriverEnum.DEVICEDRIVER_UNDEFINED in response.devices[0].device_drivers + assert DeviceDriverEnum.DEVICEDRIVER_OPENCONFIG in response.devices[0].device_drivers + #assert len(response.devices[0].device_endpoints) == 3 # ----- Create object relation ------------------------------------------------------------------------------------- TOPOLOGY_WITH_DEVICE = copy.deepcopy(TOPOLOGY) @@ -571,15 +580,7 @@ def test_grpc_device( assert response.device_ids[0].device_uuid.uuid == DEVICE_R1_UUID assert len(response.link_ids) == 0 - # ----- Dump state of database after creating the object relation -------------------------------------------------- - db_entries = database.dump_all() - LOGGER.info('----- Database Dump [{:3d} entries] -------------------------'.format(len(db_entries))) - for db_entry in db_entries: - LOGGER.info(db_entry) - LOGGER.info('-----------------------------------------------------------') - assert len(db_entries) == 47 - - # ----- Remove the object -------------------------------ro----------------------------------------------------------- + # ----- Remove the object ------------------------------------------------------------------------------------------ context_client_grpc.RemoveDevice(DeviceId(**DEVICE_R1_ID)) context_client_grpc.RemoveTopology(TopologyId(**TOPOLOGY_ID)) context_client_grpc.RemoveContext(ContextId(**CONTEXT_ID)) @@ -603,15 +604,8 @@ def test_grpc_device( # ----- Stop the EventsCollector ----------------------------------------------------------------------------------- # events_collector.stop() - # ----- Dump state of database after remove the object ------------------------------------------------------------- - db_entries = database.dump_all() - LOGGER.info('----- Database Dump [{:3d} entries] -------------------------'.format(len(db_entries))) - for db_entry in db_entries: - LOGGER.info(db_entry) - LOGGER.info('-----------------------------------------------------------') - assert len(db_entries) == 0 - +""" def test_grpc_link( context_client_grpc: ContextClient, # pylint: disable=redefined-outer-name context_db_mb: Tuple[Session, MessageBroker]): # pylint: disable=redefined-outer-name