Loading my_deploy.sh +1 −1 Original line number Diff line number Diff line Loading @@ -57,7 +57,7 @@ export TFS_COMPONENTS="${TFS_COMPONENTS} monitoring kpi_manager" #export TFS_COMPONENTS="${TFS_COMPONENTS} forecaster" # Uncomment to activate E2E Orchestrator export TFS_COMPONENTS="${TFS_COMPONENTS} e2e_orchestrator" # export TFS_COMPONENTS="${TFS_COMPONENTS} e2e_orchestrator" # Set the tag you want to use for your images. export TFS_IMAGE_TAG="dev" Loading scripts/run_tests_locally-kpi-value-API.sh +3 −1 Original line number Diff line number Diff line Loading @@ -19,5 +19,7 @@ PROJECTDIR=`pwd` cd $PROJECTDIR/src RCFILE=$PROJECTDIR/coverage/.coveragerc python3 -m pytest --log-level=DEBUG --log-cli-level=DEBUG --verbose \ # helpful pytest flags: --log-level=INFO -o log_cli=true --verbose --maxfail=1 --durations=0 python3 -m pytest --log-level=DEBUG --log-cli-level=DEBUG -o log_cli=true --verbose \ kpi_value_api/tests/test_kpi_value_api.py No newline at end of file src/common/Constants.py +3 −3 Original line number Diff line number Diff line Loading @@ -43,9 +43,9 @@ class ServiceNameEnum(Enum): ZTP = 'ztp' POLICY = 'policy' MONITORING = 'monitoring' KPIMANAGER = 'kpi_manager' KPIVALUEAPI = 'kpi_value_api' TELEMETRYFRONTEND = 'telemetry_frontend' KPIMANAGER = 'kpi-manager' KPIVALUEAPI = 'kpi-value-api' TELEMETRYFRONTEND = 'telemetry-frontend' DLT = 'dlt' NBI = 'nbi' CYBERSECURITY = 'cybersecurity' Loading src/kpi_manager/client/KpiManagerClient.py +2 −0 Original line number Diff line number Diff line Loading @@ -63,8 +63,10 @@ class KpiManagerClient: @RETRY_DECORATOR def GetKpiDescriptor(self, request : KpiId) -> KpiDescriptor: print('---> GetKpiDescriptor: {:s}'.format(grpc_message_to_json_string(request))) LOGGER.debug('GetKpiDescriptor: {:s}'.format(grpc_message_to_json_string(request))) response = self.stub.GetKpiDescriptor(request) print('---> GetKpiDescriptor result: {:s}'.format(grpc_message_to_json_string(response))) LOGGER.debug('GetKpiDescriptor result: {:s}'.format(grpc_message_to_json_string(response))) return response Loading src/kpi_value_api/service/KpiValueApiServiceServicerImpl.py +61 −7 Original line number Diff line number Diff line Loading @@ -12,14 +12,15 @@ # See the License for the specific language governing permissions and # limitations under the License. import logging, grpc, json import logging, grpc, json, requests from typing import Tuple, Any, List, Dict from datetime import datetime from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_method from common.tools.kafka.Variables import KafkaConfig, KafkaTopic from common.proto.context_pb2 import Empty from common.proto.kpi_value_api_pb2_grpc import KpiValueAPIServiceServicer from common.proto.kpi_value_api_pb2 import KpiValueList, KpiValueFilter from common.proto.kpi_value_api_pb2 import KpiValueList, KpiValueFilter, KpiValue, KpiValueType from confluent_kafka import Producer as KafkaProducer Loading @@ -28,6 +29,7 @@ from .NameMapping import NameMapping LOGGER = logging.getLogger(__name__) METRICS_POOL = MetricsPool('KpiValueAPI', 'NBIgRPC') PROM_URL = "http://localhost:9090" class KpiValueApiServiceServicerImpl(KpiValueAPIServiceServicer): def __init__(self, name_mapping : NameMapping): Loading @@ -49,7 +51,7 @@ class KpiValueApiServiceServicerImpl(KpiValueAPIServiceServicer): kpi_value.kpi_value_type # kpi_value.kpi_value_type.(many options) how? ) LOGGER.debug('KPI to produce is {:}'.format(kpi_value_to_produce)) msg_key = "gRPC-KpiValueApi" # str(__class__.__name__) msg_key = "gRPC-kpivalueapi" # str(__class__.__name__) can be used # write this KPI to Kafka producer_obj.produce( Loading @@ -60,14 +62,66 @@ class KpiValueApiServiceServicerImpl(KpiValueAPIServiceServicer): callback = self.delivery_callback ) producer_obj.flush() return Empty() @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def SelectKpiValues(self, request: KpiValueFilter, grpc_context: grpc.ServicerContext ) -> KpiValueList: LOGGER.debug('SelectKpiValues: Received gRPC message object: {:}'.format(request)) LOGGER.debug('StoreKpiValues: Received gRPC message object: {:}'.format(request)) response = KpiValueList() metrics = [kpi.kpi_id for kpi in request.kpi_id] start_timestamps = [timestamp for timestamp in request.start_timestamp] end_timestamps = [timestamp for timestamp in request.end_timestamp] results = [] for start, end in zip(start_timestamps, end_timestamps): start_str = datetime.fromtimestamp(start.seconds).isoformat() + "Z" end_str = datetime.fromtimestamp(end.seconds).isoformat() + "Z" for metric in metrics: url = f'{PROM_URL}/api/v1/query_range' params = { 'query': metric, 'start': start_str, 'end' : end_str, 'step' : '30s' # or any other step you need } response = requests.get(url, params=params) if response.status_code == 200: data = response.json() for result in data['data']['result']: for value in result['values']: kpi_value = KpiValue( kpi_id=metric, timestamp=str(seconds=value[0]), kpi_value_type=self._convert_value_to_kpi_value_type(value[1]) ) results.append(kpi_value) def _convert_value_to_kpi_value_type(self, value): # Check if the value is an integer (int64) try: int64_value = int(value) return KpiValueType(int64Val=int64_value) except ValueError: pass # Check if the value is a float try: float_value = float(value) return KpiValueType(floatVal=float_value) except ValueError: pass # Check if the value is a boolean if value.lower() in ['true', 'false']: bool_value = value.lower() == 'true' return KpiValueType(boolVal=bool_value) # If none of the above, treat it as a string return KpiValueType(stringVal=value) def delivery_callback(self, err, msg): if err: LOGGER.debug('Message delivery failed: {:}'.format(err)) else: print('Message delivered to topic {:}'.format(msg.topic())) else: LOGGER.debug('Message delivered to topic {:}'.format(msg.topic())) Loading
my_deploy.sh +1 −1 Original line number Diff line number Diff line Loading @@ -57,7 +57,7 @@ export TFS_COMPONENTS="${TFS_COMPONENTS} monitoring kpi_manager" #export TFS_COMPONENTS="${TFS_COMPONENTS} forecaster" # Uncomment to activate E2E Orchestrator export TFS_COMPONENTS="${TFS_COMPONENTS} e2e_orchestrator" # export TFS_COMPONENTS="${TFS_COMPONENTS} e2e_orchestrator" # Set the tag you want to use for your images. export TFS_IMAGE_TAG="dev" Loading
scripts/run_tests_locally-kpi-value-API.sh +3 −1 Original line number Diff line number Diff line Loading @@ -19,5 +19,7 @@ PROJECTDIR=`pwd` cd $PROJECTDIR/src RCFILE=$PROJECTDIR/coverage/.coveragerc python3 -m pytest --log-level=DEBUG --log-cli-level=DEBUG --verbose \ # helpful pytest flags: --log-level=INFO -o log_cli=true --verbose --maxfail=1 --durations=0 python3 -m pytest --log-level=DEBUG --log-cli-level=DEBUG -o log_cli=true --verbose \ kpi_value_api/tests/test_kpi_value_api.py No newline at end of file
src/common/Constants.py +3 −3 Original line number Diff line number Diff line Loading @@ -43,9 +43,9 @@ class ServiceNameEnum(Enum): ZTP = 'ztp' POLICY = 'policy' MONITORING = 'monitoring' KPIMANAGER = 'kpi_manager' KPIVALUEAPI = 'kpi_value_api' TELEMETRYFRONTEND = 'telemetry_frontend' KPIMANAGER = 'kpi-manager' KPIVALUEAPI = 'kpi-value-api' TELEMETRYFRONTEND = 'telemetry-frontend' DLT = 'dlt' NBI = 'nbi' CYBERSECURITY = 'cybersecurity' Loading
src/kpi_manager/client/KpiManagerClient.py +2 −0 Original line number Diff line number Diff line Loading @@ -63,8 +63,10 @@ class KpiManagerClient: @RETRY_DECORATOR def GetKpiDescriptor(self, request : KpiId) -> KpiDescriptor: print('---> GetKpiDescriptor: {:s}'.format(grpc_message_to_json_string(request))) LOGGER.debug('GetKpiDescriptor: {:s}'.format(grpc_message_to_json_string(request))) response = self.stub.GetKpiDescriptor(request) print('---> GetKpiDescriptor result: {:s}'.format(grpc_message_to_json_string(response))) LOGGER.debug('GetKpiDescriptor result: {:s}'.format(grpc_message_to_json_string(response))) return response Loading
src/kpi_value_api/service/KpiValueApiServiceServicerImpl.py +61 −7 Original line number Diff line number Diff line Loading @@ -12,14 +12,15 @@ # See the License for the specific language governing permissions and # limitations under the License. import logging, grpc, json import logging, grpc, json, requests from typing import Tuple, Any, List, Dict from datetime import datetime from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_method from common.tools.kafka.Variables import KafkaConfig, KafkaTopic from common.proto.context_pb2 import Empty from common.proto.kpi_value_api_pb2_grpc import KpiValueAPIServiceServicer from common.proto.kpi_value_api_pb2 import KpiValueList, KpiValueFilter from common.proto.kpi_value_api_pb2 import KpiValueList, KpiValueFilter, KpiValue, KpiValueType from confluent_kafka import Producer as KafkaProducer Loading @@ -28,6 +29,7 @@ from .NameMapping import NameMapping LOGGER = logging.getLogger(__name__) METRICS_POOL = MetricsPool('KpiValueAPI', 'NBIgRPC') PROM_URL = "http://localhost:9090" class KpiValueApiServiceServicerImpl(KpiValueAPIServiceServicer): def __init__(self, name_mapping : NameMapping): Loading @@ -49,7 +51,7 @@ class KpiValueApiServiceServicerImpl(KpiValueAPIServiceServicer): kpi_value.kpi_value_type # kpi_value.kpi_value_type.(many options) how? ) LOGGER.debug('KPI to produce is {:}'.format(kpi_value_to_produce)) msg_key = "gRPC-KpiValueApi" # str(__class__.__name__) msg_key = "gRPC-kpivalueapi" # str(__class__.__name__) can be used # write this KPI to Kafka producer_obj.produce( Loading @@ -60,14 +62,66 @@ class KpiValueApiServiceServicerImpl(KpiValueAPIServiceServicer): callback = self.delivery_callback ) producer_obj.flush() return Empty() @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def SelectKpiValues(self, request: KpiValueFilter, grpc_context: grpc.ServicerContext ) -> KpiValueList: LOGGER.debug('SelectKpiValues: Received gRPC message object: {:}'.format(request)) LOGGER.debug('StoreKpiValues: Received gRPC message object: {:}'.format(request)) response = KpiValueList() metrics = [kpi.kpi_id for kpi in request.kpi_id] start_timestamps = [timestamp for timestamp in request.start_timestamp] end_timestamps = [timestamp for timestamp in request.end_timestamp] results = [] for start, end in zip(start_timestamps, end_timestamps): start_str = datetime.fromtimestamp(start.seconds).isoformat() + "Z" end_str = datetime.fromtimestamp(end.seconds).isoformat() + "Z" for metric in metrics: url = f'{PROM_URL}/api/v1/query_range' params = { 'query': metric, 'start': start_str, 'end' : end_str, 'step' : '30s' # or any other step you need } response = requests.get(url, params=params) if response.status_code == 200: data = response.json() for result in data['data']['result']: for value in result['values']: kpi_value = KpiValue( kpi_id=metric, timestamp=str(seconds=value[0]), kpi_value_type=self._convert_value_to_kpi_value_type(value[1]) ) results.append(kpi_value) def _convert_value_to_kpi_value_type(self, value): # Check if the value is an integer (int64) try: int64_value = int(value) return KpiValueType(int64Val=int64_value) except ValueError: pass # Check if the value is a float try: float_value = float(value) return KpiValueType(floatVal=float_value) except ValueError: pass # Check if the value is a boolean if value.lower() in ['true', 'false']: bool_value = value.lower() == 'true' return KpiValueType(boolVal=bool_value) # If none of the above, treat it as a string return KpiValueType(stringVal=value) def delivery_callback(self, err, msg): if err: LOGGER.debug('Message delivery failed: {:}'.format(err)) else: print('Message delivered to topic {:}'.format(msg.topic())) else: LOGGER.debug('Message delivered to topic {:}'.format(msg.topic()))