Commits (10)
#!/bin/bash
# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
PROJECTDIR=`pwd`
cd $PROJECTDIR/src
# RCFILE=$PROJECTDIR/coverage/.coveragerc
# coverage run --rcfile=$RCFILE --append -m pytest --log-level=INFO --verbose \
# kpi_manager/tests/test_unitary.py
# python3 kpi_manager/tests/test_unitary.py
RCFILE=$PROJECTDIR/coverage/.coveragerc
python3 -m pytest --log-level=INFO --verbose \
telemetry_frontend/tests/test_unitary.py
\ No newline at end of file
# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
......@@ -191,50 +191,6 @@ def kpi_manager_client(kpi_manager_service : KpiManagerService): # pylint: disab
LOGGER.info('Closed KpiManagerClient...')
# @pytest.fixture(scope='session')
# def management_db():
# _management_db = ManagementDB('monitoring.db')
# return _management_db
# @pytest.fixture(scope='session')
# def metrics_db(kpi_manager_service : KpiManagerService): # pylint: disable=redefined-outer-name
# return monitoring_service.monitoring_servicer.metrics_db
# # This function os not clear to me (Changes should me made before execution)
# @pytest.fixture(scope='session')
# def metrics_db(monitoring_service : MonitoringService): # pylint: disable=redefined-outer-name
# return monitoring_service.monitoring_servicer.metrics_db
# #_metrics_db = MetricsDBTools.MetricsDB(
# # METRICSDB_HOSTNAME, METRICSDB_ILP_PORT, METRICSDB_REST_PORT, METRICSDB_TABLE_MONITORING_KPIS)
# #return _metrics_db
# @pytest.fixture(scope='session')
# def subs_scheduler():
# _scheduler = BackgroundScheduler(executors={'processpool': ProcessPoolExecutor(max_workers=20)})
# _scheduler.start()
# return _scheduler
# def ingestion_data(kpi_id_int):
# # pylint: disable=redefined-outer-name,unused-argument
# metrics_db = MetricsDB('localhost', '9009', '9000', 'monitoring')
# kpiSampleType = KpiSampleType.KPISAMPLETYPE_PACKETS_RECEIVED
# kpiSampleType_name = KpiSampleType.Name(kpiSampleType).upper().replace('KPISAMPLETYPE_', '')
# for _ in range(50):
# kpiSampleType = kpiSampleType_name
# kpiId = kpi_id_int
# deviceId = 'DEV'+ str(kpi_id_int)
# endpointId = 'END' + str(kpi_id_int)
# serviceId = 'SERV' + str(kpi_id_int)
# sliceId = 'SLC' + str(kpi_id_int)
# connectionId = 'CON' + str(kpi_id_int)
# time_stamp = timestamp_utcnow_to_float()
# kpi_value = 500*random()
# metrics_db.write_KPI(time_stamp, kpiId, kpiSampleType, deviceId, endpointId, serviceId, sliceId, connectionId,
# kpi_value)
# sleep(0.1)
##################################################
# Prepare Environment, should be the first test
##################################################
......@@ -248,7 +204,7 @@ def test_prepare_environment(
context_client.SetTopology(Topology(**json_topology(DEFAULT_TOPOLOGY_NAME, context_id=context_id)))
###########################
# Tests Implementation
# Tests Implementation of Kpi Manager
###########################
# Test case that makes use of client fixture to test server's CreateKpi method
......
......@@ -20,7 +20,7 @@ from common.proto.context_pb2 import Empty
from common.tools.grpc.Tools import grpc_message_to_json_string
from common.tools.client.RetryDecorator import retry, delay_exponential
from common.proto.telemetry_frontend_pb2_grpc import TelemetryFrontendServiceStub
from comment.proto.telemetry_frontend_pb2 import Collector, CollectorId, CollectorFilter, CollectorList
from common.proto.telemetry_frontend_pb2 import Collector, CollectorId, CollectorFilter, CollectorList
LOGGER = logging.getLogger(__name__)
MAX_RETRIES = 10
......@@ -48,21 +48,21 @@ class TelemetryFrontendClient:
self.stub = None
@RETRY_DECORATOR
def StartCollector(self, request : Collector) --> CollectorId:
def StartCollector(self, request : Collector) -> CollectorId: # type: ignore
LOGGER.debug('StartCollector: {:s}'.format(grpc_message_to_json_string(request)))
response = self.stub.StartCollector(request)
LOGGER.debug('StartCollector result: {:s}'.format(grpc_message_to_json_string(response)))
return response
@RETRY_DECORATOR
def StopCollector(self, request : CollectorId) --> Empty:
def StopCollector(self, request : CollectorId) -> Empty: # type: ignore
LOGGER.debug('StopCollector: {:s}'.format(grpc_message_to_json_string(request)))
response = self.stub.StopCollector(request)
LOGGER.debug('StopCollector result: {:s}'.format(grpc_message_to_json_string(response)))
return response
@RETRY_DECORATOR
def SelectCollectors(self, request : CollectorFilter) --> CollectorList:
def SelectCollectors(self, request : CollectorFilter) -> CollectorList: # type: ignore
LOGGER.debug('SelectCollectors: {:s}'.format(grpc_message_to_json_string(request)))
response = self.stub.SelectCollectors(request)
LOGGER.debug('SelectCollectors result: {:s}'.format(grpc_message_to_json_string(response)))
......
......@@ -17,7 +17,7 @@ from common.Settings import get_service_port_grpc
from monitoring.service.NameMapping import NameMapping
from common.tools.service.GenericGrpcService import GenericGrpcService
from common.proto.telemetry_frontend_pb2_grpc import add_TelemetryFrontendServiceServicer_to_server
from telemetryfrontend.service.TelemetryFrontendServiceServicerImpl import TelemetryFrontendServiceServicerImpl
from telemetry_frontend.service.TelemetryFrontendServiceServicerImpl import TelemetryFrontendServiceServicerImpl
class TelemetryFrontendService(GenericGrpcService):
......
......@@ -19,4 +19,12 @@ from common.proto import telemetry_frontend_pb2
def collector_id():
_collector_id = telemetry_frontend_pb2.CollectorId()
_collector_id.collector_id.uuid = str(1)
return _collector_id
\ No newline at end of file
return _collector_id
def create_collector_request(coll_id_str):
_create_collector_request = telemetry_frontend_pb2.Collector()
_create_collector_request.collector_id.uuid = str(coll_id_str)
_create_collector_request.kpi_id.kpi_uuid.uuid = 'KPIid' + str(coll_id_str)
_create_collector_request.duration_s = float(-1)
_create_collector_request.interval_s = float(-1)
return _create_collector_request
# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import pytest
import logging
from typing import Union
from common.Constants import ServiceNameEnum
from common.proto.telemetry_frontend_pb2 import CollectorId
from common.proto.context_pb2_grpc import add_ContextServiceServicer_to_server
from context.client.ContextClient import ContextClient
from common.tools.service.GenericGrpcService import GenericGrpcService
from common.tests.MockServicerImpl_Context import MockServicerImpl_Context
from common.Settings import (
get_service_port_grpc, get_env_var_name, ENVVAR_SUFIX_SERVICE_HOST, ENVVAR_SUFIX_SERVICE_PORT_GRPC)
from telemetry_frontend.client.TelemetryFrontendClient import TelemetryFrontendClient
from telemetry_frontend.service.TelemetryFrontendService import TelemetryFrontendService
from telemetry_frontend.tests.Messages import create_collector_request
from device.client.DeviceClient import DeviceClient
from device.service.DeviceService import DeviceService
from device.service.driver_api.DriverFactory import DriverFactory
from device.service.driver_api.DriverInstanceCache import DriverInstanceCache
from monitoring.service.NameMapping import NameMapping
os.environ['DEVICE_EMULATED_ONLY'] = 'TRUE'
from device.service.drivers import DRIVERS
###########################
# Tests Setup
###########################
LOCAL_HOST = '127.0.0.1'
MOCKSERVICE_PORT = 10000
TELEMETRY_FRONTEND_PORT = MOCKSERVICE_PORT + get_service_port_grpc(ServiceNameEnum.TELEMETRYFRONTEND)
os.environ[get_env_var_name(ServiceNameEnum.TELEMETRYFRONTEND, ENVVAR_SUFIX_SERVICE_HOST )] = str(LOCAL_HOST)
os.environ[get_env_var_name(ServiceNameEnum.TELEMETRYFRONTEND, ENVVAR_SUFIX_SERVICE_PORT_GRPC)] = str(TELEMETRY_FRONTEND_PORT)
LOGGER = logging.getLogger(__name__)
class MockContextService(GenericGrpcService):
# Mock Service implementing Context to simplify unitary tests of Monitoring
def __init__(self, bind_port: Union[str, int]) -> None:
super().__init__(bind_port, LOCAL_HOST, enable_health_servicer=False, cls_name='MockService')
# pylint: disable=attribute-defined-outside-init
def install_servicers(self):
self.context_servicer = MockServicerImpl_Context()
add_ContextServiceServicer_to_server(self.context_servicer, self.server)
@pytest.fixture(scope='session')
def context_service():
LOGGER.info('Initializing MockContextService...')
_service = MockContextService(MOCKSERVICE_PORT)
_service.start()
LOGGER.info('Yielding MockContextService...')
yield _service
LOGGER.info('Terminating MockContextService...')
_service.context_servicer.msg_broker.terminate()
_service.stop()
LOGGER.info('Terminated MockContextService...')
@pytest.fixture(scope='session')
def context_client(context_service : MockContextService): # pylint: disable=redefined-outer-name,unused-argument
LOGGER.info('Initializing ContextClient...')
_client = ContextClient()
LOGGER.info('Yielding ContextClient...')
yield _client
LOGGER.info('Closing ContextClient...')
_client.close()
LOGGER.info('Closed ContextClient...')
@pytest.fixture(scope='session')
def device_service(context_service : MockContextService): # pylint: disable=redefined-outer-name,unused-argument
LOGGER.info('Initializing DeviceService...')
driver_factory = DriverFactory(DRIVERS)
driver_instance_cache = DriverInstanceCache(driver_factory)
_service = DeviceService(driver_instance_cache)
_service.start()
# yield the server, when test finishes, execution will resume to stop it
LOGGER.info('Yielding DeviceService...')
yield _service
LOGGER.info('Terminating DeviceService...')
_service.stop()
LOGGER.info('Terminated DeviceService...')
@pytest.fixture(scope='session')
def device_client(device_service : DeviceService): # pylint: disable=redefined-outer-name,unused-argument
LOGGER.info('Initializing DeviceClient...')
_client = DeviceClient()
LOGGER.info('Yielding DeviceClient...')
yield _client
LOGGER.info('Closing DeviceClient...')
_client.close()
LOGGER.info('Closed DeviceClient...')
@pytest.fixture(scope='session')
def telemetryFrontend_service(
context_service : MockContextService,
device_service : DeviceService
):
LOGGER.info('Initializing TelemetryFrontendService...')
name_mapping = NameMapping()
_service = TelemetryFrontendService(name_mapping)
_service.start()
# yield the server, when test finishes, execution will resume to stop it
LOGGER.info('Yielding TelemetryFrontendService...')
yield _service
LOGGER.info('Terminating TelemetryFrontendService...')
_service.stop()
LOGGER.info('Terminated TelemetryFrontendService...')
@pytest.fixture(scope='session')
def telemetryFrontend_client(
telemetryFrontend_service : TelemetryFrontendService
):
LOGGER.info('Initializing TelemetryFrontendClient...')
_client = TelemetryFrontendClient()
# yield the server, when test finishes, execution will resume to stop it
LOGGER.info('Yielding TelemetryFrontendClient...')
yield _client
LOGGER.info('Closing TelemetryFrontendClient...')
_client.close()
LOGGER.info('Closed TelemetryFrontendClient...')
###########################
# Tests Implementation of Telemetry Frontend
###########################
def test_start_collector(telemetryFrontend_client):
LOGGER.warning('test_start_collector requesting')
response = telemetryFrontend_client.StartCollector(create_collector_request('1'))
LOGGER.debug(str(response))
assert isinstance(response, CollectorId)