Newer
Older
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_method
from common.tools.kafka.Variables import KafkaConfig, KafkaTopic
from common.proto.context_pb2 import Empty
from common.proto.telemetry_frontend_pb2 import CollectorId, Collector, CollectorFilter, CollectorList
from common.proto.telemetry_frontend_pb2_grpc import TelemetryFrontendServiceServicer

Waleed Akbar
committed
from telemetry.database.TelemetryModel import Collector as CollectorModel
from telemetry.database.Telemetry_DB import TelemetryDB
from confluent_kafka import Consumer as KafkaConsumer
from confluent_kafka import Producer as KafkaProducer
from confluent_kafka import KafkaError

Waleed Akbar
committed
LOGGER = logging.getLogger(__name__)
METRICS_POOL = MetricsPool('TelemetryFrontend', 'NBIgRPC')
ACTIVE_COLLECTORS = [] # keep and can be populated from DB
class TelemetryFrontendServiceServicerImpl(TelemetryFrontendServiceServicer):
LOGGER.info('Init TelemetryFrontendService')
self.tele_db_obj = TelemetryDB()
self.kafka_producer = KafkaProducer({'bootstrap.servers' : KafkaConfig.get_kafka_address()})
self.kafka_consumer = KafkaConsumer({'bootstrap.servers' : KafkaConfig.get_kafka_address(),
'group.id' : 'frontend',
'auto.offset.reset' : 'latest'})

Waleed Akbar
committed
@safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
def StartCollector(self,
request : Collector, grpc_context: grpc.ServicerContext # type: ignore
) -> CollectorId: # type: ignore
LOGGER.info ("gRPC message: {:}".format(request))
# TODO: Verify the presence of Kpi ID in KpiDB or assume that KPI ID already exists?
self.tele_db_obj.add_row_to_db(
CollectorModel.ConvertCollectorToRow(request)
)
response.collector_id.uuid = request.collector_id.collector_id.uuid
def PublishStartRequestOnKafka(self, collector_obj):
Method to generate collector request on Kafka.
collector_uuid = collector_obj.collector_id.collector_id.uuid
collector_to_generate : Dict = {
"kpi_id" : collector_obj.kpi_id.kpi_id.uuid,
"duration": collector_obj.duration_s,
"interval": collector_obj.interval_s
}
self.kafka_producer.produce(
KafkaTopic.REQUEST.value,
value = json.dumps(collector_to_generate),
callback = self.delivery_callback
)
LOGGER.info("Collector Request Generated: Collector Id: {:}, Value: {:}".format(collector_uuid, collector_to_generate))
ACTIVE_COLLECTORS.append(collector_uuid)
self.kafka_producer.flush()
@safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
def StopCollector(self,
request : CollectorId, grpc_context: grpc.ServicerContext # type: ignore
) -> Empty: # type: ignore
LOGGER.info ("gRPC message: {:}".format(request))
try:
collector_to_delete = request.collector_id.uuid
self.tele_db_obj.delete_db_row_by_id(
CollectorModel, "collector_id", collector_to_delete
)
self.PublishStopRequestOnKafka(request)
except Exception as e:
LOGGER.error('Unable to delete collector. Error: {:}'.format(e))
def PublishStopRequestOnKafka(self, collector_id):
"""
Method to generate stop collector request on Kafka.
"""
collector_uuid = collector_id.collector_id.uuid
collector_to_stop : Dict = {
"kpi_id" : collector_uuid,
"duration": -1,
"interval": -1
}
self.kafka_producer.produce(
KafkaTopic.REQUEST.value,
key = collector_uuid,
callback = self.delivery_callback
)
LOGGER.info("Collector Stop Request Generated: Collector Id: {:}, Value: {:}".format(collector_uuid, collector_to_stop))
try:
ACTIVE_COLLECTORS.remove(collector_uuid)
except ValueError:
LOGGER.warning('Collector ID {:} not found in active collector list'.format(collector_uuid))
self.kafka_producer.flush()
@safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
def SelectCollectors(self,
request : CollectorFilter, contextgrpc_context: grpc.ServicerContext # type: ignore
) -> CollectorList: # type: ignore
LOGGER.info("gRPC message: {:}".format(request))
rows = self.tele_db_obj.select_with_filter(CollectorModel, request)
except Exception as e:
LOGGER.info('Unable to apply filter on kpi descriptor. {:}'.format(e))
try:
for row in rows:
collector_obj = CollectorModel.ConvertRowToCollector(row)
response.collector_list.append(collector_obj)
return response
except Exception as e:
LOGGER.info('Unable to process filter response {:}'.format(e))
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
@safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
def delivery_callback(self, err, msg):
"""
Callback function to handle message delivery status.
Args:
err (KafkaError): Kafka error object.
msg (Message): Kafka message object.
"""
if err:
LOGGER.debug('Message delivery failed: {:}'.format(err))
print('Message delivery failed: {:}'.format(err))
# else:
# LOGGER.debug('Message delivered to topic {:}'.format(msg.topic()))
# print('Message delivered to topic {:}'.format(msg.topic()))
# ---------- Independent Method ---------------
# Listener method is independent of any method (same lifetime as service)
# continously listens for responses
def RunResponseListener(self):
threading.Thread(target=self.ResponseListener).start()
return True
def ResponseListener(self):
"""
listener for response on Kafka topic.
"""
self.kafka_consumer.subscribe([KafkaTopic.RESPONSE.value])
while True:
receive_msg = self.kafka_consumer.poll(2.0)
if receive_msg is None:
continue
elif receive_msg.error():
if receive_msg.error().code() == KafkaError._PARTITION_EOF:
continue
else:
print("Consumer error: {}".format(receive_msg.error()))
break
try:
collector_id = receive_msg.key().decode('utf-8')
if collector_id in ACTIVE_COLLECTORS:
kpi_value = json.loads(receive_msg.value().decode('utf-8'))
self.process_response(collector_id, kpi_value['kpi_id'], kpi_value['kpi_value'])
else:
print(f"collector id does not match.\nRespone ID: '{collector_id}' --- Active IDs: '{ACTIVE_COLLECTORS}' ")
except Exception as e:
print(f"Error extarcting msg key or value: {str(e)}")
continue
def process_response(self, collector_id: str, kpi_id: str, kpi_value: Any):
if kpi_id == "-1" and kpi_value == -1:
print ("Backend termination confirmation for collector id: ", collector_id)
else:
print ("KPI Value: Collector Id:", collector_id, ", Kpi Id:", kpi_id, ", Value:", kpi_value)