Skip to content
Snippets Groups Projects
Commit 5883eac9 authored by Waleed Akbar's avatar Waleed Akbar
Browse files

KPI Value writer and API in progress

parent b3f13a9f
No related branches found
No related tags found
2 merge requests!294Release TeraFlowSDN 4.0,!207Resolve "(CTTC) Separation of Monitoring"
...@@ -12,10 +12,17 @@ ...@@ -12,10 +12,17 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import logging
from enum import Enum from enum import Enum
from confluent_kafka import KafkaException
from confluent_kafka.admin import AdminClient, NewTopic
LOGGER = logging.getLogger(__name__)
class KafkaConfig(Enum): class KafkaConfig(Enum):
SERVER_IP = "127.0.0.1:9092" SERVER_IP = "127.0.0.1:9092"
ADMIN_CLIENT = AdminClient({'bootstrap.servers': SERVER_IP})
class KafkaTopic(Enum): class KafkaTopic(Enum):
REQUEST = 'topic_request' REQUEST = 'topic_request'
...@@ -24,4 +31,42 @@ class KafkaTopic(Enum): ...@@ -24,4 +31,42 @@ class KafkaTopic(Enum):
LABELED = 'topic_labeled' LABELED = 'topic_labeled'
VALUE = 'topic_value' VALUE = 'topic_value'
@staticmethod
def create_all_topics() -> bool:
"""
Method to create Kafka topics defined as class members
"""
# LOGGER.debug("Topics to be created: {:}".format(KafkaTopic.__members__.values()))
# LOGGER.debug("Topics to be created: {:}".format(KafkaTopic.__members__.keys()))
# LOGGER.debug("Topics to be created: {:}".format([member.value for member in KafkaTopic]))
all_topics = [member.value for member in KafkaTopic]
if( KafkaTopic.create_new_topic_if_not_exists( all_topics )):
LOGGER.debug("All topics created sucsessfully")
return True
else:
LOGGER.debug("Error creating all topics")
return False
@staticmethod
def create_new_topic_if_not_exists(new_topics: list) -> bool:
"""
Method to create Kafka topic if it does not exist.
Args:
list of topic: containing the topic name(s) to be created on Kafka
"""
LOGGER.debug("Recevied topic List: {:}".format(new_topics))
for topic in new_topics:
try:
topic_metadata = KafkaConfig.ADMIN_CLIENT.value.list_topics(timeout=5)
if topic not in topic_metadata.topics:
# If the topic does not exist, create a new topic
print(f"Topic '{topic}' does not exist. Creating...")
LOGGER.debug("Topic {:} does not exist. Creating...".format(topic))
new_topic = NewTopic(topic, num_partitions=1, replication_factor=1)
KafkaConfig.ADMIN_CLIENT.value.create_topics([new_topic])
except Exception as e:
LOGGER.debug("Failed to create topic: {:}".format(e))
return False
return True
# create all topics after the deployments (Telemetry and Analytics) # create all topics after the deployments (Telemetry and Analytics)
\ No newline at end of file
...@@ -31,7 +31,7 @@ def signal_handler(signal, frame): # pylint: disable=redefined-outer-name ...@@ -31,7 +31,7 @@ def signal_handler(signal, frame): # pylint: disable=redefined-outer-name
terminate.set() terminate.set()
def start_kpi_manager(name_mapping : NameMapping): def start_kpi_manager(name_mapping : NameMapping):
LOGGER.info('Start Monitoring...',) LOGGER.info('Start Kpi Manager...',)
events_collector = EventsDeviceCollector(name_mapping) events_collector = EventsDeviceCollector(name_mapping)
events_collector.start() events_collector.start()
......
...@@ -211,14 +211,14 @@ def kpi_manager_client(kpi_manager_service : KpiManagerService): # pylint: disab ...@@ -211,14 +211,14 @@ def kpi_manager_client(kpi_manager_service : KpiManagerService): # pylint: disab
# LOGGER.info("Response of delete method gRPC message object: {:}".format(del_response)) # LOGGER.info("Response of delete method gRPC message object: {:}".format(del_response))
# assert isinstance(del_response, Empty) # assert isinstance(del_response, Empty)
# def test_GetKpiDescriptor(kpi_manager_client): def test_GetKpiDescriptor(kpi_manager_client):
# LOGGER.info(" >>> test_GetKpiDescriptor: START <<< ") LOGGER.info(" >>> test_GetKpiDescriptor: START <<< ")
# # adding KPI # adding KPI
# response_id = kpi_manager_client.SetKpiDescriptor(create_kpi_descriptor_request()) response_id = kpi_manager_client.SetKpiDescriptor(create_kpi_descriptor_request())
# # get KPI # get KPI
# response = kpi_manager_client.GetKpiDescriptor(response_id) response = kpi_manager_client.GetKpiDescriptor(response_id)
# LOGGER.info("Response gRPC message object: {:}".format(response)) LOGGER.info("Response gRPC message object: {:}".format(response))
# assert isinstance(response, KpiDescriptor) assert isinstance(response, KpiDescriptor)
# def test_SelectKpiDescriptor(kpi_manager_client): # def test_SelectKpiDescriptor(kpi_manager_client):
# LOGGER.info(" >>> test_SelectKpiDescriptor: START <<< ") # LOGGER.info(" >>> test_SelectKpiDescriptor: START <<< ")
...@@ -229,12 +229,12 @@ def kpi_manager_client(kpi_manager_service : KpiManagerService): # pylint: disab ...@@ -229,12 +229,12 @@ def kpi_manager_client(kpi_manager_service : KpiManagerService): # pylint: disab
# LOGGER.info("Response gRPC message object: {:}".format(response)) # LOGGER.info("Response gRPC message object: {:}".format(response))
# assert isinstance(response, KpiDescriptorList) # assert isinstance(response, KpiDescriptorList)
def test_set_list_of_KPIs(kpi_manager_client): # def test_set_list_of_KPIs(kpi_manager_client):
LOGGER.debug(" >>> test_set_list_of_KPIs: START <<< ") # LOGGER.debug(" >>> test_set_list_of_KPIs: START <<< ")
KPIs_TO_SEARCH = ["node_in_power_total", "node_in_current_total", "node_out_power_total"] # KPIs_TO_SEARCH = ["node_in_power_total", "node_in_current_total", "node_out_power_total"]
# adding KPI # # adding KPI
for kpi in KPIs_TO_SEARCH: # for kpi in KPIs_TO_SEARCH:
kpi_manager_client.SetKpiDescriptor(create_kpi_descriptor_request_a(kpi)) # kpi_manager_client.SetKpiDescriptor(create_kpi_descriptor_request_a(kpi))
# ---------- 2nd Iteration Tests ----------------- # ---------- 2nd Iteration Tests -----------------
......
...@@ -12,8 +12,8 @@ ...@@ -12,8 +12,8 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import logging, grpc import logging, grpc, json
from typing import Tuple, Any from typing import Tuple, Any, List, Dict
from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_method from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_method
from common.tools.kafka.Variables import KafkaConfig, KafkaTopic from common.tools.kafka.Variables import KafkaConfig, KafkaTopic
...@@ -37,22 +37,30 @@ class KpiValueApiServiceServicerImpl(KpiValueAPIServiceServicer): ...@@ -37,22 +37,30 @@ class KpiValueApiServiceServicerImpl(KpiValueAPIServiceServicer):
def StoreKpiValues(self, request: KpiValueList, grpc_context: grpc.ServicerContext def StoreKpiValues(self, request: KpiValueList, grpc_context: grpc.ServicerContext
) -> Empty: ) -> Empty:
LOGGER.debug('StoreKpiValues: Received gRPC message object: {:}'.format(request)) LOGGER.debug('StoreKpiValues: Received gRPC message object: {:}'.format(request))
producer_obj = KafkaProducer({'bootstrap.servers' : KafkaConfig.SERVER_IP.value})
producer_obj = KafkaProducer({
'bootstrap.servers' : KafkaConfig.SERVER_IP.value
})
for kpi_value in request.kpi_value_list: for kpi_value in request.kpi_value_list:
kpi_value_to_produce : Tuple [str, Any, Any] = ( kpi_value_to_produce : Tuple [str, Any, Any] = (
kpi_value.kpi_id.kpi_id, # kpi_value.kpi_id.kpi_id.uuid kpi_value.kpi_id.kpi_id, # kpi_value.kpi_id.kpi_id.uuid
kpi_value.timestamp, # kpi_value.timestamp.timestamp kpi_value.timestamp, # kpi_value.timestamp.timestamp
kpi_value.kpi_value_type # kpi_value.kpi_value_type.(many options) kpi_value.kpi_value_type # kpi_value.kpi_value_type.(many options) how?
) )
LOGGER.debug('KPI to produce is {:}'.format(kpi_value_to_produce)) LOGGER.debug('KPI to produce is {:}'.format(kpi_value_to_produce))
msg_key = "gRPC-KpiValueApi" # str(__class__.__name__) msg_key = "gRPC-KpiValueApi" # str(__class__.__name__)
# write this KPI to Kafka # write this KPI to Kafka
producer_obj.produce(KafkaTopic.VALUE.value,
key = msg_key, producer_obj.produce(
value = str(kpi_value_to_produce), KafkaTopic.VALUE.value,
callback = self.delivery_callback key = msg_key,
) # value = json.dumps(kpi_value_to_produce),
value = kpi_value.SerializeToString(),
callback = self.delivery_callback
)
producer_obj.flush() producer_obj.flush()
return Empty() return Empty()
@safe_and_metered_rpc_method(METRICS_POOL, LOGGER) @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
...@@ -61,5 +69,5 @@ class KpiValueApiServiceServicerImpl(KpiValueAPIServiceServicer): ...@@ -61,5 +69,5 @@ class KpiValueApiServiceServicerImpl(KpiValueAPIServiceServicer):
LOGGER.debug('SelectKpiValues: Received gRPC message object: {:}'.format(request)) LOGGER.debug('SelectKpiValues: Received gRPC message object: {:}'.format(request))
def delivery_callback(self, err, msg): def delivery_callback(self, err, msg):
if err: print(f'Message delivery failed: {err}') if err: LOGGER.debug('Message delivery failed: {:}'.format(err))
else: print(f'Message delivered to topic {msg.topic()}') else: print('Message delivered to topic {:}'.format(msg.topic()))
...@@ -17,14 +17,17 @@ import os, logging, pytest ...@@ -17,14 +17,17 @@ import os, logging, pytest
from common.proto.context_pb2 import Empty from common.proto.context_pb2 import Empty
from common.Constants import ServiceNameEnum from common.Constants import ServiceNameEnum
from common.tools.kafka.Variables import KafkaTopic
from common.Settings import ( from common.Settings import (
ENVVAR_SUFIX_SERVICE_HOST, ENVVAR_SUFIX_SERVICE_PORT_GRPC, get_env_var_name, get_service_port_grpc) ENVVAR_SUFIX_SERVICE_HOST, ENVVAR_SUFIX_SERVICE_PORT_GRPC, get_env_var_name, get_service_port_grpc)
from kpi_value_api.service.NameMapping import NameMapping from kpi_value_api.service.NameMapping import NameMapping
from kpi_value_api.service.KpiValueApiService import KpiValueApiService from kpi_value_api.service.KpiValueApiService import KpiValueApiService
from kpi_value_api.client.KpiValueApiClient import KpiValueApiClient from kpi_value_api.client.KpiValueApiClient import KpiValueApiClient
from kpi_value_api.tests.messages import create_kpi_value_list from kpi_value_api.tests.messages import create_kpi_value_list
LOCAL_HOST = '127.0.0.1' LOCAL_HOST = '127.0.0.1'
KPIVALUEAPI_SERVICE_PORT = get_service_port_grpc(ServiceNameEnum.KPIVALUEAPI) # type: ignore KPIVALUEAPI_SERVICE_PORT = get_service_port_grpc(ServiceNameEnum.KPIVALUEAPI) # type: ignore
...@@ -77,6 +80,12 @@ def kpi_value_api_client(kpi_value_api_service : KpiValueApiService ): ...@@ -77,6 +80,12 @@ def kpi_value_api_client(kpi_value_api_service : KpiValueApiService ):
# Tests Implementation of Kpi Value Api # Tests Implementation of Kpi Value Api
########################### ###########################
def test_validate_kafka_topics():
LOGGER.debug(" >>> test_validate_kafka_topics: START <<< ")
response = KafkaTopic.create_all_topics()
assert isinstance(response, bool)
def test_store_kpi_values(kpi_value_api_client): def test_store_kpi_values(kpi_value_api_client):
LOGGER.debug(" >>> test_set_list_of_KPIs: START <<< ") LOGGER.debug(" >>> test_set_list_of_KPIs: START <<< ")
response = kpi_value_api_client.StoreKpiValues(create_kpi_value_list()) response = kpi_value_api_client.StoreKpiValues(create_kpi_value_list())
......
# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) # Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
# #
# Licensed under the Apache License, Version 2.0 (the "License"); # Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License. # you may not use this file except in compliance with the License.
......
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import grpc
import json
import logging
import threading
from common.tools.kafka.Variables import KafkaConfig, KafkaTopic
from common.proto.kpi_value_api_pb2 import KpiValue
from common.proto.kpi_manager_pb2 import KpiDescriptor, KpiId
from common.proto.kpi_manager_pb2_grpc import KpiManagerServiceStub
from confluent_kafka import KafkaError
from confluent_kafka import Consumer as KafkaConsumer
from kpi_manager.client.KpiManagerClient import KpiManagerClient
from monitoring.service.NameMapping import NameMapping
from kpi_manager.service.KpiManagerService import KpiManagerService
LOGGER = logging.getLogger(__name__)
ACTIVE_CONSUMERS = []
class KpiValueWriter:
@staticmethod
def RunKafkaConsumer():
thread = threading.Thread(target=KpiValueWriter.KafkaConsumer, args=())
ACTIVE_CONSUMERS.append(thread)
thread.start()
@staticmethod
def KafkaConsumer():
kafka_consumer = KafkaConsumer(
{ 'bootstrap.servers' : KafkaConfig.SERVER_IP.value,
'group.id' : __class__,
'auto.offset.reset' : 'latest'}
)
kafka_consumer.subscribe([KafkaTopic.VALUE.value])
LOGGER.debug("Kafka Consumer start listenng on topic: ".format(KafkaTopic.VALUE.value))
print("Kafka Consumer start listenng on topic: ".format(KafkaTopic.VALUE.value))
while True:
raw_kpi = kafka_consumer.poll(1.0)
if raw_kpi is None:
continue
elif raw_kpi.error():
if raw_kpi.error().code() == KafkaError._PARTITION_EOF:
continue
else:
print("Consumer error: {}".format(raw_kpi.error()))
continue
try:
kpi_value = KpiValue()
kpi_value.ParseFromString(raw_kpi.value())
LOGGER.debug("Received KPI : {:}".format(kpi_value))
print("Received KPI : {:}".format(kpi_value))
KpiValueWriter.get_kpi_descriptor_from_db(kpi_value.kpi_id.kpi_id.uuid)
except Exception as e:
print("Error detail: {:}".format(e))
continue
@staticmethod
def get_kpi_descriptor_from_db(kpi_value_uuid: str):
print("--- START -----")
kpi_id = KpiId()
kpi_id.kpi_id.uuid = kpi_value_uuid
print("KpiId generated: {:}".format(kpi_id))
kpi_manager_client = KpiManagerClient()
print("Kpi manger client created: {:}".format(kpi_manager_client))
try:
kpi_descriptor_object = KpiDescriptor()
kpi_descriptor_object = kpi_manager_client.GetKpiDescriptor(kpi_id)
print("kpi descriptor received: {:}".format(kpi_descriptor_object))
if isinstance (kpi_descriptor_object, KpiDescriptor):
LOGGER.debug("Extracted row: {:}".format(kpi_descriptor_object))
else:
LOGGER.debug("Error in extracting row {:}".format(kpi_descriptor_object))
except Exception as e:
print ("Unable to get Descriptor. Error: {:}".format(e))
def kpi_manager_service():
LOGGER.info('Initializing KpiManagerService...')
name_mapping = NameMapping()
# _service = MonitoringService(name_mapping)
_service = KpiManagerService(name_mapping)
_service.start()
# yield the server, when test finishes, execution will resume to stop it
LOGGER.info('Yielding KpiManagerService...')
yield _service
LOGGER.info('Terminating KpiManagerService...')
_service.stop()
LOGGER.info('Terminated KpiManagerService...')
def kpi_manager_client_a(kpi_manager_service : KpiManagerService): # pylint: disable=redefined-outer-name,unused-argument
LOGGER.info('Initializing KpiManagerClient...')
_client = KpiManagerClient()
# yield the server, when test finishes, execution will resume to stop it
LOGGER.info('Yielding KpiManagerClient...')
yield _client
LOGGER.info('Closing KpiManagerClient...')
_client.close()
LOGGER.info('Closed KpiManagerClient...')
\ No newline at end of file
...@@ -13,7 +13,18 @@ ...@@ -13,7 +13,18 @@
# limitations under the License. # limitations under the License.
import logging import logging
from kpi_manager.client.KpiManagerClient import KpiManagerClient from kpi_value_writer.service.KpiValueWriter import KpiValueWriter
from kpi_value_writer.tests.test_messages import create_kpi_descriptor_request from common.tools.kafka.Variables import KafkaTopic
LOGGER = logging.getLogger(__name__) LOGGER = logging.getLogger(__name__)
# def test_validate_kafka_topics():
# LOGGER.debug(" >>> test_validate_kafka_topics: START <<< ")
# response = KafkaTopic.create_all_topics()
# assert isinstance(response, bool)
def test_KafkaConsumer():
LOGGER.debug(" >>> test_validate_kafka_topics: START <<< ")
KpiValueWriter.RunKafkaConsumer()
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment