Skip to content
Snippets Groups Projects
Commit 3cf0b0b1 authored by Lluis Gifre Renom's avatar Lluis Gifre Renom
Browse files

Merge branch 'feat/compute-add-rest-api' into 'develop'

Added REST API Server to Compute component

See merge request teraflow-h2020/controller!17
parents 99fbcd44 f0d6c44e
No related branches found
No related tags found
1 merge request!54Release 2.0.0
Showing
with 210 additions and 51 deletions
import os
def get_setting(name, **kwargs):
value = os.environ.get(name)
if 'settings' in kwargs:
value = kwargs['settings'].pop(name, value)
if value is not None: return value
if 'default' in kwargs: return kwargs['default']
raise Exception('Setting({}) not specified in environment or configuration'.format(name))
import logging, os
import logging
from enum import Enum
from common.Settings import get_setting
from common.database.api.Database import Database
from common.database.engines.inmemory.InMemoryDatabaseEngine import InMemoryDatabaseEngine
from common.database.engines.redis.RedisDatabaseEngine import RedisDatabaseEngine
......@@ -29,7 +30,7 @@ def get_database(engine=None, **settings) -> Database:
# 1. user selected by parameter (engine=...)
# 2. environment variable DB_ENGINE
# 3. default engine: INMEMORY
if engine is None: engine = os.environ.get('DB_ENGINE', DEFAULT_DB_ENGINE)
if engine is None: engine = get_setting('DB_ENGINE', default=DEFAULT_DB_ENGINE)
if engine is None: raise Exception('Database Engine not specified')
if isinstance(engine, DatabaseEngineEnum): engine = engine.value
engine_class = ENGINES.get(engine)
......
import os, uuid
import uuid
from typing import Dict, List, Set, Tuple
from redis.client import Redis
from common.Settings import get_setting
from common.database.engines._DatabaseEngine import _DatabaseEngine
from common.database.engines.redis.Mutex import Mutex
KEY_ENTIRE_DATABASE_LOCK = 'everything'
def get_setting(settings, name):
value = settings.pop(name, os.environ.get(name, None))
if value is None: raise Exception('Setting({}) not specified in environment or configuration'.format(name))
return value
class RedisDatabaseEngine(_DatabaseEngine):
def __init__(self, **settings) -> None:
host = get_setting(settings, 'REDIS_SERVICE_HOST')
port = get_setting(settings, 'REDIS_SERVICE_PORT')
dbid = get_setting(settings, 'REDIS_DATABASE_ID')
host = get_setting('REDIS_SERVICE_HOST', settings=settings)
port = get_setting('REDIS_SERVICE_PORT', settings=settings)
dbid = get_setting('REDIS_DATABASE_ID', settings=settings)
self._client = Redis.from_url('redis://{host}:{port}/{dbid}'.format(host=host, port=port, dbid=dbid))
self._mutex = Mutex(self._client)
......
......@@ -8,5 +8,9 @@ GRPC_SERVICE_PORT = 9090
GRPC_MAX_WORKERS = 10
GRPC_GRACE_PERIOD = 60
# REST-API settings
RESTAPI_SERVICE_PORT = 8080
RESTAPI_BASE_URL = '/api'
# Prometheus settings
METRICS_PORT = 9192
import logging, os, signal, sys, threading
import logging, signal, sys, threading
from prometheus_client import start_http_server
from compute.Config import GRPC_SERVICE_PORT, GRPC_MAX_WORKERS, GRPC_GRACE_PERIOD, LOG_LEVEL, METRICS_PORT
from common.Settings import get_setting
from compute.Config import GRPC_SERVICE_PORT, GRPC_MAX_WORKERS, GRPC_GRACE_PERIOD, LOG_LEVEL, RESTAPI_SERVICE_PORT, \
RESTAPI_BASE_URL, METRICS_PORT
from compute.service.ComputeService import ComputeService
from compute.service.rest_server.Server import Server
from compute.service.rest_server.resources.Compute import Compute
terminate = threading.Event()
logger = None
......@@ -14,11 +18,13 @@ def signal_handler(signal, frame):
def main():
global terminate, logger
grpc_service_port = os.environ.get('COMPUTESERVICE_SERVICE_PORT_GRPC', GRPC_SERVICE_PORT )
max_workers = os.environ.get('MAX_WORKERS', GRPC_MAX_WORKERS )
grace_period = os.environ.get('GRACE_PERIOD', GRPC_GRACE_PERIOD )
log_level = os.environ.get('LOG_LEVEL', LOG_LEVEL )
metrics_port = os.environ.get('METRICS_PORT', METRICS_PORT )
grpc_service_port = get_setting('COMPUTESERVICE_SERVICE_PORT_GRPC', default=GRPC_SERVICE_PORT )
max_workers = get_setting('MAX_WORKERS', default=GRPC_MAX_WORKERS )
grace_period = get_setting('GRACE_PERIOD', default=GRPC_GRACE_PERIOD )
log_level = get_setting('LOG_LEVEL', default=LOG_LEVEL )
restapi_service_port = get_setting('RESTAPI_SERVICE_PORT', default=RESTAPI_SERVICE_PORT)
restapi_base_url = get_setting('RESTAPI_BASE_URL', default=RESTAPI_BASE_URL )
metrics_port = get_setting('METRICS_PORT', default=METRICS_PORT )
logging.basicConfig(level=log_level)
logger = logging.getLogger(__name__)
......@@ -35,11 +41,18 @@ def main():
grpc_service = ComputeService(port=grpc_service_port, max_workers=max_workers, grace_period=grace_period)
grpc_service.start()
rest_server = Server(port=restapi_service_port, base_url=restapi_base_url)
rest_server.add_resource(
Compute, '/restconf/config/compute', endpoint='api.compute')
rest_server.start()
# Wait for Ctrl+C or termination signal
while not terminate.wait(timeout=0.1): pass
logger.info('Terminating...')
grpc_service.stop()
rest_server.shutdown()
rest_server.join()
logger.info('Bye')
return 0
......
import logging, threading
from flask import Flask
from flask_restful import Api
from werkzeug.serving import make_server
from compute.Config import RESTAPI_BASE_URL, RESTAPI_SERVICE_PORT
logging.getLogger('werkzeug').setLevel(logging.WARNING)
BIND_ADDRESS = '0.0.0.0'
LOGGER = logging.getLogger(__name__)
class Server(threading.Thread):
def __init__(self, host=BIND_ADDRESS, port=RESTAPI_SERVICE_PORT, base_url=RESTAPI_BASE_URL):
threading.Thread.__init__(self, daemon=True)
self.host = host
self.port = port
self.base_url = base_url
self.app = Flask(__name__)
self.api = Api(self.app, prefix=self.base_url)
def add_resource(self, resource, *urls, **kwargs):
self.api.add_resource(resource, *urls, **kwargs)
def run(self):
self.srv = make_server(self.host, self.port, self.app, threaded=True)
self.ctx = self.app.app_context()
self.ctx.push()
endpoint = 'http://{}:{}{}'.format(self.host, self.port, self.base_url)
LOGGER.info('Listening on {}...'.format(endpoint))
self.srv.serve_forever()
def shutdown(self):
self.srv.shutdown()
import logging
from flask.json import jsonify
from flask_restful import Resource
from common.Settings import get_setting
from common.database.api.context.Constants import DEFAULT_CONTEXT_ID
from service.client.ServiceClient import ServiceClient
from service.proto.service_pb2 import Service, ServiceStateEnum, ServiceType
LOGGER = logging.getLogger(__name__)
class Compute(Resource):
def __init__(self) -> None:
super().__init__()
def get(self):
# Here implement HTTP GET method
raise NotImplementedError()
def post(self):
# Here implement HTTP POST method
# Retrieve required data from request
new_service_context_id = DEFAULT_CONTEXT_ID
new_service_id = 'my-service-id'
# Find Service address/port from environment and instantiate client
service_host = get_setting('SERVICESERVICE_SERVICE_HOST')
service_port = get_setting('SERVICESERVICE_SERVICE_PORT_GRPC')
service_client = ServiceClient(service_host, service_port)
# Compose a dummy CreateService request
request = Service()
request.cs_id.contextId.contextUuid.uuid = new_service_context_id
request.cs_id.cs_id.uuid = new_service_id
request.serviceType = ServiceType.L2NM
request.serviceState.serviceState = ServiceStateEnum.PLANNED
# Service component expects a non-empty config in creation, behaviour can be changed, if needed.
request.serviceConfig.serviceConfig = ' '
try:
# Issue gRPC request to Service component
reply = service_client.CreateService(request)
# Parse CreateService reply, here we check that obtained service Id and context are the expected ones.
reply_context_uuid = reply.contextId.contextUuid.uuid
reply_service_uuid = reply.cs_id.uuid
succeeded = (reply_context_uuid == new_service_context_id) and (reply_service_uuid == new_service_id)
reply = {'succeeded': succeeded}
except Exception as e:
LOGGER.exception('Something went wrong Creating Service {}'.format(str(request)))
reply = {'succeeded': False, 'error': str(e)}
return jsonify(reply)
import logging, pytest
import logging, os, pytest, requests, time
from google.protobuf.json_format import MessageToDict
from common.database.Factory import get_database, DatabaseEngineEnum
from common.database.api.Database import Database
from common.database.api.context.Constants import DEFAULT_CONTEXT_ID
from common.tests.Assertions import validate_service_id
from compute.client.ComputeClient import ComputeClient
from compute.proto.service_pb2 import Service
from compute.proto.service_pb2 import Service, ServiceId
from compute.service.ComputeService import ComputeService
from compute.Config import GRPC_SERVICE_PORT, GRPC_MAX_WORKERS, GRPC_GRACE_PERIOD
from compute.Config import GRPC_SERVICE_PORT, GRPC_MAX_WORKERS, GRPC_GRACE_PERIOD, RESTAPI_SERVICE_PORT, \
RESTAPI_BASE_URL
from compute.service.rest_server.Server import Server
from compute.service.rest_server.resources.Compute import Compute
from service.service.ServiceService import ServiceService
from service.Config import GRPC_SERVICE_PORT as SERVICE_GRPC_SERVICE_PORT, \
GRPC_MAX_WORKERS as SERVICE_GRPC_MAX_WORKERS, GRPC_GRACE_PERIOD as SERVICE_GRPC_GRACE_PERIOD
grpc_port = 10000 + GRPC_SERVICE_PORT # avoid privileged ports
compute_grpc_port = 10000 + GRPC_SERVICE_PORT # avoid privileged ports
compute_restapi_port = 10000 + RESTAPI_SERVICE_PORT # avoid privileged ports
service_grpc_port = 10000 + SERVICE_GRPC_SERVICE_PORT # avoid privileged ports
os.environ['SERVICESERVICE_SERVICE_HOST'] = '127.0.0.1'
os.environ['SERVICESERVICE_SERVICE_PORT_GRPC'] = str(service_grpc_port)
LOGGER = logging.getLogger(__name__)
LOGGER.setLevel(logging.DEBUG)
@pytest.fixture(scope='session')
def compute_service():
_service = ComputeService(port=grpc_port, max_workers=GRPC_MAX_WORKERS, grace_period=GRPC_GRACE_PERIOD)
def database():
_database = get_database(engine=DatabaseEngineEnum.INMEMORY)
_database.context(DEFAULT_CONTEXT_ID).create()
return _database
@pytest.fixture(scope='session')
def service_service(database):
_service = ServiceService(
database, port=service_grpc_port, max_workers=SERVICE_GRPC_MAX_WORKERS, grace_period=SERVICE_GRPC_GRACE_PERIOD)
_service.start()
yield _service
_service.stop()
@pytest.fixture(scope='session')
def compute_service(service_service : ServiceService):
_service = ComputeService(port=compute_grpc_port, max_workers=GRPC_MAX_WORKERS, grace_period=GRPC_GRACE_PERIOD)
_service.start()
yield _service
_service.stop()
@pytest.fixture(scope='session')
def compute_service_rest():
_rest_server = Server(port=compute_restapi_port, base_url=RESTAPI_BASE_URL)
_rest_server.add_resource(
Compute, '/restconf/config/compute', endpoint='api.compute')
_rest_server.start()
time.sleep(1) # bring time for the server to start
yield _rest_server
_rest_server.shutdown()
_rest_server.join()
@pytest.fixture(scope='session')
def compute_client(compute_service):
_client = ComputeClient(address='127.0.0.1', port=grpc_port)
_client = ComputeClient(address='127.0.0.1', port=compute_grpc_port)
yield _client
_client.close()
......@@ -31,3 +70,13 @@ def test_dummy_create_connectivity_service(compute_client : ComputeClient):
compute_client.CreateConnectivityService(Service()),
including_default_value_fields=True, preserving_proto_field_name=True,
use_integers_for_enums=False))
def test_dummy_create_connectivity_service_rest_api(compute_service_rest : Server):
# should work
request_url = 'http://127.0.0.1:{}{}/restconf/config/compute'.format(compute_restapi_port, RESTAPI_BASE_URL)
reply = requests.post(request_url, json={
# here add context of POST request body as JSON
})
json_reply = reply.json()
assert 'succeeded' in json_reply
assert json_reply['succeeded']
import logging, os, signal, sys, threading
import logging, signal, sys, threading
from prometheus_client import start_http_server
from common.Settings import get_setting
from common.database.Factory import get_database
from context.Config import GRPC_SERVICE_PORT, GRPC_MAX_WORKERS, GRPC_GRACE_PERIOD, LOG_LEVEL, RESTAPI_SERVICE_PORT, \
RESTAPI_BASE_URL, METRICS_PORT
......@@ -18,13 +19,13 @@ def signal_handler(signal, frame):
def main():
global terminate, logger
grpc_service_port = os.environ.get('CONTEXTSERVICE_SERVICE_PORT_GRPC', GRPC_SERVICE_PORT )
max_workers = os.environ.get('MAX_WORKERS', GRPC_MAX_WORKERS )
grace_period = os.environ.get('GRACE_PERIOD', GRPC_GRACE_PERIOD )
log_level = os.environ.get('LOG_LEVEL', LOG_LEVEL )
restapi_service_port = os.environ.get('RESTAPI_SERVICE_PORT', RESTAPI_SERVICE_PORT)
restapi_base_url = os.environ.get('RESTAPI_BASE_URL', RESTAPI_BASE_URL )
metrics_port = os.environ.get('METRICS_PORT', METRICS_PORT )
grpc_service_port = get_setting('CONTEXTSERVICE_SERVICE_PORT_GRPC', default=GRPC_SERVICE_PORT )
max_workers = get_setting('MAX_WORKERS', default=GRPC_MAX_WORKERS )
grace_period = get_setting('GRACE_PERIOD', default=GRPC_GRACE_PERIOD )
log_level = get_setting('LOG_LEVEL', default=LOG_LEVEL )
restapi_service_port = get_setting('RESTAPI_SERVICE_PORT', default=RESTAPI_SERVICE_PORT)
restapi_base_url = get_setting('RESTAPI_BASE_URL', default=RESTAPI_BASE_URL )
metrics_port = get_setting('METRICS_PORT', default=METRICS_PORT )
logging.basicConfig(level=log_level)
logger = logging.getLogger(__name__)
......
import logging, os, signal, sys, threading
import logging, signal, sys, threading
from prometheus_client import start_http_server
from common.Settings import get_setting
from common.database.Factory import get_database
from device.service.DeviceService import DeviceService
from device.Config import GRPC_SERVICE_PORT, GRPC_MAX_WORKERS, GRPC_GRACE_PERIOD, LOG_LEVEL, METRICS_PORT
......@@ -15,11 +16,11 @@ def signal_handler(signal, frame):
def main():
global terminate, logger
service_port = os.environ.get('DEVICESERVICE_SERVICE_PORT_GRPC', GRPC_SERVICE_PORT)
max_workers = os.environ.get('MAX_WORKERS', GRPC_MAX_WORKERS )
grace_period = os.environ.get('GRACE_PERIOD', GRPC_GRACE_PERIOD)
log_level = os.environ.get('LOG_LEVEL', LOG_LEVEL )
metrics_port = os.environ.get('METRICS_PORT', METRICS_PORT )
service_port = get_setting('DEVICESERVICE_SERVICE_PORT_GRPC', default=GRPC_SERVICE_PORT)
max_workers = get_setting('MAX_WORKERS', default=GRPC_MAX_WORKERS )
grace_period = get_setting('GRACE_PERIOD', default=GRPC_GRACE_PERIOD)
log_level = get_setting('LOG_LEVEL', default=LOG_LEVEL )
metrics_port = get_setting('METRICS_PORT', default=METRICS_PORT )
logging.basicConfig(level=log_level)
logger = logging.getLogger(__name__)
......
import logging, os, signal, sys, threading
import logging, signal, sys, threading
from prometheus_client import start_http_server
from common.Settings import get_setting
from common.database.Factory import get_database
from service.service.ServiceService import ServiceService
from service.Config import GRPC_SERVICE_PORT, GRPC_MAX_WORKERS, GRPC_GRACE_PERIOD, LOG_LEVEL, METRICS_PORT
......@@ -15,11 +16,11 @@ def signal_handler(signal, frame):
def main():
global terminate, logger
service_port = os.environ.get('SERVICESERVICE_SERVICE_PORT_GRPC', GRPC_SERVICE_PORT)
max_workers = os.environ.get('MAX_WORKERS', GRPC_MAX_WORKERS )
grace_period = os.environ.get('GRACE_PERIOD', GRPC_GRACE_PERIOD)
log_level = os.environ.get('LOG_LEVEL', LOG_LEVEL )
metrics_port = os.environ.get('METRICS_PORT', METRICS_PORT )
service_port = get_setting('SERVICESERVICE_SERVICE_PORT_GRPC', default=GRPC_SERVICE_PORT)
max_workers = get_setting('MAX_WORKERS', default=GRPC_MAX_WORKERS )
grace_period = get_setting('GRACE_PERIOD', default=GRPC_GRACE_PERIOD)
log_level = get_setting('LOG_LEVEL', default=LOG_LEVEL )
metrics_port = get_setting('METRICS_PORT', default=METRICS_PORT )
logging.basicConfig(level=log_level)
logger = logging.getLogger(__name__)
......
import logging, os, pytest
import logging, pytest
from google.protobuf.json_format import MessageToDict
from common.Settings import get_setting
from common.database.Factory import get_database, DatabaseEngineEnum
from common.database.api.Database import Database
from common.tests.Assertions import validate_device_id, validate_link_id, validate_service_id, \
......@@ -18,11 +19,6 @@ from service.proto.service_pb2 import Service
LOGGER = logging.getLogger(__name__)
LOGGER.setLevel(logging.DEBUG)
def get_setting(name):
value = os.environ.get(name)
if value is None: raise Exception('Unable to find variable({})'.format(name))
return value
@pytest.fixture(scope='session')
def redis_database():
_database = get_database(engine=DatabaseEngineEnum.REDIS, REDIS_DATABASE_ID=0)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment