Newer
Older
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import time
import logging
import threading
from typing import Any, Dict
from datetime import datetime, timezone
from confluent_kafka import Producer as KafkaProducer
from confluent_kafka import Consumer as KafkaConsumer
from confluent_kafka import KafkaError
from common.Constants import ServiceNameEnum
from common.Settings import get_service_port_grpc
from common.method_wrappers.Decorator import MetricsPool
from common.tools.kafka.Variables import KafkaConfig, KafkaTopic
from common.tools.service.GenericGrpcService import GenericGrpcService
LOGGER = logging.getLogger(__name__)
METRICS_POOL = MetricsPool('TelemetryBackend', 'backendService')
"""
Class listens for request on Kafka topic, fetches requested metrics from device.
Produces metrics on both TELEMETRY_RESPONSE and VALUE kafka topics.
"""
def __init__(self, cls_name : str = __name__) -> None:
LOGGER.info('Init TelemetryBackendService')
port = get_service_port_grpc(ServiceNameEnum.TELEMETRYBACKEND)
super().__init__(port, cls_name=cls_name)
self.kafka_producer = KafkaProducer({'bootstrap.servers' : KafkaConfig.get_kafka_address()})
self.kafka_consumer = KafkaConsumer({'bootstrap.servers' : KafkaConfig.get_kafka_address(),
'group.id' : 'backend',
'auto.offset.reset' : 'latest'})
self.running_threads = {}
self.emulatorCollector = None
self.metric_queue = queue.Queue()
def install_servicers(self):
threading.Thread(target=self.RequestListener).start()
def RequestListener(self):
"""
"""
LOGGER.info('Telemetry backend request listener is running ...')
# print ('Telemetry backend request listener is running ...')
consumer.subscribe([KafkaTopic.TELEMETRY_REQUEST.value])
while True:
if receive_msg is None:
continue
elif receive_msg.error():
if receive_msg.error().code() == KafkaError._PARTITION_EOF:
continue
else:
LOGGER.error("Consumer error: {}".format(receive_msg.error()))
break
try:
collector = json.loads(receive_msg.value().decode('utf-8'))
collector_id = receive_msg.key().decode('utf-8')
LOGGER.debug('Recevied Collector: {:} - {:}'.format(collector_id, collector))
if collector['duration'] == -1 and collector['interval'] == -1:
self.TerminateCollectorBackend(collector_id)
else:
threading.Thread(target=self.InitiateCollectorBackend,
args=(collector_id, collector)).start()
except Exception as e:
LOGGER.warning("Unable to consumer message from topic: {:}. ERROR: {:}".format(KafkaTopic.TELEMETRY_REQUEST.value, e))
def InitiateCollectorBackend(self, collector_id, collector):
Method receives collector request and initiates collecter backend.

Waleed Akbar
committed
LOGGER.info("Initiating backend for collector: (Not Implemented... In progress ) {:s}".format(str(collector_id)))
# start_time = time.time()
# self.emulatorCollector = NetworkMetricsEmulator(
# duration = collector['duration'],
# interval = collector['interval'],
# metric_queue = self.metric_queue
# )
# self.emulatorCollector.start()
# self.running_threads[collector_id] = self.emulatorCollector

Waleed Akbar
committed

Waleed Akbar
committed
# while self.emulatorCollector.is_alive():
# if not self.metric_queue.empty():
# metric_value = self.metric_queue.get()
# LOGGER.debug("Metric: {:} - Value : {:}".format(collector['kpi_id'], metric_value))
# self.GenerateKpiValue(collector_id, collector['kpi_id'] , metric_value)
# time.sleep(1)
# self.TerminateCollectorBackend(collector_id)
def GenerateKpiValue(self, collector_id: str, kpi_id: str, measured_kpi_value: Any):
Method to write kpi value on VALUE Kafka topic
"""
producer = self.kafka_producer
kpi_value : Dict = {
"time_stamp": datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ"),
key = collector_id,
value = json.dumps(kpi_value),
callback = self.delivery_callback
)
producer.flush()
def TerminateCollectorBackend(self, collector_id):
LOGGER.debug("Terminating collector backend...")
if collector_id in self.running_threads:
thread = self.running_threads[collector_id]
thread.stop()
del self.running_threads[collector_id]
LOGGER.debug("Collector backend terminated. Collector ID: {:}".format(collector_id))
self.GenerateCollectorTerminationSignal(collector_id, "-1", -1) # Termination confirmation to frontend.
else:
LOGGER.warning('Backend collector {:} not found'.format(collector_id))

Waleed Akbar
committed
def GenerateCollectorTerminationSignal(self, collector_id: str, kpi_id: str, measured_kpi_value: Any):

Waleed Akbar
committed
"""
Method to write kpi Termination signat on TELEMETRY_RESPONSE Kafka topic

Waleed Akbar
committed
"""
producer = self.kafka_producer
kpi_value : Dict = {
"kpi_id" : kpi_id,
key = collector_id,
value = json.dumps(kpi_value),
callback = self.delivery_callback
)
producer.flush()

Waleed Akbar
committed
if err:
LOGGER.error('Message delivery failed: {:s}'.format(str(err)))
# print(f'Message delivery failed: {err}')
# else:
# LOGGER.info('Message delivered to topic {:}'.format(msg.topic()))
# print(f'Message delivered to topic {msg.topic()}')