Skip to content
Snippets Groups Projects
Commit b76fe6df authored by Waleed Akbar's avatar Waleed Akbar
Browse files

Telemetry frontend test file updated.

- pytest logging is changed to DEBUG.
- ManagemetDB test is removed.
- Irrelevent imports and methods are removed from the test file.
-
parent ee9c2b6c
No related branches found
No related tags found
2 merge requests!294Release TeraFlowSDN 4.0,!258Resolve "Re-designing of Telemetry"
......@@ -24,5 +24,5 @@ cd $PROJECTDIR/src
# python3 kpi_manager/tests/test_unitary.py
RCFILE=$PROJECTDIR/coverage/.coveragerc
python3 -m pytest --log-level=INFO --log-cli-level=INFO --verbose \
python3 -m pytest --log-level=DEBUG --log-cli-level=DEBUG --verbose \
telemetry/frontend/tests/test_frontend.py
#!/bin/bash
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
PROJECTDIR=`pwd`
cd $PROJECTDIR/src
# RCFILE=$PROJECTDIR/coverage/.coveragerc
# coverage run --rcfile=$RCFILE --append -m pytest --log-level=INFO --verbose \
# kpi_manager/tests/test_unitary.py
RCFILE=$PROJECTDIR/coverage/.coveragerc
python3 -m pytest --log-cli-level=INFO --verbose \
telemetry/database/tests/managementDBtests.py
......@@ -13,129 +13,39 @@
# limitations under the License.
import os
import time
import pytest
import logging
from typing import Union
from common.proto.context_pb2 import Empty
# from common.proto.context_pb2 import Empty
from common.Constants import ServiceNameEnum
from common.proto.telemetry_frontend_pb2 import CollectorId, CollectorList
from common.proto.context_pb2_grpc import add_ContextServiceServicer_to_server
from context.client.ContextClient import ContextClient
from common.tools.service.GenericGrpcService import GenericGrpcService
from common.tests.MockServicerImpl_Context import MockServicerImpl_Context
from common.Settings import (
get_service_port_grpc, get_env_var_name, ENVVAR_SUFIX_SERVICE_HOST, ENVVAR_SUFIX_SERVICE_PORT_GRPC)
from telemetry.frontend.client.TelemetryFrontendClient import TelemetryFrontendClient
from telemetry.frontend.service.TelemetryFrontendService import TelemetryFrontendService
from telemetry.frontend.service.TelemetryFrontendServiceServicerImpl import TelemetryFrontendServiceServicerImpl
from telemetry.frontend.tests.Messages import ( create_collector_request, create_collector_filter)
from telemetry.database.managementDB import managementDB
from telemetry.database.TelemetryEngine import TelemetryEngine
from device.client.DeviceClient import DeviceClient
from device.service.DeviceService import DeviceService
from device.service.driver_api.DriverFactory import DriverFactory
from device.service.driver_api.DriverInstanceCache import DriverInstanceCache
from monitoring.service.NameMapping import NameMapping
from telemetry.frontend.tests.Messages import (
create_collector_request, create_collector_filter)
os.environ['DEVICE_EMULATED_ONLY'] = 'TRUE'
from device.service.drivers import DRIVERS
###########################
# Tests Setup
###########################
LOCAL_HOST = '127.0.0.1'
MOCKSERVICE_PORT = 10000
TELEMETRY_FRONTEND_PORT = str(MOCKSERVICE_PORT) + str(get_service_port_grpc(ServiceNameEnum.TELEMETRYFRONTEND))
TELEMETRY_FRONTEND_PORT = str(get_service_port_grpc(ServiceNameEnum.TELEMETRYFRONTEND))
os.environ[get_env_var_name(ServiceNameEnum.TELEMETRYFRONTEND, ENVVAR_SUFIX_SERVICE_HOST )] = str(LOCAL_HOST)
os.environ[get_env_var_name(ServiceNameEnum.TELEMETRYFRONTEND, ENVVAR_SUFIX_SERVICE_PORT_GRPC)] = str(TELEMETRY_FRONTEND_PORT)
LOGGER = logging.getLogger(__name__)
class MockContextService(GenericGrpcService):
# Mock Service implementing Context to simplify unitary tests of Monitoring
def __init__(self, bind_port: Union[str, int]) -> None:
super().__init__(bind_port, LOCAL_HOST, enable_health_servicer=False, cls_name='MockService')
# pylint: disable=attribute-defined-outside-init
def install_servicers(self):
self.context_servicer = MockServicerImpl_Context()
add_ContextServiceServicer_to_server(self.context_servicer, self.server)
@pytest.fixture(scope='session')
def context_service():
LOGGER.info('Initializing MockContextService...')
_service = MockContextService(MOCKSERVICE_PORT)
_service.start()
LOGGER.info('Yielding MockContextService...')
yield _service
LOGGER.info('Terminating MockContextService...')
_service.context_servicer.msg_broker.terminate()
_service.stop()
LOGGER.info('Terminated MockContextService...')
@pytest.fixture(scope='session')
def context_client(context_service : MockContextService): # pylint: disable=redefined-outer-name,unused-argument
LOGGER.info('Initializing ContextClient...')
_client = ContextClient()
LOGGER.info('Yielding ContextClient...')
yield _client
LOGGER.info('Closing ContextClient...')
_client.close()
LOGGER.info('Closed ContextClient...')
@pytest.fixture(scope='session')
def device_service(context_service : MockContextService): # pylint: disable=redefined-outer-name,unused-argument
LOGGER.info('Initializing DeviceService...')
driver_factory = DriverFactory(DRIVERS)
driver_instance_cache = DriverInstanceCache(driver_factory)
_service = DeviceService(driver_instance_cache)
_service.start()
# yield the server, when test finishes, execution will resume to stop it
LOGGER.info('Yielding DeviceService...')
yield _service
LOGGER.info('Terminating DeviceService...')
_service.stop()
LOGGER.info('Terminated DeviceService...')
@pytest.fixture(scope='session')
def device_client(device_service : DeviceService): # pylint: disable=redefined-outer-name,unused-argument
LOGGER.info('Initializing DeviceClient...')
_client = DeviceClient()
LOGGER.info('Yielding DeviceClient...')
yield _client
LOGGER.info('Closing DeviceClient...')
_client.close()
LOGGER.info('Closed DeviceClient...')
@pytest.fixture(scope='session')
def telemetryFrontend_service(
context_service : MockContextService,
device_service : DeviceService
):
def telemetryFrontend_service():
LOGGER.info('Initializing TelemetryFrontendService...')
name_mapping = NameMapping()
_service = TelemetryFrontendService(name_mapping)
_service = TelemetryFrontendService()
_service.start()
# yield the server, when test finishes, execution will resume to stop it
......@@ -168,37 +78,46 @@ def telemetryFrontend_client(
# Tests Implementation of Telemetry Frontend
###########################
def test_verify_db_and_table():
LOGGER.info(' >>> test_verify_database_and_tables START: <<< ')
_engine = TelemetryEngine.get_engine()
managementDB.create_database(_engine)
managementDB.create_tables(_engine)
# ------- Re-structuring Test ---------
def test_StartCollector(telemetryFrontend_client):
LOGGER.info(' >>> test_StartCollector START: <<< ')
response = telemetryFrontend_client.StartCollector(create_collector_request())
LOGGER.debug(str(response))
assert isinstance(response, CollectorId)
def test_run_kafka_listener():
LOGGER.info(' >>> test_run_kafka_listener START: <<< ')
name_mapping = NameMapping()
TelemetryFrontendServiceObj = TelemetryFrontendServiceServicerImpl(name_mapping)
response = TelemetryFrontendServiceObj.run_kafka_listener() # Method "run_kafka_listener" is not define in frontend.proto
LOGGER.debug(str(response))
assert isinstance(response, bool)
def test_StopCollector(telemetryFrontend_client):
LOGGER.info(' >>> test_StopCollector START: <<< ')
_collector_id = telemetryFrontend_client.StartCollector(create_collector_request())
time.sleep(3) # wait for small amount before call the stopCollecter()
response = telemetryFrontend_client.StopCollector(_collector_id)
LOGGER.debug(str(response))
assert isinstance(response, Empty)
def test_select_collectors(telemetryFrontend_client):
LOGGER.info(' >>> test_select_collector requesting <<< ')
response = telemetryFrontend_client.SelectCollectors(create_collector_filter())
LOGGER.info('Received Rows after applying Filter: {:} '.format(response))
LOGGER.debug(str(response))
assert isinstance(response, CollectorList)
\ No newline at end of file
# ------- previous test ----------------
# def test_verify_db_and_table():
# LOGGER.info(' >>> test_verify_database_and_tables START: <<< ')
# _engine = TelemetryEngine.get_engine()
# managementDB.create_database(_engine)
# managementDB.create_tables(_engine)
# def test_StartCollector(telemetryFrontend_client):
# LOGGER.info(' >>> test_StartCollector START: <<< ')
# response = telemetryFrontend_client.StartCollector(create_collector_request())
# LOGGER.debug(str(response))
# assert isinstance(response, CollectorId)
# def test_run_kafka_listener():
# LOGGER.info(' >>> test_run_kafka_listener START: <<< ')
# name_mapping = NameMapping()
# TelemetryFrontendServiceObj = TelemetryFrontendServiceServicerImpl(name_mapping)
# response = TelemetryFrontendServiceObj.run_kafka_listener() # Method "run_kafka_listener" is not define in frontend.proto
# LOGGER.debug(str(response))
# assert isinstance(response, bool)
# def test_StopCollector(telemetryFrontend_client):
# LOGGER.info(' >>> test_StopCollector START: <<< ')
# _collector_id = telemetryFrontend_client.StartCollector(create_collector_request())
# time.sleep(3) # wait for small amount before call the stopCollecter()
# response = telemetryFrontend_client.StopCollector(_collector_id)
# LOGGER.debug(str(response))
# assert isinstance(response, Empty)
# def test_select_collectors(telemetryFrontend_client):
# LOGGER.info(' >>> test_select_collector requesting <<< ')
# response = telemetryFrontend_client.SelectCollectors(create_collector_filter())
# LOGGER.info('Received Rows after applying Filter: {:} '.format(response))
# LOGGER.debug(str(response))
# assert isinstance(response, CollectorList)
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment