"README.md" did not exist on "a36344f3f5ea2c31a25e3951cfe3ee432f4b9a0a"
Newer
Older
# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import time
import logging
import threading
from typing import Any, Dict
from datetime import datetime, timezone
from confluent_kafka import Producer as KafkaProducer
from confluent_kafka import Consumer as KafkaConsumer
from confluent_kafka import KafkaError
from common.Constants import ServiceNameEnum
from common.Settings import get_service_port_grpc
from common.method_wrappers.Decorator import MetricsPool
from common.tools.kafka.Variables import KafkaConfig, KafkaTopic
from common.tools.service.GenericGrpcService import GenericGrpcService
from telemetry.backend.collectors.emulated.EmulatedCollector import EmulatedCollector
LOGGER = logging.getLogger(__name__)
METRICS_POOL = MetricsPool('TelemetryBackend', 'backendService')
"""
Class listens for request on Kafka topic, fetches requested metrics from device.
Produces metrics on both TELEMETRY_RESPONSE and VALUE kafka topics.
"""
def __init__(self, cls_name : str = __name__) -> None:
LOGGER.info('Init TelemetryBackendService')
port = get_service_port_grpc(ServiceNameEnum.TELEMETRYBACKEND)
super().__init__(port, cls_name=cls_name)
self.kafka_producer = KafkaProducer({'bootstrap.servers' : KafkaConfig.get_kafka_address()})
self.kafka_consumer = KafkaConsumer({'bootstrap.servers' : KafkaConfig.get_kafka_address(),
'group.id' : 'backend',
'auto.offset.reset' : 'latest'})
def install_servicers(self):
threading.Thread(target=self.RequestListener).start()
def RequestListener(self):
"""
"""
LOGGER.info('Telemetry backend request listener is running ...')
# print ('Telemetry backend request listener is running ...')
consumer.subscribe([KafkaTopic.TELEMETRY_REQUEST.value])
while True:
if receive_msg is None:
continue
elif receive_msg.error():
if receive_msg.error().code() == KafkaError._PARTITION_EOF:
continue
elif receive_msg.error().code() == KafkaError.UNKNOWN_TOPIC_OR_PART:
LOGGER.warning(f"Subscribed topic {receive_msg.topic()} does not exist. May be topic does not have any messages.")
continue
else:
LOGGER.error("Consumer error: {}".format(receive_msg.error()))
break
try:
collector = json.loads(
receive_msg.value().decode('utf-8')
)
collector_id = receive_msg.key().decode('utf-8')
LOGGER.debug('Recevied Collector: {:} - {:}'.format(collector_id, collector))
duration = collector.get('duration', -1)
if duration == -1 and collector['interval'] == -1:
self.TerminateCollector(collector_id)
else:
LOGGER.info("Collector ID: {:} - Scheduling...".format(collector_id))
if collector_id not in self.active_jobs:
stop_event = threading.Event()
self.active_jobs[collector_id] = stop_event
threading.Thread(target = self.CollectorHandler,
args=(
collector_id,
collector['kpi_id'],
duration,
collector['interval'],
stop_event
)).start()
# Stop the Collector after the given duration
if duration > 0:
def stop_after_duration():
time.sleep(duration)
LOGGER.warning(f"Execution duration ({duration}) completed of Collector: {collector_id}")
self.TerminateCollector(collector_id)
duration_thread = threading.Thread(
target=stop_after_duration, daemon=True, name=f"stop_after_duration_{collector_id}"
)
duration_thread.start()
else:
LOGGER.warning("Collector ID: {:} - Already scheduled or running".format(collector_id))
except Exception as e:
LOGGER.warning("Unable to consumer message from topic: {:}. ERROR: {:}".format(KafkaTopic.TELEMETRY_REQUEST.value, e))
def CollectorHandler(self, collector_id, kpi_id, duration, interval, stop_event):
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
end_points : list = self.get_endpoints_from_kpi_id(kpi_id)
if not end_points:
LOGGER.warning("KPI ID: {:} - Endpoints not found. Skipping...".format(kpi_id))
device_type : str = self.get_device_type_from_kpi_id(kpi_id)
if device_type == "Unknown":
LOGGER.warning("KPI ID: {:} - Device Type not found. Skipping...".format(kpi_id))
if device_type == "EMU-Device":
LOGGER.info("KPI ID: {:} - Device Type: {:} - Endpoints: {:}".format(kpi_id, device_type, end_points))
subscription = [collector_id, end_points, duration, interval]
self.EmulatedCollectorHandler(subscription, kpi_id, stop_event)
else:
LOGGER.warning("KPI ID: {:} - Device Type: {:} - Not Supported".format(kpi_id, device_type))
def EmulatedCollectorHandler(self, subscription, kpi_id, stop_event):
# EmulatedCollector
collector = EmulatedCollector(address="127.0.0.1", port=8000)
collector.Connect()
while not stop_event.is_set():
# samples = collector.SubscribeState(subscription)
# LOGGER.debug("KPI: {:} - Value: {:}".format(kpi_id, samples))
# self.GenerateKpiValue(job_id, kpi_id, samples)
LOGGER.info("Generating KPI Values ...")
time.sleep(1)
def GenerateKpiValue(self, collector_id: str, kpi_id: str, measured_kpi_value: Any):
Method to write kpi value on VALUE Kafka topic
"""
producer = self.kafka_producer
kpi_value : Dict = {
"time_stamp": datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ"),
key = collector_id,
value = json.dumps(kpi_value),
callback = self.delivery_callback
)
producer.flush()
LOGGER.debug("Terminating collector backend...")
try:
if job_id not in self.active_jobs: # not job_ids:
# self.logger.warning(f"Active jobs: {self.active_jobs}")
self.logger.warning(f"No active jobs found for {job_id}. It might have already terminated.")
else:
LOGGER.info(f"Terminating job: {job_id}")
stop_event = self.active_jobs.pop(job_id, None)
if stop_event:
stop_event.set()
LOGGER.info(f"Job {job_id} terminated.")
else:
LOGGER.warning(f"Job {job_id} not found in active jobs.")
except:
LOGGER.exception("Error terminating job: {:}".format(job_id))

Waleed Akbar
committed
def get_endpoints_from_kpi_id(self, kpi_id: str) -> list:

Waleed Akbar
committed
"""

Waleed Akbar
committed
"""
kpi_endpoints = {
'6e22f180-ba28-4641-b190-2287bf448888': {"uuid": "123e4567-e89b-12d3-a456-42661417ed06", "name": "eth0", "type": "ethernet", "sample_types": [101, 102]},
'123e4567-e89b-12d3-a456-426614174001': {"uuid": "123e4567-e89b-12d3-a456-42661417ed07", "name": "eth1", "type": "ethernet", "sample_types": []},
'123e4567-e89b-12d3-a456-426614174002': {"uuid": "123e4567-e89b-12d3-a456-42661417ed08", "name": "13/1/2", "type": "copper", "sample_types": [101, 102, 201, 202]},
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
return [kpi_endpoints.get(kpi_id, {})] if kpi_id in kpi_endpoints else []
def get_device_type_from_kpi_id(self, kpi_id: str) -> str:
"""
Method to get device type based on kpi_id.
"""
kpi_device_types = {
"123e4567-e89b-12d3-a456-42661type003" : {'device_type': "PKT-Device"},
"123e4567-e89b-12d3-a456-42661type004" : {'device_type': "OPT-Device"},
"6e22f180-ba28-4641-b190-2287bf448888" : {'device_type': "EMU-Device"},
}
return kpi_device_types.get(kpi_id, {}).get('device_type', "Unknown")
# def TerminateCollectorBackend(self, collector_id):
# LOGGER.debug("Terminating collector backend...")
# if collector_id in self.running_threads:
# thread = self.running_threads[collector_id]
# thread.stop()
# del self.running_threads[collector_id]
# LOGGER.debug("Collector backend terminated. Collector ID: {:}".format(collector_id))
# self.GenerateCollectorTerminationSignal(collector_id, "-1", -1) # Termination confirmation to frontend.
# else:
# LOGGER.warning('Backend collector {:} not found'.format(collector_id))
# def GenerateCollectorTerminationSignal(self, collector_id: str, kpi_id: str, measured_kpi_value: Any):
# """
# Method to write kpi Termination signat on TELEMETRY_RESPONSE Kafka topic
# """
# producer = self.kafka_producer
# kpi_value : Dict = {
# "kpi_id" : kpi_id,
# "kpi_value" : measured_kpi_value,
# }
# producer.produce(
# KafkaTopic.TELEMETRY_RESPONSE.value,
# key = collector_id,
# value = json.dumps(kpi_value),
# callback = self.delivery_callback
# )
# producer.flush()

Waleed Akbar
committed
if err:
LOGGER.error('Message delivery failed: {:s}'.format(str(err)))