Newer
Older
# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import time
import logging
import threading
from typing import Any, Dict
from datetime import datetime, timezone
from confluent_kafka import Producer as KafkaProducer
from confluent_kafka import Consumer as KafkaConsumer
from confluent_kafka import KafkaError
from common.Constants import ServiceNameEnum
from common.Settings import get_service_port_grpc
from common.method_wrappers.Decorator import MetricsPool
from common.tools.kafka.Variables import KafkaConfig, KafkaTopic
from common.tools.service.GenericGrpcService import GenericGrpcService
from telemetry.backend.collectors.emulated.EmulatedCollector import EmulatedCollector
LOGGER = logging.getLogger(__name__)
METRICS_POOL = MetricsPool('TelemetryBackend', 'backendService')
"""
Class listens for request on Kafka topic, fetches requested metrics from device.
Produces metrics on both TELEMETRY_RESPONSE and VALUE kafka topics.
"""
def __init__(self, cls_name : str = __name__) -> None:
LOGGER.info('Init TelemetryBackendService')
port = get_service_port_grpc(ServiceNameEnum.TELEMETRYBACKEND)
super().__init__(port, cls_name=cls_name)
self.kafka_producer = KafkaProducer({'bootstrap.servers' : KafkaConfig.get_kafka_address()})
self.kafka_consumer = KafkaConsumer({'bootstrap.servers' : KafkaConfig.get_kafka_address(),
'group.id' : 'backend',
'auto.offset.reset' : 'latest'})
self.collector = EmulatedCollector(address="127.0.0.1", port=8000)
def install_servicers(self):
threading.Thread(target=self.RequestListener).start()
def RequestListener(self):
"""
"""
LOGGER.info('Telemetry backend request listener is running ...')
# print ('Telemetry backend request listener is running ...')
consumer.subscribe([KafkaTopic.TELEMETRY_REQUEST.value])
while True:
if receive_msg is None:
continue
elif receive_msg.error():
if receive_msg.error().code() == KafkaError._PARTITION_EOF:
continue
elif receive_msg.error().code() == KafkaError.UNKNOWN_TOPIC_OR_PART:
LOGGER.warning(f"Subscribed topic {receive_msg.topic()} does not exist or topic does not have any messages.")
else:
LOGGER.error("Consumer error: {}".format(receive_msg.error()))
break
try:
collector = json.loads(
receive_msg.value().decode('utf-8')
)
collector_id = receive_msg.key().decode('utf-8')
LOGGER.debug('Recevied Collector: {:} - {:}'.format(collector_id, collector))
if duration == -1 and collector['interval'] == -1:
self.TerminateCollector(collector_id)
else:
LOGGER.info("Received Collector ID: {:} - Scheduling...".format(collector_id))
if collector_id not in self.active_jobs:
stop_event = threading.Event()
self.active_jobs[collector_id] = stop_event
threading.Thread(target = self.CollectorHandler,
args=(
collector_id,
collector['kpi_id'],
duration,
collector['interval'],
stop_event
)).start()
# Stop the Collector after the given duration
if duration > 0:
def stop_after_duration(completion_time, stop_event):
time.sleep(completion_time)
if not stop_event.is_set():
LOGGER.warning(f"Execution duration ({completion_time}) completed of Collector: {collector_id}")
self.TerminateCollector(collector_id)
target=stop_after_duration, daemon=True, name=f"stop_after_duration_{collector_id}",
args=(duration, stop_event)
)
duration_thread.start()
else:
LOGGER.warning("Collector ID: {:} - Already scheduled or running".format(collector_id))
except Exception as e:
LOGGER.warning("Unable to consumer message from topic: {:}. ERROR: {:}".format(KafkaTopic.TELEMETRY_REQUEST.value, e))
def CollectorHandler(self, collector_id, kpi_id, duration, interval, stop_event):
end_points : dict = self.get_endpoints_from_kpi_id(kpi_id)
if not end_points:
LOGGER.warning("KPI ID: {:} - Endpoints not found. Skipping...".format(kpi_id))
device_type : str = self.get_device_type_from_kpi_id(kpi_id)
if device_type == "Unknown":
LOGGER.warning("KPI ID: {:} - Device Type not found. Skipping...".format(kpi_id))
if device_type == "EMU-Device":
LOGGER.info("KPI ID: {:} - Device Type: {:} - Endpoints: {:}".format(kpi_id, device_type, end_points))
subscription = [collector_id, end_points, duration, interval]
self.EmulatedCollectorHandler(subscription, duration, collector_id, kpi_id, stop_event)
else:
LOGGER.warning("KPI ID: {:} - Device Type: {:} - Not Supported".format(kpi_id, device_type))
def EmulatedCollectorHandler(self, subscription, duration, collector_id, kpi_id, stop_event):
self.collector.Connect()
if not self.collector.SubscribeState(subscription):
LOGGER.warning("KPI ID: {:} - Subscription failed. Skipping...".format(kpi_id))
else:
while not stop_event.is_set():
samples = list(self.collector.GetState(duration=duration, blocking=True))
LOGGER.info("KPI: {:} - Value: {:}".format(kpi_id, samples))
self.GenerateKpiValue(collector_id, kpi_id, samples)
time.sleep(1)
self.collector.Disconnect()
# self.TerminateCollector(collector_id) # No need to terminate, automatically terminated after duration.
def GenerateKpiValue(self, collector_id: str, kpi_id: str, measured_kpi_value: Any):
Method to write kpi value on VALUE Kafka topic
"""
producer = self.kafka_producer
kpi_value : Dict = {
"time_stamp": datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ"),
key = collector_id,
value = json.dumps(kpi_value),
callback = self.delivery_callback
)
producer.flush()
LOGGER.debug("Terminating collector backend...")
try:
if job_id not in self.active_jobs: # not job_ids:
# self.logger.warning(f"Active jobs: {self.active_jobs}")
self.logger.warning(f"No active jobs found for {job_id}. It might have already terminated.")
else:
LOGGER.info(f"Terminating job: {job_id}")
stop_event = self.active_jobs.pop(job_id, None)
if stop_event:
stop_event.set()
LOGGER.info(f"Job {job_id} terminated.")
if self.collector.UnsubscribeState(job_id):
LOGGER.info(f"Unsubscribed from collector: {job_id}")
else:
LOGGER.warning(f"Failed to unsubscribe from collector: {job_id}")
else:
LOGGER.warning(f"Job {job_id} not found in active jobs.")
except:
LOGGER.exception("Error terminating job: {:}".format(job_id))

Waleed Akbar
committed
# --- Mock Methods ---
def get_endpoints_from_kpi_id(self, kpi_id: str) -> dict:

Waleed Akbar
committed
"""

Waleed Akbar
committed
"""
kpi_endpoints = {
'6e22f180-ba28-4641-b190-2287bf448888': {"uuid": "123e4567-e89b-12d3-a456-42661417ed06", "name": "eth0", "type": "ethernet", "sample_types": [101, 102]},
'123e4567-e89b-12d3-a456-426614174001': {"uuid": "123e4567-e89b-12d3-a456-42661417ed07", "name": "eth1", "type": "ethernet", "sample_types": []},
'123e4567-e89b-12d3-a456-426614174002': {"uuid": "123e4567-e89b-12d3-a456-42661417ed08", "name": "13/1/2", "type": "copper", "sample_types": [101, 102, 201, 202]},
return kpi_endpoints.get(kpi_id, {}) if kpi_id in kpi_endpoints else {}
def get_device_type_from_kpi_id(self, kpi_id: str) -> str:
"""
Method to get device type based on kpi_id.
"""
kpi_device_types = {
"123e4567-e89b-12d3-a456-42661type003" : {'device_type': "PKT-Device"},
"123e4567-e89b-12d3-a456-42661type004" : {'device_type': "OPT-Device"},
"6e22f180-ba28-4641-b190-2287bf448888" : {'device_type': "EMU-Device"},
}
return kpi_device_types.get(kpi_id, {}).get('device_type', "Unknown")
if err:
LOGGER.error('Message delivery failed: {:s}'.format(str(err)))