Skip to content
Snippets Groups Projects
Commit 1ed5b830 authored by Lluis Gifre Renom's avatar Lluis Gifre Renom
Browse files

Monitoring component:

- Migrated to use new generic gRPC servicer and clients
- Migrated to use new settings framework
- Improved definition of unitary tests and solved some bugs
- Minor code formatting/styling
parent 6a1bf009
No related branches found
No related tags found
1 merge request!54Release 2.0.0
# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import grpc, logging
from typing import Iterator
from common.Constants import ServiceNameEnum
from common.Settings import get_service_host, get_service_port_grpc
from common.tools.client.RetryDecorator import retry, delay_exponential
from common.tools.grpc.Tools import grpc_message_to_json_string
from monitoring.proto.context_pb2 import Empty
from monitoring.proto.monitoring_pb2 import Kpi, KpiDescriptor, KpiId, MonitorKpiRequest
from monitoring.proto.monitoring_pb2_grpc import MonitoringServiceStub
LOGGER = logging.getLogger(__name__)
MAX_RETRIES = 15
DELAY_FUNCTION = delay_exponential(initial=0.01, increment=2.0, maximum=5.0)
RETRY_DECORATOR = retry(max_retries=MAX_RETRIES, delay_function=DELAY_FUNCTION, prepare_method_name='connect')
class MonitoringClient:
def __init__(self, host=None, port=None):
if not host: host = get_service_host(ServiceNameEnum.MONITORING)
if not port: port = get_service_port_grpc(ServiceNameEnum.MONITORING)
self.endpoint = '{:s}:{:s}'.format(str(host), str(port))
LOGGER.debug('Creating channel to {:s}...'.format(str(self.endpoint)))
self.channel = None
self.stub = None
self.connect()
LOGGER.debug('Channel created')
def connect(self):
self.channel = grpc.insecure_channel(self.endpoint)
self.stub = MonitoringServiceStub(self.channel)
def close(self):
if self.channel is not None: self.channel.close()
self.channel = None
self.stub = None
@RETRY_DECORATOR
def CreateKpi(self, request : KpiDescriptor) -> KpiId:
LOGGER.debug('CreateKpi: {:s}'.format(grpc_message_to_json_string(request)))
response = self.stub.CreateKpi(request)
LOGGER.debug('CreateKpi result: {:s}'.format(grpc_message_to_json_string(response)))
return response
@RETRY_DECORATOR
def GetKpiDescriptor(self, request : KpiId) -> KpiDescriptor:
LOGGER.debug('GetKpiDescriptor: {:s}'.format(grpc_message_to_json_string(request)))
response = self.stub.GetKpiDescriptor(request)
LOGGER.debug('GetKpiDescriptor result: {:s}'.format(grpc_message_to_json_string(response)))
return response
@RETRY_DECORATOR
def IncludeKpi(self, request : Kpi) -> Empty:
LOGGER.debug('IncludeKpi: {:s}'.format(grpc_message_to_json_string(request)))
response = self.stub.IncludeKpi(request)
LOGGER.debug('IncludeKpi result: {:s}'.format(grpc_message_to_json_string(response)))
return response
@RETRY_DECORATOR
def MonitorKpi(self, request : MonitorKpiRequest) -> Empty:
LOGGER.debug('MonitorKpi: {:s}'.format(grpc_message_to_json_string(request)))
response = self.stub.MonitorKpi(request)
LOGGER.debug('MonitorKpi result: {:s}'.format(grpc_message_to_json_string(response)))
return response
@RETRY_DECORATOR
def GetStreamKpi(self, request : KpiId) -> Iterator[Kpi]:
LOGGER.debug('GetStreamKpi: {:s}'.format(grpc_message_to_json_string(request)))
response = self.stub.GetStreamKpi(request)
LOGGER.debug('GetStreamKpi result: {:s}'.format(grpc_message_to_json_string(response)))
return response
@RETRY_DECORATOR
def GetInstantKpi(self, request : KpiId) -> Kpi:
LOGGER.debug('GetInstantKpi: {:s}'.format(grpc_message_to_json_string(request)))
response = self.stub.GetInstantKpi(request)
LOGGER.debug('GetInstantKpi result: {:s}'.format(grpc_message_to_json_string(response)))
return response
if __name__ == '__main__':
import sys
# get port
_port = sys.argv[1] if len(sys.argv) > 1 else '7070'
# make call to server
client = MonitoringClient(port=_port)
# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import sys
import grpc
from monitoring.proto import monitoring_pb2
from monitoring.proto import monitoring_pb2_grpc
from monitoring.proto import context_pb2
from common.logger import getJSONLogger
LOGGER = getJSONLogger('monitoring-client')
LOGGER.setLevel('DEBUG')
class MonitoringClient:
def __init__(self, server='monitoring', port='7070'):
endpoint = '{}:{}'.format(server, port)
LOGGER.info('init monitoringClient {}'.format(endpoint))
self.channel = grpc.insecure_channel(endpoint)
self.server = monitoring_pb2_grpc.MonitoringServiceStub(self.channel)
def CreateKpi(self, request):
LOGGER.info('CreateKpi: {}'.format(request))
response = self.server.CreateKpi(request)
LOGGER.info('CreateKpi result: {}'.format(response))
return response
def MonitorKpi(self, request):
LOGGER.info('MonitorKpi: {}'.format(request))
response = self.server.MonitorKpi(request)
LOGGER.info('MonitorKpi result: {}'.format(response))
return response
def IncludeKpi(self, request):
LOGGER.info('IncludeKpi: {}'.format(request))
response = self.server.IncludeKpi(request)
LOGGER.info('IncludeKpi result: {}'.format(response))
return response
def GetStreamKpi(self, request):
LOGGER.info('GetStreamKpi: {}'.format(request))
response = self.server.GetStreamKpi(request)
LOGGER.info('GetStreamKpi result: {}'.format(response))
yield monitoring_pb2.Kpi()
def GetInstantKpi(self, request):
LOGGER.info('GetInstantKpi: {}'.format(request))
response = self.server.GetInstantKpi(request)
LOGGER.info('GetInstantKpi result: {}'.format(response))
return monitoring_pb2.Kpi()
def GetKpiDescriptor(self, request):
LOGGER.info('GetKpiDescriptor: {}'.format(request))
response = self.server.GetKpiDescriptor(request)
LOGGER.info('GetKpiDescriptor result: {}'.format(response))
return response
if __name__ == '__main__':
# get port
port = sys.argv[1] if len(sys.argv) > 1 else '7070'
# make call to server
client = MonitoringClient(port=port)
...@@ -19,26 +19,27 @@ import grpc ...@@ -19,26 +19,27 @@ import grpc
from common.rpc_method_wrapper.ServiceExceptions import ServiceException from common.rpc_method_wrapper.ServiceExceptions import ServiceException
from context.client.ContextClient import ContextClient from context.client.ContextClient import ContextClient
from context.proto import kpi_sample_types_pb2 #from context.proto import kpi_sample_types_pb2
from context.proto.context_pb2 import Empty, EventTypeEnum from context.proto.context_pb2 import Empty, EventTypeEnum
from common.logger import getJSONLogger from common.logger import getJSONLogger
from monitoring.client.monitoring_client import MonitoringClient from monitoring.client.MonitoringClient import MonitoringClient
from monitoring.proto import monitoring_pb2 from monitoring.proto import monitoring_pb2
LOGGER = getJSONLogger('monitoringservice-server') LOGGER = getJSONLogger('monitoringservice-server')
LOGGER.setLevel('DEBUG') LOGGER.setLevel('DEBUG')
class EventsDeviceCollector: class EventsDeviceCollector:
def __init__(self, context_client_grpc : ContextClient, monitoring_client_grpc : MonitoringClient) -> None: # pylint: disable=redefined-outer-name def __init__(self) -> None: # pylint: disable=redefined-outer-name
self._events_queue = Queue() self._events_queue = Queue()
self._device_stream = context_client_grpc.GetDeviceEvents(Empty()) self._context_client_grpc = ContextClient()
self._context_client = context_client_grpc self._device_stream = self._context_client_grpc.GetDeviceEvents(Empty())
self._channel = context_client_grpc.channel self._context_client = self._context_client_grpc
self._monitoring_client = monitoring_client_grpc self._channel = self._context_client_grpc.channel
self._monitoring_client = MonitoringClient(host='127.0.0.1')
self._device_thread = threading.Thread(target=self._collect, args=(self._device_stream ,), daemon=False) self._device_thread = threading.Thread(target=self._collect, args=(self._device_stream,), daemon=False)
def grpc_server_on(self): def grpc_server_on(self):
try: try:
......
...@@ -12,63 +12,17 @@ ...@@ -12,63 +12,17 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
from concurrent import futures from common.Constants import ServiceNameEnum
from common.Settings import get_service_port_grpc
import grpc, logging from common.tools.service.GenericGrpcService import GenericGrpcService
from monitoring.proto.monitoring_pb2_grpc import add_MonitoringServiceServicer_to_server
from monitoring.service.MonitoringServiceServicerImpl import MonitoringServiceServicerImpl from .MonitoringServiceServicerImpl import MonitoringServiceServicerImpl
from monitoring.Config import GRPC_SERVICE_PORT, GRPC_MAX_WORKERS, GRPC_GRACE_PERIOD
from monitoring.proto.monitoring_pb2_grpc import add_MonitoringServiceServicer_to_server class MonitoringService(GenericGrpcService):
def __init__(self, cls_name: str = __name__) -> None:
from grpc_health.v1 import health port = get_service_port_grpc(ServiceNameEnum.MONITORING)
from grpc_health.v1 import health_pb2 super().__init__(port, cls_name=cls_name)
from grpc_health.v1.health_pb2_grpc import add_HealthServicer_to_server
from common.logger import getJSONLogger
LOGGER = getJSONLogger('monitoring-server')
BIND_ADDRESS = '0.0.0.0'
class MonitoringService:
def __init__(self, address=BIND_ADDRESS, port=GRPC_SERVICE_PORT, max_workers=GRPC_MAX_WORKERS,
grace_period=GRPC_GRACE_PERIOD):
self.address = address
self.port = port
self.endpoint = None
self.max_workers = max_workers
self.grace_period = grace_period
self.monitoring_servicer = None
self.health_servicer = None
self.pool = None
self.server = None
def start(self):
# create gRPC server
self.server = grpc.server(futures.ThreadPoolExecutor(max_workers=self.max_workers)) # ,interceptors=(tracer_interceptor,))
# add monitoring servicer class to gRPC server
self.monitoring_servicer = MonitoringServiceServicerImpl() self.monitoring_servicer = MonitoringServiceServicerImpl()
add_MonitoringServiceServicer_to_server(self.monitoring_servicer, self.server)
# add gRPC health checker servicer class to gRPC server
self.health_servicer = health.HealthServicer(
experimental_non_blocking=True, experimental_thread_pool=futures.ThreadPoolExecutor(max_workers=1))
add_HealthServicer_to_server(self.health_servicer, self.server)
# start server
self.endpoint = '{:s}:{:s}'.format(str(self.address), str(self.port))
LOGGER.info('Starting Service (tentative endpoint: {:s}, max_workers: {:s})...'.format(
str(self.endpoint), str(self.max_workers)))
self.server.add_insecure_port(self.endpoint)
self.server.start()
self.health_servicer.set('', health_pb2.HealthCheckResponse.SERVING) # pylint: disable=maybe-no-member
LOGGER.debug('Service started')
def stop(self):
LOGGER.debug('Stopping service (grace period {} seconds)...'.format(self.grace_period))
self.health_servicer.enter_graceful_shutdown()
self.server.stop(self.grace_period)
LOGGER.debug('Service stopped')
def install_servicers(self):
add_MonitoringServiceServicer_to_server(self.monitoring_servicer, self.server)
...@@ -12,29 +12,21 @@ ...@@ -12,29 +12,21 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import os,grpc, logging import os, grpc, logging
import socket from prometheus_client import Counter, Summary
from prometheus_client import Summary
from prometheus_client import Counter
from common.Settings import get_setting
from monitoring.Config import DEVICE_GRPC_SERVICE_PORT, DEVICE_SERVICE_HOST
from monitoring.proto.kpi_sample_types_pb2 import KpiSampleType from monitoring.proto.kpi_sample_types_pb2 import KpiSampleType
from monitoring.service import SqliteTools, InfluxTools from monitoring.service import SqliteTools, InfluxTools
from monitoring.proto import monitoring_pb2 from monitoring.proto import monitoring_pb2
from monitoring.proto import monitoring_pb2_grpc from monitoring.proto import monitoring_pb2_grpc
from common.rpc_method_wrapper.ServiceExceptions import ServiceException from common.rpc_method_wrapper.ServiceExceptions import ServiceException
from context.proto import context_pb2 from context.proto import context_pb2
from device.client.DeviceClient import DeviceClient from device.client.DeviceClient import DeviceClient
from device.proto import device_pb2 from device.proto import device_pb2
LOGGER = logging.getLogger(__name__) LOGGER = logging.getLogger(__name__)
MONITORING_GETINSTANTKPI_REQUEST_TIME = Summary('monitoring_getinstantkpi_processing_seconds', 'Time spent processing monitoring instant kpi request') MONITORING_GETINSTANTKPI_REQUEST_TIME = Summary(
'monitoring_getinstantkpi_processing_seconds', 'Time spent processing monitoring instant kpi request')
MONITORING_INCLUDEKPI_COUNTER = Counter('monitoring_includekpi_counter', 'Monitoring include kpi request counter') MONITORING_INCLUDEKPI_COUNTER = Counter('monitoring_includekpi_counter', 'Monitoring include kpi request counter')
INFLUXDB_HOSTNAME = os.environ.get("INFLUXDB_HOSTNAME") INFLUXDB_HOSTNAME = os.environ.get("INFLUXDB_HOSTNAME")
...@@ -42,9 +34,6 @@ INFLUXDB_USER = os.environ.get("INFLUXDB_USER") ...@@ -42,9 +34,6 @@ INFLUXDB_USER = os.environ.get("INFLUXDB_USER")
INFLUXDB_PASSWORD = os.environ.get("INFLUXDB_PASSWORD") INFLUXDB_PASSWORD = os.environ.get("INFLUXDB_PASSWORD")
INFLUXDB_DATABASE = os.environ.get("INFLUXDB_DATABASE") INFLUXDB_DATABASE = os.environ.get("INFLUXDB_DATABASE")
DEVICE_SERVICE_HOST = get_setting('DEVICESERVICE_SERVICE_HOST', default=DEVICE_SERVICE_HOST )
DEVICE_SERVICE_PORT = get_setting('DEVICESERVICE_SERVICE_PORT_GRPC', default=DEVICE_GRPC_SERVICE_PORT)
class MonitoringServiceServicerImpl(monitoring_pb2_grpc.MonitoringServiceServicer): class MonitoringServiceServicerImpl(monitoring_pb2_grpc.MonitoringServiceServicer):
def __init__(self): def __init__(self):
...@@ -52,13 +41,14 @@ class MonitoringServiceServicerImpl(monitoring_pb2_grpc.MonitoringServiceService ...@@ -52,13 +41,14 @@ class MonitoringServiceServicerImpl(monitoring_pb2_grpc.MonitoringServiceService
# Init sqlite monitoring db # Init sqlite monitoring db
self.sql_db = SqliteTools.SQLite('monitoring.db') self.sql_db = SqliteTools.SQLite('monitoring.db')
self.deviceClient = DeviceClient(address=DEVICE_SERVICE_HOST, port=DEVICE_GRPC_SERVICE_PORT) # instantiate the client
# Create influx_db client # Create influx_db client
self.influx_db = InfluxTools.Influx(INFLUXDB_HOSTNAME,"8086",INFLUXDB_USER,INFLUXDB_PASSWORD,INFLUXDB_DATABASE) self.influx_db = InfluxTools.Influx(INFLUXDB_HOSTNAME,"8086",INFLUXDB_USER,INFLUXDB_PASSWORD,INFLUXDB_DATABASE)
# CreateKpi (CreateKpiRequest) returns (KpiId) {} # CreateKpi (CreateKpiRequest) returns (KpiId) {}
def CreateKpi(self, request : monitoring_pb2.KpiDescriptor, grpc_context : grpc.ServicerContext) -> monitoring_pb2.KpiId : def CreateKpi(
self, request : monitoring_pb2.KpiDescriptor, grpc_context : grpc.ServicerContext
) -> monitoring_pb2.KpiId:
# CREATEKPI_COUNTER_STARTED.inc() # CREATEKPI_COUNTER_STARTED.inc()
LOGGER.info('CreateKpi') LOGGER.info('CreateKpi')
try: try:
...@@ -71,7 +61,8 @@ class MonitoringServiceServicerImpl(monitoring_pb2_grpc.MonitoringServiceService ...@@ -71,7 +61,8 @@ class MonitoringServiceServicerImpl(monitoring_pb2_grpc.MonitoringServiceService
kpi_endpoint_id = request.endpoint_id.endpoint_uuid.uuid kpi_endpoint_id = request.endpoint_id.endpoint_uuid.uuid
kpi_service_id = request.service_id.service_uuid.uuid kpi_service_id = request.service_id.service_uuid.uuid
data = self.sql_db.insert_KPI(kpi_description, kpi_sample_type, kpi_device_id, kpi_endpoint_id, kpi_service_id) data = self.sql_db.insert_KPI(
kpi_description, kpi_sample_type, kpi_device_id, kpi_endpoint_id, kpi_service_id)
kpi_id.kpi_id.uuid = str(data) kpi_id.kpi_id.uuid = str(data)
...@@ -87,7 +78,9 @@ class MonitoringServiceServicerImpl(monitoring_pb2_grpc.MonitoringServiceService ...@@ -87,7 +78,9 @@ class MonitoringServiceServicerImpl(monitoring_pb2_grpc.MonitoringServiceService
grpc_context.abort(grpc.StatusCode.INTERNAL, str(e)) grpc_context.abort(grpc.StatusCode.INTERNAL, str(e))
# rpc MonitorKpi (MonitorKpiRequest) returns (context.Empty) {} # rpc MonitorKpi (MonitorKpiRequest) returns (context.Empty) {}
def MonitorKpi ( self, request : monitoring_pb2.MonitorKpiRequest, grpc_context : grpc.ServicerContext) -> context_pb2.Empty: def MonitorKpi(
self, request : monitoring_pb2.MonitorKpiRequest, grpc_context : grpc.ServicerContext
) -> context_pb2.Empty:
LOGGER.info('MonitorKpi') LOGGER.info('MonitorKpi')
try: try:
...@@ -97,25 +90,23 @@ class MonitoringServiceServicerImpl(monitoring_pb2_grpc.MonitoringServiceService ...@@ -97,25 +90,23 @@ class MonitoringServiceServicerImpl(monitoring_pb2_grpc.MonitoringServiceService
kpiDescriptor = self.GetKpiDescriptor(request.kpi_id, grpc_context) kpiDescriptor = self.GetKpiDescriptor(request.kpi_id, grpc_context)
monitor_device_request.kpi_descriptor.CopyFrom(kpiDescriptor) monitor_device_request.kpi_descriptor.CopyFrom(kpiDescriptor)
monitor_device_request.kpi_id.kpi_id.uuid = request.kpi_id.kpi_id.uuid monitor_device_request.kpi_id.kpi_id.uuid = request.kpi_id.kpi_id.uuid
monitor_device_request.sampling_duration_s = request.sampling_duration_s monitor_device_request.sampling_duration_s = request.sampling_duration_s
monitor_device_request.sampling_interval_s = request.sampling_interval_s monitor_device_request.sampling_interval_s = request.sampling_interval_s
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) device_client = DeviceClient()
if s.connect_ex((DEVICE_SERVICE_HOST, DEVICE_GRPC_SERVICE_PORT)) == 0: device_client.MonitorDeviceKpi(monitor_device_request)
self.deviceClient.MonitorDeviceKpi(monitor_device_request)
else:
LOGGER.warning('Device service is not reachable')
return context_pb2.Empty()
except ServiceException as e: except ServiceException as e:
LOGGER.exception('MonitorKpi exception') LOGGER.exception('MonitorKpi exception')
# CREATEKPI_COUNTER_FAILED.inc() # CREATEKPI_COUNTER_FAILED.inc()
grpc_context.abort(e.code, e.details) grpc_context.abort(e.code, e.details)
except Exception as e: # pragma: no cover except Exception as e: # pragma: no cover
LOGGER.exception('MonitorKpi exception') LOGGER.exception('MonitorKpi exception')
grpc_context.abort(grpc.StatusCode.INTERNAL, str(e))
# CREATEKPI_COUNTER_FAILED.inc() # CREATEKPI_COUNTER_FAILED.inc()
return context_pb2.Empty()
# rpc IncludeKpi(IncludeKpiRequest) returns(context.Empty) {} # rpc IncludeKpi(IncludeKpiRequest) returns(context.Empty) {}
def IncludeKpi(self, request : monitoring_pb2.Kpi, grpc_context : grpc.ServicerContext) -> context_pb2.Empty: def IncludeKpi(self, request : monitoring_pb2.Kpi, grpc_context : grpc.ServicerContext) -> context_pb2.Empty:
...@@ -145,7 +136,7 @@ class MonitoringServiceServicerImpl(monitoring_pb2_grpc.MonitoringServiceService ...@@ -145,7 +136,7 @@ class MonitoringServiceServicerImpl(monitoring_pb2_grpc.MonitoringServiceService
LOGGER.exception('IncludeKpi exception') LOGGER.exception('IncludeKpi exception')
# CREATEKPI_COUNTER_FAILED.inc() # CREATEKPI_COUNTER_FAILED.inc()
grpc_context.abort(e.code, e.details) grpc_context.abort(e.code, e.details)
except Exception as e: # pragma: no cover except Exception: # pragma: no cover
LOGGER.exception('IncludeKpi exception') LOGGER.exception('IncludeKpi exception')
# CREATEKPI_COUNTER_FAILED.inc() # CREATEKPI_COUNTER_FAILED.inc()
return context_pb2.Empty() return context_pb2.Empty()
...@@ -162,7 +153,9 @@ class MonitoringServiceServicerImpl(monitoring_pb2_grpc.MonitoringServiceService ...@@ -162,7 +153,9 @@ class MonitoringServiceServicerImpl(monitoring_pb2_grpc.MonitoringServiceService
return monitoring_pb2.Kpi() return monitoring_pb2.Kpi()
def GetKpiDescriptor(self, request : monitoring_pb2.KpiId, grpc_context : grpc.ServicerContext) -> monitoring_pb2.KpiDescriptor: def GetKpiDescriptor(
self, request : monitoring_pb2.KpiId, grpc_context : grpc.ServicerContext
) -> monitoring_pb2.KpiDescriptor:
LOGGER.info('getting Kpi by KpiID') LOGGER.info('getting Kpi by KpiID')
try: try:
kpi_db = self.sql_db.get_KPI(int(request.kpi_id.uuid)) kpi_db = self.sql_db.get_KPI(int(request.kpi_id.uuid))
...@@ -183,5 +176,5 @@ class MonitoringServiceServicerImpl(monitoring_pb2_grpc.MonitoringServiceService ...@@ -183,5 +176,5 @@ class MonitoringServiceServicerImpl(monitoring_pb2_grpc.MonitoringServiceService
LOGGER.exception('GetKpiDescriptor exception') LOGGER.exception('GetKpiDescriptor exception')
grpc_context.abort(e.code, e.details) grpc_context.abort(e.code, e.details)
except Exception as e: # pragma: no cover except Exception: # pragma: no cover
LOGGER.exception('GetKpiDescriptor exception') LOGGER.exception('GetKpiDescriptor exception')
...@@ -12,44 +12,27 @@ ...@@ -12,44 +12,27 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import logging, signal, sys, threading, socket import logging, signal, sys, threading
from common.Settings import get_setting, wait_for_environment_variables
from context.client.ContextClient import ContextClient
from monitoring.Config import (
GRPC_SERVICE_PORT, GRPC_MAX_WORKERS, GRPC_GRACE_PERIOD, LOG_LEVEL, METRICS_PORT, CONTEXT_GRPC_SERVICE_PORT,
CONTEXT_SERVICE_HOST)
from monitoring.client.monitoring_client import MonitoringClient
from monitoring.proto import monitoring_pb2
from monitoring.service.EventTools import EventsDeviceCollector
from monitoring.service.MonitoringService import MonitoringService
from prometheus_client import start_http_server from prometheus_client import start_http_server
from common.Constants import ServiceNameEnum
from common.Settings import (
ENVVAR_SUFIX_SERVICE_HOST, ENVVAR_SUFIX_SERVICE_PORT_GRPC, get_env_var_name, get_log_level, get_metrics_port,
wait_for_environment_variables)
from monitoring.proto import monitoring_pb2
from .EventTools import EventsDeviceCollector
from .MonitoringService import MonitoringService
terminate = threading.Event() terminate = threading.Event()
LOGGER = None LOGGER = None
LOCALHOST = '127.0.0.1'
def signal_handler(signal, frame): def signal_handler(signal, frame): # pylint: disable=redefined-outer-name
LOGGER.warning('Terminate signal received') LOGGER.warning('Terminate signal received')
terminate.set() terminate.set()
def start_monitoring(): def start_monitoring():
LOGGER.info('Start Monitoring...',) LOGGER.info('Start Monitoring...',)
grpc_service_port = get_setting('MONITORINGSERVICE_SERVICE_PORT_GRPC', default=GRPC_SERVICE_PORT ) events_collector = EventsDeviceCollector()
context_service_host = get_setting('CONTEXTSERVICE_SERVICE_HOST', default=CONTEXT_SERVICE_HOST )
context_service_port = get_setting('CONTEXTSERVICE_SERVICE_PORT_GRPC', default=CONTEXT_GRPC_SERVICE_PORT)
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
if s.connect_ex((context_service_host, int(context_service_port))) != 0:
LOGGER.info('Context service is not reachable')
return
context_client_grpc = ContextClient(address=context_service_host, port=context_service_port)
monitoring_client = MonitoringClient(server=LOCALHOST, port=grpc_service_port) # instantiate the client
events_collector = EventsDeviceCollector(context_client_grpc, monitoring_client)
events_collector.start() events_collector.start()
# Iterate while terminate is not set # Iterate while terminate is not set
...@@ -64,8 +47,7 @@ def start_monitoring(): ...@@ -64,8 +47,7 @@ def start_monitoring():
monitor_kpi_request.kpi_id.CopyFrom(kpi_id) monitor_kpi_request.kpi_id.CopyFrom(kpi_id)
monitor_kpi_request.sampling_duration_s = 86400 monitor_kpi_request.sampling_duration_s = 86400
monitor_kpi_request.sampling_interval_s = 30 monitor_kpi_request.sampling_interval_s = 30
events_collector._monitoring_client.MonitorKpi(monitor_kpi_request)
monitoring_client.MonitorKpi(monitor_kpi_request)
else: else:
# Terminate is set, looping terminates # Terminate is set, looping terminates
LOGGER.warning("Stopping execution...") LOGGER.warning("Stopping execution...")
...@@ -73,31 +55,28 @@ def start_monitoring(): ...@@ -73,31 +55,28 @@ def start_monitoring():
events_collector.start() events_collector.start()
def main(): def main():
global LOGGER global LOGGER # pylint: disable=global-statement
grpc_service_port = get_setting('MONITORINGSERVICE_SERVICE_PORT_GRPC', default=GRPC_SERVICE_PORT)
max_workers = get_setting('MAX_WORKERS', default=GRPC_MAX_WORKERS )
grace_period = get_setting('GRACE_PERIOD', default=GRPC_GRACE_PERIOD)
log_level = get_setting('LOG_LEVEL', default=LOG_LEVEL )
metrics_port = get_setting('METRICS_PORT', default=METRICS_PORT )
log_level = get_log_level()
logging.basicConfig(level=log_level) logging.basicConfig(level=log_level)
LOGGER = logging.getLogger(__name__) LOGGER = logging.getLogger(__name__)
wait_for_environment_variables([ wait_for_environment_variables([
'CONTEXTSERVICE_SERVICE_HOST', 'CONTEXTSERVICE_SERVICE_PORT_GRPC', get_env_var_name(ServiceNameEnum.CONTEXT, ENVVAR_SUFIX_SERVICE_HOST ),
'DEVICESERVICE_SERVICE_HOST', 'DEVICESERVICE_SERVICE_PORT_GRPC' get_env_var_name(ServiceNameEnum.CONTEXT, ENVVAR_SUFIX_SERVICE_PORT_GRPC),
]) ])
signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler) signal.signal(signal.SIGTERM, signal_handler)
LOGGER.info('Starting...') LOGGER.info('Starting...')
# Start metrics server # Start metrics server
metrics_port = get_metrics_port()
start_http_server(metrics_port) start_http_server(metrics_port)
# Starting monitoring service # Starting monitoring service
grpc_service = MonitoringService(port=grpc_service_port, max_workers=max_workers, grace_period=grace_period) grpc_service = MonitoringService()
grpc_service.start() grpc_service.start()
start_monitoring() start_monitoring()
...@@ -112,4 +91,4 @@ def main(): ...@@ -112,4 +91,4 @@ def main():
return 0 return 0
if __name__ == '__main__': if __name__ == '__main__':
sys.exit(main()) sys.exit(main())
\ No newline at end of file
# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from monitoring.proto import monitoring_pb2
from monitoring.proto.kpi_sample_types_pb2 import KpiSampleType
def kpi():
_kpi = monitoring_pb2.Kpi()
_kpi.kpi_id.kpi_id.uuid = 'KPIID0000' # pylint: disable=maybe-no-member
return _kpi
def kpi_id():
_kpi_id = monitoring_pb2.KpiId()
_kpi_id.kpi_id.uuid = str(1) # pylint: disable=maybe-no-member
return _kpi_id
def create_kpi_request():
_create_kpi_request = monitoring_pb2.KpiDescriptor()
_create_kpi_request.kpi_description = 'KPI Description Test'
_create_kpi_request.kpi_sample_type = KpiSampleType.KPISAMPLETYPE_PACKETS_TRANSMITTED
_create_kpi_request.device_id.device_uuid.uuid = 'DEV1' # pylint: disable=maybe-no-member
_create_kpi_request.service_id.service_uuid.uuid = 'SERV1' # pylint: disable=maybe-no-member
_create_kpi_request.endpoint_id.endpoint_uuid.uuid = 'END1' # pylint: disable=maybe-no-member
return _create_kpi_request
def monitor_kpi_request(kpi_uuid, sampling_duration_s, sampling_interval_s):
_monitor_kpi_request = monitoring_pb2.MonitorKpiRequest()
_monitor_kpi_request.kpi_id.kpi_id.uuid = kpi_uuid # pylint: disable=maybe-no-member
_monitor_kpi_request.sampling_duration_s = sampling_duration_s
_monitor_kpi_request.sampling_interval_s = sampling_interval_s
return _monitor_kpi_request
def include_kpi_request():
_include_kpi_request = monitoring_pb2.Kpi()
_include_kpi_request.kpi_id.kpi_id.uuid = str(1) # pylint: disable=maybe-no-member
_include_kpi_request.timestamp = "2021-10-12T13:14:42Z"
_include_kpi_request.kpi_value.intVal = 500 # pylint: disable=maybe-no-member
return _include_kpi_request
# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from common.tools.object_factory.Device import (
json_device_emulated_connect_rules, json_device_emulated_packet_router_disabled)
from context.proto.kpi_sample_types_pb2 import KpiSampleType
PACKET_PORT_SAMPLE_TYPES = [
KpiSampleType.KPISAMPLETYPE_PACKETS_TRANSMITTED,
KpiSampleType.KPISAMPLETYPE_PACKETS_RECEIVED,
KpiSampleType.KPISAMPLETYPE_BYTES_TRANSMITTED,
KpiSampleType.KPISAMPLETYPE_BYTES_RECEIVED,
]
DEVICE_DEV1_UUID = 'DEV1'
ENDPOINT_END1_UUID = 'END1'
DEVICE_DEV1_ENDPOINT_DEFS = [(ENDPOINT_END1_UUID, 'copper', PACKET_PORT_SAMPLE_TYPES)]
DEVICE_DEV1 = json_device_emulated_packet_router_disabled(DEVICE_DEV1_UUID)
DEVICE_DEV1_CONNECT_RULES = json_device_emulated_connect_rules(DEVICE_DEV1_ENDPOINT_DEFS)
This diff is collapsed.
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment