Newer
Older
# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# import sys
# sys.path.append('.')
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
import os, pytest
import logging, json
from apscheduler.schedulers.background import BackgroundScheduler
from common.proto.context_pb2 import ConfigActionEnum, Context, ContextId, DeviceOperationalStatusEnum, EventTypeEnum, DeviceEvent, Device, Empty, Topology, TopologyId
from common.Constants import ServiceNameEnum
from common.Constants import DEFAULT_CONTEXT_NAME, DEFAULT_TOPOLOGY_NAME, ServiceNameEnum
from common.Settings import (
ENVVAR_SUFIX_SERVICE_HOST, ENVVAR_SUFIX_SERVICE_PORT_GRPC, get_env_var_name, get_service_port_grpc)
from common.tests.MockServicerImpl_Context import MockServicerImpl_Context
from common.proto.context_pb2_grpc import add_ContextServiceServicer_to_server
from common.proto.kpi_sample_types_pb2 import KpiSampleType
from common.tools.object_factory.Context import json_context, json_context_id
from common.tools.object_factory.Topology import json_topology, json_topology_id
# from common.proto.monitoring_pb2 import KpiId, KpiDescriptor, SubsDescriptor, SubsList, AlarmID, \
# AlarmDescriptor, AlarmList, KpiDescriptorList, SubsResponse, AlarmResponse, RawKpiTable #, Kpi, KpiList
from common.proto.kpi_manager_pb2 import KpiId, KpiDescriptor, KpiDescriptorList
from device.service.driver_api.DriverFactory import DriverFactory
from device.service.driver_api.DriverInstanceCache import DriverInstanceCache
from device.service.DeviceService import DeviceService
from device.client.DeviceClient import DeviceClient
from kpi_manager.tests.test_messages import create_kpi_request, create_kpi_request_b, create_kpi_request_c, create_kpi_request_d
# from monitoring.service.MonitoringService import MonitoringService
from kpi_manager.service.KpiManagerService import KpiManagerService
# from monitoring.client.MonitoringClient import MonitoringClient
from kpi_manager.client.KpiManagerClient import KpiManagerClient
from monitoring.service.ManagementDBTools import ManagementDB
from monitoring.service.MetricsDBTools import MetricsDB
from monitoring.service.NameMapping import NameMapping
###########################
# Tests Setup
###########################
LOCAL_HOST = '127.0.0.1'
MOCKSERVICE_PORT = 10000
KPIMANAGER_SERVICE_PORT = MOCKSERVICE_PORT + get_service_port_grpc(ServiceNameEnum.KPIMANAGER) # avoid privileged ports
os.environ[get_env_var_name(ServiceNameEnum.KPIMANAGER, ENVVAR_SUFIX_SERVICE_HOST )] = str(LOCAL_HOST)
os.environ[get_env_var_name(ServiceNameEnum.KPIMANAGER, ENVVAR_SUFIX_SERVICE_PORT_GRPC)] = str(KPIMANAGER_SERVICE_PORT)
METRICSDB_HOSTNAME = os.environ.get('METRICSDB_HOSTNAME')
LOGGER = logging.getLogger(__name__)
class MockContextService(GenericGrpcService):
# Mock Service implementing Context to simplify unitary tests of Monitoring
def __init__(self, bind_port: Union[str, int]) -> None:
super().__init__(bind_port, LOCAL_HOST, enable_health_servicer=False, cls_name='MockService')
# pylint: disable=attribute-defined-outside-init
def install_servicers(self):
self.context_servicer = MockServicerImpl_Context()
add_ContextServiceServicer_to_server(self.context_servicer, self.server)
@pytest.fixture(scope='session')
def context_service():
LOGGER.info('Initializing MockContextService...')
_service = MockContextService(MOCKSERVICE_PORT)
_service.start()
LOGGER.info('Yielding MockContextService...')
yield _service
LOGGER.info('Terminating MockContextService...')
_service.context_servicer.msg_broker.terminate()
_service.stop()
LOGGER.info('Terminated MockContextService...')
@pytest.fixture(scope='session')
def context_client(context_service : MockContextService): # pylint: disable=redefined-outer-name,unused-argument
LOGGER.info('Initializing ContextClient...')
_client = ContextClient()
LOGGER.info('Yielding ContextClient...')
yield _client
LOGGER.info('Closing ContextClient...')
_client.close()
LOGGER.info('Closed ContextClient...')
@pytest.fixture(scope='session')
def device_service(context_service : MockContextService): # pylint: disable=redefined-outer-name,unused-argument
LOGGER.info('Initializing DeviceService...')
driver_factory = DriverFactory(DRIVERS)
driver_instance_cache = DriverInstanceCache(driver_factory)
_service = DeviceService(driver_instance_cache)
_service.start()
# yield the server, when test finishes, execution will resume to stop it
LOGGER.info('Yielding DeviceService...')
yield _service
LOGGER.info('Terminating DeviceService...')
_service.stop()
LOGGER.info('Terminated DeviceService...')
@pytest.fixture(scope='session')
def device_client(device_service : DeviceService): # pylint: disable=redefined-outer-name,unused-argument
LOGGER.info('Initializing DeviceClient...')
_client = DeviceClient()
LOGGER.info('Yielding DeviceClient...')
yield _client
LOGGER.info('Closing DeviceClient...')
_client.close()
LOGGER.info('Closed DeviceClient...')
@pytest.fixture(scope='session')
def device_client(device_service : DeviceService): # pylint: disable=redefined-outer-name,unused-argument
LOGGER.info('Initializing DeviceClient...')
_client = DeviceClient()
LOGGER.info('Yielding DeviceClient...')
yield _client
LOGGER.info('Closing DeviceClient...')
_client.close()
LOGGER.info('Closed DeviceClient...')
# This fixture will be requested by test cases and last during testing session
@pytest.fixture(scope='session')
def kpi_manager_service(
context_service : MockContextService, # pylint: disable=redefined-outer-name,unused-argument
device_service : DeviceService # pylint: disable=redefined-outer-name,unused-argument
):
LOGGER.info('Initializing KpiManagerService...')
name_mapping = NameMapping()
# _service = MonitoringService(name_mapping)
_service = KpiManagerService(name_mapping)
_service.start()
# yield the server, when test finishes, execution will resume to stop it
LOGGER.info('Yielding KpiManagerService...')
yield _service
LOGGER.info('Terminating KpiManagerService...')
_service.stop()
LOGGER.info('Terminated KpiManagerService...')
# This fixture will be requested by test cases and last during testing session.
# The client requires the server, so client fixture has the server as dependency.
# def monitoring_client(monitoring_service : MonitoringService): (Add for better understanding)
@pytest.fixture(scope='session')
def kpi_manager_client(kpi_manager_service : KpiManagerService): # pylint: disable=redefined-outer-name,unused-argument
LOGGER.info('Initializing KpiManagerClient...')
_client = KpiManagerClient()
# yield the server, when test finishes, execution will resume to stop it
LOGGER.info('Yielding KpiManagerClient...')
yield _client
LOGGER.info('Closing KpiManagerClient...')
_client.close()
LOGGER.info('Closed KpiManagerClient...')
@pytest.fixture(scope='session')
def management_db():
_management_db = ManagementDB('monitoring.db')
return _management_db
@pytest.fixture(scope='session')
def metrics_db(kpi_manager_service : KpiManagerService): # pylint: disable=redefined-outer-name
return monitoring_service.monitoring_servicer.metrics_db
# This function os not clear to me (Changes should me made before execution)
@pytest.fixture(scope='session')
def metrics_db(monitoring_service : MonitoringService): # pylint: disable=redefined-outer-name
return monitoring_service.monitoring_servicer.metrics_db
#_metrics_db = MetricsDBTools.MetricsDB(
# METRICSDB_HOSTNAME, METRICSDB_ILP_PORT, METRICSDB_REST_PORT, METRICSDB_TABLE_MONITORING_KPIS)
#return _metrics_db
@pytest.fixture(scope='session')
def subs_scheduler():
_scheduler = BackgroundScheduler(executors={'processpool': ProcessPoolExecutor(max_workers=20)})
_scheduler.start()
return _scheduler
def ingestion_data(kpi_id_int):
# pylint: disable=redefined-outer-name,unused-argument
metrics_db = MetricsDB('localhost', '9009', '9000', 'monitoring')
kpiSampleType = KpiSampleType.KPISAMPLETYPE_PACKETS_RECEIVED
kpiSampleType_name = KpiSampleType.Name(kpiSampleType).upper().replace('KPISAMPLETYPE_', '')
for _ in range(50):
kpiSampleType = kpiSampleType_name
kpiId = kpi_id_int
deviceId = 'DEV'+ str(kpi_id_int)
endpointId = 'END' + str(kpi_id_int)
serviceId = 'SERV' + str(kpi_id_int)
sliceId = 'SLC' + str(kpi_id_int)
connectionId = 'CON' + str(kpi_id_int)
time_stamp = timestamp_utcnow_to_float()
kpi_value = 500*random()
metrics_db.write_KPI(time_stamp, kpiId, kpiSampleType, deviceId, endpointId, serviceId, sliceId, connectionId,
kpi_value)
sleep(0.1)
##################################################
# Prepare Environment, should be the first test
##################################################
def test_prepare_environment(
context_client : ContextClient, # pylint: disable=redefined-outer-name,unused-argument
):
context_id = json_context_id(DEFAULT_CONTEXT_NAME)
context_client.SetContext(Context(**json_context(DEFAULT_CONTEXT_NAME)))
context_client.SetTopology(Topology(**json_topology(DEFAULT_TOPOLOGY_NAME, context_id=context_id)))
###########################
# Tests Implementation
###########################
# Test case that makes use of client fixture to test server's CreateKpi method
def test_set_kpi(kpi_manager_client): # pylint: disable=redefined-outer-name
# make call to server
LOGGER.warning('test_create_kpi requesting')
for i in range(3):
response = kpi_manager_client.SetKpi(create_kpi_request(str(i+1)))
LOGGER.debug(str(response))
assert isinstance(response, KpiId)
# Test case that makes use of client fixture to test server's DeleteKpi method
def test_delete_kpi(kpi_manager_client): # pylint: disable=redefined-outer-name
# make call to server
LOGGER.warning('delete_kpi requesting')
response = kpi_manager_client.SetKpi(create_kpi_request('4'))
response = kpi_manager_client.DeleteKpi(response)
LOGGER.debug(str(response))
assert isinstance(response, Empty)
# Test case that makes use of client fixture to test server's GetKpiDescriptor method
def test_get_kpi_descriptor_list(kpi_manager_client): # pylint: disable=redefined-outer-name
LOGGER.warning('test_getkpidescritor_kpi begin')
response = kpi_manager_client.GetKpiDescriptorList(Empty())
LOGGER.debug(str(response))
assert isinstance(response, KpiDescriptorList)