Newer
Older
# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

Lluis Gifre Renom
committed
from prometheus_client import start_http_server
from common.Settings import get_setting, wait_for_environment_variables
from context.client.ContextClient import ContextClient
from device.Config import (
CONTEXT_SERVICE_HOST, CONTEXT_SERVICE_PORT, GRPC_SERVICE_PORT, GRPC_MAX_WORKERS, GRPC_GRACE_PERIOD, LOG_LEVEL,
METRICS_PORT, MONITORING_SERVICE_HOST, MONITORING_SERVICE_PORT)
from monitoring.client.monitoring_client import MonitoringClient
from .DeviceService import DeviceService
from .driver_api.DriverFactory import DriverFactory
from .driver_api.DriverInstanceCache import DriverInstanceCache
from .drivers import DRIVERS

Lluis Gifre Renom
committed
terminate = threading.Event()
LOGGER : logging.Logger = None

Lluis Gifre Renom
committed
def signal_handler(signal, frame): # pylint: disable=redefined-outer-name
LOGGER.warning('Terminate signal received')

Lluis Gifre Renom
committed
terminate.set()
def main():
global LOGGER # pylint: disable=global-statement

Lluis Gifre Renom
committed
grpc_service_port = get_setting('DEVICESERVICE_SERVICE_PORT_GRPC', default=GRPC_SERVICE_PORT )
max_workers = get_setting('MAX_WORKERS', default=GRPC_MAX_WORKERS )
grace_period = get_setting('GRACE_PERIOD', default=GRPC_GRACE_PERIOD )
log_level = get_setting('LOG_LEVEL', default=LOG_LEVEL )
metrics_port = get_setting('METRICS_PORT', default=METRICS_PORT )
logging.basicConfig(level=log_level)
logging.getLogger('apscheduler.executors.default').setLevel(logging.WARNING)
logging.getLogger('apscheduler.scheduler').setLevel(logging.WARNING)
logging.getLogger('monitoring-client').setLevel(logging.WARNING)
LOGGER = logging.getLogger(__name__)
wait_for_environment_variables([
'CONTEXTSERVICE_SERVICE_HOST', 'CONTEXTSERVICE_SERVICE_PORT_GRPC',
'MONITORINGSERVICE_SERVICE_HOST', 'MONITORINGSERVICE_SERVICE_PORT_GRPC'
])
context_service_host = get_setting('CONTEXTSERVICE_SERVICE_HOST', default=CONTEXT_SERVICE_HOST )
context_service_port = get_setting('CONTEXTSERVICE_SERVICE_PORT_GRPC', default=CONTEXT_SERVICE_PORT )
monitoring_service_host = get_setting('MONITORINGSERVICE_SERVICE_HOST', default=MONITORING_SERVICE_HOST)
monitoring_service_port = get_setting('MONITORINGSERVICE_SERVICE_PORT_GRPC', default=MONITORING_SERVICE_PORT)

Lluis Gifre Renom
committed
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
LOGGER.info('Starting...')

Lluis Gifre Renom
committed
# Start metrics server
start_http_server(metrics_port)
# Initialize Context Client
if context_service_host is None or context_service_port is None:
raise Exception('Wrong address({:s}):port({:s}) of Context component'.format(
str(context_service_host), str(context_service_port)))
context_client = ContextClient(context_service_host, context_service_port)
# Initialize Monitoring Client
if monitoring_service_host is None or monitoring_service_port is None:
raise Exception('Wrong address({:s}):port({:s}) of Monitoring component'.format(
str(monitoring_service_host), str(monitoring_service_port)))
monitoring_client = MonitoringClient(monitoring_service_host, monitoring_service_port)
# Initialize Driver framework
driver_factory = DriverFactory(DRIVERS)
driver_instance_cache = DriverInstanceCache(driver_factory)

Lluis Gifre Renom
committed
# Starting device service
grpc_service = DeviceService(
context_client, monitoring_client, driver_instance_cache, port=grpc_service_port, max_workers=max_workers,
grace_period=grace_period)
grpc_service.start()

Lluis Gifre Renom
committed
# Wait for Ctrl+C or termination signal
while not terminate.wait(timeout=0.1): pass

Lluis Gifre Renom
committed
LOGGER.info('Terminating...')
grpc_service.stop()
driver_instance_cache.terminate()

Lluis Gifre Renom
committed
LOGGER.info('Bye')
return 0

Lluis Gifre Renom
committed
if __name__ == '__main__':
sys.exit(main())