diff --git a/src/context/Config.py b/src/context/Config.py index 328610fc81561f60d97b8ef3080930c4affce20e..6f5d1dc0b347dc5db27a2cfae973a4e5bdf7b4cc 100644 --- a/src/context/Config.py +++ b/src/context/Config.py @@ -12,22 +12,5 @@ # See the License for the specific language governing permissions and # limitations under the License. -import logging - -# General settings -LOG_LEVEL = logging.INFO - -# gRPC settings -GRPC_SERVICE_PORT = 1010 -GRPC_MAX_WORKERS = 200 # multiple clients might keep connections alive for Get*Events() RPC methods -GRPC_GRACE_PERIOD = 60 - -# REST-API settings -RESTAPI_SERVICE_PORT = 8080 -RESTAPI_BASE_URL = '/api' - -# Prometheus settings -METRICS_PORT = 9192 - # Autopopulate the component with fake data for testing purposes? POPULATE_FAKE_DATA = False diff --git a/src/context/client/ContextClient.py b/src/context/client/ContextClient.py index 3206e4a366ef2f6cb7d3aeb366b287572b8d49da..34214fac00d03f5d7595bf118b35026642ba9426 100644 --- a/src/context/client/ContextClient.py +++ b/src/context/client/ContextClient.py @@ -12,8 +12,10 @@ # See the License for the specific language governing permissions and # limitations under the License. -from typing import Iterator import grpc, logging +from typing import Iterator +from common.Constants import ServiceNameEnum +from common.Settings import get_service_host, get_service_port_grpc from common.tools.client.RetryDecorator import retry, delay_exponential from common.tools.grpc.Tools import grpc_message_to_json_string from context.proto.context_pb2 import ( @@ -29,9 +31,11 @@ DELAY_FUNCTION = delay_exponential(initial=0.01, increment=2.0, maximum=5.0) RETRY_DECORATOR = retry(max_retries=MAX_RETRIES, delay_function=DELAY_FUNCTION, prepare_method_name='connect') class ContextClient: - def __init__(self, address, port): - self.endpoint = '{:s}:{:s}'.format(str(address), str(port)) - LOGGER.debug('Creating channel to {:s}...'.format(self.endpoint)) + def __init__(self, host=None, port=None): + if not host: host = get_service_host(ServiceNameEnum.CONTEXT) + if not port: port = get_service_port_grpc(ServiceNameEnum.CONTEXT) + self.endpoint = '{:s}:{:s}'.format(str(host), str(port)) + LOGGER.debug('Creating channel to {:s}...'.format(str(self.endpoint))) self.channel = None self.stub = None self.connect() diff --git a/src/context/service/Populate.py b/src/context/service/Populate.py index f4630182d864a891095caa6689dde9f656ea1c0e..ace630900d82fc36a82d290f12e466018dc82587 100644 --- a/src/context/service/Populate.py +++ b/src/context/service/Populate.py @@ -20,8 +20,8 @@ from context.tests.Objects import ( LINK_R1_R2, LINK_R1_R2_ID, LINK_R1_R3, LINK_R1_R3_ID, LINK_R2_R3, LINK_R2_R3_ID, SERVICE_R1_R2, SERVICE_R1_R3, SERVICE_R2_R3) -def populate(address, port): - client = ContextClient(address=address, port=port) +def populate(host=None, port=None): + client = ContextClient(host=host, port=port) client.SetContext(Context(**CONTEXT)) client.SetTopology(Topology(**TOPOLOGY)) diff --git a/src/context/service/__main__.py b/src/context/service/__main__.py index 180a1f44cb6a37b487e6bce0a13706952ff73bc2..53754caf4f9d2621ed8a6fdfd325d42f77f44a4f 100644 --- a/src/context/service/__main__.py +++ b/src/context/service/__main__.py @@ -14,17 +14,15 @@ import logging, signal, sys, threading from prometheus_client import start_http_server -from common.Settings import get_setting +from common.Settings import get_log_level, get_metrics_port, get_setting from common.orm.Database import Database from common.orm.Factory import get_database_backend from common.message_broker.Factory import get_messagebroker_backend from common.message_broker.MessageBroker import MessageBroker -from context.Config import ( - GRPC_SERVICE_PORT, GRPC_MAX_WORKERS, GRPC_GRACE_PERIOD, LOG_LEVEL, POPULATE_FAKE_DATA, RESTAPI_SERVICE_PORT, - RESTAPI_BASE_URL, METRICS_PORT) +from context.Config import POPULATE_FAKE_DATA from .grpc_server.ContextService import ContextService from .rest_server.Resources import RESOURCES -from .rest_server.Server import Server +from .rest_server.RestServer import RestServer from .Populate import populate terminate = threading.Event() @@ -37,16 +35,7 @@ def signal_handler(signal, frame): # pylint: disable=redefined-outer-name def main(): global LOGGER # pylint: disable=global-statement - grpc_service_port = get_setting('CONTEXTSERVICE_SERVICE_PORT_GRPC', default=GRPC_SERVICE_PORT ) - max_workers = get_setting('MAX_WORKERS', default=GRPC_MAX_WORKERS ) - grace_period = get_setting('GRACE_PERIOD', default=GRPC_GRACE_PERIOD ) - log_level = get_setting('LOG_LEVEL', default=LOG_LEVEL ) - restapi_service_port = get_setting('CONTEXTSERVICE_SERVICE_PORT_HTTP', default=RESTAPI_SERVICE_PORT) - restapi_base_url = get_setting('RESTAPI_BASE_URL', default=RESTAPI_BASE_URL ) - metrics_port = get_setting('METRICS_PORT', default=METRICS_PORT ) - populate_fake_data = get_setting('POPULATE_FAKE_DATA', default=POPULATE_FAKE_DATA ) - if isinstance(populate_fake_data, str): populate_fake_data = (populate_fake_data.upper() in {'T', '1', 'TRUE'}) - + log_level = get_log_level() logging.basicConfig(level=log_level) LOGGER = logging.getLogger(__name__) @@ -56,6 +45,7 @@ def main(): LOGGER.info('Starting...') # Start metrics server + metrics_port = get_metrics_port() start_http_server(metrics_port) # Get database instance @@ -65,18 +55,19 @@ def main(): messagebroker = MessageBroker(get_messagebroker_backend()) # Starting context service - grpc_service = ContextService( - database, messagebroker, port=grpc_service_port, max_workers=max_workers, grace_period=grace_period) + grpc_service = ContextService(database, messagebroker) grpc_service.start() - rest_server = Server(port=restapi_service_port, base_url=restapi_base_url) + rest_server = RestServer() for endpoint_name, resource_class, resource_url in RESOURCES: rest_server.add_resource(resource_class, resource_url, endpoint=endpoint_name, resource_class_args=(database,)) rest_server.start() + populate_fake_data = get_setting('POPULATE_FAKE_DATA', default=POPULATE_FAKE_DATA) + if isinstance(populate_fake_data, str): populate_fake_data = (populate_fake_data.upper() in {'T', '1', 'TRUE'}) if populate_fake_data: LOGGER.info('Populating fake data...') - populate('127.0.0.1', grpc_service_port) + populate(host='127.0.0.1', port=grpc_service.bind_port) LOGGER.info('Fake Data populated') # Wait for Ctrl+C or termination signal diff --git a/src/context/service/grpc_server/ContextService.py b/src/context/service/grpc_server/ContextService.py index 87ca94a70aa2e1733b8ec443c70a4623d2e0c471..c338b0f0d499e3c9a2a32a8ca77e386333af0456 100644 --- a/src/context/service/grpc_server/ContextService.py +++ b/src/context/service/grpc_server/ContextService.py @@ -12,61 +12,22 @@ # See the License for the specific language governing permissions and # limitations under the License. -import grpc -import logging -from concurrent import futures -from grpc_health.v1.health import HealthServicer, OVERALL_HEALTH -from grpc_health.v1.health_pb2 import HealthCheckResponse -from grpc_health.v1.health_pb2_grpc import add_HealthServicer_to_server -from context.Config import GRPC_SERVICE_PORT, GRPC_MAX_WORKERS, GRPC_GRACE_PERIOD +from common.Constants import ServiceNameEnum +from common.Settings import get_service_port_grpc +from common.message_broker.MessageBroker import MessageBroker +from common.orm.Database import Database +from common.tools.service.GenericGrpcService import GenericGrpcService from context.proto.context_pb2_grpc import add_ContextServiceServicer_to_server from .ContextServiceServicerImpl import ContextServiceServicerImpl -BIND_ADDRESS = '0.0.0.0' -LOGGER = logging.getLogger(__name__) +# Custom gRPC settings +GRPC_MAX_WORKERS = 200 # multiple clients might keep connections alive for Get*Events() RPC methods -class ContextService: - def __init__( - self, database, messagebroker, address=BIND_ADDRESS, port=GRPC_SERVICE_PORT, max_workers=GRPC_MAX_WORKERS, - grace_period=GRPC_GRACE_PERIOD): +class ContextService(GenericGrpcService): + def __init__(self, database : Database, messagebroker : MessageBroker, cls_name: str = __name__) -> None: + port = get_service_port_grpc(ServiceNameEnum.CONTEXT) + super().__init__(port, max_workers=GRPC_MAX_WORKERS, cls_name=cls_name) + self.context_servicer = ContextServiceServicerImpl(database, messagebroker) - self.database = database - self.messagebroker = messagebroker - self.address = address - self.port = port - self.endpoint = None - self.max_workers = max_workers - self.grace_period = grace_period - self.context_servicer = None - self.health_servicer = None - self.pool = None - self.server = None - - def start(self): - self.endpoint = '{:s}:{:s}'.format(str(self.address), str(self.port)) - LOGGER.info('Starting Service (tentative endpoint: {:s}, max_workers: {:s})...'.format( - str(self.endpoint), str(self.max_workers))) - - self.pool = futures.ThreadPoolExecutor(max_workers=self.max_workers) - self.server = grpc.server(self.pool) # , interceptors=(tracer_interceptor,)) - - self.context_servicer = ContextServiceServicerImpl(self.database, self.messagebroker) + def install_servicers(self): add_ContextServiceServicer_to_server(self.context_servicer, self.server) - - self.health_servicer = HealthServicer( - experimental_non_blocking=True, experimental_thread_pool=futures.ThreadPoolExecutor(max_workers=1)) - add_HealthServicer_to_server(self.health_servicer, self.server) - - port = self.server.add_insecure_port(self.endpoint) - self.endpoint = '{:s}:{:s}'.format(str(self.address), str(port)) - LOGGER.info('Listening on {:s}...'.format(str(self.endpoint))) - self.server.start() - self.health_servicer.set(OVERALL_HEALTH, HealthCheckResponse.SERVING) # pylint: disable=maybe-no-member - - LOGGER.debug('Service started') - - def stop(self): - LOGGER.debug('Stopping service (grace period {:s} seconds)...'.format(str(self.grace_period))) - self.health_servicer.enter_graceful_shutdown() - self.server.stop(self.grace_period) - LOGGER.debug('Service stopped') diff --git a/src/context/service/rest_server/RestServer.py b/src/context/service/rest_server/RestServer.py new file mode 100644 index 0000000000000000000000000000000000000000..289e92a3c1b74e207a261b133130a551c3c55918 --- /dev/null +++ b/src/context/service/rest_server/RestServer.py @@ -0,0 +1,23 @@ +# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from common.Constants import ServiceNameEnum +from common.Settings import get_service_baseurl_http, get_service_port_http +from common.tools.service.GenericRestServer import GenericRestServer + +class RestServer(GenericRestServer): + def __init__(self, cls_name: str = __name__) -> None: + bind_port = get_service_port_http(ServiceNameEnum.CONTEXT) + base_url = get_service_baseurl_http(ServiceNameEnum.CONTEXT) + super().__init__(bind_port, base_url, cls_name=cls_name) diff --git a/src/context/service/rest_server/Server.py b/src/context/service/rest_server/Server.py deleted file mode 100644 index ac4888d41bd9a84c57fc2d2f308bde4558787cbc..0000000000000000000000000000000000000000 --- a/src/context/service/rest_server/Server.py +++ /dev/null @@ -1,50 +0,0 @@ -# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/) -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import logging, threading -from flask import Flask -from flask_restful import Api -from werkzeug.serving import make_server -from context.Config import RESTAPI_BASE_URL, RESTAPI_SERVICE_PORT - -logging.getLogger('werkzeug').setLevel(logging.WARNING) - -BIND_ADDRESS = '0.0.0.0' -LOGGER = logging.getLogger(__name__) - -class Server(threading.Thread): - def __init__(self, host=BIND_ADDRESS, port=RESTAPI_SERVICE_PORT, base_url=RESTAPI_BASE_URL): - threading.Thread.__init__(self, daemon=True) - self.host = host - self.port = port - self.base_url = base_url - self.srv = None - self.ctx = None - self.app = Flask(__name__) - self.api = Api(self.app, prefix=self.base_url) - - def add_resource(self, resource, *urls, **kwargs): - self.api.add_resource(resource, *urls, **kwargs) - - def run(self): - self.srv = make_server(self.host, self.port, self.app, threaded=True) - self.ctx = self.app.app_context() - self.ctx.push() - - endpoint = 'http://{:s}:{:s}{:s}'.format(str(self.host), str(self.port), str(self.base_url)) - LOGGER.info('Listening on {:s}...'.format(str(endpoint))) - self.srv.serve_forever() - - def shutdown(self): - self.srv.shutdown() diff --git a/src/context/tests/test_unitary.py b/src/context/tests/test_unitary.py index 10f44d9ad87a71d5935151f4ae724e9d04b5d5ce..870067d1e662baf1cd9e537a7c7a58beb62fdd08 100644 --- a/src/context/tests/test_unitary.py +++ b/src/context/tests/test_unitary.py @@ -15,7 +15,8 @@ # pylint: disable=too-many-lines import copy, grpc, logging, os, pytest, requests, time, urllib from typing import Tuple -from common.Constants import DEFAULT_CONTEXT_UUID, DEFAULT_TOPOLOGY_UUID +from common.Constants import DEFAULT_CONTEXT_UUID, DEFAULT_TOPOLOGY_UUID, ServiceNameEnum +from common.Settings import ENVVAR_SUFIX_SERVICE_HOST, ENVVAR_SUFIX_SERVICE_PORT_GRPC, ENVVAR_SUFIX_SERVICE_PORT_HTTP, get_env_var_name, get_service_baseurl_http, get_service_port_grpc, get_service_port_http from common.orm.Database import Database from common.orm.Factory import get_database_backend, BackendEnum as DatabaseBackendEnum from common.message_broker.Factory import get_messagebroker_backend, BackendEnum as MessageBrokerBackendEnum @@ -25,8 +26,6 @@ from common.type_checkers.Assertions import ( validate_contexts, validate_device, validate_device_ids, validate_devices, validate_link, validate_link_ids, validate_links, validate_service, validate_service_ids, validate_services, validate_topologies, validate_topology, validate_topology_ids) -from context.Config import ( - GRPC_SERVICE_PORT, GRPC_MAX_WORKERS, GRPC_GRACE_PERIOD, RESTAPI_SERVICE_PORT, RESTAPI_BASE_URL) from context.client.ContextClient import ContextClient from context.client.EventsCollector import EventsCollector from context.proto.context_pb2 import ( @@ -37,7 +36,7 @@ from context.service.database.Tools import ( FASTHASHER_DATA_ACCEPTED_FORMAT, FASTHASHER_ITEM_ACCEPTED_FORMAT, fast_hasher) from context.service.grpc_server.ContextService import ContextService from context.service.Populate import populate -from context.service.rest_server.Server import Server as RestServer +from context.service.rest_server.RestServer import RestServer from context.service.rest_server.Resources import RESOURCES from .Objects import ( CONNECTION_R1_R3, CONNECTION_R1_R3_ID, CONNECTION_R1_R3_UUID, CONTEXT, CONTEXT_ID, DEVICE_R1, DEVICE_R1_ID, @@ -48,10 +47,15 @@ from .Objects import ( LOGGER = logging.getLogger(__name__) LOGGER.setLevel(logging.DEBUG) -GRPC_PORT = 10000 + GRPC_SERVICE_PORT # avoid privileged ports -RESTAPI_PORT = 10000 + RESTAPI_SERVICE_PORT # avoid privileged ports +LOCAL_HOST = '127.0.0.1' +GRPC_PORT = 10000 + get_service_port_grpc(ServiceNameEnum.CONTEXT) # avoid privileged ports +HTTP_PORT = 10000 + get_service_port_http(ServiceNameEnum.CONTEXT) # avoid privileged ports -DEFAULT_REDIS_SERVICE_HOST = '127.0.0.1' +os.environ[get_env_var_name(ServiceNameEnum.CONTEXT.value, ENVVAR_SUFIX_SERVICE_HOST )] = str(LOCAL_HOST) +os.environ[get_env_var_name(ServiceNameEnum.CONTEXT.value, ENVVAR_SUFIX_SERVICE_PORT_GRPC)] = str(GRPC_PORT) +os.environ[get_env_var_name(ServiceNameEnum.CONTEXT.value, ENVVAR_SUFIX_SERVICE_PORT_HTTP)] = str(HTTP_PORT) + +DEFAULT_REDIS_SERVICE_HOST = LOCAL_HOST DEFAULT_REDIS_SERVICE_PORT = 6379 DEFAULT_REDIS_DATABASE_ID = 0 @@ -78,9 +82,7 @@ def context_db_mb(request) -> Tuple[Database, MessageBroker]: @pytest.fixture(scope='session') def context_service_grpc(context_db_mb : Tuple[Database, MessageBroker]): # pylint: disable=redefined-outer-name - _service = ContextService( - context_db_mb[0], context_db_mb[1], port=GRPC_PORT, max_workers=GRPC_MAX_WORKERS, - grace_period=GRPC_GRACE_PERIOD) + _service = ContextService(context_db_mb[0], context_db_mb[1]) _service.start() yield _service _service.stop() @@ -88,7 +90,7 @@ def context_service_grpc(context_db_mb : Tuple[Database, MessageBroker]): # pyli @pytest.fixture(scope='session') def context_service_rest(context_db_mb : Tuple[Database, MessageBroker]): # pylint: disable=redefined-outer-name database = context_db_mb[0] - _rest_server = RestServer(port=RESTAPI_PORT, base_url=RESTAPI_BASE_URL) + _rest_server = RestServer() for endpoint_name, resource_class, resource_url in RESOURCES: _rest_server.add_resource(resource_class, resource_url, endpoint=endpoint_name, resource_class_args=(database,)) _rest_server.start() @@ -99,12 +101,13 @@ def context_service_rest(context_db_mb : Tuple[Database, MessageBroker]): # pyli @pytest.fixture(scope='session') def context_client_grpc(context_service_grpc : ContextService): # pylint: disable=redefined-outer-name - _client = ContextClient(address='127.0.0.1', port=GRPC_PORT) + _client = ContextClient() yield _client _client.close() def do_rest_request(url : str): - request_url = 'http://127.0.0.1:{:s}{:s}{:s}'.format(str(RESTAPI_PORT), str(RESTAPI_BASE_URL), url) + base_url = get_service_baseurl_http(ServiceNameEnum.CONTEXT) + request_url = 'http://{:s}:{:s}{:s}{:s}'.format(str(LOCAL_HOST), str(HTTP_PORT), str(base_url), url) LOGGER.warning('Request: GET {:s}'.format(str(request_url))) reply = requests.get(request_url) LOGGER.warning('Reply: {:s}'.format(str(reply.text))) @@ -1172,7 +1175,7 @@ def test_rest_populate_database( ): database = context_db_mb[0] database.clear_all() - populate('127.0.0.1', GRPC_PORT) + populate(LOCAL_HOST, GRPC_PORT) def test_rest_get_context_ids(context_service_rest : RestServer): # pylint: disable=redefined-outer-name reply = do_rest_request('/context_ids')