diff --git a/src/analytics/backend/service/__main__.py b/src/analytics/backend/service/__main__.py index 533761bab2ed225e3f8d82f5df7d9290f7fa01b8..55bcb53e4c1c404bd7203f7c7ecc6d0c260d5a54 100644 --- a/src/analytics/backend/service/__main__.py +++ b/src/analytics/backend/service/__main__.py @@ -16,8 +16,11 @@ import logging, signal, sys, threading from prometheus_client import start_http_server from common.Settings import get_log_level, get_metrics_port from .AnalyticsBackendService import AnalyticsBackendService +from common.tools.kafka.Variables import KafkaTopic + terminate = threading.Event() +logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') LOGGER = None def signal_handler(signal, frame): # pylint: disable=redefined-outer-name @@ -36,6 +39,8 @@ def main(): LOGGER.info('Starting...') + KafkaTopic.create_all_topics() + # Start metrics server metrics_port = get_metrics_port() start_http_server(metrics_port) diff --git a/src/analytics/frontend/service/__main__.py b/src/analytics/frontend/service/__main__.py index edf94c4fdd828c9a195d8695b0ba52b544b6a863..a79b2bbc63b5543e11ee8bda92e1b8d9fdda967d 100644 --- a/src/analytics/frontend/service/__main__.py +++ b/src/analytics/frontend/service/__main__.py @@ -18,8 +18,11 @@ from common.Settings import get_log_level, get_metrics_port from .AnalyticsFrontendService import AnalyticsFrontendService from analytics.database.AnalyzerModel import Analyzer as Model from common.tools.database.GenericDatabase import Database +from common.tools.kafka.Variables import KafkaTopic + terminate = threading.Event() +logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') LOGGER = None def signal_handler(signal, frame): # pylint: disable=redefined-outer-name @@ -43,6 +46,8 @@ def main(): kpiDBobj.create_database() kpiDBobj.create_tables() + KafkaTopic.create_all_topics() + # Start metrics server metrics_port = get_metrics_port() start_http_server(metrics_port) diff --git a/src/common/tools/database/GenericEngine.py b/src/common/tools/database/GenericEngine.py index 1d38a1f440af643c253d5d17bda8ca9e6c3fdc44..89b6c2b6dd1d4c94dc2d1082ce0051a82078ddd8 100644 --- a/src/common/tools/database/GenericEngine.py +++ b/src/common/tools/database/GenericEngine.py @@ -33,8 +33,8 @@ class Engine: CRDB_USERNAME, CRDB_PASSWORD, CRDB_NAMESPACE, CRDB_SQL_PORT, CRDB_DATABASE, CRDB_SSLMODE) try: engine = sqlalchemy.create_engine(crdb_uri, echo=False) - LOGGER.info(' AnalyzerDB initalized with DB URL: {:}'.format(crdb_uri)) + LOGGER.info(' Database initalized with DB URL: {:}'.format(crdb_uri)) + return engine except: # pylint: disable=bare-except # pragma: no cover LOGGER.exception('Failed to connect to database: {:s}'.format(str(crdb_uri))) return None # type: ignore - return engine diff --git a/src/context/service/database/models/enums/KpiSampleType.py b/src/context/service/database/models/enums/KpiSampleType.py index 77b568dcfc809447851bd39fc5093ab60ca67892..66afdb710720f7bd272a8764a4c624fb7a563ab7 100644 --- a/src/context/service/database/models/enums/KpiSampleType.py +++ b/src/context/service/database/models/enums/KpiSampleType.py @@ -22,13 +22,16 @@ from ._GrpcToEnum import grpc_to_enum # BYTES_RECEIVED. If item name does not match, automatic mapping of # proto enums to database enums will fail. class ORM_KpiSampleTypeEnum(enum.Enum): - UNKNOWN = KpiSampleType.KPISAMPLETYPE_UNKNOWN - PACKETS_TRANSMITTED = KpiSampleType.KPISAMPLETYPE_PACKETS_TRANSMITTED - PACKETS_RECEIVED = KpiSampleType.KPISAMPLETYPE_PACKETS_RECEIVED - BYTES_TRANSMITTED = KpiSampleType.KPISAMPLETYPE_BYTES_TRANSMITTED - BYTES_RECEIVED = KpiSampleType.KPISAMPLETYPE_BYTES_RECEIVED - LINK_TOTAL_CAPACITY_GBPS = KpiSampleType.KPISAMPLETYPE_LINK_TOTAL_CAPACITY_GBPS - LINK_USED_CAPACITY_GBPS = KpiSampleType.KPISAMPLETYPE_LINK_USED_CAPACITY_GBPS + UNKNOWN = KpiSampleType.KPISAMPLETYPE_UNKNOWN # 0 + PACKETS_TRANSMITTED = KpiSampleType.KPISAMPLETYPE_PACKETS_TRANSMITTED # 101 + PACKETS_RECEIVED = KpiSampleType.KPISAMPLETYPE_PACKETS_RECEIVED # 102 + PACKETS_DROPPED = KpiSampleType.KPISAMPLETYPE_PACKETS_DROPPED # 103 + BYTES_TRANSMITTED = KpiSampleType.KPISAMPLETYPE_BYTES_TRANSMITTED # 201 + BYTES_RECEIVED = KpiSampleType.KPISAMPLETYPE_BYTES_RECEIVED # 202 + BYTES_DROPPED = KpiSampleType.KPISAMPLETYPE_BYTES_DROPPED # 203 + LINK_TOTAL_CAPACITY_GBPS = KpiSampleType.KPISAMPLETYPE_LINK_TOTAL_CAPACITY_GBPS # 301 + LINK_USED_CAPACITY_GBPS = KpiSampleType.KPISAMPLETYPE_LINK_USED_CAPACITY_GBPS # 302 + grpc_to_enum__kpi_sample_type = functools.partial( grpc_to_enum, KpiSampleType, ORM_KpiSampleTypeEnum) diff --git a/src/kpi_manager/tests/test_messages.py b/src/kpi_manager/tests/test_messages.py index ebe13b661ffe34543808b386c2d1a3b76823e455..811661a4efa8511cbaa270c6b5a00ba34fc5af7f 100644 --- a/src/kpi_manager/tests/test_messages.py +++ b/src/kpi_manager/tests/test_messages.py @@ -26,15 +26,15 @@ def create_kpi_id_request(): def create_kpi_descriptor_request(descriptor_name: str = "Test_name"): _create_kpi_request = kpi_manager_pb2.KpiDescriptor() - _create_kpi_request.kpi_id.kpi_id.uuid = str(uuid.uuid4()) + # _create_kpi_request.kpi_id.kpi_id.uuid = str(uuid.uuid4()) # _create_kpi_request.kpi_id.kpi_id.uuid = "6e22f180-ba28-4641-b190-2287bf448888" - # _create_kpi_request.kpi_id.kpi_id.uuid = "1e22f180-ba28-4641-b190-2287bf446666" + _create_kpi_request.kpi_id.kpi_id.uuid = "f974b6cc-095f-4767-b8c1-3457b383fb99" _create_kpi_request.kpi_description = descriptor_name _create_kpi_request.kpi_sample_type = KpiSampleType.KPISAMPLETYPE_PACKETS_RECEIVED - _create_kpi_request.device_id.device_uuid.uuid = 'DEV2' + _create_kpi_request.device_id.device_uuid.uuid = str(uuid.uuid4()) _create_kpi_request.service_id.service_uuid.uuid = 'SERV2' - _create_kpi_request.slice_id.slice_uuid.uuid = 'SLC1' - _create_kpi_request.endpoint_id.endpoint_uuid.uuid = 'END1' + _create_kpi_request.slice_id.slice_uuid.uuid = 'SLC1' + _create_kpi_request.endpoint_id.endpoint_uuid.uuid = str(uuid.uuid4()) _create_kpi_request.connection_id.connection_uuid.uuid = 'CON1' _create_kpi_request.link_id.link_uuid.uuid = 'LNK1' return _create_kpi_request diff --git a/src/telemetry/backend/Dockerfile b/src/telemetry/backend/Dockerfile index 4bc5605d59bdb2e2b24cf580ce8e2a6978caa2e8..7448f1ebc60015e13499a7a702750c75214494bc 100644 --- a/src/telemetry/backend/Dockerfile +++ b/src/telemetry/backend/Dockerfile @@ -62,6 +62,10 @@ RUN python3 -m pip install -r requirements.txt # Add component files into working directory WORKDIR /var/teraflow +COPY src/context/__init__.py context/__init__.py +COPY src/context/client/. context/client/ +COPY src/kpi_manager/client/. kpi_manager/client/ +COPY src/kpi_manager/__init__.py kpi_manager/__init__.py COPY src/telemetry/__init__.py telemetry/__init__.py COPY src/telemetry/backend/. telemetry/backend/ diff --git a/src/telemetry/backend/collector_api/_Collector.py b/src/telemetry/backend/collector_api/_Collector.py index d6e711d65c763cb3cefd36f1798f5c97656f0a92..a4bd7f17f254873cb5fc6d3302d257e7f1e35f12 100644 --- a/src/telemetry/backend/collector_api/_Collector.py +++ b/src/telemetry/backend/collector_api/_Collector.py @@ -71,69 +71,69 @@ class _Collector: """ raise NotImplementedError() - def GetInitialConfig(self) -> List[Tuple[str, Any]]: - """ Retrieve initial configuration of entire device. - Returns: - values : List[Tuple[str, Any]] - List of tuples (resource key, resource value) for - resource keys. - """ - raise NotImplementedError() - - def GetConfig(self, resource_keys: List[str] = []) -> \ - List[Tuple[str, Union[Any, None, Exception]]]: - """ Retrieve running configuration of entire device or - selected resource keys. - Parameters: - resource_keys : List[str] - List of keys pointing to the resources to be retrieved. - Returns: - values : List[Tuple[str, Union[Any, None, Exception]]] - List of tuples (resource key, resource value) for - resource keys requested. If a resource is found, - the appropriate value type must be retrieved. - If a resource is not found, None must be retrieved as - value for that resource. In case of Exception, - the Exception must be retrieved as value. - """ - raise NotImplementedError() - - def SetConfig(self, resources: List[Tuple[str, Any]]) -> \ - List[Union[bool, Exception]]: - """ Create/Update configuration for a list of resources. - Parameters: - resources : List[Tuple[str, Any]] - List of tuples, each containing a resource_key pointing the - resource to be modified, and a resource_value containing - the new value to be set. - Returns: - results : List[Union[bool, Exception]] - List of results for resource key changes requested. - Return values must be in the same order as the - resource keys requested. If a resource is properly set, - True must be retrieved; otherwise, the Exception that is - raised during the processing must be retrieved. - """ - raise NotImplementedError() - - def DeleteConfig(self, resources: List[Tuple[str, Any]]) -> \ - List[Union[bool, Exception]]: - """ Delete configuration for a list of resources. - Parameters: - resources : List[Tuple[str, Any]] - List of tuples, each containing a resource_key pointing the - resource to be modified, and a resource_value containing - possible additionally required values to locate - the value to be removed. - Returns: - results : List[Union[bool, Exception]] - List of results for resource key deletions requested. - Return values must be in the same order as the resource keys - requested. If a resource is properly deleted, True must be - retrieved; otherwise, the Exception that is raised during - the processing must be retrieved. - """ - raise NotImplementedError() + # def GetInitialConfig(self) -> List[Tuple[str, Any]]: + # """ Retrieve initial configuration of entire device. + # Returns: + # values : List[Tuple[str, Any]] + # List of tuples (resource key, resource value) for + # resource keys. + # """ + # raise NotImplementedError() + + # def GetConfig(self, resource_keys: List[str] = []) -> \ + # List[Tuple[str, Union[Any, None, Exception]]]: + # """ Retrieve running configuration of entire device or + # selected resource keys. + # Parameters: + # resource_keys : List[str] + # List of keys pointing to the resources to be retrieved. + # Returns: + # values : List[Tuple[str, Union[Any, None, Exception]]] + # List of tuples (resource key, resource value) for + # resource keys requested. If a resource is found, + # the appropriate value type must be retrieved. + # If a resource is not found, None must be retrieved as + # value for that resource. In case of Exception, + # the Exception must be retrieved as value. + # """ + # raise NotImplementedError() + + # def SetConfig(self, resources: List[Tuple[str, Any]]) -> \ + # List[Union[bool, Exception]]: + # """ Create/Update configuration for a list of resources. + # Parameters: + # resources : List[Tuple[str, Any]] + # List of tuples, each containing a resource_key pointing the + # resource to be modified, and a resource_value containing + # the new value to be set. + # Returns: + # results : List[Union[bool, Exception]] + # List of results for resource key changes requested. + # Return values must be in the same order as the + # resource keys requested. If a resource is properly set, + # True must be retrieved; otherwise, the Exception that is + # raised during the processing must be retrieved. + # """ + # raise NotImplementedError() + + # def DeleteConfig(self, resources: List[Tuple[str, Any]]) -> \ + # List[Union[bool, Exception]]: + # """ Delete configuration for a list of resources. + # Parameters: + # resources : List[Tuple[str, Any]] + # List of tuples, each containing a resource_key pointing the + # resource to be modified, and a resource_value containing + # possible additionally required values to locate + # the value to be removed. + # Returns: + # results : List[Union[bool, Exception]] + # List of results for resource key deletions requested. + # Return values must be in the same order as the resource keys + # requested. If a resource is properly deleted, True must be + # retrieved; otherwise, the Exception that is raised during + # the processing must be retrieved. + # """ + # raise NotImplementedError() def SubscribeState(self, subscriptions: List[Tuple[str, dict, float, float]]) -> \ bool: diff --git a/src/telemetry/backend/service/TelemetryBackendService.py b/src/telemetry/backend/service/TelemetryBackendService.py index c392efd1dbbaa7a31e3bcffd57b93fbd008b198e..40cd1443a3ebe505e43f2d7142191ffb0c7d8919 100755 --- a/src/telemetry/backend/service/TelemetryBackendService.py +++ b/src/telemetry/backend/service/TelemetryBackendService.py @@ -16,7 +16,7 @@ import json import time import logging import threading -from typing import Any, Dict +from typing import Any, Dict, Tuple from datetime import datetime, timezone from confluent_kafka import Producer as KafkaProducer from confluent_kafka import Consumer as KafkaConsumer @@ -26,10 +26,15 @@ from common.Settings import get_service_port_grpc from common.method_wrappers.Decorator import MetricsPool from common.tools.kafka.Variables import KafkaConfig, KafkaTopic from common.tools.service.GenericGrpcService import GenericGrpcService +from common.tools.context_queries.Device import get_device +from common.proto.kpi_manager_pb2 import KpiId + +from kpi_manager.client.KpiManagerClient import KpiManagerClient +from context.client.ContextClient import ContextClient from telemetry.backend.collectors.emulated.EmulatedCollector import EmulatedCollector -LOGGER = logging.getLogger(__name__) -METRICS_POOL = MetricsPool('TelemetryBackend', 'backendService') +LOGGER = logging.getLogger(__name__) +METRICS_POOL = MetricsPool('TelemetryBackend', 'backendService') class TelemetryBackendService(GenericGrpcService): """ @@ -44,7 +49,9 @@ class TelemetryBackendService(GenericGrpcService): self.kafka_consumer = KafkaConsumer({'bootstrap.servers' : KafkaConfig.get_kafka_address(), 'group.id' : 'backend', 'auto.offset.reset' : 'latest'}) - self.collector = EmulatedCollector(address="127.0.0.1", port=8000) + self.collector = EmulatedCollector(address="127.0.0.1", port=8000) + self.context_client = ContextClient() + self.kpi_manager_client = KpiManagerClient() self.active_jobs = {} def install_servicers(self): @@ -116,16 +123,13 @@ class TelemetryBackendService(GenericGrpcService): """ Method to handle collector request. """ - end_points : dict = self.get_endpoints_from_kpi_id(kpi_id) - if not end_points: + device_type, end_points = self.get_endpoint_detail(kpi_id) + # end_points : dict = self.get_endpoints_from_kpi_id(kpi_id) + if end_points is None: LOGGER.warning("KPI ID: {:} - Endpoints not found. Skipping...".format(kpi_id)) - - device_type : str = self.get_device_type_from_kpi_id(kpi_id) - - if device_type == "Unknown": - LOGGER.warning("KPI ID: {:} - Device Type not found. Skipping...".format(kpi_id)) - - if device_type == "EMU-Device": + return + # device_type : str = self.get_device_type_from_kpi_id(kpi_id) + if device_type and "emu" in device_type: LOGGER.info("KPI ID: {:} - Device Type: {:} - Endpoints: {:}".format(kpi_id, device_type, end_points)) subscription = [collector_id, end_points, duration, interval] self.EmulatedCollectorHandler(subscription, duration, collector_id, kpi_id, stop_event) @@ -186,28 +190,40 @@ class TelemetryBackendService(GenericGrpcService): except: LOGGER.exception("Error terminating job: {:}".format(job_id)) -# --- Mock Methods --- - def get_endpoints_from_kpi_id(self, kpi_id: str) -> dict: + def get_endpoint_detail(self, kpi_id: str): """ - Method to get endpoints based on kpi_id. + Method to get device_type and endpoint detail based on device_uuid. """ - kpi_endpoints = { - '6e22f180-ba28-4641-b190-2287bf448888': {"uuid": "123e4567-e89b-12d3-a456-42661417ed06", "name": "eth0", "type": "ethernet", "sample_types": [101, 102]}, - '123e4567-e89b-12d3-a456-426614174001': {"uuid": "123e4567-e89b-12d3-a456-42661417ed07", "name": "eth1", "type": "ethernet", "sample_types": []}, - '123e4567-e89b-12d3-a456-426614174002': {"uuid": "123e4567-e89b-12d3-a456-42661417ed08", "name": "13/1/2", "type": "copper", "sample_types": [101, 102, 201, 202]}, - } - return kpi_endpoints.get(kpi_id, {}) if kpi_id in kpi_endpoints else {} + kpi_id_obj = KpiId() + kpi_id_obj.kpi_id.uuid = kpi_id + kpi_descriptor = self.kpi_manager_client.GetKpiDescriptor(kpi_id_obj) + if not kpi_descriptor: + LOGGER.warning(f"KPI ID: {kpi_id} - Descriptor not found. Skipping...") + return (None, None) - def get_device_type_from_kpi_id(self, kpi_id: str) -> str: - """ - Method to get device type based on kpi_id. - """ - kpi_device_types = { - "123e4567-e89b-12d3-a456-42661type003" : {'device_type': "PKT-Device"}, - "123e4567-e89b-12d3-a456-42661type004" : {'device_type': "OPT-Device"}, - "6e22f180-ba28-4641-b190-2287bf448888" : {'device_type': "EMU-Device"}, - } - return kpi_device_types.get(kpi_id, {}).get('device_type', "Unknown") + device_id = kpi_descriptor.device_id.device_uuid.uuid + endpoint_id = kpi_descriptor.endpoint_id.endpoint_uuid.uuid + device = get_device( context_client = self.context_client, + device_uuid = device_id, + include_config_rules = False, + include_components = False, + ) + if device: + for endpoint in device.device_endpoints: + if endpoint.endpoint_id.endpoint_uuid.uuid == endpoint_id: + endpoint_dict = {} + kpi_sample_types = [] + endpoint_dict["uuid"] = endpoint.endpoint_id.endpoint_uuid.uuid + endpoint_dict["name"] = endpoint.name + endpoint_dict["type"] = endpoint.endpoint_type + for sample_type in endpoint.kpi_sample_types: + kpi_sample_types.append(sample_type) + endpoint_dict["sample_types"] = kpi_sample_types + + return (device.device_type, endpoint_dict) + + LOGGER.warning(f"Device ID: {device_id} - Endpoint ID: {endpoint_id} - Not Found") + return (None, None) def delivery_callback(self, err, msg): if err: diff --git a/src/telemetry/backend/service/__main__.py b/src/telemetry/backend/service/__main__.py index 61ff397214fa21ff2d767c2eda4b3a7ee1f796b5..6e77d5d6cc7e31792d737bca04fb1af0e7baa2bb 100644 --- a/src/telemetry/backend/service/__main__.py +++ b/src/telemetry/backend/service/__main__.py @@ -16,6 +16,7 @@ import logging, signal, sys, threading from prometheus_client import start_http_server from common.Settings import get_log_level, get_metrics_port from .TelemetryBackendService import TelemetryBackendService +from common.tools.kafka.Variables import KafkaTopic terminate = threading.Event() LOGGER = None @@ -34,6 +35,8 @@ def main(): signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGTERM, signal_handler) + KafkaTopic.create_all_topics() + LOGGER.info('Starting...') # Start metrics server diff --git a/src/telemetry/backend/tests/Fixtures.py b/src/telemetry/backend/tests/Fixtures.py new file mode 100644 index 0000000000000000000000000000000000000000..59f1b761ca40caf2013471c7f6fdbaa781759a0a --- /dev/null +++ b/src/telemetry/backend/tests/Fixtures.py @@ -0,0 +1,58 @@ +# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pytest +import logging + +from context.client.ContextClient import ContextClient +from device.client.DeviceClient import DeviceClient +from service.client.ServiceClient import ServiceClient +from kpi_manager.client.KpiManagerClient import KpiManagerClient + + +LOGGER = logging.getLogger(__name__) +LOGGER.setLevel(logging.DEBUG) + + +@pytest.fixture(scope='session') +def context_client(): + _client = ContextClient(host="10.152.183.234") + _client.connect() + LOGGER.info('Yielding Connected ContextClient...') + yield _client + _client.close() + +@pytest.fixture(scope='session') +def device_client(): + _client = DeviceClient(host="10.152.183.95") + _client.connect() + LOGGER.info('Yielding Connected DeviceClient...') + yield _client + _client.close() + +@pytest.fixture(scope='session') +def service_client(): + _client = ServiceClient(host="10.152.183.47") + _client.connect() + LOGGER.info('Yielding Connected DeviceClient...') + yield _client + _client.close() + +@pytest.fixture(scope='session') +def kpi_manager_client(): + _client = KpiManagerClient(host="10.152.183.118") + LOGGER.info('Yielding Connected KpiManagerClient...') + yield _client + _client.close() + LOGGER.info('Closed KpiManagerClient...') diff --git a/src/telemetry/backend/tests/add_devices.py b/src/telemetry/backend/tests/add_devices.py new file mode 100644 index 0000000000000000000000000000000000000000..9fe02a953ec8a789ea5faa2aa49f3829b33aa721 --- /dev/null +++ b/src/telemetry/backend/tests/add_devices.py @@ -0,0 +1,78 @@ +# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging, os, time +from common.Constants import DEFAULT_CONTEXT_NAME +from common.proto.context_pb2 import ContextId, DeviceOperationalStatusEnum, Empty +from common.tools.descriptor.Loader import DescriptorLoader, check_descriptor_load_results, validate_empty_scenario +from common.tools.grpc.Tools import grpc_message_to_json, grpc_message_to_json_string +from common.tools.object_factory.Context import json_context_id +from context.client.ContextClient import ContextClient +from device.client.DeviceClient import DeviceClient +from .Fixtures import context_client, device_client # pylint: disable=unused-import + +LOGGER = logging.getLogger(__name__) +LOGGER.setLevel(logging.DEBUG) + +DESCRIPTOR_FILE = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'topology.json') +ADMIN_CONTEXT_ID = ContextId(**json_context_id(DEFAULT_CONTEXT_NAME)) + +def load_topology( + context_client : ContextClient, # pylint: disable=redefined-outer-name + device_client : DeviceClient, # pylint: disable=redefined-outer-name +) -> None: + LOGGER.info('Loading Topology...') + validate_empty_scenario(context_client) + descriptor_loader = DescriptorLoader( + descriptors_file=DESCRIPTOR_FILE, context_client=context_client, device_client=device_client) + LOGGER.info('Descriptor Loader Created') + results = descriptor_loader.process() + # LOGGER.info('Descriptor Load Results: {:s}'.format(str(results))) + check_descriptor_load_results(results, descriptor_loader) + # descriptor_loader.validate() + + # Verify the scenario has no services/slices + response = context_client.GetContext(ADMIN_CONTEXT_ID) + assert len(response.service_ids) == 0 + assert len(response.slice_ids) == 0 + +def test_scenario_devices_enabled( + context_client : ContextClient, # pylint: disable=redefined-outer-name +) -> None: + """ + This test validates that the devices are enabled. + """ + DEVICE_OP_STATUS_ENABLED = DeviceOperationalStatusEnum.DEVICEOPERATIONALSTATUS_ENABLED + + disabled_devices = list() + response = None + num_devices = -1 + num_devices_enabled, num_retry = 0, 0 + while (num_devices != num_devices_enabled) and (num_retry < 10): + time.sleep(1.0) + response = context_client.ListDevices(Empty()) + num_devices = len(response.devices) + num_devices_enabled = 0 + disabled_devices = list() + for device in response.devices: + if device.device_operational_status == DEVICE_OP_STATUS_ENABLED: + num_devices_enabled += 1 + else: + disabled_devices.append(grpc_message_to_json(device)) + LOGGER.info('Num Devices enabled: {:d}/{:d}'.format(num_devices_enabled, num_devices)) + num_retry += 1 + if num_devices_enabled != num_devices: + LOGGER.info('Disabled Devices: {:s}'.format(str(disabled_devices))) + LOGGER.info('Devices: {:s}'.format(grpc_message_to_json_string(response))) + assert num_devices_enabled == num_devices diff --git a/src/telemetry/backend/tests/messages.py b/src/telemetry/backend/tests/messages.py index f6a2bb247f28d10654746e0c75b6ed1973382e38..0d31cd15f2038c3065d679e34b1b772d37aaaf45 100644 --- a/src/telemetry/backend/tests/messages.py +++ b/src/telemetry/backend/tests/messages.py @@ -15,8 +15,8 @@ import uuid import random from common.proto import telemetry_frontend_pb2 -# from common.proto.kpi_sample_types_pb2 import KpiSampleType -# from common.proto.kpi_manager_pb2 import KpiId +from common.proto.kpi_sample_types_pb2 import KpiSampleType +from common.proto.kpi_manager_pb2 import KpiDescriptor, KpiId def create_collector_request(): _create_collector_request = telemetry_frontend_pb2.Collector() @@ -24,8 +24,25 @@ def create_collector_request(): # _create_collector_request.collector_id.collector_id.uuid = "efef4d95-1cf1-43c4-9742-95c283dddddd" _create_collector_request.kpi_id.kpi_id.uuid = str(uuid.uuid4()) # _create_collector_request.kpi_id.kpi_id.uuid = "6e22f180-ba28-4641-b190-2287bf448888" - _create_collector_request.duration_s = float(random.randint(8, 16)) + _create_collector_request.duration_s = float(random.randint(30, 50)) # _create_collector_request.duration_s = -1 _create_collector_request.interval_s = float(random.randint(2, 4)) return _create_collector_request +def _create_kpi_descriptor(device_id : str = ""): + _create_kpi_request = KpiDescriptor() + _create_kpi_request.kpi_id.kpi_id.uuid = str(uuid.uuid4()) + _create_kpi_request.kpi_description = "Test Description" + _create_kpi_request.kpi_sample_type = KpiSampleType.KPISAMPLETYPE_PACKETS_RECEIVED + _create_kpi_request.device_id.device_uuid.uuid = device_id + _create_kpi_request.service_id.service_uuid.uuid = 'SERV3' + _create_kpi_request.slice_id.slice_uuid.uuid = 'SLC3' + _create_kpi_request.endpoint_id.endpoint_uuid.uuid = '36571df2-bac1-5909-a27d-5f42491d2ff0' + _create_kpi_request.connection_id.connection_uuid.uuid = 'CON2' + _create_kpi_request.link_id.link_uuid.uuid = 'LNK2' + return _create_kpi_request + +def _create_kpi_id(kpi_id : str = "fc046641-0c9a-4750-b4d9-9f98401714e2"): + _create_kpi_id_request = KpiId() + _create_kpi_id_request.kpi_id.uuid = kpi_id + return _create_kpi_id_request diff --git a/src/telemetry/backend/tests/test_backend.py b/src/telemetry/backend/tests/test_backend.py index 28b92fb29c60eb099d253d12c36fa59b87b0a701..1329aa969a4fed5baa887dd12d120f41eb56c2fa 100644 --- a/src/telemetry/backend/tests/test_backend.py +++ b/src/telemetry/backend/tests/test_backend.py @@ -16,11 +16,19 @@ import pytest import logging import time from telemetry.backend.service.TelemetryBackendService import TelemetryBackendService -from .messages import create_collector_request -from .Fixtures import context_client, device_client +from .messages import create_collector_request, _create_kpi_descriptor, _create_kpi_id +from .Fixtures import context_client, device_client, service_client, kpi_manager_client from .add_devices import load_topology +from common.tools.context_queries.Topology import get_topology +from common.Constants import DEFAULT_CONTEXT_NAME +from common.tools.context_queries.Device import get_device, add_device_to_topology +# from common.tools.context_queries.EndPoint import get_endpoint_names +from .EndPoint import get_endpoint_names # modofied version of get_endpoint_names +from common.proto.context_pb2 import EndPointId, DeviceId, TopologyId, ContextId , Empty +from common.proto.kpi_manager_pb2 import KpiId LOGGER = logging.getLogger(__name__) +LOGGER.setLevel(logging.DEBUG) ########################### @@ -37,6 +45,120 @@ def log_all_methods(request): yield LOGGER.info(f" <<<<< Finished test: {request.node.name} ") +# # ----- Add Topology ----- +# def test_add_to_topology(context_client, device_client, service_client): +# load_topology(context_client, device_client) + +# # ----- Add Device to Topology ------ +# def test_add_device_to_topology(context_client): +# context_id = ContextId() +# context_id.context_uuid.uuid = "43813baf-195e-5da6-af20-b3d0922e71a7" +# topology_uuid = "c76135e3-24a8-5e92-9bed-c3c9139359c8" +# device_uuid = "69a3a3f0-5237-5f9e-bc96-d450d0c6c03a" +# response = add_device_to_topology( context_client = context_client, +# context_id = context_id, +# topology_uuid = topology_uuid, +# device_uuid = device_uuid +# ) +# LOGGER.info(f"Device added to topology: {response}") +# assert response is True + +# # ----- Get Topology ----- +# def test_get_topology(context_client, device_client): +# response = get_topology(context_client = context_client, topology_uuid = "test1", context_uuid = "test1") +# LOGGER.info(f"Topology: {response}") +# assert response is not None + +# def test_set_kpi_descriptor_and_get_device_id(kpi_manager_client): +# kpi_descriptor = _create_kpi_descriptor("1290fb71-bf15-5528-8b69-2d2fabe1fa18") +# kpi_id = kpi_manager_client.SetKpiDescriptor(kpi_descriptor) +# LOGGER.info(f"KPI Descriptor set: {kpi_id}") +# assert kpi_id is not None + +# response = kpi_manager_client.GetKpiDescriptor(kpi_id) +# # response = kpi_manager_client.GetKpiDescriptor(_create_kpi_id()) + +# assert response is not None +# LOGGER.info(f"KPI Descriptor: {response}") +# LOGGER.info(f"Device Id: {response.device_id.device_uuid.uuid}") +# LOGGER.info(f"Endpoint Id: {response.endpoint_id.endpoint_uuid.uuid}") + +# # ----- Get endpoint detail using device ID ----- +# def test_get_device_details(context_client): +# response = get_device(context_client = context_client, device_uuid = "1290fb71-bf15-5528-8b69-2d2fabe1fa18", include_config_rules = False, include_components = False) +# if response: +# LOGGER.info(f"Device type: {response.device_type}") +# for endpoint in response.device_endpoints: +# if endpoint.endpoint_id.endpoint_uuid.uuid == '36571df2-bac1-5909-a27d-5f42491d2ff0': +# endpoint_dict = {} +# kpi_sample_types = [] +# # LOGGER.info(f"Endpoint: {endpoint}") +# # LOGGER.info(f"Enpoint_uuid: {endpoint.endpoint_id.endpoint_uuid.uuid}") +# endpoint_dict["uuid"] = endpoint.endpoint_id.endpoint_uuid.uuid +# # LOGGER.info(f"Enpoint_name: {endpoint.name}") +# endpoint_dict["name"] = endpoint.name +# # LOGGER.info(f"Enpoint_type: {endpoint.endpoint_type}") +# endpoint_dict["type"] = endpoint.endpoint_type +# for sample_type in endpoint.kpi_sample_types: +# # LOGGER.info(f"Enpoint_sample_types: {sample_type}") +# kpi_sample_types.append(sample_type) +# endpoint_dict["sample_types"] = kpi_sample_types +# LOGGER.info(f"Extracted endpoint dict: {endpoint_dict}") +# else: +# LOGGER.info(f"Endpoint not matched") +# LOGGER.info(f"Device Type: {type(response)}") +# assert response is not None + +# # ----- List Conetxts ----- +# def test_list_contextIds(context_client): +# empty = Empty() +# response = context_client.ListContexts(empty) +# LOGGER.info(f"Contexts: {response}") +# assert response + +# # ----- List Devices ----- +# def test_list_devices(context_client): +# empty = Empty() +# response = context_client.ListDeviceIds(empty) +# LOGGER.info(f"Devices: {response}") +# assert response + +# ----- Get Endpoints ----- TODO: get_endpoint_names method doesn't return KPI samples types +# def test_get_endpoints(context_client): +# device_id = DeviceId() +# device_id.device_uuid.uuid = "1290fb71-bf15-5528-8b69-2d2fabe1fa18" +# endpoint_id = EndPointId() +# endpoint_id.endpoint_uuid.uuid = "43b817fa-246f-5e0a-a2e3-2aad0b3e16ca" +# endpoint_id.device_id.CopyFrom(device_id) +# response = get_endpoint_names(context_client = context_client, endpoint_ids = [endpoint_id]) +# LOGGER.info(f"Endpoints: {response}") +# assert response is not None + +# # ----- List Topologies ----- +# def test_list_topologies(context_client): +# context_id = ContextId() +# context_id.context_uuid.uuid = "e7d46baa-d38d-5b72-a082-f344274b63ef" +# respone = context_client.ListTopologies(context_id) +# LOGGER.info(f"Topologies: {respone}") + +# # ----- Remove Topology ----- +# def test_remove_topology(context_client): +# context_id = ContextId() +# context_id.context_uuid.uuid = "e7d46baa-d38d-5b72-a082-f344274b63ef" +# topology_id = TopologyId() +# topology_id.topology_uuid.uuid = "9ef0118c-4bca-5e81-808b-dc8f60e2cda4" +# topology_id.context_id.CopyFrom(context_id) + +# response = context_client.RemoveTopology(topology_id) +# LOGGER.info(f"Topology removed: {response}") + +# # ----- Remove context ----- +# def test_remove_context(context_client): +# context_id = ContextId() +# context_id.context_uuid.uuid = "e7d46baa-d38d-5b72-a082-f344274b63ef" +# response = context_client.RemoveContext(context_id) +# LOGGER.info(f"Context removed: {response}") + @pytest.fixture def telemetryBackend_service(): LOGGER.info('Initializing TelemetryBackendService...') @@ -54,15 +176,6 @@ def telemetryBackend_service(): def test_InitiateCollectorBackend(telemetryBackend_service): LOGGER.info(" Backend Initiated Successfully. Waiting for timer to finish ...") - time.sleep(300) + time.sleep(30) LOGGER.info(" Backend Timer Finished Successfully. ") -# --- "test_validate_kafka_topics" should be run before the functionality tests --- -# def test_validate_kafka_topics(): -# LOGGER.debug(" >>> test_validate_kafka_topics: START <<< ") -# response = KafkaTopic.create_all_topics() -# assert isinstance(response, bool) - -# # Call load_topology from the add_devices.py file -# def test_load_topology(context_client, device_client): -# load_topology(context_client, device_client) diff --git a/src/telemetry/backend/tests/topology.json b/src/telemetry/backend/tests/topology.json new file mode 100644 index 0000000000000000000000000000000000000000..6416130b924441e959fcdb7001b7c1b51df172d8 --- /dev/null +++ b/src/telemetry/backend/tests/topology.json @@ -0,0 +1,148 @@ +{ + "contexts": [ + {"context_id": {"context_uuid": {"uuid": "admin"}}} + ], + "topologies": [ + {"topology_id": {"context_id": {"context_uuid": {"uuid": "admin"}}, "topology_uuid": {"uuid": "admin"}}} + ], + "devices": [ + { + "device_id": {"device_uuid": {"uuid": "DE1"}}, "device_type": "emu-packet-router", "device_drivers": [0], + "device_endpoints": [], "device_operational_status": 0, "device_config": {"config_rules": [ + {"action": 1, "custom": {"resource_key": "_connect/address", "resource_value": "127.0.0.1"}}, + {"action": 1, "custom": {"resource_key": "_connect/port", "resource_value": "0"}}, + {"action": 1, "custom": {"resource_key": "_connect/settings", "resource_value": {"endpoints": [ + {"sample_types": [101, 102], "type": "copper/internal", "uuid": "1/1"}, + {"sample_types": [103, 102], "type": "copper/internal", "uuid": "1/2"}, + {"sample_types": [201, 202], "type": "copper/internal", "uuid": "2/1"}, + {"sample_types": [202, 203], "type": "copper/internal", "uuid": "2/2"}, + {"sample_types": [201, 203], "type": "copper/internal", "uuid": "2/3"}, + {"sample_types": [101, 103], "type": "copper/internal", "uuid": "2/4"} + ]}}} + ]} + }, + { + "device_id": {"device_uuid": {"uuid": "DE2"}}, "device_type": "emu-packet-router", "device_drivers": [0], + "device_endpoints": [], "device_operational_status": 0, "device_config": {"config_rules": [ + {"action": 1, "custom": {"resource_key": "_connect/address", "resource_value": "127.0.0.1"}}, + {"action": 1, "custom": {"resource_key": "_connect/port", "resource_value": "0"}}, + {"action": 1, "custom": {"resource_key": "_connect/settings", "resource_value": {"endpoints": [ + {"sample_types": [101, 103], "type": "copper/internal", "uuid": "1/1"}, + {"sample_types": [103, 101], "type": "copper/internal", "uuid": "1/2"}, + {"sample_types": [202, 201], "type": "copper/internal", "uuid": "2/1"}, + {"sample_types": [203, 201], "type": "copper/internal", "uuid": "2/2"}, + {"sample_types": [203, 202], "type": "copper/internal", "uuid": "2/3"}, + {"sample_types": [102 ], "type": "copper/internal", "uuid": "2/4"} + ]}}} + ]} + }, + { + "device_id": {"device_uuid": {"uuid": "DE3"}}, "device_type": "emu-packet-router", "device_drivers": [0], + "device_endpoints": [], "device_operational_status": 0, "device_config": {"config_rules": [ + {"action": 1, "custom": {"resource_key": "_connect/address", "resource_value": "127.0.0.1"}}, + {"action": 1, "custom": {"resource_key": "_connect/port", "resource_value": "0"}}, + {"action": 1, "custom": {"resource_key": "_connect/settings", "resource_value": {"endpoints": [ + {"sample_types": [], "type": "copper/internal", "uuid": "1/1"}, + {"sample_types": [], "type": "copper/internal", "uuid": "1/2"}, + {"sample_types": [], "type": "copper/internal", "uuid": "2/1"}, + {"sample_types": [], "type": "copper/internal", "uuid": "2/2"}, + {"sample_types": [], "type": "copper/internal", "uuid": "2/3"}, + {"sample_types": [], "type": "copper/internal", "uuid": "2/4"} + ]}}} + ]} + }, + { + "device_id": {"device_uuid": {"uuid": "DE4"}}, "device_type": "emu-packet-router", "device_drivers": [0], + "device_endpoints": [], "device_operational_status": 0, "device_config": {"config_rules": [ + {"action": 1, "custom": {"resource_key": "_connect/address", "resource_value": "127.0.0.1"}}, + {"action": 1, "custom": {"resource_key": "_connect/port", "resource_value": "0"}}, + {"action": 1, "custom": {"resource_key": "_connect/settings", "resource_value": {"endpoints": [ + {"sample_types": [], "type": "copper/internal", "uuid": "1/1"}, + {"sample_types": [], "type": "copper/internal", "uuid": "1/2"}, + {"sample_types": [], "type": "copper/internal", "uuid": "2/1"}, + {"sample_types": [], "type": "copper/internal", "uuid": "2/2"}, + {"sample_types": [], "type": "copper/internal", "uuid": "2/3"}, + {"sample_types": [], "type": "copper/internal", "uuid": "2/4"} + ]}}} + ]} + } + ], + "links": [ + + { + "link_id": {"link_uuid": {"uuid": "DE1/2/2==DE2/2/1"}}, "link_endpoint_ids": [ + {"device_id": {"device_uuid": {"uuid": "DE1"}}, "endpoint_uuid": {"uuid": "2/2"}}, + {"device_id": {"device_uuid": {"uuid": "DE2"}}, "endpoint_uuid": {"uuid": "2/1"}} + ] + }, + { + "link_id": {"link_uuid": {"uuid": "DE1/2/3==DE3/2/1"}}, "link_endpoint_ids": [ + {"device_id": {"device_uuid": {"uuid": "DE1"}}, "endpoint_uuid": {"uuid": "2/3"}}, + {"device_id": {"device_uuid": {"uuid": "DE3"}}, "endpoint_uuid": {"uuid": "2/1"}} + ] + }, + { + "link_id": {"link_uuid": {"uuid": "DE1/2/4==DE4/2/1"}}, "link_endpoint_ids": [ + {"device_id": {"device_uuid": {"uuid": "DE1"}}, "endpoint_uuid": {"uuid": "2/4"}}, + {"device_id": {"device_uuid": {"uuid": "DE4"}}, "endpoint_uuid": {"uuid": "2/1"}} + ] + }, + + { + "link_id": {"link_uuid": {"uuid": "DE2/2/1==DE1/2/2"}}, "link_endpoint_ids": [ + {"device_id": {"device_uuid": {"uuid": "DE2"}}, "endpoint_uuid": {"uuid": "2/1"}}, + {"device_id": {"device_uuid": {"uuid": "DE1"}}, "endpoint_uuid": {"uuid": "2/2"}} + ] + }, + { + "link_id": {"link_uuid": {"uuid": "DE2/2/3==DE3/2/2"}}, "link_endpoint_ids": [ + {"device_id": {"device_uuid": {"uuid": "DE2"}}, "endpoint_uuid": {"uuid": "2/3"}}, + {"device_id": {"device_uuid": {"uuid": "DE3"}}, "endpoint_uuid": {"uuid": "2/2"}} + ] + }, + { + "link_id": {"link_uuid": {"uuid": "DE2/2/4==DE4/2/2"}}, "link_endpoint_ids": [ + {"device_id": {"device_uuid": {"uuid": "DE2"}}, "endpoint_uuid": {"uuid": "2/4"}}, + {"device_id": {"device_uuid": {"uuid": "DE4"}}, "endpoint_uuid": {"uuid": "2/2"}} + ] + }, + + { + "link_id": {"link_uuid": {"uuid": "DE3/2/1==DE1/2/3"}}, "link_endpoint_ids": [ + {"device_id": {"device_uuid": {"uuid": "DE3"}}, "endpoint_uuid": {"uuid": "2/1"}}, + {"device_id": {"device_uuid": {"uuid": "DE1"}}, "endpoint_uuid": {"uuid": "2/3"}} + ] + }, + { + "link_id": {"link_uuid": {"uuid": "DE3/2/2==DE2/2/3"}}, "link_endpoint_ids": [ + {"device_id": {"device_uuid": {"uuid": "DE3"}}, "endpoint_uuid": {"uuid": "2/2"}}, + {"device_id": {"device_uuid": {"uuid": "DE2"}}, "endpoint_uuid": {"uuid": "2/3"}} + ] + }, + { + "link_id": {"link_uuid": {"uuid": "DE4/2/2==DE2/2/4"}}, "link_endpoint_ids": [ + {"device_id": {"device_uuid": {"uuid": "DE4"}}, "endpoint_uuid": {"uuid": "2/2"}}, + {"device_id": {"device_uuid": {"uuid": "DE2"}}, "endpoint_uuid": {"uuid": "2/4"}} + ] + }, + + { + "link_id": {"link_uuid": {"uuid": "DE4/2/1==DE1/2/4"}}, "link_endpoint_ids": [ + {"device_id": {"device_uuid": {"uuid": "DE4"}}, "endpoint_uuid": {"uuid": "2/1"}}, + {"device_id": {"device_uuid": {"uuid": "DE1"}}, "endpoint_uuid": {"uuid": "2/4"}} + ] + }, + { + "link_id": {"link_uuid": {"uuid": "DE4/2/2==DE2/2/4"}}, "link_endpoint_ids": [ + {"device_id": {"device_uuid": {"uuid": "DE4"}}, "endpoint_uuid": {"uuid": "2/2"}}, + {"device_id": {"device_uuid": {"uuid": "DE2"}}, "endpoint_uuid": {"uuid": "2/4"}} + ] + }, + { + "link_id": {"link_uuid": {"uuid": "DE4/2/3==DE3/2/4"}}, "link_endpoint_ids": [ + {"device_id": {"device_uuid": {"uuid": "DE4"}}, "endpoint_uuid": {"uuid": "2/3"}}, + {"device_id": {"device_uuid": {"uuid": "DE3"}}, "endpoint_uuid": {"uuid": "2/4"}} + ] + } + ] +} diff --git a/src/telemetry/frontend/service/TelemetryFrontendServiceServicerImpl.py b/src/telemetry/frontend/service/TelemetryFrontendServiceServicerImpl.py index 1ef8ed46b049ec96daffffc46748e51f55144df9..955036495f670dc8d126a0682917dfc90acba185 100644 --- a/src/telemetry/frontend/service/TelemetryFrontendServiceServicerImpl.py +++ b/src/telemetry/frontend/service/TelemetryFrontendServiceServicerImpl.py @@ -143,58 +143,6 @@ class TelemetryFrontendServiceServicerImpl(TelemetryFrontendServiceServicer): @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def delivery_callback(self, err, msg): - """ - Callback function to handle message delivery status. - Args: - err (KafkaError): Kafka error object. - msg (Message): Kafka message object. - """ if err: LOGGER.debug('Message delivery failed: {:}'.format(err)) - # print('Message delivery failed: {:}'.format(err)) - # else: - # LOGGER.debug('Message delivered to topic {:}'.format(msg.topic())) - # print('Message delivered to topic {:}'.format(msg.topic())) - - # ---------- Independent Method --------------- - # Listener method is independent of any method (same lifetime as service) - # continously listens for responses - def install_servicers(self): - threading.Thread(target=self.ResponseListener).start() - - def ResponseListener(self): - """ - listener for response on Kafka topic. - """ - self.kafka_consumer.subscribe([KafkaTopic.TELEMETRY_RESPONSE.value]) - while True: - receive_msg = self.kafka_consumer.poll(2.0) - if receive_msg is None: - continue - elif receive_msg.error(): - if receive_msg.error().code() == KafkaError._PARTITION_EOF: - continue - else: - # print("Consumer error: {:}".format(receive_msg.error())) - LOGGER.error("Consumer error: {:}".format(receive_msg.error())) - break - try: - collector_id = receive_msg.key().decode('utf-8') - if collector_id in ACTIVE_COLLECTORS: - kpi_value = json.loads(receive_msg.value().decode('utf-8')) - self.process_response(collector_id, kpi_value['kpi_id'], kpi_value['kpi_value']) - else: - # print(f"collector id does not match.\nRespone ID: '{collector_id}' --- Active IDs: '{ACTIVE_COLLECTORS}' ") - LOGGER.info("collector id does not match.\nRespone ID: {:} --- Active IDs: {:}".format(collector_id, ACTIVE_COLLECTORS)) - except Exception as e: - # print(f"Error extarcting msg key or value: {str(e)}") - LOGGER.info("Error extarcting msg key or value: {:}".format(e)) - continue - - def process_response(self, collector_id: str, kpi_id: str, kpi_value: Any): - if kpi_id == "-1" and kpi_value == -1: - # print ("Backend termination confirmation for collector id: ", collector_id) - LOGGER.info("Backend termination confirmation for collector id: {:}".format(collector_id)) - else: - LOGGER.info("Backend termination confirmation for collector id: {:}".format(collector_id)) - # print ("KPI Value: Collector Id:", collector_id, ", Kpi Id:", kpi_id, ", Value:", kpi_value) + diff --git a/src/telemetry/frontend/service/__main__.py b/src/telemetry/frontend/service/__main__.py index e1b9dba4e97fe30b962a1deb9050c67671cbe976..874b34b8c7ae7800b323d427e9347798b22cf7bc 100644 --- a/src/telemetry/frontend/service/__main__.py +++ b/src/telemetry/frontend/service/__main__.py @@ -18,6 +18,8 @@ from common.Settings import get_log_level, get_metrics_port from .TelemetryFrontendService import TelemetryFrontendService from telemetry.database.TelemetryModel import Collector as Model from common.tools.database.GenericDatabase import Database +from common.tools.kafka.Variables import KafkaTopic + terminate = threading.Event() LOGGER = None @@ -43,6 +45,8 @@ def main(): kpiDBobj.create_database() kpiDBobj.create_tables() + KafkaTopic.create_all_topics() + # Start metrics server metrics_port = get_metrics_port() start_http_server(metrics_port) diff --git a/src/telemetry/frontend/tests/Messages.py b/src/telemetry/frontend/tests/Messages.py index 177bcc0b7e3829d2cdfd54c404618af9ebe43161..d766f68fac4fdf978543cc94a151fbca81d9b0de 100644 --- a/src/telemetry/frontend/tests/Messages.py +++ b/src/telemetry/frontend/tests/Messages.py @@ -30,16 +30,17 @@ def create_collector_request(): # _create_collector_request.collector_id.collector_id.uuid = str(uuid.uuid4()) _create_collector_request.collector_id.collector_id.uuid = "efef4d95-1cf1-43c4-9742-95c283dddddd" # _create_collector_request.kpi_id.kpi_id.uuid = str(uuid.uuid4()) - _create_collector_request.kpi_id.kpi_id.uuid = "6e22f180-ba28-4641-b190-2287bf448888" + # _create_collector_request.kpi_id.kpi_id.uuid = "6e22f180-ba28-4641-b190-2287bf448888" + _create_collector_request.kpi_id.kpi_id.uuid = "8c5ca114-cdc7-4081-b128-b667fd159832" # _create_collector_request.duration_s = float(random.randint(8, 16)) - _create_collector_request.duration_s = -1 - _create_collector_request.interval_s = float(random.randint(3, 5)) + _create_collector_request.duration_s = float(random.randint(40, 60)) + _create_collector_request.interval_s = float(random.randint(5, 7)) return _create_collector_request def create_collector_filter(): _create_collector_filter = telemetry_frontend_pb2.CollectorFilter() kpi_id_obj = KpiId() # kpi_id_obj.kpi_id.uuid = str(uuid.uuid4()) - kpi_id_obj.kpi_id.uuid = "a7237fa3-caf4-479d-84b6-4d9f9738fb7f" + kpi_id_obj.kpi_id.uuid = "8c5ca114-cdc7-4081-b128-b667fd159832" _create_collector_filter.kpi_id.append(kpi_id_obj) return _create_collector_filter diff --git a/src/telemetry/frontend/tests/test_frontend.py b/src/telemetry/frontend/tests/test_frontend.py index 6c6107152f950cbe565f109b1757843a5c6165e8..767a1f73f2ebd73f88c71ac44e8ce87efb37bebd 100644 --- a/src/telemetry/frontend/tests/test_frontend.py +++ b/src/telemetry/frontend/tests/test_frontend.py @@ -90,13 +90,6 @@ def telemetryFrontend_client( # Tests Implementation of Telemetry Frontend ########################### -# ------- Re-structuring Test --------- -# --- "test_validate_kafka_topics" should be run before the functionality tests --- -def test_validate_kafka_topics(): - # LOGGER.debug(" >>> test_validate_kafka_topics: START <<< ") - response = KafkaTopic.create_all_topics() - assert isinstance(response, bool) - # ----- core funtionality test ----- def test_StartCollector(telemetryFrontend_client): # LOGGER.info(' >>> test_StartCollector START: <<< ') @@ -104,18 +97,17 @@ def test_StartCollector(telemetryFrontend_client): LOGGER.debug(str(response)) assert isinstance(response, CollectorId) +def test_SelectCollectors(telemetryFrontend_client): + LOGGER.info(' >>> test_SelectCollectors START: <<< ') + response = telemetryFrontend_client.SelectCollectors(create_collector_filter()) + LOGGER.debug(str(response)) + assert isinstance(response, CollectorList) def test_StopCollector(telemetryFrontend_client): # LOGGER.info(' >>> test_StopCollector START: <<< ') - LOGGER.info("Waiting before termination...") - time.sleep(30) + # LOGGER.info("Waiting before termination...") + # time.sleep(30) response = telemetryFrontend_client.StopCollector(create_collector_id()) LOGGER.debug(str(response)) assert isinstance(response, Empty) -# def test_SelectCollectors(telemetryFrontend_client): -# LOGGER.info(' >>> test_SelectCollectors START: <<< ') -# response = telemetryFrontend_client.SelectCollectors(create_collector_filter()) -# LOGGER.debug(str(response)) -# assert isinstance(response, CollectorList) -