# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/) # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import logging, signal, sys, threading, socket from common.Settings import get_setting, wait_for_environment_variables from context.client.ContextClient import ContextClient from monitoring.Config import ( GRPC_SERVICE_PORT, GRPC_MAX_WORKERS, GRPC_GRACE_PERIOD, LOG_LEVEL, METRICS_PORT, CONTEXT_GRPC_SERVICE_PORT, CONTEXT_SERVICE_HOST) from monitoring.client.monitoring_client import MonitoringClient from monitoring.proto import monitoring_pb2 from monitoring.service.EventTools import EventsDeviceCollector from monitoring.service.MonitoringService import MonitoringService from prometheus_client import start_http_server terminate = threading.Event() LOGGER = None LOCALHOST = '127.0.0.1' def signal_handler(signal, frame): LOGGER.warning('Terminate signal received') terminate.set() def start_monitoring(): LOGGER.info('Start Monitoring...',) grpc_service_port = get_setting('MONITORINGSERVICE_SERVICE_PORT_GRPC', default=GRPC_SERVICE_PORT ) context_service_host = get_setting('CONTEXTSERVICE_SERVICE_HOST', default=CONTEXT_SERVICE_HOST ) context_service_port = get_setting('CONTEXTSERVICE_SERVICE_PORT_GRPC', default=CONTEXT_GRPC_SERVICE_PORT) s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) if s.connect_ex((context_service_host, int(context_service_port))) != 0: LOGGER.info('Context service is not reachable') return context_client_grpc = ContextClient(address=context_service_host, port=context_service_port) monitoring_client = MonitoringClient(server=LOCALHOST, port=grpc_service_port) # instantiate the client events_collector = EventsDeviceCollector(context_client_grpc, monitoring_client) events_collector.start() # Iterate while terminate is not set while not terminate.is_set(): list_new_kpi_ids = events_collector.listen_events() # Monitor Kpis if bool(list_new_kpi_ids): for kpi_id in list_new_kpi_ids: # Create Monitor Kpi Requests monitor_kpi_request = monitoring_pb2.MonitorKpiRequest() monitor_kpi_request.kpi_id.CopyFrom(kpi_id) monitor_kpi_request.sampling_duration_s = 300 monitor_kpi_request.sampling_interval_s = 15 monitoring_client.MonitorKpi(monitor_kpi_request) else: # Terminate is set, looping terminates LOGGER.warning("Stopping execution...") events_collector.start() def main(): global LOGGER grpc_service_port = get_setting('MONITORINGSERVICE_SERVICE_PORT_GRPC', default=GRPC_SERVICE_PORT) max_workers = get_setting('MAX_WORKERS', default=GRPC_MAX_WORKERS ) grace_period = get_setting('GRACE_PERIOD', default=GRPC_GRACE_PERIOD) log_level = get_setting('LOG_LEVEL', default=LOG_LEVEL ) metrics_port = get_setting('METRICS_PORT', default=METRICS_PORT ) logging.basicConfig(level=log_level) LOGGER = logging.getLogger(__name__) wait_for_environment_variables([ 'CONTEXTSERVICE_SERVICE_HOST', 'CONTEXTSERVICE_SERVICE_PORT_GRPC', 'DEVICESERVICE_SERVICE_HOST', 'DEVICESERVICE_SERVICE_PORT_GRPC' ]) signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGTERM, signal_handler) LOGGER.info('Starting...') # Start metrics server start_http_server(metrics_port) # Starting monitoring service grpc_service = MonitoringService(port=grpc_service_port, max_workers=max_workers, grace_period=grace_period) grpc_service.start() start_monitoring() # Wait for Ctrl+C or termination signal while not terminate.wait(timeout=0.1): pass LOGGER.info('Terminating...') grpc_service.stop() LOGGER.info('Bye') return 0 if __name__ == '__main__': sys.exit(main())