Skip to content
Snippets Groups Projects
Commit 11a69c44 authored by Lluis Gifre Renom's avatar Lluis Gifre Renom
Browse files

Context component:

- Migrated to use new generic gRPC servicer
- Migrated to use new generic Rest servicer
- Migrated to use new settings framework
parent 5dc77fe4
No related branches found
No related tags found
1 merge request!54Release 2.0.0
......@@ -12,22 +12,5 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
# General settings
LOG_LEVEL = logging.INFO
# gRPC settings
GRPC_SERVICE_PORT = 1010
GRPC_MAX_WORKERS = 200 # multiple clients might keep connections alive for Get*Events() RPC methods
GRPC_GRACE_PERIOD = 60
# REST-API settings
RESTAPI_SERVICE_PORT = 8080
RESTAPI_BASE_URL = '/api'
# Prometheus settings
METRICS_PORT = 9192
# Autopopulate the component with fake data for testing purposes?
POPULATE_FAKE_DATA = False
......@@ -12,8 +12,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Iterator
import grpc, logging
from typing import Iterator
from common.Constants import ServiceNameEnum
from common.Settings import get_service_host, get_service_port_grpc
from common.tools.client.RetryDecorator import retry, delay_exponential
from common.tools.grpc.Tools import grpc_message_to_json_string
from context.proto.context_pb2 import (
......@@ -29,9 +31,11 @@ DELAY_FUNCTION = delay_exponential(initial=0.01, increment=2.0, maximum=5.0)
RETRY_DECORATOR = retry(max_retries=MAX_RETRIES, delay_function=DELAY_FUNCTION, prepare_method_name='connect')
class ContextClient:
def __init__(self, address, port):
self.endpoint = '{:s}:{:s}'.format(str(address), str(port))
LOGGER.debug('Creating channel to {:s}...'.format(self.endpoint))
def __init__(self, host=None, port=None):
if not host: host = get_service_host(ServiceNameEnum.CONTEXT)
if not port: port = get_service_port_grpc(ServiceNameEnum.CONTEXT)
self.endpoint = '{:s}:{:s}'.format(str(host), str(port))
LOGGER.debug('Creating channel to {:s}...'.format(str(self.endpoint)))
self.channel = None
self.stub = None
self.connect()
......
......@@ -20,8 +20,8 @@ from context.tests.Objects import (
LINK_R1_R2, LINK_R1_R2_ID, LINK_R1_R3, LINK_R1_R3_ID, LINK_R2_R3, LINK_R2_R3_ID, SERVICE_R1_R2, SERVICE_R1_R3,
SERVICE_R2_R3)
def populate(address, port):
client = ContextClient(address=address, port=port)
def populate(host=None, port=None):
client = ContextClient(host=host, port=port)
client.SetContext(Context(**CONTEXT))
client.SetTopology(Topology(**TOPOLOGY))
......
......@@ -14,17 +14,15 @@
import logging, signal, sys, threading
from prometheus_client import start_http_server
from common.Settings import get_setting
from common.Settings import get_log_level, get_metrics_port, get_setting
from common.orm.Database import Database
from common.orm.Factory import get_database_backend
from common.message_broker.Factory import get_messagebroker_backend
from common.message_broker.MessageBroker import MessageBroker
from context.Config import (
GRPC_SERVICE_PORT, GRPC_MAX_WORKERS, GRPC_GRACE_PERIOD, LOG_LEVEL, POPULATE_FAKE_DATA, RESTAPI_SERVICE_PORT,
RESTAPI_BASE_URL, METRICS_PORT)
from context.Config import POPULATE_FAKE_DATA
from .grpc_server.ContextService import ContextService
from .rest_server.Resources import RESOURCES
from .rest_server.Server import Server
from .rest_server.RestServer import RestServer
from .Populate import populate
terminate = threading.Event()
......@@ -37,16 +35,7 @@ def signal_handler(signal, frame): # pylint: disable=redefined-outer-name
def main():
global LOGGER # pylint: disable=global-statement
grpc_service_port = get_setting('CONTEXTSERVICE_SERVICE_PORT_GRPC', default=GRPC_SERVICE_PORT )
max_workers = get_setting('MAX_WORKERS', default=GRPC_MAX_WORKERS )
grace_period = get_setting('GRACE_PERIOD', default=GRPC_GRACE_PERIOD )
log_level = get_setting('LOG_LEVEL', default=LOG_LEVEL )
restapi_service_port = get_setting('CONTEXTSERVICE_SERVICE_PORT_HTTP', default=RESTAPI_SERVICE_PORT)
restapi_base_url = get_setting('RESTAPI_BASE_URL', default=RESTAPI_BASE_URL )
metrics_port = get_setting('METRICS_PORT', default=METRICS_PORT )
populate_fake_data = get_setting('POPULATE_FAKE_DATA', default=POPULATE_FAKE_DATA )
if isinstance(populate_fake_data, str): populate_fake_data = (populate_fake_data.upper() in {'T', '1', 'TRUE'})
log_level = get_log_level()
logging.basicConfig(level=log_level)
LOGGER = logging.getLogger(__name__)
......@@ -56,6 +45,7 @@ def main():
LOGGER.info('Starting...')
# Start metrics server
metrics_port = get_metrics_port()
start_http_server(metrics_port)
# Get database instance
......@@ -65,18 +55,19 @@ def main():
messagebroker = MessageBroker(get_messagebroker_backend())
# Starting context service
grpc_service = ContextService(
database, messagebroker, port=grpc_service_port, max_workers=max_workers, grace_period=grace_period)
grpc_service = ContextService(database, messagebroker)
grpc_service.start()
rest_server = Server(port=restapi_service_port, base_url=restapi_base_url)
rest_server = RestServer()
for endpoint_name, resource_class, resource_url in RESOURCES:
rest_server.add_resource(resource_class, resource_url, endpoint=endpoint_name, resource_class_args=(database,))
rest_server.start()
populate_fake_data = get_setting('POPULATE_FAKE_DATA', default=POPULATE_FAKE_DATA)
if isinstance(populate_fake_data, str): populate_fake_data = (populate_fake_data.upper() in {'T', '1', 'TRUE'})
if populate_fake_data:
LOGGER.info('Populating fake data...')
populate('127.0.0.1', grpc_service_port)
populate(host='127.0.0.1', port=grpc_service.bind_port)
LOGGER.info('Fake Data populated')
# Wait for Ctrl+C or termination signal
......
......@@ -12,61 +12,22 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import grpc
import logging
from concurrent import futures
from grpc_health.v1.health import HealthServicer, OVERALL_HEALTH
from grpc_health.v1.health_pb2 import HealthCheckResponse
from grpc_health.v1.health_pb2_grpc import add_HealthServicer_to_server
from context.Config import GRPC_SERVICE_PORT, GRPC_MAX_WORKERS, GRPC_GRACE_PERIOD
from common.Constants import ServiceNameEnum
from common.Settings import get_service_port_grpc
from common.message_broker.MessageBroker import MessageBroker
from common.orm.Database import Database
from common.tools.service.GenericGrpcService import GenericGrpcService
from context.proto.context_pb2_grpc import add_ContextServiceServicer_to_server
from .ContextServiceServicerImpl import ContextServiceServicerImpl
BIND_ADDRESS = '0.0.0.0'
LOGGER = logging.getLogger(__name__)
# Custom gRPC settings
GRPC_MAX_WORKERS = 200 # multiple clients might keep connections alive for Get*Events() RPC methods
class ContextService:
def __init__(
self, database, messagebroker, address=BIND_ADDRESS, port=GRPC_SERVICE_PORT, max_workers=GRPC_MAX_WORKERS,
grace_period=GRPC_GRACE_PERIOD):
class ContextService(GenericGrpcService):
def __init__(self, database : Database, messagebroker : MessageBroker, cls_name: str = __name__) -> None:
port = get_service_port_grpc(ServiceNameEnum.CONTEXT)
super().__init__(port, max_workers=GRPC_MAX_WORKERS, cls_name=cls_name)
self.context_servicer = ContextServiceServicerImpl(database, messagebroker)
self.database = database
self.messagebroker = messagebroker
self.address = address
self.port = port
self.endpoint = None
self.max_workers = max_workers
self.grace_period = grace_period
self.context_servicer = None
self.health_servicer = None
self.pool = None
self.server = None
def start(self):
self.endpoint = '{:s}:{:s}'.format(str(self.address), str(self.port))
LOGGER.info('Starting Service (tentative endpoint: {:s}, max_workers: {:s})...'.format(
str(self.endpoint), str(self.max_workers)))
self.pool = futures.ThreadPoolExecutor(max_workers=self.max_workers)
self.server = grpc.server(self.pool) # , interceptors=(tracer_interceptor,))
self.context_servicer = ContextServiceServicerImpl(self.database, self.messagebroker)
def install_servicers(self):
add_ContextServiceServicer_to_server(self.context_servicer, self.server)
self.health_servicer = HealthServicer(
experimental_non_blocking=True, experimental_thread_pool=futures.ThreadPoolExecutor(max_workers=1))
add_HealthServicer_to_server(self.health_servicer, self.server)
port = self.server.add_insecure_port(self.endpoint)
self.endpoint = '{:s}:{:s}'.format(str(self.address), str(port))
LOGGER.info('Listening on {:s}...'.format(str(self.endpoint)))
self.server.start()
self.health_servicer.set(OVERALL_HEALTH, HealthCheckResponse.SERVING) # pylint: disable=maybe-no-member
LOGGER.debug('Service started')
def stop(self):
LOGGER.debug('Stopping service (grace period {:s} seconds)...'.format(str(self.grace_period)))
self.health_servicer.enter_graceful_shutdown()
self.server.stop(self.grace_period)
LOGGER.debug('Service stopped')
......@@ -12,39 +12,12 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging, threading
from flask import Flask
from flask_restful import Api
from werkzeug.serving import make_server
from context.Config import RESTAPI_BASE_URL, RESTAPI_SERVICE_PORT
logging.getLogger('werkzeug').setLevel(logging.WARNING)
BIND_ADDRESS = '0.0.0.0'
LOGGER = logging.getLogger(__name__)
class Server(threading.Thread):
def __init__(self, host=BIND_ADDRESS, port=RESTAPI_SERVICE_PORT, base_url=RESTAPI_BASE_URL):
threading.Thread.__init__(self, daemon=True)
self.host = host
self.port = port
self.base_url = base_url
self.srv = None
self.ctx = None
self.app = Flask(__name__)
self.api = Api(self.app, prefix=self.base_url)
def add_resource(self, resource, *urls, **kwargs):
self.api.add_resource(resource, *urls, **kwargs)
def run(self):
self.srv = make_server(self.host, self.port, self.app, threaded=True)
self.ctx = self.app.app_context()
self.ctx.push()
endpoint = 'http://{:s}:{:s}{:s}'.format(str(self.host), str(self.port), str(self.base_url))
LOGGER.info('Listening on {:s}...'.format(str(endpoint)))
self.srv.serve_forever()
def shutdown(self):
self.srv.shutdown()
from common.Constants import ServiceNameEnum
from common.Settings import get_service_baseurl_http, get_service_port_http
from common.tools.service.GenericRestServer import GenericRestServer
class RestServer(GenericRestServer):
def __init__(self, cls_name: str = __name__) -> None:
bind_port = get_service_port_http(ServiceNameEnum.CONTEXT)
base_url = get_service_baseurl_http(ServiceNameEnum.CONTEXT)
super().__init__(bind_port, base_url, cls_name=cls_name)
......@@ -15,7 +15,8 @@
# pylint: disable=too-many-lines
import copy, grpc, logging, os, pytest, requests, time, urllib
from typing import Tuple
from common.Constants import DEFAULT_CONTEXT_UUID, DEFAULT_TOPOLOGY_UUID
from common.Constants import DEFAULT_CONTEXT_UUID, DEFAULT_TOPOLOGY_UUID, ServiceNameEnum
from common.Settings import ENVVAR_SUFIX_SERVICE_HOST, ENVVAR_SUFIX_SERVICE_PORT_GRPC, ENVVAR_SUFIX_SERVICE_PORT_HTTP, get_env_var_name, get_service_baseurl_http, get_service_port_grpc, get_service_port_http
from common.orm.Database import Database
from common.orm.Factory import get_database_backend, BackendEnum as DatabaseBackendEnum
from common.message_broker.Factory import get_messagebroker_backend, BackendEnum as MessageBrokerBackendEnum
......@@ -25,8 +26,6 @@ from common.type_checkers.Assertions import (
validate_contexts, validate_device, validate_device_ids, validate_devices, validate_link, validate_link_ids,
validate_links, validate_service, validate_service_ids, validate_services, validate_topologies, validate_topology,
validate_topology_ids)
from context.Config import (
GRPC_SERVICE_PORT, GRPC_MAX_WORKERS, GRPC_GRACE_PERIOD, RESTAPI_SERVICE_PORT, RESTAPI_BASE_URL)
from context.client.ContextClient import ContextClient
from context.client.EventsCollector import EventsCollector
from context.proto.context_pb2 import (
......@@ -37,7 +36,7 @@ from context.service.database.Tools import (
FASTHASHER_DATA_ACCEPTED_FORMAT, FASTHASHER_ITEM_ACCEPTED_FORMAT, fast_hasher)
from context.service.grpc_server.ContextService import ContextService
from context.service.Populate import populate
from context.service.rest_server.Server import Server as RestServer
from context.service.rest_server.RestServer import RestServer
from context.service.rest_server.Resources import RESOURCES
from .Objects import (
CONNECTION_R1_R3, CONNECTION_R1_R3_ID, CONNECTION_R1_R3_UUID, CONTEXT, CONTEXT_ID, DEVICE_R1, DEVICE_R1_ID,
......@@ -48,10 +47,15 @@ from .Objects import (
LOGGER = logging.getLogger(__name__)
LOGGER.setLevel(logging.DEBUG)
GRPC_PORT = 10000 + GRPC_SERVICE_PORT # avoid privileged ports
RESTAPI_PORT = 10000 + RESTAPI_SERVICE_PORT # avoid privileged ports
LOCAL_HOST = '127.0.0.1'
GRPC_PORT = 10000 + get_service_port_grpc(ServiceNameEnum.CONTEXT) # avoid privileged ports
HTTP_PORT = 10000 + get_service_port_http(ServiceNameEnum.CONTEXT) # avoid privileged ports
DEFAULT_REDIS_SERVICE_HOST = '127.0.0.1'
os.environ[get_env_var_name(ServiceNameEnum.CONTEXT.value, ENVVAR_SUFIX_SERVICE_HOST )] = str(LOCAL_HOST)
os.environ[get_env_var_name(ServiceNameEnum.CONTEXT.value, ENVVAR_SUFIX_SERVICE_PORT_GRPC)] = str(GRPC_PORT)
os.environ[get_env_var_name(ServiceNameEnum.CONTEXT.value, ENVVAR_SUFIX_SERVICE_PORT_HTTP)] = str(HTTP_PORT)
DEFAULT_REDIS_SERVICE_HOST = LOCAL_HOST
DEFAULT_REDIS_SERVICE_PORT = 6379
DEFAULT_REDIS_DATABASE_ID = 0
......@@ -78,9 +82,7 @@ def context_db_mb(request) -> Tuple[Database, MessageBroker]:
@pytest.fixture(scope='session')
def context_service_grpc(context_db_mb : Tuple[Database, MessageBroker]): # pylint: disable=redefined-outer-name
_service = ContextService(
context_db_mb[0], context_db_mb[1], port=GRPC_PORT, max_workers=GRPC_MAX_WORKERS,
grace_period=GRPC_GRACE_PERIOD)
_service = ContextService(context_db_mb[0], context_db_mb[1])
_service.start()
yield _service
_service.stop()
......@@ -88,7 +90,7 @@ def context_service_grpc(context_db_mb : Tuple[Database, MessageBroker]): # pyli
@pytest.fixture(scope='session')
def context_service_rest(context_db_mb : Tuple[Database, MessageBroker]): # pylint: disable=redefined-outer-name
database = context_db_mb[0]
_rest_server = RestServer(port=RESTAPI_PORT, base_url=RESTAPI_BASE_URL)
_rest_server = RestServer()
for endpoint_name, resource_class, resource_url in RESOURCES:
_rest_server.add_resource(resource_class, resource_url, endpoint=endpoint_name, resource_class_args=(database,))
_rest_server.start()
......@@ -99,12 +101,13 @@ def context_service_rest(context_db_mb : Tuple[Database, MessageBroker]): # pyli
@pytest.fixture(scope='session')
def context_client_grpc(context_service_grpc : ContextService): # pylint: disable=redefined-outer-name
_client = ContextClient(address='127.0.0.1', port=GRPC_PORT)
_client = ContextClient()
yield _client
_client.close()
def do_rest_request(url : str):
request_url = 'http://127.0.0.1:{:s}{:s}{:s}'.format(str(RESTAPI_PORT), str(RESTAPI_BASE_URL), url)
base_url = get_service_baseurl_http(ServiceNameEnum.CONTEXT)
request_url = 'http://{:s}:{:s}{:s}{:s}'.format(str(LOCAL_HOST), str(HTTP_PORT), str(base_url), url)
LOGGER.warning('Request: GET {:s}'.format(str(request_url)))
reply = requests.get(request_url)
LOGGER.warning('Reply: {:s}'.format(str(reply.text)))
......@@ -1172,7 +1175,7 @@ def test_rest_populate_database(
):
database = context_db_mb[0]
database.clear_all()
populate('127.0.0.1', GRPC_PORT)
populate(LOCAL_HOST, GRPC_PORT)
def test_rest_get_context_ids(context_service_rest : RestServer): # pylint: disable=redefined-outer-name
reply = do_rest_request('/context_ids')
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment