Newer
Older
# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import pytz
from common.tools.service.GenericGrpcService import GenericGrpcService
from common.tools.kafka.Variables import KafkaConfig, KafkaTopic
from confluent_kafka import Consumer as KafkaConsumer
from confluent_kafka import KafkaError
from common.Constants import ServiceNameEnum
from common.Settings import get_service_port_grpc
from threading import Thread, Event
from analytics.backend.service.Streamer import DaskStreamer
from common.proto.analytics_frontend_pb2 import Analyzer
from apscheduler.schedulers.background import BackgroundScheduler
from datetime import datetime, timedelta
class AnalyticsBackendService(GenericGrpcService):
"""
Class listens for ...
"""
def __init__(self, cls_name : str = __name__) -> None:
LOGGER.info('Init AnalyticsBackendService')
port = get_service_port_grpc(ServiceNameEnum.ANALYTICSBACKEND)
super().__init__(port, cls_name=cls_name)
self.schedular = BackgroundScheduler(daemon=True)
self.schedular.start()
self.running_threads = {} # To keep track of all running analyzers
Konstantinos Poulakakis
committed
self.kafka_consumer = KafkaConsumer({'bootstrap.servers' : KafkaConfig.get_kafka_address(),
'group.id' : 'analytics-frontend',
'auto.offset.reset' : 'latest'})
threading.Thread(
target=self.RequestListener,
args=()
).start()
LOGGER.info("Request Listener is initiated ...")
# print ("Request Listener is initiated ...")
consumer = self.kafka_consumer
consumer.subscribe([KafkaTopic.ANALYTICS_REQUEST.value])
receive_msg = consumer.poll(2.0)
if receive_msg is None:
continue
elif receive_msg.error():
if receive_msg.error().code() == KafkaError._PARTITION_EOF:
continue
else:
LOGGER.error("Consumer error: {:}".format(receive_msg.error()))
# print ("Consumer error: {:}".format(receive_msg.error()))
try:
analyzer = json.loads(receive_msg.value().decode('utf-8'))
analyzer_uuid = receive_msg.key().decode('utf-8')
LOGGER.debug('Recevied Analyzer: {:} - {:}'.format(analyzer_uuid, analyzer))
# print ('Recevied Analyzer: {:} - {:}'.format(analyzer_uuid, analyzer))
if analyzer["algo_name"] is None and analyzer["oper_mode"] is None:
self.StopStreamer(analyzer_uuid)
else:
self.StartStreamer(analyzer_uuid, analyzer)
except Exception as e:
LOGGER.warning("Unable to consume message from topic: {:}. ERROR: {:}".format(KafkaTopic.ANALYTICS_REQUEST.value, e))
def StartStreamer(self, analyzer_uuid : str, analyzer : json):
"""
Start the DaskStreamer with the given parameters.
"""
streamer = DaskStreamer(
analyzer_uuid,
analyzer['input_kpis' ],
analyzer['output_kpis'],
analyzer['thresholds' ],
analyzer['batch_size' ],
analyzer['window_size'],
self.schedular.add_job(
streamer.run,
'date',
run_date=datetime.now(pytz.utc),
id=analyzer_uuid,
replace_existing=True
)
LOGGER.info("Dask Streamer started.")
return True
except Exception as e:
LOGGER.error("Failed to start Dask Streamer. ERROR: {:}".format(e))
def StopStreamer(self, analyzer_uuid : str):
"""
Stop the DaskStreamer with the given analyzer_uuid.
"""
try:
active_jobs = self.schedular.get_jobs()
logger.debug("Active Jobs: {:}".format(active_jobs))
if analyzer_uuid not in [job.id for job in active_jobs]:
LOGGER.warning("Dask Streamer not found with the given analyzer_uuid: {:}".format(analyzer_uuid))
self.schedular.remove_job(analyzer_uuid)
LOGGER.info("Dask Streamer stopped.")
return True
except Exception as e:
LOGGER.error("Failed to stop Dask Streamer. ERROR: {:}".format(e))
return False