Skip to content
Snippets Groups Projects
Commit e50c6e93 authored by Waleed Akbar's avatar Waleed Akbar
Browse files

some missing imports were added and some methods are commented temporarily

parent 7b88c37f
No related branches found
No related tags found
2 merge requests!294Release TeraFlowSDN 4.0,!207Resolve "(CTTC) Separation of Monitoring"
......@@ -17,6 +17,7 @@
# sys.path.append('.')
import os, pytest
import logging, json
from typing import Union
from apscheduler.schedulers.background import BackgroundScheduler
......@@ -33,6 +34,8 @@ from common.tools.object_factory.Topology import json_topology, json_topology_id
# from common.proto.monitoring_pb2 import KpiId, KpiDescriptor, SubsDescriptor, SubsList, AlarmID, \
# AlarmDescriptor, AlarmList, KpiDescriptorList, SubsResponse, AlarmResponse, RawKpiTable #, Kpi, KpiList
from common.proto.kpi_manager_pb2 import KpiId, KpiDescriptor, KpiDescriptorList
from common.tools.service.GenericGrpcService import GenericGrpcService
from context.client.ContextClient import ContextClient
from device.service.driver_api.DriverFactory import DriverFactory
......@@ -50,6 +53,9 @@ from monitoring.service.ManagementDBTools import ManagementDB
from monitoring.service.MetricsDBTools import MetricsDB
from monitoring.service.NameMapping import NameMapping
os.environ['DEVICE_EMULATED_ONLY'] = 'TRUE'
from device.service.drivers import DRIVERS
###########################
# Tests Setup
###########################
......@@ -185,49 +191,49 @@ def kpi_manager_client(kpi_manager_service : KpiManagerService): # pylint: disab
LOGGER.info('Closed KpiManagerClient...')
@pytest.fixture(scope='session')
def management_db():
_management_db = ManagementDB('monitoring.db')
return _management_db
@pytest.fixture(scope='session')
def metrics_db(kpi_manager_service : KpiManagerService): # pylint: disable=redefined-outer-name
return monitoring_service.monitoring_servicer.metrics_db
# This function os not clear to me (Changes should me made before execution)
@pytest.fixture(scope='session')
def metrics_db(monitoring_service : MonitoringService): # pylint: disable=redefined-outer-name
return monitoring_service.monitoring_servicer.metrics_db
#_metrics_db = MetricsDBTools.MetricsDB(
# METRICSDB_HOSTNAME, METRICSDB_ILP_PORT, METRICSDB_REST_PORT, METRICSDB_TABLE_MONITORING_KPIS)
#return _metrics_db
@pytest.fixture(scope='session')
def subs_scheduler():
_scheduler = BackgroundScheduler(executors={'processpool': ProcessPoolExecutor(max_workers=20)})
_scheduler.start()
return _scheduler
def ingestion_data(kpi_id_int):
# pylint: disable=redefined-outer-name,unused-argument
metrics_db = MetricsDB('localhost', '9009', '9000', 'monitoring')
kpiSampleType = KpiSampleType.KPISAMPLETYPE_PACKETS_RECEIVED
kpiSampleType_name = KpiSampleType.Name(kpiSampleType).upper().replace('KPISAMPLETYPE_', '')
for _ in range(50):
kpiSampleType = kpiSampleType_name
kpiId = kpi_id_int
deviceId = 'DEV'+ str(kpi_id_int)
endpointId = 'END' + str(kpi_id_int)
serviceId = 'SERV' + str(kpi_id_int)
sliceId = 'SLC' + str(kpi_id_int)
connectionId = 'CON' + str(kpi_id_int)
time_stamp = timestamp_utcnow_to_float()
kpi_value = 500*random()
metrics_db.write_KPI(time_stamp, kpiId, kpiSampleType, deviceId, endpointId, serviceId, sliceId, connectionId,
kpi_value)
sleep(0.1)
# @pytest.fixture(scope='session')
# def management_db():
# _management_db = ManagementDB('monitoring.db')
# return _management_db
# @pytest.fixture(scope='session')
# def metrics_db(kpi_manager_service : KpiManagerService): # pylint: disable=redefined-outer-name
# return monitoring_service.monitoring_servicer.metrics_db
# # This function os not clear to me (Changes should me made before execution)
# @pytest.fixture(scope='session')
# def metrics_db(monitoring_service : MonitoringService): # pylint: disable=redefined-outer-name
# return monitoring_service.monitoring_servicer.metrics_db
# #_metrics_db = MetricsDBTools.MetricsDB(
# # METRICSDB_HOSTNAME, METRICSDB_ILP_PORT, METRICSDB_REST_PORT, METRICSDB_TABLE_MONITORING_KPIS)
# #return _metrics_db
# @pytest.fixture(scope='session')
# def subs_scheduler():
# _scheduler = BackgroundScheduler(executors={'processpool': ProcessPoolExecutor(max_workers=20)})
# _scheduler.start()
# return _scheduler
# def ingestion_data(kpi_id_int):
# # pylint: disable=redefined-outer-name,unused-argument
# metrics_db = MetricsDB('localhost', '9009', '9000', 'monitoring')
# kpiSampleType = KpiSampleType.KPISAMPLETYPE_PACKETS_RECEIVED
# kpiSampleType_name = KpiSampleType.Name(kpiSampleType).upper().replace('KPISAMPLETYPE_', '')
# for _ in range(50):
# kpiSampleType = kpiSampleType_name
# kpiId = kpi_id_int
# deviceId = 'DEV'+ str(kpi_id_int)
# endpointId = 'END' + str(kpi_id_int)
# serviceId = 'SERV' + str(kpi_id_int)
# sliceId = 'SLC' + str(kpi_id_int)
# connectionId = 'CON' + str(kpi_id_int)
# time_stamp = timestamp_utcnow_to_float()
# kpi_value = 500*random()
# metrics_db.write_KPI(time_stamp, kpiId, kpiSampleType, deviceId, endpointId, serviceId, sliceId, connectionId,
# kpi_value)
# sleep(0.1)
##################################################
# Prepare Environment, should be the first test
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment