Newer
Older
# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging, signal, sys, threading, socket
from common.Settings import get_setting, wait_for_environment_variables
from context.client.ContextClient import ContextClient
from monitoring.Config import (
GRPC_SERVICE_PORT, GRPC_MAX_WORKERS, GRPC_GRACE_PERIOD, LOG_LEVEL, METRICS_PORT, CONTEXT_GRPC_SERVICE_PORT,
CONTEXT_SERVICE_HOST)
from monitoring.client.monitoring_client import MonitoringClient
from monitoring.proto import monitoring_pb2
from monitoring.service.EventTools import EventsDeviceCollector
from monitoring.service.MonitoringService import MonitoringService
from prometheus_client import start_http_server
terminate = threading.Event()
LOGGER = None
LOCALHOST = '127.0.0.1'
LOGGER.info('Start Monitoring...',)
grpc_service_port = get_setting('MONITORINGSERVICE_SERVICE_PORT_GRPC', default=GRPC_SERVICE_PORT )
context_service_host = get_setting('CONTEXTSERVICE_SERVICE_HOST', default=CONTEXT_SERVICE_HOST )
context_service_port = get_setting('CONTEXTSERVICE_SERVICE_PORT_GRPC', default=CONTEXT_GRPC_SERVICE_PORT)
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
if s.connect_ex((context_service_host, int(context_service_port))) != 0:
LOGGER.info('Context service is not reachable')
return
context_client_grpc = ContextClient(address=context_service_host, port=context_service_port)
monitoring_client = MonitoringClient(server=LOCALHOST, port=grpc_service_port) # instantiate the client
events_collector = EventsDeviceCollector(context_client_grpc, monitoring_client)
events_collector.start()
# Iterate while terminate is not set
while not terminate.is_set():
list_new_kpi_ids = events_collector.listen_events()
# Monitor Kpis
if bool(list_new_kpi_ids):
for kpi_id in list_new_kpi_ids:
# Create Monitor Kpi Requests
monitor_kpi_request = monitoring_pb2.MonitorKpiRequest()
monitor_kpi_request.kpi_id.CopyFrom(kpi_id)
monitor_kpi_request.sampling_duration_s = 300
monitor_kpi_request.sampling_interval_s = 15
monitoring_client.MonitorKpi(monitor_kpi_request)
else:
# Terminate is set, looping terminates
LOGGER.warning("Stopping execution...")
events_collector.start()
grpc_service_port = get_setting('MONITORINGSERVICE_SERVICE_PORT_GRPC', default=GRPC_SERVICE_PORT)
max_workers = get_setting('MAX_WORKERS', default=GRPC_MAX_WORKERS )
grace_period = get_setting('GRACE_PERIOD', default=GRPC_GRACE_PERIOD)
log_level = get_setting('LOG_LEVEL', default=LOG_LEVEL )
metrics_port = get_setting('METRICS_PORT', default=METRICS_PORT )
LOGGER = logging.getLogger(__name__)
wait_for_environment_variables([
'CONTEXTSERVICE_SERVICE_HOST', 'CONTEXTSERVICE_SERVICE_PORT_GRPC',
'DEVICESERVICE_SERVICE_HOST', 'DEVICESERVICE_SERVICE_PORT_GRPC'
])
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
LOGGER.info('Starting...')
# Start metrics server
start_http_server(metrics_port)
# Starting monitoring service
grpc_service = MonitoringService(port=grpc_service_port, max_workers=max_workers, grace_period=grace_period)
# Wait for Ctrl+C or termination signal
while not terminate.wait(timeout=0.1): pass
LOGGER.info('Terminating...')
return 0
if __name__ == '__main__':
sys.exit(main())