Newer
Older
# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from telemetry.backend.service.TelemetryBackendService import TelemetryBackendService
from .messages import create_collector_request, _create_kpi_descriptor, _create_kpi_id
from .Fixtures import context_client, device_client, service_client, kpi_manager_client
from common.tools.context_queries.Topology import get_topology
from common.Constants import DEFAULT_CONTEXT_NAME
from common.tools.context_queries.Device import get_device, add_device_to_topology
# from common.tools.context_queries.EndPoint import get_endpoint_names
#from .EndPoint import get_endpoint_names # modofied version of get_endpoint_names
from common.proto.context_pb2 import EndPointId, DeviceId, TopologyId, ContextId , Empty
from common.proto.kpi_manager_pb2 import KpiId
LOGGER.setLevel(logging.DEBUG)
###########################
# Tests Implementation of Telemetry Backend
###########################
@pytest.fixture(autouse=True)
def log_all_methods(request):
'''
This fixture logs messages before and after each test function runs, indicating the start and end of the test.
The autouse=True parameter ensures that this logging happens automatically for all tests in the module.
'''
LOGGER.info(f" >>>>> Starting test: {request.node.name} ")
yield
LOGGER.info(f" <<<<< Finished test: {request.node.name} ")
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
# # ----- Add Topology -----
# def test_add_to_topology(context_client, device_client, service_client):
# load_topology(context_client, device_client)
# # ----- Add Device to Topology ------
# def test_add_device_to_topology(context_client):
# context_id = ContextId()
# context_id.context_uuid.uuid = "43813baf-195e-5da6-af20-b3d0922e71a7"
# topology_uuid = "c76135e3-24a8-5e92-9bed-c3c9139359c8"
# device_uuid = "69a3a3f0-5237-5f9e-bc96-d450d0c6c03a"
# response = add_device_to_topology( context_client = context_client,
# context_id = context_id,
# topology_uuid = topology_uuid,
# device_uuid = device_uuid
# )
# LOGGER.info(f"Device added to topology: {response}")
# assert response is True
# # ----- Get Topology -----
# def test_get_topology(context_client, device_client):
# response = get_topology(context_client = context_client, topology_uuid = "test1", context_uuid = "test1")
# LOGGER.info(f"Topology: {response}")
# assert response is not None
# def test_set_kpi_descriptor_and_get_device_id(kpi_manager_client):
# kpi_descriptor = _create_kpi_descriptor("1290fb71-bf15-5528-8b69-2d2fabe1fa18")
# kpi_id = kpi_manager_client.SetKpiDescriptor(kpi_descriptor)
# LOGGER.info(f"KPI Descriptor set: {kpi_id}")
# assert kpi_id is not None
# response = kpi_manager_client.GetKpiDescriptor(kpi_id)
# # response = kpi_manager_client.GetKpiDescriptor(_create_kpi_id())
# assert response is not None
# LOGGER.info(f"KPI Descriptor: {response}")
# LOGGER.info(f"Device Id: {response.device_id.device_uuid.uuid}")
# LOGGER.info(f"Endpoint Id: {response.endpoint_id.endpoint_uuid.uuid}")
# # ----- Get endpoint detail using device ID -----
# def test_get_device_details(context_client):
# response = get_device(context_client = context_client, device_uuid = "1290fb71-bf15-5528-8b69-2d2fabe1fa18", include_config_rules = False, include_components = False)
# if response:
# LOGGER.info(f"Device type: {response.device_type}")
# for endpoint in response.device_endpoints:
# if endpoint.endpoint_id.endpoint_uuid.uuid == '36571df2-bac1-5909-a27d-5f42491d2ff0':
# endpoint_dict = {}
# kpi_sample_types = []
# # LOGGER.info(f"Endpoint: {endpoint}")
# # LOGGER.info(f"Enpoint_uuid: {endpoint.endpoint_id.endpoint_uuid.uuid}")
# endpoint_dict["uuid"] = endpoint.endpoint_id.endpoint_uuid.uuid
# # LOGGER.info(f"Enpoint_name: {endpoint.name}")
# endpoint_dict["name"] = endpoint.name
# # LOGGER.info(f"Enpoint_type: {endpoint.endpoint_type}")
# endpoint_dict["type"] = endpoint.endpoint_type
# for sample_type in endpoint.kpi_sample_types:
# # LOGGER.info(f"Enpoint_sample_types: {sample_type}")
# kpi_sample_types.append(sample_type)
# endpoint_dict["sample_types"] = kpi_sample_types
# LOGGER.info(f"Extracted endpoint dict: {endpoint_dict}")
# else:
# LOGGER.info(f"Endpoint not matched")
# LOGGER.info(f"Device Type: {type(response)}")
# assert response is not None
# # ----- List Conetxts -----
# def test_list_contextIds(context_client):
# empty = Empty()
# response = context_client.ListContexts(empty)
# LOGGER.info(f"Contexts: {response}")
# assert response
# # ----- List Devices -----
# def test_list_devices(context_client):
# empty = Empty()
# response = context_client.ListDeviceIds(empty)
# LOGGER.info(f"Devices: {response}")
# assert response
# ----- Get Endpoints ----- TODO: get_endpoint_names method doesn't return KPI samples types
# def test_get_endpoints(context_client):
# device_id = DeviceId()
# device_id.device_uuid.uuid = "1290fb71-bf15-5528-8b69-2d2fabe1fa18"
# endpoint_id = EndPointId()
# endpoint_id.endpoint_uuid.uuid = "43b817fa-246f-5e0a-a2e3-2aad0b3e16ca"
# endpoint_id.device_id.CopyFrom(device_id)
# response = get_endpoint_names(context_client = context_client, endpoint_ids = [endpoint_id])
# LOGGER.info(f"Endpoints: {response}")
# assert response is not None
# # ----- List Topologies -----
# def test_list_topologies(context_client):
# context_id = ContextId()
# context_id.context_uuid.uuid = "e7d46baa-d38d-5b72-a082-f344274b63ef"
# respone = context_client.ListTopologies(context_id)
# LOGGER.info(f"Topologies: {respone}")
# # ----- Remove Topology -----
# def test_remove_topology(context_client):
# context_id = ContextId()
# context_id.context_uuid.uuid = "e7d46baa-d38d-5b72-a082-f344274b63ef"
# topology_id = TopologyId()
# topology_id.topology_uuid.uuid = "9ef0118c-4bca-5e81-808b-dc8f60e2cda4"
# topology_id.context_id.CopyFrom(context_id)
# response = context_client.RemoveTopology(topology_id)
# LOGGER.info(f"Topology removed: {response}")
# # ----- Remove context -----
# def test_remove_context(context_client):
# context_id = ContextId()
# context_id.context_uuid.uuid = "e7d46baa-d38d-5b72-a082-f344274b63ef"
# response = context_client.RemoveContext(context_id)
# LOGGER.info(f"Context removed: {response}")
@pytest.fixture
def telemetryBackend_service():
LOGGER.info('Initializing TelemetryBackendService...')
_service = TelemetryBackendService()
_service.start()
LOGGER.info('Yielding TelemetryBackendService...')
yield _service
LOGGER.info('Terminating TelemetryBackendService...')
_service.stop()
LOGGER.info('Terminated TelemetryBackendService...')
def test_InitiateCollectorBackend(telemetryBackend_service):
LOGGER.info(" Backend Initiated Successfully. Waiting for timer to finish ...")
time.sleep(30)
LOGGER.info(" Backend Timer Finished Successfully. ")