diff --git a/src/common/tools/kafka/Variables.py b/src/common/tools/kafka/Variables.py index a6f996932011f6838c8a1e96a325a6a9d885fbd9..e01b33896f439204eaf770cf58c7d9852a797611 100644 --- a/src/common/tools/kafka/Variables.py +++ b/src/common/tools/kafka/Variables.py @@ -38,7 +38,7 @@ class KafkaTopic(Enum): """ all_topics = [member.value for member in KafkaTopic] if( KafkaTopic.create_new_topic_if_not_exists( all_topics )): - LOGGER.debug("All topics created sucsessfully") + LOGGER.debug("All topics are created sucsessfully") return True else: LOGGER.debug("Error creating all topics") @@ -70,4 +70,4 @@ class KafkaTopic(Enum): return False return True -# create all topics after the deployments (Telemetry and Analytics) \ No newline at end of file +# create all topics after the deployments (Telemetry and Analytics) diff --git a/src/kpi_manager/README.md b/src/kpi_manager/README.md index 72ba6e5594adeef4a29d650615716c26273ed115..c1feadcc4843db26a219d1e3b37833ddd80b18dc 100644 --- a/src/kpi_manager/README.md +++ b/src/kpi_manager/README.md @@ -26,4 +26,4 @@ The following requirements should be fulfilled before the execuation of KPI mana 4. Call `SelectKpiDescriptor(KpiDescriptorFilter)->KpiDescriptorList` to get all `KpiDescriptor` objects that matches the filter criteria. `KpiDescriptorFilter` and `KpiDescriptorList` are pre-defined gRPC message types. ## For KPI composer and KPI writer -The functionalities of KPI composer and writer is heavily dependent upon Telemetery service. Therfore, these services has other pre-requsites that are mention [here](https://labs.etsi.org/rep/tfs/controller/-/blob/feat/71-cttc-separation-of-monitoring/src/telemetry/requirements.in). \ No newline at end of file +The functionalities of KPI composer and writer is heavily dependent upon Telemetery service. Therfore, these services has other pre-requsites that are mention [here](https://labs.etsi.org/rep/tfs/controller/-/blob/feat/71-cttc-separation-of-monitoring/src/telemetry/requirements.in). diff --git a/src/kpi_manager/client/KpiManagerClient.py b/src/kpi_manager/client/KpiManagerClient.py index 0b7979dbbb4d8149fde1171208488adebfc34aee..9e7079ae49c4a76757c3c979d322d23dce0fa92e 100755 --- a/src/kpi_manager/client/KpiManagerClient.py +++ b/src/kpi_manager/client/KpiManagerClient.py @@ -65,10 +65,8 @@ class KpiManagerClient: @RETRY_DECORATOR def GetKpiDescriptor(self, request : KpiId) -> KpiDescriptor: - print('---> GetKpiDescriptor: {:s}'.format(grpc_message_to_json_string(request))) LOGGER.debug('GetKpiDescriptor: {:s}'.format(grpc_message_to_json_string(request))) response = self.stub.GetKpiDescriptor(request) - print('---> GetKpiDescriptor result: {:s}'.format(grpc_message_to_json_string(response))) LOGGER.debug('GetKpiDescriptor result: {:s}'.format(grpc_message_to_json_string(response))) return response @@ -77,4 +75,4 @@ class KpiManagerClient: LOGGER.debug('SelectKpiDescriptor: {:s}'.format(grpc_message_to_json_string(filter))) response = self.stub.SelectKpiDescriptor(filter) LOGGER.debug('SelectKpiDescriptor result: {:s}'.format(grpc_message_to_json_string(response))) - return response \ No newline at end of file + return response diff --git a/src/kpi_manager/database/KpiModel.py b/src/kpi_manager/database/KpiModel.py index b8794ef686e3cae23470c9dd4de970f2248d6850..5c2fdff0664883bcc727096ddeda562fdbe3085d 100644 --- a/src/kpi_manager/database/KpiModel.py +++ b/src/kpi_manager/database/KpiModel.py @@ -14,7 +14,7 @@ import logging from sqlalchemy.dialects.postgresql import UUID -from sqlalchemy import Column, Integer, String, Float, Text +from sqlalchemy import Column, Integer, String, Text from sqlalchemy.orm import registry from common.proto.kpi_manager_pb2 import KpiDescriptor @@ -23,7 +23,6 @@ LOGGER = logging.getLogger(__name__) # Create a base class for declarative models Base = registry().generate_base() -# Base = declarative_base() class Kpi(Base): __tablename__ = 'kpi' @@ -82,4 +81,4 @@ class Kpi(Base): response.endpoint_id.endpoint_uuid.uuid = row.endpoint_id response.connection_id.connection_uuid.uuid = row.connection_id response.link_id.link_uuid.uuid = row.link_id - return response \ No newline at end of file + return response diff --git a/src/kpi_manager/database/Kpi_DB.py b/src/kpi_manager/database/Kpi_DB.py index 6ab2c52f65a28f8c26e5c501aaaf1a591688d81f..5b2b586b6d72cf73ff3362f9240d1171e8a9974d 100644 --- a/src/kpi_manager/database/Kpi_DB.py +++ b/src/kpi_manager/database/Kpi_DB.py @@ -151,4 +151,4 @@ class KpiDB: LOGGER.error(f"Error fetching filtered rows from {model.__name__} table with filters {filter_object} ::: {e}") raise OperationFailedException ("Select by filter", extra_details=["unable to apply the filter {:}".format(e)]) finally: - session.close() \ No newline at end of file + session.close() diff --git a/src/kpi_manager/service/KpiManagerService.py b/src/kpi_manager/service/KpiManagerService.py index 6c8c663939e42dc1e632235b6bd239ea72f18eeb..b69a926a94c6cf10a680fe1b15d065f6bc073c97 100755 --- a/src/kpi_manager/service/KpiManagerService.py +++ b/src/kpi_manager/service/KpiManagerService.py @@ -14,17 +14,16 @@ from common.Constants import ServiceNameEnum from common.Settings import get_service_port_grpc -from .NameMapping import NameMapping from common.tools.service.GenericGrpcService import GenericGrpcService from common.proto.kpi_manager_pb2_grpc import add_KpiManagerServiceServicer_to_server from kpi_manager.service.KpiManagerServiceServicerImpl import KpiManagerServiceServicerImpl class KpiManagerService(GenericGrpcService): - def __init__(self, name_mapping : NameMapping, cls_name: str = __name__) -> None: + def __init__(self, cls_name: str = __name__) -> None: port = get_service_port_grpc(ServiceNameEnum.KPIMANAGER) super().__init__(port, cls_name=cls_name) - self.kpiManagerService_servicer = KpiManagerServiceServicerImpl(name_mapping) + self.kpiManagerService_servicer = KpiManagerServiceServicerImpl() def install_servicers(self): add_KpiManagerServiceServicer_to_server(self.kpiManagerService_servicer, self.server) diff --git a/src/kpi_manager/service/KpiManagerServiceServicerImpl.py b/src/kpi_manager/service/KpiManagerServiceServicerImpl.py index 0ac7dd76bc18fcd8670cca2ad0c9bfdfed329174..05292fc5b14feaf079cc7691c650775965cc9148 100644 --- a/src/kpi_manager/service/KpiManagerServiceServicerImpl.py +++ b/src/kpi_manager/service/KpiManagerServiceServicerImpl.py @@ -14,24 +14,18 @@ import logging, grpc -from typing import List, Set -from sqlalchemy.sql.expression import BinaryExpression from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_method from common.proto.context_pb2 import Empty from common.proto.kpi_manager_pb2_grpc import KpiManagerServiceServicer from common.proto.kpi_manager_pb2 import KpiId, KpiDescriptor, KpiDescriptorFilter, KpiDescriptorList -from .NameMapping import NameMapping from kpi_manager.database.Kpi_DB import KpiDB from kpi_manager.database.KpiModel import Kpi as KpiModel LOGGER = logging.getLogger(__name__) METRICS_POOL = MetricsPool('KpiManager', 'NBIgRPC') -class IDNotFoundError(Exception): - ... - class KpiManagerServiceServicerImpl(KpiManagerServiceServicer): - def __init__(self, name_mapping : NameMapping): + def __init__(self): LOGGER.info('Init KpiManagerService') self.kpi_db_obj = KpiDB() diff --git a/src/kpi_manager/service/NameMapping.py b/src/kpi_manager/service/NameMapping.py deleted file mode 100644 index f98e367b17b4a2e4c7c6f3dcdb90dfb8ee24d3ad..0000000000000000000000000000000000000000 --- a/src/kpi_manager/service/NameMapping.py +++ /dev/null @@ -1,46 +0,0 @@ -# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import threading -from typing import Dict, Optional - -class NameMapping: - def __init__(self) -> None: - self.__lock = threading.Lock() - self.__device_to_name : Dict[str, str] = dict() - self.__endpoint_to_name : Dict[str, str] = dict() - - def get_device_name(self, device_uuid : str) -> Optional[str]: - with self.__lock: - return self.__device_to_name.get(device_uuid) - - def get_endpoint_name(self, endpoint_uuid : str) -> Optional[str]: - with self.__lock: - return self.__endpoint_to_name.get(endpoint_uuid) - - def set_device_name(self, device_uuid : str, device_name : str) -> None: - with self.__lock: - self.__device_to_name[device_uuid] = device_name - - def set_endpoint_name(self, endpoint_uuid : str, endpoint_name : str) -> None: - with self.__lock: - self.__endpoint_to_name[endpoint_uuid] = endpoint_name - - def delete_device_name(self, device_uuid : str) -> None: - with self.__lock: - self.__device_to_name.pop(device_uuid, None) - - def delete_endpoint_name(self, endpoint_uuid : str) -> None: - with self.__lock: - self.__endpoint_to_name.pop(endpoint_uuid, None) diff --git a/src/kpi_manager/service/__main__.py b/src/kpi_manager/service/__main__.py index 132a152194adb7780f35550483e14a2414df1cf8..a4884c545debaa632a9ff0cdbc458949ab8f3458 100644 --- a/src/kpi_manager/service/__main__.py +++ b/src/kpi_manager/service/__main__.py @@ -36,21 +36,12 @@ def main(): logging.basicConfig(level=log_level) LOGGER = logging.getLogger(__name__) - wait_for_environment_variables([ - get_env_var_name(ServiceNameEnum.CONTEXT, ENVVAR_SUFIX_SERVICE_HOST ), - get_env_var_name(ServiceNameEnum.CONTEXT, ENVVAR_SUFIX_SERVICE_PORT_GRPC), - get_env_var_name(ServiceNameEnum.DEVICE, ENVVAR_SUFIX_SERVICE_HOST ), - get_env_var_name(ServiceNameEnum.DEVICE, ENVVAR_SUFIX_SERVICE_PORT_GRPC), - ]) - signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGTERM, signal_handler) LOGGER.debug('Starting...') - name_mapping = NameMapping() - - grpc_service = KpiManagerService(name_mapping) + grpc_service = KpiManagerService() grpc_service.start() # Wait for Ctrl+C or termination signal diff --git a/src/kpi_manager/tests/test_messages.py b/src/kpi_manager/tests/test_messages.py index 6294d1969e71400ea8d1702314bd0cdde374341a..870660658a2808fc6db2e98a140497980022e5a7 100644 --- a/src/kpi_manager/tests/test_messages.py +++ b/src/kpi_manager/tests/test_messages.py @@ -23,12 +23,12 @@ def create_kpi_descriptor_request(descriptor_name: str = "Test_name"): _create_kpi_request.kpi_id.kpi_id.uuid = str(uuid.uuid4()) _create_kpi_request.kpi_description = descriptor_name _create_kpi_request.kpi_sample_type = KpiSampleType.KPISAMPLETYPE_PACKETS_RECEIVED - _create_kpi_request.device_id.device_uuid.uuid = 'DEV2' # pylint: disable=maybe-no-member - _create_kpi_request.service_id.service_uuid.uuid = 'SERV2' # pylint: disable=maybe-no-member - _create_kpi_request.slice_id.slice_uuid.uuid = 'SLC1' # pylint: disable=maybe-no-member - _create_kpi_request.endpoint_id.endpoint_uuid.uuid = 'END1' # pylint: disable=maybe-no-member - _create_kpi_request.connection_id.connection_uuid.uuid = 'CON1' # pylint: disable=maybe-no-member - _create_kpi_request.link_id.link_uuid.uuid = 'LNK1' # pylint: disable=maybe-no-member + _create_kpi_request.device_id.device_uuid.uuid = 'DEV2' + _create_kpi_request.service_id.service_uuid.uuid = 'SERV2' + _create_kpi_request.slice_id.slice_uuid.uuid = 'SLC1' + _create_kpi_request.endpoint_id.endpoint_uuid.uuid = 'END1' + _create_kpi_request.connection_id.connection_uuid.uuid = 'CON1' + _create_kpi_request.link_id.link_uuid.uuid = 'LNK1' return _create_kpi_request def create_kpi_descriptor_request_a(description: str = "Test Description"): @@ -36,12 +36,12 @@ def create_kpi_descriptor_request_a(description: str = "Test Description"): _create_kpi_request.kpi_id.kpi_id.uuid = str(uuid.uuid4()) _create_kpi_request.kpi_description = description _create_kpi_request.kpi_sample_type = KpiSampleType.KPISAMPLETYPE_PACKETS_RECEIVED - _create_kpi_request.device_id.device_uuid.uuid = 'DEV4' # pylint: disable=maybe-no-member - _create_kpi_request.service_id.service_uuid.uuid = 'SERV3' # pylint: disable=maybe-no-member - _create_kpi_request.slice_id.slice_uuid.uuid = 'SLC3' # pylint: disable=maybe-no-member - _create_kpi_request.endpoint_id.endpoint_uuid.uuid = 'END2' # pylint: disable=maybe-no-member - _create_kpi_request.connection_id.connection_uuid.uuid = 'CON2' # pylint: disable=maybe-no-member - _create_kpi_request.link_id.link_uuid.uuid = 'LNK2' # pylint: disable=maybe-no-member + _create_kpi_request.device_id.device_uuid.uuid = 'DEV4' + _create_kpi_request.service_id.service_uuid.uuid = 'SERV3' + _create_kpi_request.slice_id.slice_uuid.uuid = 'SLC3' + _create_kpi_request.endpoint_id.endpoint_uuid.uuid = 'END2' + _create_kpi_request.connection_id.connection_uuid.uuid = 'CON2' + _create_kpi_request.link_id.link_uuid.uuid = 'LNK2' return _create_kpi_request def create_kpi_filter_request(): diff --git a/src/kpi_value_api/client/KpiValueApiClient.py b/src/kpi_value_api/client/KpiValueApiClient.py index adf17da5d283fec66d3cd24e0fb7b000f877b3e0..f432271cfb7c8136f72156330b25d0b82b934d99 100644 --- a/src/kpi_value_api/client/KpiValueApiClient.py +++ b/src/kpi_value_api/client/KpiValueApiClient.py @@ -20,7 +20,7 @@ from common.tools.client.RetryDecorator import retry, delay_exponential from common.tools.grpc.Tools import grpc_message_to_json_string from common.proto.context_pb2 import Empty -from common.proto.kpi_value_api_pb2 import KpiValue, KpiValueList, KpiValueType, KpiValueFilter +from common.proto.kpi_value_api_pb2 import KpiValueList, KpiValueFilter from common.proto.kpi_value_api_pb2_grpc import KpiValueAPIServiceStub LOGGER = logging.getLogger(__name__) @@ -60,4 +60,4 @@ class KpiValueApiClient: LOGGER.debug('SelectKpiValues: {:s}'.format(grpc_message_to_json_string(request))) response = self.stub.SelectKpiValues(request) LOGGER.debug('SelectKpiValues result: {:s}'.format(grpc_message_to_json_string(response))) - return response \ No newline at end of file + return response diff --git a/src/kpi_value_api/service/KpiValueApiService.py b/src/kpi_value_api/service/KpiValueApiService.py index 2fb24aaacff82ce7c33cce6e43e2f50f4af14fbe..68b6fbdc278a00aa7cf98385bcf8afa573f91445 100644 --- a/src/kpi_value_api/service/KpiValueApiService.py +++ b/src/kpi_value_api/service/KpiValueApiService.py @@ -13,7 +13,6 @@ # limitations under the License. -from .NameMapping import NameMapping from common.Constants import ServiceNameEnum from common.Settings import get_service_port_grpc from common.tools.service.GenericGrpcService import GenericGrpcService @@ -22,10 +21,10 @@ from common.proto.kpi_value_api_pb2_grpc import add_KpiValueAPIServiceServicer_t class KpiValueApiService(GenericGrpcService): - def __init__(self, name_mapping : NameMapping, cls_name : str = __name__ ) -> None: + def __init__(self, cls_name : str = __name__ ) -> None: port = get_service_port_grpc(ServiceNameEnum.KPIVALUEAPI) super().__init__(port, cls_name=cls_name) - self.kpiValueApiService_servicer = KpiValueApiServiceServicerImpl(name_mapping) + self.kpiValueApiService_servicer = KpiValueApiServiceServicerImpl() def install_servicers(self): - add_KpiValueAPIServiceServicer_to_server(self.kpiValueApiService_servicer, self.server) \ No newline at end of file + add_KpiValueAPIServiceServicer_to_server(self.kpiValueApiService_servicer, self.server) diff --git a/src/kpi_value_api/service/KpiValueApiServiceServicerImpl.py b/src/kpi_value_api/service/KpiValueApiServiceServicerImpl.py index 3ecf20c08a6e280ce346273e5c154e0cdd565c1d..d27de54f3cddfd0d70d656a89c45adc50e518289 100644 --- a/src/kpi_value_api/service/KpiValueApiServiceServicerImpl.py +++ b/src/kpi_value_api/service/KpiValueApiServiceServicerImpl.py @@ -12,8 +12,8 @@ # See the License for the specific language governing permissions and # limitations under the License. -import logging, grpc, json, requests -from typing import Tuple, Any, List, Dict +import logging, grpc, requests +from typing import Tuple, Any from datetime import datetime from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_method from common.tools.kafka.Variables import KafkaConfig, KafkaTopic @@ -24,41 +24,35 @@ from common.proto.kpi_value_api_pb2 import KpiValueList, KpiValueFilter, KpiValu from confluent_kafka import Producer as KafkaProducer -from .NameMapping import NameMapping - LOGGER = logging.getLogger(__name__) METRICS_POOL = MetricsPool('KpiValueAPI', 'NBIgRPC') PROM_URL = "http://localhost:9090" class KpiValueApiServiceServicerImpl(KpiValueAPIServiceServicer): - def __init__(self, name_mapping : NameMapping): + def __init__(self): LOGGER.debug('Init KpiValueApiService') @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def StoreKpiValues(self, request: KpiValueList, grpc_context: grpc.ServicerContext ) -> Empty: LOGGER.debug('StoreKpiValues: Received gRPC message object: {:}'.format(request)) - producer_obj = KafkaProducer({ 'bootstrap.servers' : KafkaConfig.SERVER_IP.value }) - for kpi_value in request.kpi_value_list: kpi_value_to_produce : Tuple [str, Any, Any] = ( - kpi_value.kpi_id.kpi_id, # kpi_value.kpi_id.kpi_id.uuid - kpi_value.timestamp, # kpi_value.timestamp.timestamp + kpi_value.kpi_id.kpi_id, + kpi_value.timestamp, kpi_value.kpi_value_type # kpi_value.kpi_value_type.(many options) how? ) LOGGER.debug('KPI to produce is {:}'.format(kpi_value_to_produce)) msg_key = "gRPC-kpivalueapi" # str(__class__.__name__) can be used - # write this KPI to Kafka producer_obj.produce( KafkaTopic.VALUE.value, - key = msg_key, - # value = json.dumps(kpi_value_to_produce), - value = kpi_value.SerializeToString(), + key = msg_key, + value = kpi_value.SerializeToString(), # value = json.dumps(kpi_value_to_produce), callback = self.delivery_callback ) producer_obj.flush() diff --git a/src/kpi_value_api/service/NameMapping.py b/src/kpi_value_api/service/NameMapping.py deleted file mode 100644 index f98e367b17b4a2e4c7c6f3dcdb90dfb8ee24d3ad..0000000000000000000000000000000000000000 --- a/src/kpi_value_api/service/NameMapping.py +++ /dev/null @@ -1,46 +0,0 @@ -# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import threading -from typing import Dict, Optional - -class NameMapping: - def __init__(self) -> None: - self.__lock = threading.Lock() - self.__device_to_name : Dict[str, str] = dict() - self.__endpoint_to_name : Dict[str, str] = dict() - - def get_device_name(self, device_uuid : str) -> Optional[str]: - with self.__lock: - return self.__device_to_name.get(device_uuid) - - def get_endpoint_name(self, endpoint_uuid : str) -> Optional[str]: - with self.__lock: - return self.__endpoint_to_name.get(endpoint_uuid) - - def set_device_name(self, device_uuid : str, device_name : str) -> None: - with self.__lock: - self.__device_to_name[device_uuid] = device_name - - def set_endpoint_name(self, endpoint_uuid : str, endpoint_name : str) -> None: - with self.__lock: - self.__endpoint_to_name[endpoint_uuid] = endpoint_name - - def delete_device_name(self, device_uuid : str) -> None: - with self.__lock: - self.__device_to_name.pop(device_uuid, None) - - def delete_endpoint_name(self, endpoint_uuid : str) -> None: - with self.__lock: - self.__endpoint_to_name.pop(endpoint_uuid, None) diff --git a/src/kpi_value_api/service/__main__.py b/src/kpi_value_api/service/__main__.py index 1a8707112601f317ab111ff19329d00620ad3284..8b4ebe296e2c4f193aa1fc99aede9364556c2094 100644 --- a/src/kpi_value_api/service/__main__.py +++ b/src/kpi_value_api/service/__main__.py @@ -12,10 +12,9 @@ # See the License for the specific language governing permissions and # limitations under the License. -import logging, signal, sys, threading, time +import logging, signal, sys, threading from prometheus_client import start_http_server from common.Settings import get_log_level -from .NameMapping import NameMapping # import updated from .KpiValueApiService import KpiValueApiService terminate = threading.Event() @@ -37,9 +36,7 @@ def main(): LOGGER.debug('Starting...') - name_mapping = NameMapping() - - grpc_service = KpiValueApiService(name_mapping) + grpc_service = KpiValueApiService() grpc_service.start() # Wait for Ctrl+C or termination signal diff --git a/src/kpi_value_api/tests/messages.py b/src/kpi_value_api/tests/messages.py index fc883db1f845905fb55eb3a4532f97eeae3c8a49..c2a1cbb0b275fb26d6498e4470f3869a105a8d36 100644 --- a/src/kpi_value_api/tests/messages.py +++ b/src/kpi_value_api/tests/messages.py @@ -32,4 +32,4 @@ def create_kpi_value_list(): _create_kpi_value_list.kpi_value_list.append(kpi_value_object) - return _create_kpi_value_list \ No newline at end of file + return _create_kpi_value_list diff --git a/src/kpi_value_api/tests/test_kpi_value_api.py b/src/kpi_value_api/tests/test_kpi_value_api.py index 6c2858659d75fbf6f9f7df1e691db91797d50701..307b5cdad4e6503a774e308f669fc44762f84bf1 100644 --- a/src/kpi_value_api/tests/test_kpi_value_api.py +++ b/src/kpi_value_api/tests/test_kpi_value_api.py @@ -14,35 +14,28 @@ import os, logging, pytest - from common.proto.context_pb2 import Empty from common.Constants import ServiceNameEnum from common.tools.kafka.Variables import KafkaTopic from common.Settings import ( ENVVAR_SUFIX_SERVICE_HOST, ENVVAR_SUFIX_SERVICE_PORT_GRPC, get_env_var_name, get_service_port_grpc) - - -from kpi_value_api.service.NameMapping import NameMapping from kpi_value_api.service.KpiValueApiService import KpiValueApiService from kpi_value_api.client.KpiValueApiClient import KpiValueApiClient from kpi_value_api.tests.messages import create_kpi_value_list LOCAL_HOST = '127.0.0.1' - KPIVALUEAPI_SERVICE_PORT = get_service_port_grpc(ServiceNameEnum.KPIVALUEAPI) # type: ignore os.environ[get_env_var_name(ServiceNameEnum.KPIVALUEAPI, ENVVAR_SUFIX_SERVICE_HOST )] = str(LOCAL_HOST) os.environ[get_env_var_name(ServiceNameEnum.KPIVALUEAPI, ENVVAR_SUFIX_SERVICE_PORT_GRPC)] = str(KPIVALUEAPI_SERVICE_PORT) - LOGGER = logging.getLogger(__name__) # This fixture will be requested by test cases and last during testing session @pytest.fixture(scope='session') def kpi_value_api_service(): LOGGER.info('Initializing KpiValueApiService...') - name_mapping = NameMapping() # _service = MonitoringService(name_mapping) - _service = KpiValueApiService(name_mapping) + _service = KpiValueApiService() _service.start() # yield the server, when test finishes, execution will resume to stop it @@ -85,9 +78,7 @@ def test_validate_kafka_topics(): response = KafkaTopic.create_all_topics() assert isinstance(response, bool) - def test_store_kpi_values(kpi_value_api_client): LOGGER.debug(" >>> test_set_list_of_KPIs: START <<< ") response = kpi_value_api_client.StoreKpiValues(create_kpi_value_list()) assert isinstance(response, Empty) - diff --git a/src/kpi_value_writer/service/KpiValueComposer.py b/src/kpi_value_writer/service/KpiValueComposer.py deleted file mode 100644 index e2f315eda124e6a2d31208ff9194e8600694ceff..0000000000000000000000000000000000000000 --- a/src/kpi_value_writer/service/KpiValueComposer.py +++ /dev/null @@ -1,138 +0,0 @@ -# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -# read Kafka stream from Kafka topic - -import re -import logging -import threading -from confluent_kafka import KafkaError -from confluent_kafka import Producer as KafkaProducer -from confluent_kafka import Consumer as KafkaConsumer -from kpi_management.service.database.Kpi_DB import Kpi_DB -from kpi_management.service.database.KpiModel import Kpi as KpiModel - -LOGGER = logging.getLogger(__name__) -# KAFKA_SERVER_IP = '10.152.183.175:30092' -KAFKA_SERVER_IP = '127.0.0.1:9092' -# ADMIN_KAFKA_CLIENT = AdminClient({'bootstrap.servers': KAFKA_SERVER_IP}) -KAFKA_TOPICS = {'request' : 'topic_request', 'response': 'topic_response', - 'raw' : 'topic_raw' , 'labeled' : 'topic_labeled'} -PRODUCER_CONFIG = {'bootstrap.servers': KAFKA_SERVER_IP,} -CONSUMER_CONFIG = {'bootstrap.servers' : KAFKA_SERVER_IP, - 'group.id' : 'kpi_composer', - 'auto.offset.reset' : 'latest'} -KPIs_TO_SEARCH = ["node_network_receive_packets_total", - "node_network_receive_bytes_total", - "node_network_transmit_bytes_total", - "process_open_fds"] -DB_TABLE_NAME = KpiModel - -class KpiValueComposer: - def __init__(self) -> None: - pass - - @staticmethod - def compose_kpi(): - threading.Thread(target=KpiValueComposer.kafka_listener, args=()).start() - - @staticmethod - def kafka_listener(): - """ - listener for events on Kafka topic. - """ - kafka_consumer = KafkaConsumer(CONSUMER_CONFIG) - kafka_consumer.subscribe([KAFKA_TOPICS['raw']]) - while True: - receive_msg = kafka_consumer.poll(2.0) - if receive_msg is None: - # print (" - Telemetry frontend listening on Kafka Topic: ", KAFKA_TOPICS['raw']) # added for debugging purposes - continue - elif receive_msg.error(): - if receive_msg.error().code() == KafkaError._PARTITION_EOF: - continue - else: - print("Consumer error: {}".format(receive_msg.error())) - continue - try: - new_event = receive_msg.value().decode('utf-8') - KpiValueComposer.process_event_and_label_kpi(new_event) - except Exception as e: - print(f"Error to consume event from topic: {KAFKA_TOPICS['raw']}. Error detail: {str(e)}") - continue - - @staticmethod - def process_event_and_label_kpi(event): - pattern = re.compile("|".join(map(re.escape, KPIs_TO_SEARCH))) - lines = event.split('\n') - # matching_rows = [] - sub_names = kpi_value = "" - for line in lines: - try: - if pattern.search(line) and not line.startswith("# HELP") and not line.startswith("# TYPE") and not 'device="lo"' in line: - (kpi_name, kpi_value) = line.split(" ") - if kpi_name.endswith('}'): - (kpi_name, sub_names) = kpi_name.replace('}','').split('{') - print("Received KPI from raw topic: {:}".format((kpi_name, sub_names, kpi_value))) - kpi_descriptor = KpiValueComposer.request_kpi_descriptor_from_db(kpi_name) - if kpi_descriptor is not None: - kpi_to_produce = KpiValueComposer.merge_kpi_descriptor_and_value(kpi_descriptor, kpi_value) - producerObj = KafkaProducer(PRODUCER_CONFIG) - producerObj.produce(KAFKA_TOPICS['labeled'], key="labeled", value= str(kpi_to_produce), callback=KpiValueComposer.delivery_callback) - producerObj.flush() - else: - print ("No matching of KPI ({:}) found in db".format(kpi_name)) - except Exception as e: - print("Unable to extract kpi name and value from raw data: ERROR Info: {:}".format(e)) - - @staticmethod - def request_kpi_descriptor_from_db(kpi_name: str = KPIs_TO_SEARCH[0]): # = KPIs_TO_SEARCH[0] is added for testing - col_name = "kpi_description" - kpiDBobj = Kpi_DB() - row = kpiDBobj.search_db_row_by_id(DB_TABLE_NAME, col_name, kpi_name) - if row is not None: - LOGGER.info("Extracted Row: {:}".format(row)) - return row - else: - return None - - @staticmethod - def merge_kpi_descriptor_and_value(kpi_descriptor, kpi_value): - # Creating a dictionary from the kpi_descriptor's attributes - kpi_dict = { - 'kpi_id' : kpi_descriptor.kpi_id, - 'kpi_description': kpi_descriptor.kpi_description, - 'kpi_sample_type': kpi_descriptor.kpi_sample_type, - 'device_id' : kpi_descriptor.device_id, - 'endpoint_id' : kpi_descriptor.endpoint_id, - 'service_id' : kpi_descriptor.service_id, - 'slice_id' : kpi_descriptor.slice_id, - 'connection_id' : kpi_descriptor.connection_id, - 'link_id' : kpi_descriptor.link_id, - 'kpi_value' : kpi_value - } - return kpi_dict - - @staticmethod - def delivery_callback( err, msg): - """ - Callback function to handle message delivery status. - Args: - err (KafkaError): Kafka error object. - msg (Message): Kafka message object. - """ - if err: - print(f'Message delivery failed: {err}') - else: - print(f'Message delivered to topic {msg.topic()}') \ No newline at end of file diff --git a/src/kpi_value_writer/service/KpiValueWriter.py b/src/kpi_value_writer/service/KpiValueWriter.py index 73dd0a20bc3ae1d4cf8508278e2c34d5f987a77e..aa0f2cf79128e962801fda457edb8256ee4735b1 100644 --- a/src/kpi_value_writer/service/KpiValueWriter.py +++ b/src/kpi_value_writer/service/KpiValueWriter.py @@ -12,23 +12,16 @@ # See the License for the specific language governing permissions and # limitations under the License. -import grpc -import json import logging import threading from common.tools.kafka.Variables import KafkaConfig, KafkaTopic - from common.proto.kpi_value_api_pb2 import KpiValue from common.proto.kpi_manager_pb2 import KpiDescriptor, KpiId -from common.proto.kpi_manager_pb2_grpc import KpiManagerServiceStub from confluent_kafka import KafkaError from confluent_kafka import Consumer as KafkaConsumer from kpi_manager.client.KpiManagerClient import KpiManagerClient -from monitoring.service.NameMapping import NameMapping -from kpi_manager.service.KpiManagerService import KpiManagerService - # -- test import -- from kpi_value_writer.tests.test_messages import create_kpi_descriptor_request from .MetricWriterToPrometheus import MetricWriterToPrometheus @@ -38,7 +31,6 @@ LOGGER = logging.getLogger(__name__) ACTIVE_CONSUMERS = [] class KpiValueWriter: - @staticmethod def RunKafkaConsumer(): thread = threading.Thread(target=KpiValueWriter.KafkaConsumer, args=()) diff --git a/src/kpi_value_writer/service/NameMapping.py b/src/kpi_value_writer/service/NameMapping.py deleted file mode 100644 index f98e367b17b4a2e4c7c6f3dcdb90dfb8ee24d3ad..0000000000000000000000000000000000000000 --- a/src/kpi_value_writer/service/NameMapping.py +++ /dev/null @@ -1,46 +0,0 @@ -# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import threading -from typing import Dict, Optional - -class NameMapping: - def __init__(self) -> None: - self.__lock = threading.Lock() - self.__device_to_name : Dict[str, str] = dict() - self.__endpoint_to_name : Dict[str, str] = dict() - - def get_device_name(self, device_uuid : str) -> Optional[str]: - with self.__lock: - return self.__device_to_name.get(device_uuid) - - def get_endpoint_name(self, endpoint_uuid : str) -> Optional[str]: - with self.__lock: - return self.__endpoint_to_name.get(endpoint_uuid) - - def set_device_name(self, device_uuid : str, device_name : str) -> None: - with self.__lock: - self.__device_to_name[device_uuid] = device_name - - def set_endpoint_name(self, endpoint_uuid : str, endpoint_name : str) -> None: - with self.__lock: - self.__endpoint_to_name[endpoint_uuid] = endpoint_name - - def delete_device_name(self, device_uuid : str) -> None: - with self.__lock: - self.__device_to_name.pop(device_uuid, None) - - def delete_endpoint_name(self, endpoint_uuid : str) -> None: - with self.__lock: - self.__endpoint_to_name.pop(endpoint_uuid, None) diff --git a/src/kpi_value_writer/service/__main__.py b/src/kpi_value_writer/service/__main__.py index 3e1d8989f4492c47f1e78e57a107ccb033d3f4c0..028585575c65874658d3d29efaf0dee0ce4deaae 100644 --- a/src/kpi_value_writer/service/__main__.py +++ b/src/kpi_value_writer/service/__main__.py @@ -14,7 +14,6 @@ import logging, signal, sys, threading from prometheus_client import start_http_server -from .NameMapping import NameMapping # import updated from .KpiValueWriter import KpiValueWriter from common.Settings import get_log_level, get_metrics_port @@ -37,11 +36,7 @@ def main(): LOGGER.debug('Starting...') - start_http_server(get_metrics_port) # add Prometheus client port - - name_mapping = NameMapping() - - grpc_service = KpiValueWriter(name_mapping) + grpc_service = KpiValueWriter() grpc_service.start() # Wait for Ctrl+C or termination signal diff --git a/src/kpi_value_writer/tests/test_kpi_composer.py b/src/kpi_value_writer/tests/test_kpi_composer.py deleted file mode 100644 index fa75ba2ab67b73565b408b9404e90e0eab9c2ae6..0000000000000000000000000000000000000000 --- a/src/kpi_value_writer/tests/test_kpi_composer.py +++ /dev/null @@ -1,31 +0,0 @@ -# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import threading -import logging -from kpi_manager.service.KpiValueComposer import KpiValueComposer - -LOGGER = logging.getLogger(__name__) - -def test_compose_kpi(): - LOGGER.info(' >>> test_compose_kpi START <<< ') - KpiValueComposer.compose_kpi() - -# def test_request_kpi_descriptor_from_db(): -# LOGGER.info(' >>> test_request_kpi_descriptor_from_db START <<< ') -# KpiValueComposer.request_kpi_descriptor_from_db() - -# def test_delete_kpi_by_id(): -# LOGGER.info(' >>> test_test_delete_kpi_by_id START <<< ') -# KpiValueComposer.delete_kpi_by_id() \ No newline at end of file diff --git a/src/kpi_value_writer/tests/test_messages.py b/src/kpi_value_writer/tests/test_messages.py index 64add9a63a911eb083c11887e88f92689bbc4e52..89a41fa08ad37b7d9b305bba6e7c445fea5cd18a 100755 --- a/src/kpi_value_writer/tests/test_messages.py +++ b/src/kpi_value_writer/tests/test_messages.py @@ -23,18 +23,17 @@ def create_kpi_id_request(): _create_kpi_id.kpi_id.uuid = str(uuid.uuid4()) return _create_kpi_id - def create_kpi_descriptor_request(description: str = "Test Description"): _create_kpi_request = kpi_manager_pb2.KpiDescriptor() _create_kpi_request.kpi_id.kpi_id.uuid = str(uuid.uuid4()) _create_kpi_request.kpi_description = description _create_kpi_request.kpi_sample_type = KpiSampleType.KPISAMPLETYPE_PACKETS_RECEIVED - _create_kpi_request.device_id.device_uuid.uuid = 'DEV4' # pylint: disable=maybe-no-member - _create_kpi_request.service_id.service_uuid.uuid = 'SERV3' # pylint: disable=maybe-no-member - _create_kpi_request.slice_id.slice_uuid.uuid = 'SLC3' # pylint: disable=maybe-no-member - _create_kpi_request.endpoint_id.endpoint_uuid.uuid = 'END2' # pylint: disable=maybe-no-member - _create_kpi_request.connection_id.connection_uuid.uuid = 'CON2' # pylint: disable=maybe-no-member - _create_kpi_request.link_id.link_uuid.uuid = 'LNK2' # pylint: disable=maybe-no-member + _create_kpi_request.device_id.device_uuid.uuid = 'DEV4' + _create_kpi_request.service_id.service_uuid.uuid = 'SERV3' + _create_kpi_request.slice_id.slice_uuid.uuid = 'SLC3' + _create_kpi_request.endpoint_id.endpoint_uuid.uuid = 'END2' + _create_kpi_request.connection_id.connection_uuid.uuid = 'CON2' + _create_kpi_request.link_id.link_uuid.uuid = 'LNK2' return _create_kpi_request def create_kpi_value_request(): @@ -42,4 +41,4 @@ def create_kpi_value_request(): _create_kpi_value_request.kpi_id.kpi_id.uuid = str(uuid.uuid4()) _create_kpi_value_request.timestamp.timestamp = time.time() _create_kpi_value_request.kpi_value_type.floatVal = random.randint(10, 10000) - return _create_kpi_value_request \ No newline at end of file + return _create_kpi_value_request diff --git a/src/kpi_value_writer/tests/test_metric_writer_to_prom.py b/src/kpi_value_writer/tests/test_metric_writer_to_prom.py index 44f12ecea1a434793649d30bdd7ea5edf1d88e7d..f60e96253ae8edb29eedcbe2d6e66aaeb450229c 100644 --- a/src/kpi_value_writer/tests/test_metric_writer_to_prom.py +++ b/src/kpi_value_writer/tests/test_metric_writer_to_prom.py @@ -26,4 +26,3 @@ def test_metric_writer_to_prometheus(): create_kpi_descriptor_request(), create_kpi_value_request() ) -