Skip to content
Snippets Groups Projects
Commit 9605c9a1 authored by Waleed Akbar's avatar Waleed Akbar
Browse files

Telemetry Frontend gRPC NBI Re-structuring

- Changed the column type of the start and end timestamps to Float.
- Added ConvertRowToCollector() in TelemetryModel.
- Renamed the class variable from "DBobj" to "tele_db_obj".
- Renamed the local variable from "collector_id" to "collector_uuid".
- Added PublishStopRequestOnKafka() to publish the stop collector request on Kafka.
- Improved the test files.
parent 97655a60
No related branches found
No related tags found
2 merge requests!294Release TeraFlowSDN 4.0,!261(CTTC) New Analytics Component
...@@ -16,6 +16,7 @@ import logging ...@@ -16,6 +16,7 @@ import logging
from sqlalchemy.dialects.postgresql import UUID from sqlalchemy.dialects.postgresql import UUID
from sqlalchemy import Column, String, Float from sqlalchemy import Column, String, Float
from sqlalchemy.orm import registry from sqlalchemy.orm import registry
from common.proto import telemetry_frontend_pb2
logging.basicConfig(level=logging.INFO) logging.basicConfig(level=logging.INFO)
LOGGER = logging.getLogger(__name__) LOGGER = logging.getLogger(__name__)
...@@ -30,8 +31,8 @@ class Collector(Base): ...@@ -30,8 +31,8 @@ class Collector(Base):
kpi_id = Column(UUID(as_uuid=False), nullable=False) kpi_id = Column(UUID(as_uuid=False), nullable=False)
sampling_duration_s = Column(Float , nullable=False) sampling_duration_s = Column(Float , nullable=False)
sampling_interval_s = Column(Float , nullable=False) sampling_interval_s = Column(Float , nullable=False)
start_timestamp = Column(String , nullable=False) start_timestamp = Column(Float , nullable=False)
end_timestamp = Column(String , nullable=False) end_timestamp = Column(Float , nullable=False)
# helps in logging the information # helps in logging the information
def __repr__(self): def __repr__(self):
...@@ -42,7 +43,7 @@ class Collector(Base): ...@@ -42,7 +43,7 @@ class Collector(Base):
@classmethod @classmethod
def ConvertCollectorToRow(cls, request): def ConvertCollectorToRow(cls, request):
""" """
Create an instance of collector rows from a request object. Create an instance of Collector table rows from a request object.
Args: request: The request object containing collector gRPC message. Args: request: The request object containing collector gRPC message.
Returns: A row (an instance of Collector table) initialized with content of the request. Returns: A row (an instance of Collector table) initialized with content of the request.
""" """
...@@ -55,5 +56,18 @@ class Collector(Base): ...@@ -55,5 +56,18 @@ class Collector(Base):
end_timestamp = request.end_time.timestamp end_timestamp = request.end_time.timestamp
) )
# add method to convert gRPC requests to rows if necessary... @classmethod
def ConvertRowToCollector(cls, row):
"""
Create and return a dictionary representation of a Collector table instance.
Args: row: The Collector table instance (row) containing the data.
Returns: collector gRPC message initialized with the content of a row.
"""
response = telemetry_frontend_pb2.Collector()
response.collector_id.collector_id.uuid = row.collector_id
response.kpi_id.kpi_id.uuid = row.kpi_id
response.duration_s = row.sampling_duration_s
response.interval_s = row.sampling_interval_s
response.start_time.timestamp = row.start_timestamp
response.end_time.timestamp = row.end_timestamp
return response
...@@ -121,13 +121,13 @@ class TelemetryDB: ...@@ -121,13 +121,13 @@ class TelemetryDB:
query = session.query(CollectorModel) query = session.query(CollectorModel)
# Apply filters based on the filter_object # Apply filters based on the filter_object
if filter_object.kpi_id: if filter_object.kpi_id:
query = query.filter(CollectorModel.kpi_id.in_([k.kpi_id.uuid for k in filter_object.kpi_id])) query = query.filter(CollectorModel.kpi_id.in_([k.kpi_id.uuid for k in filter_object.kpi_id]))
result = query.all() result = query.all()
# query should be added to return all rows
if result: if result:
LOGGER.debug(f"Fetched filtered rows from {model.__name__} table with filters: {filter_object}") # - Results: {result} LOGGER.debug(f"Fetched filtered rows from {model.__name__} table with filters: {filter_object}") # - Results: {result}
else: else:
LOGGER.debug(f"No matching row found in {model.__name__} table with filters: {filter_object}") LOGGER.warning(f"No matching row found in {model.__name__} table with filters: {filter_object}")
return result return result
except Exception as e: except Exception as e:
LOGGER.error(f"Error fetching filtered rows from {model.__name__} table with filters {filter_object} ::: {e}") LOGGER.error(f"Error fetching filtered rows from {model.__name__} table with filters {filter_object} ::: {e}")
......
...@@ -34,18 +34,20 @@ from telemetry.database.Telemetry_DB import TelemetryDB ...@@ -34,18 +34,20 @@ from telemetry.database.Telemetry_DB import TelemetryDB
LOGGER = logging.getLogger(__name__) LOGGER = logging.getLogger(__name__)
METRICS_POOL = MetricsPool('TelemetryFrontend', 'NBIgRPC') METRICS_POOL = MetricsPool('TelemetryFrontend', 'NBIgRPC')
ACTIVE_COLLECTORS = [] ACTIVE_COLLECTORS = [] # keep and can be populated from DB
class TelemetryFrontendServiceServicerImpl(TelemetryFrontendServiceServicer): class TelemetryFrontendServiceServicerImpl(TelemetryFrontendServiceServicer):
def __init__(self): def __init__(self):
LOGGER.info('Init TelemetryFrontendService') LOGGER.info('Init TelemetryFrontendService')
self.DBobj = TelemetryDB() self.tele_db_obj = TelemetryDB()
self.kafka_producer = KafkaProducer({'bootstrap.servers' : KafkaConfig.SERVER_ADDRESS.value}) self.kafka_producer = KafkaProducer({'bootstrap.servers' : KafkaConfig.SERVER_ADDRESS.value})
self.kafka_consumer = KafkaConsumer({'bootstrap.servers' : KafkaConfig.SERVER_ADDRESS.value, self.kafka_consumer = KafkaConsumer({'bootstrap.servers' : KafkaConfig.SERVER_ADDRESS.value,
'group.id' : 'frontend', 'group.id' : 'frontend',
'auto.offset.reset' : 'latest'}) 'auto.offset.reset' : 'latest'})
# --->>> SECTION: StartCollector with all helper methods <<<---
@safe_and_metered_rpc_method(METRICS_POOL, LOGGER) @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
def StartCollector(self, def StartCollector(self,
request : Collector, grpc_context: grpc.ServicerContext # type: ignore request : Collector, grpc_context: grpc.ServicerContext # type: ignore
...@@ -54,7 +56,7 @@ class TelemetryFrontendServiceServicerImpl(TelemetryFrontendServiceServicer): ...@@ -54,7 +56,7 @@ class TelemetryFrontendServiceServicerImpl(TelemetryFrontendServiceServicer):
response = CollectorId() response = CollectorId()
# TODO: Verify the presence of Kpi ID in KpiDB or assume that KPI ID already exists. # TODO: Verify the presence of Kpi ID in KpiDB or assume that KPI ID already exists.
self.DBobj.add_row_to_db( self.tele_db_obj.add_row_to_db(
CollectorModel.ConvertCollectorToRow(request) CollectorModel.ConvertCollectorToRow(request)
) )
self.PublishRequestOnKafka(request) self.PublishRequestOnKafka(request)
...@@ -66,7 +68,7 @@ class TelemetryFrontendServiceServicerImpl(TelemetryFrontendServiceServicer): ...@@ -66,7 +68,7 @@ class TelemetryFrontendServiceServicerImpl(TelemetryFrontendServiceServicer):
""" """
Method to generate collector request on Kafka. Method to generate collector request on Kafka.
""" """
collector_id = collector_obj.collector_id.collector_id.uuid collector_uuid = collector_obj.collector_id.collector_id.uuid
collector_to_generate : Tuple [str, int, int] = ( collector_to_generate : Tuple [str, int, int] = (
collector_obj.kpi_id.kpi_id.uuid, collector_obj.kpi_id.kpi_id.uuid,
collector_obj.duration_s, collector_obj.duration_s,
...@@ -74,12 +76,12 @@ class TelemetryFrontendServiceServicerImpl(TelemetryFrontendServiceServicer): ...@@ -74,12 +76,12 @@ class TelemetryFrontendServiceServicerImpl(TelemetryFrontendServiceServicer):
) )
self.kafka_producer.produce( self.kafka_producer.produce(
KafkaTopic.REQUEST.value, KafkaTopic.REQUEST.value,
key = collector_id, key = collector_uuid,
value = str(collector_to_generate), value = str(collector_to_generate),
callback = self.delivery_callback callback = self.delivery_callback
) )
LOGGER.info("Collector Request Generated: Collector Id: {:}, Value: {:}".format(collector_id, collector_to_generate)) LOGGER.info("Collector Request Generated: Collector Id: {:}, Value: {:}".format(collector_uuid, collector_to_generate))
ACTIVE_COLLECTORS.append(collector_id) ACTIVE_COLLECTORS.append(collector_uuid)
self.kafka_producer.flush() self.kafka_producer.flush()
def run_kafka_listener(self): def run_kafka_listener(self):
...@@ -141,39 +143,59 @@ class TelemetryFrontendServiceServicerImpl(TelemetryFrontendServiceServicer): ...@@ -141,39 +143,59 @@ class TelemetryFrontendServiceServicerImpl(TelemetryFrontendServiceServicer):
msg (Message): Kafka message object. msg (Message): Kafka message object.
""" """
if err: if err:
print(f'Message delivery failed: {err}') LOGGER.debug('Message delivery failed: {:}'.format(err))
print('Message delivery failed: {:}'.format(err))
else: else:
print(f'Message delivered to topic {msg.topic()}') LOGGER.debug('Message delivered to topic {:}'.format(msg.topic()))
print('Message delivered to topic {:}'.format(msg.topic()))
# <<<--- SECTION: StopCollector with all helper methods --->>>
@safe_and_metered_rpc_method(METRICS_POOL, LOGGER) @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
def StopCollector(self, def StopCollector(self,
request : CollectorId, grpc_context: grpc.ServicerContext # type: ignore request : CollectorId, grpc_context: grpc.ServicerContext # type: ignore
) -> Empty: # type: ignore ) -> Empty: # type: ignore
LOGGER.info ("gRPC message: {:}".format(request)) LOGGER.info ("gRPC message: {:}".format(request))
_collector_id = request.collector_id.uuid self.PublishStopRequestOnKafka(request)
self.publish_to_kafka_request_topic(_collector_id, "", -1, -1)
return Empty() return Empty()
def PublishStopRequestOnKafka(self, collector_id):
"""
Method to generate stop collector request on Kafka.
"""
collector_uuid = collector_id.collector_id.uuid
collector_to_stop : Tuple [str, int, int] = (
collector_uuid , -1, -1
)
self.kafka_producer.produce(
KafkaTopic.REQUEST.value,
key = collector_uuid,
value = str(collector_to_stop),
callback = self.delivery_callback
)
LOGGER.info("Collector Stop Request Generated: Collector Id: {:}, Value: {:}".format(collector_uuid, collector_to_stop))
try:
ACTIVE_COLLECTORS.remove(collector_uuid)
except ValueError:
LOGGER.warning('Collector ID {:} not found in active collector list'.format(collector_uuid))
self.kafka_producer.flush()
@safe_and_metered_rpc_method(METRICS_POOL, LOGGER) @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
def SelectCollectors(self, def SelectCollectors(self,
request : CollectorFilter, contextgrpc_context: grpc.ServicerContext # type: ignore request : CollectorFilter, contextgrpc_context: grpc.ServicerContext # type: ignore
) -> CollectorList: # type: ignore ) -> CollectorList: # type: ignore
LOGGER.info("gRPC message: {:}".format(request)) LOGGER.info("gRPC message: {:}".format(request))
response = CollectorList() response = CollectorList()
filter_to_apply = dict()
filter_to_apply['kpi_id'] = request.kpi_id[0].kpi_id.uuid
# filter_to_apply['duration_s'] = request.duration_s[0]
try: try:
rows = self.managementDBobj.select_with_filter(CollectorModel, **filter_to_apply) rows = self.tele_db_obj.select_with_filter(CollectorModel, request)
except Exception as e: except Exception as e:
LOGGER.info('Unable to apply filter on kpi descriptor. {:}'.format(e)) LOGGER.info('Unable to apply filter on kpi descriptor. {:}'.format(e))
try: try:
if len(rows) != 0: for row in rows:
for row in rows: collector_obj = CollectorModel.ConvertRowToCollector(row)
collector_obj = Collector() response.collector_list.append(collector_obj)
collector_obj.collector_id.collector_id.uuid = row.collector_id
response.collector_list.append(collector_obj)
return response return response
except Exception as e: except Exception as e:
LOGGER.info('Unable to process response {:}'.format(e)) LOGGER.info('Unable to process filter response {:}'.format(e))
...@@ -16,67 +16,27 @@ import uuid ...@@ -16,67 +16,27 @@ import uuid
import random import random
from common.proto import telemetry_frontend_pb2 from common.proto import telemetry_frontend_pb2
from common.proto.kpi_sample_types_pb2 import KpiSampleType from common.proto.kpi_sample_types_pb2 import KpiSampleType
from common.proto.kpi_manager_pb2 import KpiId
# ----------------------- "2nd" Iteration -------------------------------- # ----------------------- "2nd" Iteration --------------------------------
def create_collector_id(): def create_collector_id():
_collector_id = telemetry_frontend_pb2.CollectorId() _collector_id = telemetry_frontend_pb2.CollectorId()
_collector_id.collector_id.uuid = uuid.uuid4() # _collector_id.collector_id.uuid = str(uuid.uuid4())
_collector_id.collector_id.uuid = "5d45f53f-d567-429f-9427-9196ac72ff0c"
return _collector_id return _collector_id
# def create_collector_id_a(coll_id_str : str):
# _collector_id = telemetry_frontend_pb2.CollectorId()
# _collector_id.collector_id.uuid = str(coll_id_str)
# return _collector_id
def create_collector_request(): def create_collector_request():
_create_collector_request = telemetry_frontend_pb2.Collector() _create_collector_request = telemetry_frontend_pb2.Collector()
_create_collector_request.collector_id.collector_id.uuid = str(uuid.uuid4()) _create_collector_request.collector_id.collector_id.uuid = str(uuid.uuid4())
_create_collector_request.kpi_id.kpi_id.uuid = str(uuid.uuid4()) _create_collector_request.kpi_id.kpi_id.uuid = str(uuid.uuid4())
# _create_collector_request.collector = "collector description"
_create_collector_request.duration_s = float(random.randint(8, 16)) _create_collector_request.duration_s = float(random.randint(8, 16))
_create_collector_request.interval_s = float(random.randint(2, 4)) _create_collector_request.interval_s = float(random.randint(2, 4))
return _create_collector_request return _create_collector_request
def create_collector_filter(): def create_collector_filter():
_create_collector_filter = telemetry_frontend_pb2.CollectorFilter() _create_collector_filter = telemetry_frontend_pb2.CollectorFilter()
new_kpi_id = _create_collector_filter.kpi_id.add() kpi_id_obj = KpiId()
new_kpi_id.kpi_id.uuid = "165d20c5-a446-42fa-812f-e2b7ed283c6f" # kpi_id_obj.kpi_id.uuid = str(uuid.uuid4())
kpi_id_obj.kpi_id.uuid = "a7237fa3-caf4-479d-84b6-4d9f9738fb7f"
_create_collector_filter.kpi_id.append(kpi_id_obj)
return _create_collector_filter return _create_collector_filter
# ----------------------- "First" Iteration --------------------------------
# def create_collector_request_a():
# _create_collector_request_a = telemetry_frontend_pb2.Collector()
# _create_collector_request_a.collector_id.collector_id.uuid = "-1"
# return _create_collector_request_a
# def create_collector_request_b(str_kpi_id, coll_duration_s, coll_interval_s
# ) -> telemetry_frontend_pb2.Collector:
# _create_collector_request_b = telemetry_frontend_pb2.Collector()
# _create_collector_request_b.collector_id.collector_id.uuid = '1'
# _create_collector_request_b.kpi_id.kpi_id.uuid = str_kpi_id
# _create_collector_request_b.duration_s = coll_duration_s
# _create_collector_request_b.interval_s = coll_interval_s
# return _create_collector_request_b
# def create_collector_filter():
# _create_collector_filter = telemetry_frontend_pb2.CollectorFilter()
# new_collector_id = _create_collector_filter.collector_id.add()
# new_collector_id.collector_id.uuid = "COLL1"
# new_kpi_id = _create_collector_filter.kpi_id.add()
# new_kpi_id.kpi_id.uuid = "KPI1"
# new_device_id = _create_collector_filter.device_id.add()
# new_device_id.device_uuid.uuid = 'DEV1'
# new_service_id = _create_collector_filter.service_id.add()
# new_service_id.service_uuid.uuid = 'SERV1'
# new_slice_id = _create_collector_filter.slice_id.add()
# new_slice_id.slice_uuid.uuid = 'SLC1'
# new_endpoint_id = _create_collector_filter.endpoint_id.add()
# new_endpoint_id.endpoint_uuid.uuid = 'END1'
# new_connection_id = _create_collector_filter.connection_id.add()
# new_connection_id.connection_uuid.uuid = 'CON1'
# _create_collector_filter.kpi_sample_type.append(KpiSampleType.KPISAMPLETYPE_PACKETS_RECEIVED)
# return _create_collector_filter
# def create_collector_list():
# _create_collector_list = telemetry_frontend_pb2.CollectorList()
# return _create_collector_list
\ No newline at end of file
...@@ -19,6 +19,7 @@ import logging ...@@ -19,6 +19,7 @@ import logging
# from common.proto.context_pb2 import Empty # from common.proto.context_pb2 import Empty
from common.Constants import ServiceNameEnum from common.Constants import ServiceNameEnum
from common.proto.telemetry_frontend_pb2 import CollectorId, CollectorList from common.proto.telemetry_frontend_pb2 import CollectorId, CollectorList
from common.proto.context_pb2 import Empty
from common.Settings import ( from common.Settings import (
get_service_port_grpc, get_env_var_name, ENVVAR_SUFIX_SERVICE_HOST, ENVVAR_SUFIX_SERVICE_PORT_GRPC) get_service_port_grpc, get_env_var_name, ENVVAR_SUFIX_SERVICE_HOST, ENVVAR_SUFIX_SERVICE_PORT_GRPC)
...@@ -26,7 +27,7 @@ from common.Settings import ( ...@@ -26,7 +27,7 @@ from common.Settings import (
from telemetry.frontend.client.TelemetryFrontendClient import TelemetryFrontendClient from telemetry.frontend.client.TelemetryFrontendClient import TelemetryFrontendClient
from telemetry.frontend.service.TelemetryFrontendService import TelemetryFrontendService from telemetry.frontend.service.TelemetryFrontendService import TelemetryFrontendService
from telemetry.frontend.tests.Messages import ( from telemetry.frontend.tests.Messages import (
create_collector_request, create_collector_filter) create_collector_request, create_collector_id, create_collector_filter)
########################### ###########################
...@@ -85,6 +86,18 @@ def test_StartCollector(telemetryFrontend_client): ...@@ -85,6 +86,18 @@ def test_StartCollector(telemetryFrontend_client):
LOGGER.debug(str(response)) LOGGER.debug(str(response))
assert isinstance(response, CollectorId) assert isinstance(response, CollectorId)
def test_StopCollector(telemetryFrontend_client):
LOGGER.info(' >>> test_StopCollector START: <<< ')
response = telemetryFrontend_client.StopCollector(create_collector_id())
LOGGER.debug(str(response))
assert isinstance(response, Empty)
def test_SelectCollectors(telemetryFrontend_client):
LOGGER.info(' >>> test_SelectCollectors START: <<< ')
response = telemetryFrontend_client.SelectCollectors(create_collector_filter())
LOGGER.debug(str(response))
assert isinstance(response, CollectorList)
# ------- previous test ---------------- # ------- previous test ----------------
# def test_verify_db_and_table(): # def test_verify_db_and_table():
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment