Newer
Older
# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os

Waleed Akbar
committed
import time
import pytest
import logging
from typing import Union
from common.proto.context_pb2 import Empty
from common.Constants import ServiceNameEnum
from common.proto.telemetry_frontend_pb2 import CollectorId, CollectorList
from common.proto.context_pb2_grpc import add_ContextServiceServicer_to_server
from context.client.ContextClient import ContextClient
from common.tools.service.GenericGrpcService import GenericGrpcService
from common.tests.MockServicerImpl_Context import MockServicerImpl_Context
from common.Settings import (
get_service_port_grpc, get_env_var_name, ENVVAR_SUFIX_SERVICE_HOST, ENVVAR_SUFIX_SERVICE_PORT_GRPC)
from telemetry.frontend.client.TelemetryFrontendClient import TelemetryFrontendClient
from telemetry.frontend.service.TelemetryFrontendService import TelemetryFrontendService
from telemetry.frontend.service.TelemetryFrontendServiceServicerImpl import TelemetryFrontendServiceServicerImpl
from telemetry.frontend.tests.Messages import ( create_collector_request, create_collector_filter)
from telemetry.database.managementDB import managementDB
from telemetry.database.TelemetryEngine import TelemetryEngine
from device.client.DeviceClient import DeviceClient
from device.service.DeviceService import DeviceService
from device.service.driver_api.DriverFactory import DriverFactory
from device.service.driver_api.DriverInstanceCache import DriverInstanceCache
from monitoring.service.NameMapping import NameMapping
os.environ['DEVICE_EMULATED_ONLY'] = 'TRUE'
from device.service.drivers import DRIVERS
###########################
# Tests Setup
###########################
LOCAL_HOST = '127.0.0.1'
MOCKSERVICE_PORT = 10000

Waleed Akbar
committed
TELEMETRY_FRONTEND_PORT = str(MOCKSERVICE_PORT) + str(get_service_port_grpc(ServiceNameEnum.TELEMETRYFRONTEND))
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
os.environ[get_env_var_name(ServiceNameEnum.TELEMETRYFRONTEND, ENVVAR_SUFIX_SERVICE_HOST )] = str(LOCAL_HOST)
os.environ[get_env_var_name(ServiceNameEnum.TELEMETRYFRONTEND, ENVVAR_SUFIX_SERVICE_PORT_GRPC)] = str(TELEMETRY_FRONTEND_PORT)
LOGGER = logging.getLogger(__name__)
class MockContextService(GenericGrpcService):
# Mock Service implementing Context to simplify unitary tests of Monitoring
def __init__(self, bind_port: Union[str, int]) -> None:
super().__init__(bind_port, LOCAL_HOST, enable_health_servicer=False, cls_name='MockService')
# pylint: disable=attribute-defined-outside-init
def install_servicers(self):
self.context_servicer = MockServicerImpl_Context()
add_ContextServiceServicer_to_server(self.context_servicer, self.server)
@pytest.fixture(scope='session')
def context_service():
LOGGER.info('Initializing MockContextService...')
_service = MockContextService(MOCKSERVICE_PORT)
_service.start()
LOGGER.info('Yielding MockContextService...')
yield _service
LOGGER.info('Terminating MockContextService...')
_service.context_servicer.msg_broker.terminate()
_service.stop()
LOGGER.info('Terminated MockContextService...')
@pytest.fixture(scope='session')
def context_client(context_service : MockContextService): # pylint: disable=redefined-outer-name,unused-argument
LOGGER.info('Initializing ContextClient...')
_client = ContextClient()
LOGGER.info('Yielding ContextClient...')
yield _client
LOGGER.info('Closing ContextClient...')
_client.close()
LOGGER.info('Closed ContextClient...')
@pytest.fixture(scope='session')
def device_service(context_service : MockContextService): # pylint: disable=redefined-outer-name,unused-argument
LOGGER.info('Initializing DeviceService...')
driver_factory = DriverFactory(DRIVERS)
driver_instance_cache = DriverInstanceCache(driver_factory)
_service = DeviceService(driver_instance_cache)
_service.start()
# yield the server, when test finishes, execution will resume to stop it
LOGGER.info('Yielding DeviceService...')
yield _service
LOGGER.info('Terminating DeviceService...')
_service.stop()
LOGGER.info('Terminated DeviceService...')
@pytest.fixture(scope='session')
def device_client(device_service : DeviceService): # pylint: disable=redefined-outer-name,unused-argument
LOGGER.info('Initializing DeviceClient...')
_client = DeviceClient()
LOGGER.info('Yielding DeviceClient...')
yield _client
LOGGER.info('Closing DeviceClient...')
_client.close()
LOGGER.info('Closed DeviceClient...')
@pytest.fixture(scope='session')
def telemetryFrontend_service(
context_service : MockContextService,
device_service : DeviceService
):
LOGGER.info('Initializing TelemetryFrontendService...')
name_mapping = NameMapping()
_service = TelemetryFrontendService(name_mapping)
_service.start()
# yield the server, when test finishes, execution will resume to stop it
LOGGER.info('Yielding TelemetryFrontendService...')
yield _service
LOGGER.info('Terminating TelemetryFrontendService...')
_service.stop()
LOGGER.info('Terminated TelemetryFrontendService...')
@pytest.fixture(scope='session')
def telemetryFrontend_client(
telemetryFrontend_service : TelemetryFrontendService
):
LOGGER.info('Initializing TelemetryFrontendClient...')
_client = TelemetryFrontendClient()
# yield the server, when test finishes, execution will resume to stop it
LOGGER.info('Yielding TelemetryFrontendClient...')
yield _client
LOGGER.info('Closing TelemetryFrontendClient...')
_client.close()
LOGGER.info('Closed TelemetryFrontendClient...')
###########################
# Tests Implementation of Telemetry Frontend
###########################
def test_verify_db_and_table():
LOGGER.info(' >>> test_verify_database_and_tables START: <<< ')
_engine = TelemetryEngine.get_engine()
managementDB.create_database(_engine)
managementDB.create_tables(_engine)
def test_StartCollector(telemetryFrontend_client):
LOGGER.info(' >>> test_StartCollector START: <<< ')
response = telemetryFrontend_client.StartCollector(create_collector_request())
LOGGER.debug(str(response))
assert isinstance(response, CollectorId)
def test_run_kafka_listener():
LOGGER.info(' >>> test_run_kafka_listener START: <<< ')
name_mapping = NameMapping()
TelemetryFrontendServiceObj = TelemetryFrontendServiceServicerImpl(name_mapping)
response = TelemetryFrontendServiceObj.run_kafka_listener() # Method "run_kafka_listener" is not define in frontend.proto
LOGGER.debug(str(response))
assert isinstance(response, bool)
def test_StopCollector(telemetryFrontend_client):
LOGGER.info(' >>> test_StopCollector START: <<< ')
_collector_id = telemetryFrontend_client.StartCollector(create_collector_request())
time.sleep(3) # wait for small amount before call the stopCollecter()
response = telemetryFrontend_client.StopCollector(_collector_id)
LOGGER.debug(str(response))
assert isinstance(response, Empty)
def test_select_collectors(telemetryFrontend_client):
LOGGER.info(' >>> test_select_collector requesting <<< ')
response = telemetryFrontend_client.SelectCollectors(create_collector_filter())
LOGGER.info('Received Rows after applying Filter: {:} '.format(response))
LOGGER.debug(str(response))
assert isinstance(response, CollectorList)