Skip to content
Snippets Groups Projects
Commit aa7e998c authored by Ricard Vilalta's avatar Ricard Vilalta
Browse files

First unitary tests

parent c6e5c4cc
No related branches found
No related tags found
2 merge requests!54Release 2.0.0,!42Interdomain Component
import os,grpc
from prometheus_client import Summary
from prometheus_client import Counter
from monitoring.service import SqliteTools, InfluxTools
from monitoring.proto import monitoring_pb2
from monitoring.proto import monitoring_pb2_grpc
from interdomain.proto import interdomain_pb2
from interdomain.proto import interdomain_pb2_grpc
from common.rpc_method_wrapper.ServiceExceptions import ServiceException
from common.logger import getJSONLogger
from context.proto import context_pb2
from slice.Config import GRPC_SERVICE_PORT
from slice.client.SliceClient import SliceClient
from slice.proto import slice_pb2
from device.Config import GRPC_SERVICE_PORT
from device.client.DeviceClient import DeviceClient
from device.proto import device_pb2
LOGGER = getJSONLogger('monitoringservice-server')
LOGGER = getJSONLogger('interdomainservice-server')
LOGGER.setLevel('DEBUG')
MONITORING_GETINSTANTKPI_REQUEST_TIME = Summary('monitoring_getinstantkpi_processing_seconds', 'Time spent processing monitoring instant kpi request')
MONITORING_INCLUDEKPI_COUNTER = Counter('monitoring_includekpi_counter', 'Monitoring include kpi request counter')
INFLUXDB_HOSTNAME = os.environ.get("INFLUXDB_HOSTNAME")
INFLUXDB_USER = os.environ.get("INFLUXDB_USER")
INFLUXDB_PASSWORD = os.environ.get("INFLUXDB_PASSWORD")
INFLUXDB_DATABASE = os.environ.get("INFLUXDB_DATABASE")
class MonitoringServiceServicerImpl(monitoring_pb2_grpc.MonitoringServiceServicer):
class InterdomainServiceServicerImpl(interdomain_pb2_grpc.InterdomainServiceServicer):
def __init__(self):
LOGGER.info('Init monitoringService')
# Init sqlite monitoring db
self.sql_db = SqliteTools.SQLite('monitoring.db')
# Create influx_db client
self.influx_db = InfluxTools.Influx(INFLUXDB_HOSTNAME,"8086",INFLUXDB_USER,INFLUXDB_PASSWORD,INFLUXDB_DATABASE)
# CreateKpi (CreateKpiRequest) returns (KpiId) {}
def CreateKpi(self, request : monitoring_pb2.KpiDescriptor, grpc_context : grpc.ServicerContext) -> monitoring_pb2.KpiId :
# CREATEKPI_COUNTER_STARTED.inc()
LOGGER.info('CreateKpi')
try:
# Here the code to create a sqlite query to crete a KPI and return a KpiID
kpi_id = monitoring_pb2.KpiId()
kpi_description = request.kpi_description
kpi_sample_type = request.kpi_sample_type
kpi_device_id = request.device_id.device_uuid.uuid
kpi_endpoint_id = request.endpoint_id.endpoint_uuid.uuid
kpi_service_id = request.service_id.service_uuid.uuid
LOGGER.info('Init InterdomainService')
data = self.sql_db.insert_KPI(kpi_description, kpi_sample_type, kpi_device_id, kpi_endpoint_id, kpi_service_id)
# rpc Authenticate (context.TeraFlowController) returns (context.AuthenticationResult) {}
def Authenticate(self, request : context_pb2.TeraFlowController) -> context_pb2.AuthenticationResult :
LOGGER.info('Authenticate')
auth_result = context_pb2.AuthenticationResult()
auth_result.context_id = 0
auth_result.authenticated = True
return auth_result
kpi_id.kpi_id.uuid = str(data)
# rpc LookUpSlice(slice.TransportSlice) returns (slice.SliceId) {}
def LookUpSlice ( self, request : slice_pb2.TransportSlice) -> slice_pb2.SliceId:
# CREATEKPI_COUNTER_COMPLETED.inc()
return kpi_id
except ServiceException as e:
LOGGER.exception('CreateKpi exception')
# CREATEKPI_COUNTER_FAILED.inc()
grpc_context.abort(e.code, e.details)
except Exception as e: # pragma: no cover
LOGGER.exception('CreateKpi exception')
# CREATEKPI_COUNTER_FAILED.inc()
grpc_context.abort(grpc.StatusCode.INTERNAL, str(e))
# rpc MonitorKpi (MonitorKpiRequest) returns (context.Empty) {}
def MonitorKpi ( self, request : monitoring_pb2.MonitorKpiRequest, grpc_context : grpc.ServicerContext) -> context_pb2.Empty:
LOGGER.info('MonitorKpi')
LOGGER.info('LookUpSlice')
try:
# Creates the request to send to the device service
monitor_device_request = device_pb2.MonitoringSettings()
kpiDescriptor = self.GetKpiDescriptor(request.kpi_id, grpc_context)
monitor_device_request.kpi_descriptor.CopyFrom(kpiDescriptor)
monitor_device_request.kpi_id.kpi_id.uuid = request.kpi_id.kpi_id.uuid
monitor_device_request.sampling_duration_s = request.sampling_duration_s
monitor_device_request.sampling_interval_s = request.sampling_interval_s
slice_id = slice_pb2.SliceId()
deviceClient = DeviceClient(address="localhost", port=GRPC_SERVICE_PORT ) # instantiate the client
# deviceClient.MonitorDeviceKpi(monitor_device_request)
return sliceId
except Exception as e:
LOGGER.exception('LookUpSlice exception')
return context_pb2.Empty()
except ServiceException as e:
LOGGER.exception('MonitorKpi exception')
# CREATEKPI_COUNTER_FAILED.inc()
grpc_context.abort(e.code, e.details)
except Exception as e: # pragma: no cover
LOGGER.exception('MonitorKpi exception')
# CREATEKPI_COUNTER_FAILED.inc()
# rpc IncludeKpi(IncludeKpiRequest) returns(context.Empty) {}
def IncludeKpi(self, request : monitoring_pb2.Kpi, grpc_context : grpc.ServicerContext) -> context_pb2.Empty:
# rpc OrderSliceFromCatalog(slice.TransportSlice) returns (slice.SliceStatus) {}
def OrderSliceFromCatalog(self, request : slice_pb2.TransportSlice) -> slice_pb2.SliceStatus:
LOGGER.info('IncludeKpi')
LOGGER.info('OrderSliceFromCatalog')
try:
kpiDescriptor = self.GetKpiDescriptor(request.kpi_id, grpc_context)
kpiSampleType = kpiDescriptor.kpi_sample_type
kpiId = request.kpi_id.kpi_id.uuid
deviceId = kpiDescriptor.device_id.device_uuid.uuid
endpointId = kpiDescriptor.endpoint_id.endpoint_uuid.uuid
serviceId = kpiDescriptor.service_id.service_uuid.uuid
time_stamp = request.timestamp
kpi_value = request.kpi_value.intVal
# Build the structure to be included as point in the influxDB
self.influx_db.write_KPI(time_stamp,kpiId,kpiSampleType,deviceId,endpointId,serviceId,kpi_value)
self.influx_db.read_KPI_points()
return context_pb2.Empty()
except ServiceException as e:
LOGGER.exception('IncludeKpi exception')
# CREATEKPI_COUNTER_FAILED.inc()
grpc_context.abort(e.code, e.details)
slice_status=slice_pb2.SliceStatus()
return slice_status
except Exception as e: # pragma: no cover
LOGGER.exception('IncludeKpi exception')
# CREATEKPI_COUNTER_FAILED.inc()
LOGGER.exception('OrderSliceFromCatalog exception')
def GetStreamKpi ( self, request, grpc_context : grpc.ServicerContext):
# receives monitoring.KpiId returns stream monitoring.Kpi
LOGGER.info('GetStreamKpi')
yield monitoring_pb2.Kpi()
@MONITORING_GETINSTANTKPI_REQUEST_TIME.time()
def GetInstantKpi ( self, request, grpc_context : grpc.ServicerContext):
# receives monitoring.KpiId returns monitoring.Kpi
LOGGER.info('GetInstantKpi')
return monitoring_pb2.Kpi()
# rpc CreateSliceAndAddToCatalog(slice.TransportSlice) returns (slice.SliceStatus) {}
def CreateSliceAndAddToCatalog(self, request : slice_pb2.TransportSlice) -> slice_pb2.SliceStatus:
LOGGER.info('OrderSliceFromCatalog')
def GetKpiDescriptor(self, request : monitoring_pb2.KpiId, grpc_context : grpc.ServicerContext) -> monitoring_pb2.KpiDescriptor:
LOGGER.info('getting Kpi by KpiID')
try:
kpi_db = self.sql_db.get_KPI(int(request.kpi_id.uuid))
print(self.sql_db.get_KPIS())
kpiDescriptor = monitoring_pb2.KpiDescriptor()
kpiDescriptor.kpi_description = kpi_db[1]
kpiDescriptor.kpi_sample_type = kpi_db[2]
kpiDescriptor.device_id.device_uuid.uuid = str(kpi_db[3])
kpiDescriptor.endpoint_id.endpoint_uuid.uuid = str(kpi_db[4])
kpiDescriptor.service_id.service_uuid.uuid = str(kpi_db[5])
return kpiDescriptor
except ServiceException as e:
LOGGER.exception('GetKpiDescriptor exception')
grpc_context.abort(e.code, e.details)
slice_status=slice_pb2.SliceStatus()
return slice_status
except Exception as e: # pragma: no cover
LOGGER.exception('GetKpiDescriptor exception')
LOGGER.exception('OrderSliceFromCatalog exception')
import logging, grpc
import os
import sqlite3
import pytest
from typing import Tuple
from interdomain.proto import context_pb2, kpi_sample_types_pb2, monitoring_pb2
from interdomain.client.interdomain_client import InterdomainClient
from interdomain.Config import GRPC_SERVICE_PORT, GRPC_MAX_WORKERS, GRPC_GRACE_PERIOD
from interdomain.service.InterdomainService import InterdomainService
from common.orm.Database import Database
from common.orm.Factory import get_database_backend, BackendEnum as DatabaseBackendEnum
from common.message_broker.Factory import get_messagebroker_backend, BackendEnum as MessageBrokerBackendEnum
from common.message_broker.MessageBroker import MessageBroker
LOGGER = logging.getLogger(__name__)
LOGGER.setLevel(logging.DEBUG)
###########################
# Tests Setup
###########################
SERVER_ADDRESS = '127.0.0.1'
LISTEN_ADDRESS = '[::]'
GRPC_PORT_MONITORING = 9090
GRPC_PORT_CONTEXT = 10000 + grpc_port_context # avoid privileged ports
SCENARIOS = [ # comment/uncomment scenarios to activate/deactivate them in the test unit
('all_inmemory', DatabaseBackendEnum.INMEMORY, {}, MessageBrokerBackendEnum.INMEMORY, {} ),
]
# This fixture will be requested by test cases and last during testing session
@pytest.fixture(scope='session')
def interdomain_service():
LOGGER.warning('interdomain_service begin')
interdomain_port = GRPC_INTERDOMAIN_PORT
max_workers = GRPC_MAX_WORKERS
grace_period = GRPC_GRACE_PERIOD
LOGGER.info('Initializing InterdomainService...')
grpc_service = InterdomainService(port=interdomain_port, max_workers=max_workers, grace_period=grace_period)
server = grpc_service.start()
# yield the server, when test finishes, execution will resume to stop it
LOGGER.warning('interdomain_service yielding')
yield server
LOGGER.info('Terminating InterdomainService...')
grpc_service.stop()
# This fixture will be requested by test cases and last during testing session.
# The client requires the server, so client fixture has the server as dependency.
@pytest.fixture(scope='session')
def interdomain_client(interdomain_service):
LOGGER.warning('interdomain_client begin')
client = InterdomainClient(server=SERVER_ADDRESS, port=GRPC_PORT_INTERDOMAIN) # instantiate the client
LOGGER.warning('interdomain_client returning')
return client
# This fixture will be requested by test cases and last during testing session.
@pytest.fixture(scope='session')
def create_TeraFlowController():
LOGGER.warning('create_TeraFlowController begin')
# form request
tf_ctl = context_pb2.TeraFlowController()
tf_ctl.context_id = context_pb2.ContextId()
tf_ctl.context_id.context_uuid = context_pb2.Uuid()
tf_ctl.context_id.context_uuid.uuid = str(1)
tf_ctl.ip_address = "127.0.0.1"
tf_ctl.port = 9090
return tf_ctl
@pytest.fixture(scope='session')
def create_TransportSlice():
LOGGER.warning('create_TransportSlice begin')
# form request
slice_req = slice_pb2.TransportSlice()
slice_req.contextId = context_pb2.ContextId()
slice_req.contextId.context_uuid = context_pb2.Uuid()
slice_req.contextId.context_uuid.uuid = str(1)
slice_req.slice_id = context_pb2.Uuid()
slice_req.slice_id.context_uuid.uuid = str(1)
return slice_req
###########################
# Tests Implementation
###########################
# Test case that makes use of client fixture to test server's CreateKpi method
def test_Authenticate(interdomain_client,create_TeraFlowController):
# make call to server
LOGGER.warning('test_Authenticate requesting')
response = interdomain_client.Authenticate(create_TeraFlowController)
LOGGER.debug(str(response))
assert isinstance(response, context.AuthenticationResult)
# Test case that makes use of client fixture to test server's MonitorKpi method
def test_LookUpSlice(interdomain_client,create_TransportSlice):
LOGGER.warning('test_LookUpSlice begin')
response = interdomain_client.LookUpSlice(create_TransportSlice)
LOGGER.debug(str(response))
assert isinstance(response, slice.SliceId)
# Test case that makes use of client fixture to test server's GetStreamKpi method
def test_CreateSliceAndAddToCatalog(interdomain_client,create_TransportSlice):
LOGGER.warning('test_CreateSliceAndAddToCatalog begin')
response = interdomain_client.CreateSliceAndAddToCatalog(create_TransportSlice)
LOGGER.debug(str(response))
assert isinstance(response, slice.SliceId)
# Test case that makes use of client fixture to test server's IncludeKpi method
def test_OrderSliceFromCatalog(interdomain_client,create_TransportSlice):
# make call to server
LOGGER.warning('test_OrderSliceFromCatalog requesting')
response = interdomain_client.OrderSliceFromCatalog(create_TransportSlice)
LOGGER.debug(str(response))
assert isinstance(response, slice.SliceId)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment