Loading src/tests/scenario2/tests/test_functional_cleanup.py +25 −2 Original line number Diff line number Diff line Loading @@ -22,7 +22,9 @@ from common.tools.object_factory.Topology import json_topology_id from context.client.ContextClient import ContextClient from context.client.EventsCollector import EventsCollector from common.proto.context_pb2 import ContextId, DeviceId, Empty, LinkId, TopologyId from common.proto.monitoring_pb2 import KpiId, KpiDescriptorList, AlarmList, AlarmID from device.client.DeviceClient import DeviceClient from monitoring.client.MonitoringClient import MonitoringClient from tests.Fixtures import context_client, device_client from .Objects import CONTEXT_ID, CONTEXTS, DEVICES, LINKS, TOPOLOGIES Loading @@ -49,7 +51,7 @@ def test_services_removed(context_client : ContextClient): # pylint: disable=re def test_scenario_cleanup( context_client : ContextClient, device_client : DeviceClient): # pylint: disable=redefined-outer-name context_client : ContextClient, device_client : DeviceClient, monitoring_client : MonitoringClient): # pylint: disable=redefined-outer-name # ----- Start the EventsCollector ---------------------------------------------------------------------------------- #events_collector = EventsCollector(context_client) Loading Loading @@ -91,6 +93,21 @@ def test_scenario_cleanup( context_client.RemoveContext(ContextId(**context_id)) #expected_events.append(('ContextEvent', EVENT_REMOVE, json_context_id(context_uuid))) # ----- Delete Alarms ---------------------------------------------------------------------------------------------- response: AlarmList = monitoring_client.GetAlarms(Empty()) for alarm in response.alarm_descriptor: alarm_id = AlarmID() alarm_id.alarm_id.uuid = alarm.alarm_id.alarm_id.uuid monitoring_client.DeleteAlarm(alarm_id) # ----- Delete Kpis ------------------------------------------------------------------------------------------------ response: KpiDescriptorList = monitoring_client.GetKpiDescriptorList(Empty()) for kpi_descriptor in response.kpi_descriptor_list: kpi_id = KpiId() kpi_id.kpi_id.uuid = kpi_descriptor.kpi_id.kpi_id.uuid monitoring_client.DeleteKpi(kpi_id) # ----- Validate Collected Events ---------------------------------------------------------------------------------- #check_events(events_collector, expected_events) Loading @@ -98,7 +115,7 @@ def test_scenario_cleanup( #events_collector.stop() def test_scenario_empty_again(context_client : ContextClient): # pylint: disable=redefined-outer-name def test_scenario_empty_again(context_client : ContextClient, monitoring_client : MonitoringClient): # pylint: disable=redefined-outer-name # ----- List entities - Ensure database is empty again ------------------------------------------------------------- response = context_client.ListContexts(Empty()) assert len(response.contexts) == 0 Loading @@ -108,3 +125,9 @@ def test_scenario_empty_again(context_client : ContextClient): # pylint: disabl response = context_client.ListLinks(Empty()) assert len(response.links) == 0 response = monitoring_client.GetAlarms(Empty()) assert len(response.response.alarm_descriptor) == 0 response = monitoring_client.GetKpiDescriptorList(Empty()) assert len(response.kpi_descriptor_list) == 0 Loading
src/tests/scenario2/tests/test_functional_cleanup.py +25 −2 Original line number Diff line number Diff line Loading @@ -22,7 +22,9 @@ from common.tools.object_factory.Topology import json_topology_id from context.client.ContextClient import ContextClient from context.client.EventsCollector import EventsCollector from common.proto.context_pb2 import ContextId, DeviceId, Empty, LinkId, TopologyId from common.proto.monitoring_pb2 import KpiId, KpiDescriptorList, AlarmList, AlarmID from device.client.DeviceClient import DeviceClient from monitoring.client.MonitoringClient import MonitoringClient from tests.Fixtures import context_client, device_client from .Objects import CONTEXT_ID, CONTEXTS, DEVICES, LINKS, TOPOLOGIES Loading @@ -49,7 +51,7 @@ def test_services_removed(context_client : ContextClient): # pylint: disable=re def test_scenario_cleanup( context_client : ContextClient, device_client : DeviceClient): # pylint: disable=redefined-outer-name context_client : ContextClient, device_client : DeviceClient, monitoring_client : MonitoringClient): # pylint: disable=redefined-outer-name # ----- Start the EventsCollector ---------------------------------------------------------------------------------- #events_collector = EventsCollector(context_client) Loading Loading @@ -91,6 +93,21 @@ def test_scenario_cleanup( context_client.RemoveContext(ContextId(**context_id)) #expected_events.append(('ContextEvent', EVENT_REMOVE, json_context_id(context_uuid))) # ----- Delete Alarms ---------------------------------------------------------------------------------------------- response: AlarmList = monitoring_client.GetAlarms(Empty()) for alarm in response.alarm_descriptor: alarm_id = AlarmID() alarm_id.alarm_id.uuid = alarm.alarm_id.alarm_id.uuid monitoring_client.DeleteAlarm(alarm_id) # ----- Delete Kpis ------------------------------------------------------------------------------------------------ response: KpiDescriptorList = monitoring_client.GetKpiDescriptorList(Empty()) for kpi_descriptor in response.kpi_descriptor_list: kpi_id = KpiId() kpi_id.kpi_id.uuid = kpi_descriptor.kpi_id.kpi_id.uuid monitoring_client.DeleteKpi(kpi_id) # ----- Validate Collected Events ---------------------------------------------------------------------------------- #check_events(events_collector, expected_events) Loading @@ -98,7 +115,7 @@ def test_scenario_cleanup( #events_collector.stop() def test_scenario_empty_again(context_client : ContextClient): # pylint: disable=redefined-outer-name def test_scenario_empty_again(context_client : ContextClient, monitoring_client : MonitoringClient): # pylint: disable=redefined-outer-name # ----- List entities - Ensure database is empty again ------------------------------------------------------------- response = context_client.ListContexts(Empty()) assert len(response.contexts) == 0 Loading @@ -108,3 +125,9 @@ def test_scenario_empty_again(context_client : ContextClient): # pylint: disabl response = context_client.ListLinks(Empty()) assert len(response.links) == 0 response = monitoring_client.GetAlarms(Empty()) assert len(response.response.alarm_descriptor) == 0 response = monitoring_client.GetKpiDescriptorList(Empty()) assert len(response.kpi_descriptor_list) == 0