diff --git a/src/interdomain/service/InterdomainServiceServicerImpl.py b/src/interdomain/service/InterdomainServiceServicerImpl.py index fcadaa0c910cf912055ad30efc1ed389f256e3e4..76357d876c5d8161657610a023448c03f6f574fd 100644 --- a/src/interdomain/service/InterdomainServiceServicerImpl.py +++ b/src/interdomain/service/InterdomainServiceServicerImpl.py @@ -1,163 +1,66 @@ import os,grpc -from prometheus_client import Summary -from prometheus_client import Counter - -from monitoring.service import SqliteTools, InfluxTools -from monitoring.proto import monitoring_pb2 -from monitoring.proto import monitoring_pb2_grpc +from interdomain.proto import interdomain_pb2 +from interdomain.proto import interdomain_pb2_grpc from common.rpc_method_wrapper.ServiceExceptions import ServiceException from common.logger import getJSONLogger from context.proto import context_pb2 +from slice.Config import GRPC_SERVICE_PORT +from slice.client.SliceClient import SliceClient +from slice.proto import slice_pb2 -from device.Config import GRPC_SERVICE_PORT -from device.client.DeviceClient import DeviceClient -from device.proto import device_pb2 - -LOGGER = getJSONLogger('monitoringservice-server') +LOGGER = getJSONLogger('interdomainservice-server') LOGGER.setLevel('DEBUG') -MONITORING_GETINSTANTKPI_REQUEST_TIME = Summary('monitoring_getinstantkpi_processing_seconds', 'Time spent processing monitoring instant kpi request') -MONITORING_INCLUDEKPI_COUNTER = Counter('monitoring_includekpi_counter', 'Monitoring include kpi request counter') - -INFLUXDB_HOSTNAME = os.environ.get("INFLUXDB_HOSTNAME") -INFLUXDB_USER = os.environ.get("INFLUXDB_USER") -INFLUXDB_PASSWORD = os.environ.get("INFLUXDB_PASSWORD") -INFLUXDB_DATABASE = os.environ.get("INFLUXDB_DATABASE") - - -class MonitoringServiceServicerImpl(monitoring_pb2_grpc.MonitoringServiceServicer): +class InterdomainServiceServicerImpl(interdomain_pb2_grpc.InterdomainServiceServicer): def __init__(self): - LOGGER.info('Init monitoringService') - - # Init sqlite monitoring db - self.sql_db = SqliteTools.SQLite('monitoring.db') - - # Create influx_db client - self.influx_db = InfluxTools.Influx(INFLUXDB_HOSTNAME,"8086",INFLUXDB_USER,INFLUXDB_PASSWORD,INFLUXDB_DATABASE) - - # CreateKpi (CreateKpiRequest) returns (KpiId) {} - def CreateKpi(self, request : monitoring_pb2.KpiDescriptor, grpc_context : grpc.ServicerContext) -> monitoring_pb2.KpiId : - # CREATEKPI_COUNTER_STARTED.inc() - LOGGER.info('CreateKpi') - try: - # Here the code to create a sqlite query to crete a KPI and return a KpiID - kpi_id = monitoring_pb2.KpiId() - - kpi_description = request.kpi_description - kpi_sample_type = request.kpi_sample_type - kpi_device_id = request.device_id.device_uuid.uuid - kpi_endpoint_id = request.endpoint_id.endpoint_uuid.uuid - kpi_service_id = request.service_id.service_uuid.uuid + LOGGER.info('Init InterdomainService') - data = self.sql_db.insert_KPI(kpi_description, kpi_sample_type, kpi_device_id, kpi_endpoint_id, kpi_service_id) + # rpc Authenticate (context.TeraFlowController) returns (context.AuthenticationResult) {} + def Authenticate(self, request : context_pb2.TeraFlowController) -> context_pb2.AuthenticationResult : + LOGGER.info('Authenticate') + auth_result = context_pb2.AuthenticationResult() + auth_result.context_id = 0 + auth_result.authenticated = True + return auth_result - kpi_id.kpi_id.uuid = str(data) + # rpc LookUpSlice(slice.TransportSlice) returns (slice.SliceId) {} + def LookUpSlice ( self, request : slice_pb2.TransportSlice) -> slice_pb2.SliceId: - # CREATEKPI_COUNTER_COMPLETED.inc() - return kpi_id - except ServiceException as e: - LOGGER.exception('CreateKpi exception') - # CREATEKPI_COUNTER_FAILED.inc() - grpc_context.abort(e.code, e.details) - except Exception as e: # pragma: no cover - LOGGER.exception('CreateKpi exception') - # CREATEKPI_COUNTER_FAILED.inc() - grpc_context.abort(grpc.StatusCode.INTERNAL, str(e)) - - # rpc MonitorKpi (MonitorKpiRequest) returns (context.Empty) {} - def MonitorKpi ( self, request : monitoring_pb2.MonitorKpiRequest, grpc_context : grpc.ServicerContext) -> context_pb2.Empty: - - LOGGER.info('MonitorKpi') + LOGGER.info('LookUpSlice') try: - # Creates the request to send to the device service - monitor_device_request = device_pb2.MonitoringSettings() - - kpiDescriptor = self.GetKpiDescriptor(request.kpi_id, grpc_context) - - monitor_device_request.kpi_descriptor.CopyFrom(kpiDescriptor) - monitor_device_request.kpi_id.kpi_id.uuid = request.kpi_id.kpi_id.uuid - monitor_device_request.sampling_duration_s = request.sampling_duration_s - monitor_device_request.sampling_interval_s = request.sampling_interval_s + slice_id = slice_pb2.SliceId() - deviceClient = DeviceClient(address="localhost", port=GRPC_SERVICE_PORT ) # instantiate the client - # deviceClient.MonitorDeviceKpi(monitor_device_request) + return sliceId + except Exception as e: + LOGGER.exception('LookUpSlice exception') - return context_pb2.Empty() - except ServiceException as e: - LOGGER.exception('MonitorKpi exception') - # CREATEKPI_COUNTER_FAILED.inc() - grpc_context.abort(e.code, e.details) - except Exception as e: # pragma: no cover - LOGGER.exception('MonitorKpi exception') - # CREATEKPI_COUNTER_FAILED.inc() - # rpc IncludeKpi(IncludeKpiRequest) returns(context.Empty) {} - def IncludeKpi(self, request : monitoring_pb2.Kpi, grpc_context : grpc.ServicerContext) -> context_pb2.Empty: + # rpc OrderSliceFromCatalog(slice.TransportSlice) returns (slice.SliceStatus) {} + def OrderSliceFromCatalog(self, request : slice_pb2.TransportSlice) -> slice_pb2.SliceStatus: - LOGGER.info('IncludeKpi') + LOGGER.info('OrderSliceFromCatalog') try: - kpiDescriptor = self.GetKpiDescriptor(request.kpi_id, grpc_context) - - kpiSampleType = kpiDescriptor.kpi_sample_type - kpiId = request.kpi_id.kpi_id.uuid - deviceId = kpiDescriptor.device_id.device_uuid.uuid - endpointId = kpiDescriptor.endpoint_id.endpoint_uuid.uuid - serviceId = kpiDescriptor.service_id.service_uuid.uuid - time_stamp = request.timestamp - kpi_value = request.kpi_value.intVal - - # Build the structure to be included as point in the influxDB - self.influx_db.write_KPI(time_stamp,kpiId,kpiSampleType,deviceId,endpointId,serviceId,kpi_value) - - self.influx_db.read_KPI_points() - - return context_pb2.Empty() - except ServiceException as e: - LOGGER.exception('IncludeKpi exception') - # CREATEKPI_COUNTER_FAILED.inc() - grpc_context.abort(e.code, e.details) + slice_status=slice_pb2.SliceStatus() + return slice_status except Exception as e: # pragma: no cover - LOGGER.exception('IncludeKpi exception') - # CREATEKPI_COUNTER_FAILED.inc() + LOGGER.exception('OrderSliceFromCatalog exception') - def GetStreamKpi ( self, request, grpc_context : grpc.ServicerContext): - # receives monitoring.KpiId returns stream monitoring.Kpi - LOGGER.info('GetStreamKpi') - yield monitoring_pb2.Kpi() - @MONITORING_GETINSTANTKPI_REQUEST_TIME.time() - def GetInstantKpi ( self, request, grpc_context : grpc.ServicerContext): - # receives monitoring.KpiId returns monitoring.Kpi - LOGGER.info('GetInstantKpi') - return monitoring_pb2.Kpi() + # rpc CreateSliceAndAddToCatalog(slice.TransportSlice) returns (slice.SliceStatus) {} + def CreateSliceAndAddToCatalog(self, request : slice_pb2.TransportSlice) -> slice_pb2.SliceStatus: + LOGGER.info('OrderSliceFromCatalog') - def GetKpiDescriptor(self, request : monitoring_pb2.KpiId, grpc_context : grpc.ServicerContext) -> monitoring_pb2.KpiDescriptor: - LOGGER.info('getting Kpi by KpiID') try: - kpi_db = self.sql_db.get_KPI(int(request.kpi_id.uuid)) - print(self.sql_db.get_KPIS()) - - kpiDescriptor = monitoring_pb2.KpiDescriptor() - - kpiDescriptor.kpi_description = kpi_db[1] - kpiDescriptor.kpi_sample_type = kpi_db[2] - kpiDescriptor.device_id.device_uuid.uuid = str(kpi_db[3]) - kpiDescriptor.endpoint_id.endpoint_uuid.uuid = str(kpi_db[4]) - kpiDescriptor.service_id.service_uuid.uuid = str(kpi_db[5]) - - return kpiDescriptor - except ServiceException as e: - LOGGER.exception('GetKpiDescriptor exception') - grpc_context.abort(e.code, e.details) - + slice_status=slice_pb2.SliceStatus() + return slice_status except Exception as e: # pragma: no cover - LOGGER.exception('GetKpiDescriptor exception') + LOGGER.exception('OrderSliceFromCatalog exception') diff --git a/src/interdomain/tests/__init__.py b/src/interdomain/tests/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/src/interdomain/tests/test_unitary.py b/src/interdomain/tests/test_unitary.py new file mode 100644 index 0000000000000000000000000000000000000000..57fe4d891fe435a06b309c16bc0dbbf21b05f129 --- /dev/null +++ b/src/interdomain/tests/test_unitary.py @@ -0,0 +1,131 @@ +import logging, grpc +import os +import sqlite3 + +import pytest +from typing import Tuple + +from interdomain.proto import context_pb2, kpi_sample_types_pb2, monitoring_pb2 +from interdomain.client.interdomain_client import InterdomainClient +from interdomain.Config import GRPC_SERVICE_PORT, GRPC_MAX_WORKERS, GRPC_GRACE_PERIOD +from interdomain.service.InterdomainService import InterdomainService + +from common.orm.Database import Database +from common.orm.Factory import get_database_backend, BackendEnum as DatabaseBackendEnum +from common.message_broker.Factory import get_messagebroker_backend, BackendEnum as MessageBrokerBackendEnum +from common.message_broker.MessageBroker import MessageBroker + +LOGGER = logging.getLogger(__name__) +LOGGER.setLevel(logging.DEBUG) + +########################### +# Tests Setup +########################### + +SERVER_ADDRESS = '127.0.0.1' +LISTEN_ADDRESS = '[::]' +GRPC_PORT_MONITORING = 9090 + +GRPC_PORT_CONTEXT = 10000 + grpc_port_context # avoid privileged ports + +SCENARIOS = [ # comment/uncomment scenarios to activate/deactivate them in the test unit + ('all_inmemory', DatabaseBackendEnum.INMEMORY, {}, MessageBrokerBackendEnum.INMEMORY, {} ), +] + + +# This fixture will be requested by test cases and last during testing session +@pytest.fixture(scope='session') +def interdomain_service(): + LOGGER.warning('interdomain_service begin') + + interdomain_port = GRPC_INTERDOMAIN_PORT + max_workers = GRPC_MAX_WORKERS + grace_period = GRPC_GRACE_PERIOD + + LOGGER.info('Initializing InterdomainService...') + grpc_service = InterdomainService(port=interdomain_port, max_workers=max_workers, grace_period=grace_period) + server = grpc_service.start() + + # yield the server, when test finishes, execution will resume to stop it + LOGGER.warning('interdomain_service yielding') + yield server + + LOGGER.info('Terminating InterdomainService...') + grpc_service.stop() + +# This fixture will be requested by test cases and last during testing session. +# The client requires the server, so client fixture has the server as dependency. +@pytest.fixture(scope='session') +def interdomain_client(interdomain_service): + LOGGER.warning('interdomain_client begin') + client = InterdomainClient(server=SERVER_ADDRESS, port=GRPC_PORT_INTERDOMAIN) # instantiate the client + LOGGER.warning('interdomain_client returning') + return client + +# This fixture will be requested by test cases and last during testing session. +@pytest.fixture(scope='session') +def create_TeraFlowController(): + LOGGER.warning('create_TeraFlowController begin') + # form request + tf_ctl = context_pb2.TeraFlowController() + tf_ctl.context_id = context_pb2.ContextId() + tf_ctl.context_id.context_uuid = context_pb2.Uuid() + tf_ctl.context_id.context_uuid.uuid = str(1) + tf_ctl.ip_address = "127.0.0.1" + tf_ctl.port = 9090 + return tf_ctl + +@pytest.fixture(scope='session') +def create_TransportSlice(): + LOGGER.warning('create_TransportSlice begin') + + # form request + slice_req = slice_pb2.TransportSlice() + slice_req.contextId = context_pb2.ContextId() + slice_req.contextId.context_uuid = context_pb2.Uuid() + slice_req.contextId.context_uuid.uuid = str(1) + slice_req.slice_id = context_pb2.Uuid() + slice_req.slice_id.context_uuid.uuid = str(1) + + return slice_req + + +########################### +# Tests Implementation +########################### + + +# Test case that makes use of client fixture to test server's CreateKpi method +def test_Authenticate(interdomain_client,create_TeraFlowController): + # make call to server + LOGGER.warning('test_Authenticate requesting') + response = interdomain_client.Authenticate(create_TeraFlowController) + LOGGER.debug(str(response)) + assert isinstance(response, context.AuthenticationResult) + +# Test case that makes use of client fixture to test server's MonitorKpi method +def test_LookUpSlice(interdomain_client,create_TransportSlice): + LOGGER.warning('test_LookUpSlice begin') + + response = interdomain_client.LookUpSlice(create_TransportSlice) + LOGGER.debug(str(response)) + assert isinstance(response, slice.SliceId) + +# Test case that makes use of client fixture to test server's GetStreamKpi method +def test_CreateSliceAndAddToCatalog(interdomain_client,create_TransportSlice): + LOGGER.warning('test_CreateSliceAndAddToCatalog begin') + response = interdomain_client.CreateSliceAndAddToCatalog(create_TransportSlice) + LOGGER.debug(str(response)) + assert isinstance(response, slice.SliceId) + +# Test case that makes use of client fixture to test server's IncludeKpi method +def test_OrderSliceFromCatalog(interdomain_client,create_TransportSlice): + # make call to server + LOGGER.warning('test_OrderSliceFromCatalog requesting') + response = interdomain_client.OrderSliceFromCatalog(create_TransportSlice) + LOGGER.debug(str(response)) + assert isinstance(response, slice.SliceId) + + + +