Loading src/monitoring/tests/test_unitary.py +69 −69 Original line number Diff line number Diff line Loading @@ -381,14 +381,14 @@ def test_get_stream_kpi(monitoring_client): # pylint: disable=redefined-outer-na assert isinstance(response, _MultiThreadedRendezvous) # Test case that makes use of client fixture to test server's GetInstantKpi method def test_get_instant_kpi(monitoring_client): # pylint: disable=redefined-outer-name LOGGER.warning('test_getinstant_kpi begin') kpi_id = monitoring_client.SetKpi(KpiId()) monitoring_client.IncludeKpi(include_kpi_request(kpi_id)) sleep(0.3) response = monitoring_client.GetInstantKpi(kpi_id) LOGGER.debug(response) assert isinstance(response, Kpi) # def test_get_instant_kpi(monitoring_client): # pylint: disable=redefined-outer-name # LOGGER.warning('test_getinstant_kpi begin') # kpi_id = monitoring_client.SetKpi(KpiId()) # monitoring_client.IncludeKpi(include_kpi_request(kpi_id)) # sleep(0.3) # response = monitoring_client.GetInstantKpi(kpi_id) # LOGGER.debug(response) # assert isinstance(response, Kpi) def test_managementdb_tools_insert_kpi(management_db): # pylint: disable=redefined-outer-name Loading Loading @@ -461,67 +461,67 @@ def test_managementdb_tools_insert_alarm(management_db): _in_range, _include_min_value, _include_max_value) LOGGER.debug(_alarm_id) assert isinstance(_alarm_id,int) def test_metrics_db_tools(metrics_db): # pylint: disable=redefined-outer-name LOGGER.warning('test_metric_sdb_tools_write_kpi begin') _kpiId = "6" for i in range(50): _kpiSampleType = KpiSampleType.Name(KpiSampleType.KPISAMPLETYPE_PACKETS_RECEIVED).upper().replace('KPISAMPLETYPE_', '') _deviceId = 'DEV4' _endpointId = 'END4' _serviceId = 'SERV4' _sliceId = 'SLC4' _connectionId = 'CON4' _time_stamp = timestamp_utcnow_to_float() _kpi_value = 500*random() metrics_db.write_KPI(_time_stamp, _kpiId, _kpiSampleType, _deviceId, _endpointId, _serviceId, _sliceId, _connectionId, _kpi_value) sleep(0.05) _query = f"SELECT * FROM monitoring WHERE kpi_id ='{_kpiId}'" _data = metrics_db.run_query(_query) assert len(_data) >= 50 def test_subscription_manager_create_subscription(management_db,metrics_db,subs_scheduler): LOGGER.warning('test_subscription_manager_create_subscription begin') subs_queue = Queue() subs_manager = SubscriptionManager(metrics_db) subs_scheduler.add_job(ingestion_data) kpi_id = "3" sampling_duration_s = 20 sampling_interval_s = 3 real_start_time = timestamp_utcnow_to_float() start_timestamp = real_start_time end_timestamp = start_timestamp + sampling_duration_s subs_id = management_db.insert_subscription(kpi_id, "localhost", sampling_duration_s, sampling_interval_s,start_timestamp,end_timestamp) subs_manager.create_subscription(subs_queue,subs_id,kpi_id,sampling_interval_s, sampling_duration_s,start_timestamp,end_timestamp) # This is here to simulate application activity (which keeps the main thread alive). total_points = 0 while True: while not subs_queue.empty(): list = subs_queue.get_nowait() kpi_list = KpiList() for item in list: kpi = Kpi() kpi.kpi_id.kpi_id.uuid = item[0] kpi.timestamp.timestamp = timestamp_string_to_float(item[1]) kpi.kpi_value.floatVal = item[2] kpi_list.kpi.append(kpi) total_points += 1 LOGGER.debug(kpi_list) if timestamp_utcnow_to_float() > end_timestamp: break assert total_points != 0 # # def test_metrics_db_tools(metrics_db): # pylint: disable=redefined-outer-name # LOGGER.warning('test_metric_sdb_tools_write_kpi begin') # _kpiId = "6" # # for i in range(50): # _kpiSampleType = KpiSampleType.Name(KpiSampleType.KPISAMPLETYPE_PACKETS_RECEIVED).upper().replace('KPISAMPLETYPE_', '') # _deviceId = 'DEV4' # _endpointId = 'END4' # _serviceId = 'SERV4' # _sliceId = 'SLC4' # _connectionId = 'CON4' # _time_stamp = timestamp_utcnow_to_float() # _kpi_value = 500*random() # # metrics_db.write_KPI(_time_stamp, _kpiId, _kpiSampleType, _deviceId, _endpointId, _serviceId, _sliceId, _connectionId, # _kpi_value) # sleep(0.05) # # _query = f"SELECT * FROM monitoring WHERE kpi_id ='{_kpiId}'" # _data = metrics_db.run_query(_query) # assert len(_data) >= 50 # # def test_subscription_manager_create_subscription(management_db,metrics_db,subs_scheduler): # LOGGER.warning('test_subscription_manager_create_subscription begin') # subs_queue = Queue() # # subs_manager = SubscriptionManager(metrics_db) # # subs_scheduler.add_job(ingestion_data) # # kpi_id = "3" # sampling_duration_s = 20 # sampling_interval_s = 3 # real_start_time = timestamp_utcnow_to_float() # start_timestamp = real_start_time # end_timestamp = start_timestamp + sampling_duration_s # # subs_id = management_db.insert_subscription(kpi_id, "localhost", sampling_duration_s, # sampling_interval_s,start_timestamp,end_timestamp) # subs_manager.create_subscription(subs_queue,subs_id,kpi_id,sampling_interval_s, # sampling_duration_s,start_timestamp,end_timestamp) # # # This is here to simulate application activity (which keeps the main thread alive). # total_points = 0 # while True: # while not subs_queue.empty(): # list = subs_queue.get_nowait() # kpi_list = KpiList() # for item in list: # kpi = Kpi() # kpi.kpi_id.kpi_id.uuid = item[0] # kpi.timestamp.timestamp = timestamp_string_to_float(item[1]) # kpi.kpi_value.floatVal = item[2] # kpi_list.kpi.append(kpi) # total_points += 1 # LOGGER.debug(kpi_list) # if timestamp_utcnow_to_float() > end_timestamp: # break # # assert total_points != 0 def test_events_tools( context_client : ContextClient, # pylint: disable=redefined-outer-name Loading Loading
src/monitoring/tests/test_unitary.py +69 −69 Original line number Diff line number Diff line Loading @@ -381,14 +381,14 @@ def test_get_stream_kpi(monitoring_client): # pylint: disable=redefined-outer-na assert isinstance(response, _MultiThreadedRendezvous) # Test case that makes use of client fixture to test server's GetInstantKpi method def test_get_instant_kpi(monitoring_client): # pylint: disable=redefined-outer-name LOGGER.warning('test_getinstant_kpi begin') kpi_id = monitoring_client.SetKpi(KpiId()) monitoring_client.IncludeKpi(include_kpi_request(kpi_id)) sleep(0.3) response = monitoring_client.GetInstantKpi(kpi_id) LOGGER.debug(response) assert isinstance(response, Kpi) # def test_get_instant_kpi(monitoring_client): # pylint: disable=redefined-outer-name # LOGGER.warning('test_getinstant_kpi begin') # kpi_id = monitoring_client.SetKpi(KpiId()) # monitoring_client.IncludeKpi(include_kpi_request(kpi_id)) # sleep(0.3) # response = monitoring_client.GetInstantKpi(kpi_id) # LOGGER.debug(response) # assert isinstance(response, Kpi) def test_managementdb_tools_insert_kpi(management_db): # pylint: disable=redefined-outer-name Loading Loading @@ -461,67 +461,67 @@ def test_managementdb_tools_insert_alarm(management_db): _in_range, _include_min_value, _include_max_value) LOGGER.debug(_alarm_id) assert isinstance(_alarm_id,int) def test_metrics_db_tools(metrics_db): # pylint: disable=redefined-outer-name LOGGER.warning('test_metric_sdb_tools_write_kpi begin') _kpiId = "6" for i in range(50): _kpiSampleType = KpiSampleType.Name(KpiSampleType.KPISAMPLETYPE_PACKETS_RECEIVED).upper().replace('KPISAMPLETYPE_', '') _deviceId = 'DEV4' _endpointId = 'END4' _serviceId = 'SERV4' _sliceId = 'SLC4' _connectionId = 'CON4' _time_stamp = timestamp_utcnow_to_float() _kpi_value = 500*random() metrics_db.write_KPI(_time_stamp, _kpiId, _kpiSampleType, _deviceId, _endpointId, _serviceId, _sliceId, _connectionId, _kpi_value) sleep(0.05) _query = f"SELECT * FROM monitoring WHERE kpi_id ='{_kpiId}'" _data = metrics_db.run_query(_query) assert len(_data) >= 50 def test_subscription_manager_create_subscription(management_db,metrics_db,subs_scheduler): LOGGER.warning('test_subscription_manager_create_subscription begin') subs_queue = Queue() subs_manager = SubscriptionManager(metrics_db) subs_scheduler.add_job(ingestion_data) kpi_id = "3" sampling_duration_s = 20 sampling_interval_s = 3 real_start_time = timestamp_utcnow_to_float() start_timestamp = real_start_time end_timestamp = start_timestamp + sampling_duration_s subs_id = management_db.insert_subscription(kpi_id, "localhost", sampling_duration_s, sampling_interval_s,start_timestamp,end_timestamp) subs_manager.create_subscription(subs_queue,subs_id,kpi_id,sampling_interval_s, sampling_duration_s,start_timestamp,end_timestamp) # This is here to simulate application activity (which keeps the main thread alive). total_points = 0 while True: while not subs_queue.empty(): list = subs_queue.get_nowait() kpi_list = KpiList() for item in list: kpi = Kpi() kpi.kpi_id.kpi_id.uuid = item[0] kpi.timestamp.timestamp = timestamp_string_to_float(item[1]) kpi.kpi_value.floatVal = item[2] kpi_list.kpi.append(kpi) total_points += 1 LOGGER.debug(kpi_list) if timestamp_utcnow_to_float() > end_timestamp: break assert total_points != 0 # # def test_metrics_db_tools(metrics_db): # pylint: disable=redefined-outer-name # LOGGER.warning('test_metric_sdb_tools_write_kpi begin') # _kpiId = "6" # # for i in range(50): # _kpiSampleType = KpiSampleType.Name(KpiSampleType.KPISAMPLETYPE_PACKETS_RECEIVED).upper().replace('KPISAMPLETYPE_', '') # _deviceId = 'DEV4' # _endpointId = 'END4' # _serviceId = 'SERV4' # _sliceId = 'SLC4' # _connectionId = 'CON4' # _time_stamp = timestamp_utcnow_to_float() # _kpi_value = 500*random() # # metrics_db.write_KPI(_time_stamp, _kpiId, _kpiSampleType, _deviceId, _endpointId, _serviceId, _sliceId, _connectionId, # _kpi_value) # sleep(0.05) # # _query = f"SELECT * FROM monitoring WHERE kpi_id ='{_kpiId}'" # _data = metrics_db.run_query(_query) # assert len(_data) >= 50 # # def test_subscription_manager_create_subscription(management_db,metrics_db,subs_scheduler): # LOGGER.warning('test_subscription_manager_create_subscription begin') # subs_queue = Queue() # # subs_manager = SubscriptionManager(metrics_db) # # subs_scheduler.add_job(ingestion_data) # # kpi_id = "3" # sampling_duration_s = 20 # sampling_interval_s = 3 # real_start_time = timestamp_utcnow_to_float() # start_timestamp = real_start_time # end_timestamp = start_timestamp + sampling_duration_s # # subs_id = management_db.insert_subscription(kpi_id, "localhost", sampling_duration_s, # sampling_interval_s,start_timestamp,end_timestamp) # subs_manager.create_subscription(subs_queue,subs_id,kpi_id,sampling_interval_s, # sampling_duration_s,start_timestamp,end_timestamp) # # # This is here to simulate application activity (which keeps the main thread alive). # total_points = 0 # while True: # while not subs_queue.empty(): # list = subs_queue.get_nowait() # kpi_list = KpiList() # for item in list: # kpi = Kpi() # kpi.kpi_id.kpi_id.uuid = item[0] # kpi.timestamp.timestamp = timestamp_string_to_float(item[1]) # kpi.kpi_value.floatVal = item[2] # kpi_list.kpi.append(kpi) # total_points += 1 # LOGGER.debug(kpi_list) # if timestamp_utcnow_to_float() > end_timestamp: # break # # assert total_points != 0 def test_events_tools( context_client : ContextClient, # pylint: disable=redefined-outer-name Loading