Skip to content
Snippets Groups Projects
Commit ebd836d3 authored by Lluis Gifre Renom's avatar Lluis Gifre Renom
Browse files

NBI component:

- Updated to use gunicorn as WGSI HTTP server
- Removed gRPC client/server as it was unused
- Added WebSocket Flask-SocketIO
- Implemented Heartbeat endpoint based on WebSocket
- Other minor framework and WebUI fixes
parent e94c8d32
No related branches found
No related tags found
3 merge requests!359Release TeraFlowSDN 5.0,!322Resolve "(CTTC) Update NBI WebSocket endpoints to Flask-SocketIO and use gunicorn",!286Resolve "(CTTC) Implement integration test between E2E-IP-Optical SDN Controllers"
Showing with 193 additions and 318 deletions
...@@ -23,9 +23,6 @@ spec: ...@@ -23,9 +23,6 @@ spec:
replicas: 1 replicas: 1
template: template:
metadata: metadata:
annotations:
config.linkerd.io/skip-inbound-ports: "8762"
config.linkerd.io/skip-outbound-ports: "8762"
labels: labels:
app: nbiservice app: nbiservice
spec: spec:
...@@ -36,22 +33,28 @@ spec: ...@@ -36,22 +33,28 @@ spec:
imagePullPolicy: Always imagePullPolicy: Always
ports: ports:
- containerPort: 8080 - containerPort: 8080
- containerPort: 9090
- containerPort: 9192 - containerPort: 9192
- containerPort: 8761
env: env:
- name: LOG_LEVEL - name: LOG_LEVEL
value: "INFO" value: "INFO"
- name: FLASK_ENV
value: "production" # change to "development" if developing
- name: IETF_NETWORK_RENDERER - name: IETF_NETWORK_RENDERER
value: "LIBYANG" value: "LIBYANG"
- name: WS_IP_PORT
value: "8761"
readinessProbe: readinessProbe:
exec: httpGet:
command: ["/bin/grpc_health_probe", "-addr=:9090"] path: /healthz
port: 8080
initialDelaySeconds: 5
periodSeconds: 10
failureThreshold: 3
livenessProbe: livenessProbe:
exec: httpGet:
command: ["/bin/grpc_health_probe", "-addr=:9090"] path: /healthz
port: 8080
initialDelaySeconds: 5
periodSeconds: 10
failureThreshold: 3
resources: resources:
requests: requests:
cpu: 50m cpu: 50m
...@@ -75,15 +78,7 @@ spec: ...@@ -75,15 +78,7 @@ spec:
protocol: TCP protocol: TCP
port: 8080 port: 8080
targetPort: 8080 targetPort: 8080
- name: grpc
protocol: TCP
port: 9090
targetPort: 9090
- name: metrics - name: metrics
protocol: TCP protocol: TCP
port: 9192 port: 9192
targetPort: 9192 targetPort: 9192
- name: websocket
protocol: TCP
port: 8761
targetPort: 8761
// Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
syntax = "proto3";
package nbi;
import "context.proto";
service NbiService {
rpc CheckCredentials (context.TeraFlowController) returns (context.AuthenticationResult) {}
rpc GetConnectivityServiceStatus (context.ServiceId ) returns (context.ServiceStatus ) {}
rpc CreateConnectivityService (context.Service ) returns (context.ServiceId ) {}
rpc EditConnectivityService (context.Service ) returns (context.ServiceId ) {}
rpc DeleteConnectivityService (context.Service ) returns (context.Empty ) {}
rpc GetAllActiveConnectivityServices (context.Empty ) returns (context.ServiceIdList ) {}
rpc ClearAllConnectivityServices (context.Empty ) returns (context.Empty ) {}
}
...@@ -87,7 +87,6 @@ DEFAULT_SERVICE_GRPC_PORTS = { ...@@ -87,7 +87,6 @@ DEFAULT_SERVICE_GRPC_PORTS = {
ServiceNameEnum.POLICY .value : 6060, ServiceNameEnum.POLICY .value : 6060,
ServiceNameEnum.MONITORING .value : 7070, ServiceNameEnum.MONITORING .value : 7070,
ServiceNameEnum.DLT .value : 8080, ServiceNameEnum.DLT .value : 8080,
ServiceNameEnum.NBI .value : 9090,
ServiceNameEnum.L3_CAD .value : 10001, ServiceNameEnum.L3_CAD .value : 10001,
ServiceNameEnum.L3_AM .value : 10002, ServiceNameEnum.L3_AM .value : 10002,
ServiceNameEnum.DBSCANSERVING .value : 10008, ServiceNameEnum.DBSCANSERVING .value : 10008,
......
...@@ -62,7 +62,7 @@ unit_test nbi: ...@@ -62,7 +62,7 @@ unit_test nbi:
fi fi
script: script:
- docker pull "$CI_REGISTRY_IMAGE/$IMAGE_NAME:$IMAGE_TAG" - docker pull "$CI_REGISTRY_IMAGE/$IMAGE_NAME:$IMAGE_TAG"
- docker run --name $IMAGE_NAME -d -p 9090:9090 -v "$PWD/src/$IMAGE_NAME/tests:/opt/results" --network=teraflowbridge $CI_REGISTRY_IMAGE/$IMAGE_NAME:$IMAGE_TAG - docker run --name $IMAGE_NAME -d -v "$PWD/src/$IMAGE_NAME/tests:/opt/results" --network=teraflowbridge $CI_REGISTRY_IMAGE/$IMAGE_NAME:$IMAGE_TAG
- sleep 5 - sleep 5
- docker ps -a - docker ps -a
- docker logs $IMAGE_NAME - docker logs $IMAGE_NAME
......
...@@ -16,7 +16,7 @@ FROM python:3.9-slim ...@@ -16,7 +16,7 @@ FROM python:3.9-slim
# Install dependencies # Install dependencies
RUN apt-get --yes --quiet --quiet update && \ RUN apt-get --yes --quiet --quiet update && \
apt-get --yes --quiet --quiet install wget g++ git build-essential cmake libpcre2-dev python3-dev python3-cffi && \ apt-get --yes --quiet --quiet install g++ git build-essential cmake libpcre2-dev python3-dev python3-cffi && \
rm -rf /var/lib/apt/lists/* rm -rf /var/lib/apt/lists/*
# Download, build and install libyang. Note that APT package is outdated # Download, build and install libyang. Note that APT package is outdated
...@@ -37,23 +37,11 @@ RUN ldconfig ...@@ -37,23 +37,11 @@ RUN ldconfig
# Set Python to show logs as they occur # Set Python to show logs as they occur
ENV PYTHONUNBUFFERED=0 ENV PYTHONUNBUFFERED=0
# Download the gRPC health probe
RUN GRPC_HEALTH_PROBE_VERSION=v0.2.0 && \
wget -qO/bin/grpc_health_probe https://github.com/grpc-ecosystem/grpc-health-probe/releases/download/${GRPC_HEALTH_PROBE_VERSION}/grpc_health_probe-linux-amd64 && \
chmod +x /bin/grpc_health_probe
# Get generic Python packages # Get generic Python packages
RUN python3 -m pip install --upgrade pip RUN python3 -m pip install --upgrade pip
RUN python3 -m pip install --upgrade setuptools wheel RUN python3 -m pip install --upgrade setuptools wheel
RUN python3 -m pip install --upgrade pip-tools RUN python3 -m pip install --upgrade pip-tools
# Get common Python packages
# Note: this step enables sharing the previous Docker build steps among all the Python components
WORKDIR /var/teraflow
COPY common_requirements.in common_requirements.in
RUN pip-compile --quiet --output-file=common_requirements.txt common_requirements.in
RUN python3 -m pip install -r common_requirements.txt
# Add common files into working directory # Add common files into working directory
WORKDIR /var/teraflow/common WORKDIR /var/teraflow/common
COPY src/common/. ./ COPY src/common/. ./
...@@ -94,4 +82,5 @@ RUN mkdir -p /var/teraflow/tests/tools ...@@ -94,4 +82,5 @@ RUN mkdir -p /var/teraflow/tests/tools
COPY src/tests/tools/mock_osm/. tests/tools/mock_osm/ COPY src/tests/tools/mock_osm/. tests/tools/mock_osm/
# Start the service # Start the service
ENTRYPOINT ["python", "-m", "nbi.service"] #ENTRYPOINT ["gunicorn", "-w", "4", "--worker-class", "eventlet", "-b", "0.0.0.0:8080", "nbi.service:nbi_app"]
ENTRYPOINT ["gunicorn", "-w", "4", "--worker-class", "geventwebsocket.gunicorn.workers.GeventWebSocketWorker", "-b", "0.0.0.0:8080", "nbi.service.app:nbi_app"]
# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import grpc, logging
from common.Constants import ServiceNameEnum
from common.Settings import get_service_host, get_service_port_grpc
from common.proto.nbi_pb2_grpc import NbiServiceStub
from common.proto.context_pb2 import (
AuthenticationResult, Empty, Service, ServiceId, ServiceIdList, ServiceStatus, TeraFlowController)
from common.tools.client.RetryDecorator import retry, delay_exponential
from common.tools.grpc.Tools import grpc_message_to_json_string
LOGGER = logging.getLogger(__name__)
MAX_RETRIES = 15
DELAY_FUNCTION = delay_exponential(initial=0.01, increment=2.0, maximum=5.0)
RETRY_DECORATOR = retry(max_retries=MAX_RETRIES, delay_function=DELAY_FUNCTION, prepare_method_name='connect')
class NbiClient:
def __init__(self, host=None, port=None):
if not host: host = get_service_host(ServiceNameEnum.NBI)
if not port: port = get_service_port_grpc(ServiceNameEnum.NBI)
self.endpoint = '{:s}:{:s}'.format(str(host), str(port))
LOGGER.debug('Creating channel to {:s}...'.format(str(self.endpoint)))
self.channel = None
self.stub = None
self.connect()
LOGGER.debug('Channel created')
def connect(self):
self.channel = grpc.insecure_channel(self.endpoint)
self.stub = NbiServiceStub(self.channel)
def close(self):
if self.channel is not None: self.channel.close()
self.channel = None
self.stub = None
@RETRY_DECORATOR
def CheckCredentials(self, request : TeraFlowController) -> AuthenticationResult:
LOGGER.debug('CheckCredentials request: {:s}'.format(grpc_message_to_json_string(request)))
response = self.stub.CheckCredentials(request)
LOGGER.debug('CheckCredentials result: {:s}'.format(grpc_message_to_json_string(response)))
return response
@RETRY_DECORATOR
def GetConnectivityServiceStatus(self, request : ServiceId) -> ServiceStatus:
LOGGER.debug('GetConnectivityServiceStatus request: {:s}'.format(grpc_message_to_json_string(request)))
response = self.stub.GetConnectivityServiceStatus(request)
LOGGER.debug('GetConnectivityServiceStatus result: {:s}'.format(grpc_message_to_json_string(response)))
return response
@RETRY_DECORATOR
def CreateConnectivityService(self, request : Service) -> ServiceId:
LOGGER.debug('CreateConnectivityService request: {:s}'.format(grpc_message_to_json_string(request)))
response = self.stub.CreateConnectivityService(request)
LOGGER.debug('CreateConnectivityService result: {:s}'.format(grpc_message_to_json_string(response)))
return response
@RETRY_DECORATOR
def EditConnectivityService(self, request : Service) -> ServiceId:
LOGGER.debug('EditConnectivityService request: {:s}'.format(grpc_message_to_json_string(request)))
response = self.stub.EditConnectivityService(request)
LOGGER.debug('EditConnectivityService result: {:s}'.format(grpc_message_to_json_string(response)))
return response
@RETRY_DECORATOR
def DeleteConnectivityService(self, request : Service) -> Empty:
LOGGER.debug('DeleteConnectivityService request: {:s}'.format(grpc_message_to_json_string(request)))
response = self.stub.DeleteConnectivityService(request)
LOGGER.debug('DeleteConnectivityService result: {:s}'.format(grpc_message_to_json_string(response)))
return response
@RETRY_DECORATOR
def GetAllActiveConnectivityServices(self, request : Empty) -> ServiceIdList:
LOGGER.debug('GetAllActiveConnectivityServices request: {:s}'.format(grpc_message_to_json_string(request)))
response = self.stub.GetAllActiveConnectivityServices(request)
LOGGER.debug('GetAllActiveConnectivityServices result: {:s}'.format(grpc_message_to_json_string(response)))
return response
@RETRY_DECORATOR
def ClearAllConnectivityServices(self, request : Empty) -> Empty:
LOGGER.debug('ClearAllConnectivityServices request: {:s}'.format(grpc_message_to_json_string(request)))
response = self.stub.ClearAllConnectivityServices(request)
LOGGER.debug('ClearAllConnectivityServices result: {:s}'.format(grpc_message_to_json_string(response)))
return response
...@@ -14,10 +14,14 @@ ...@@ -14,10 +14,14 @@
deepdiff==6.7.* deepdiff==6.7.*
deepmerge==1.1.* deepmerge==1.1.*
eventlet
Flask==2.1.3 Flask==2.1.3
Flask-HTTPAuth==4.5.0 Flask-HTTPAuth==4.5.0
Flask-RESTful==0.3.9 Flask-RESTful==0.3.9
flask-socketio
jsonschema==4.4.0 jsonschema==4.4.0
gevent
gunicorn
libyang==2.8.4 libyang==2.8.4
netaddr==0.9.0 netaddr==0.9.0
pyang==2.6.0 pyang==2.6.0
...@@ -25,4 +29,19 @@ git+https://github.com/robshakir/pyangbind.git ...@@ -25,4 +29,19 @@ git+https://github.com/robshakir/pyangbind.git
pydantic==2.6.3 pydantic==2.6.3
requests==2.27.1 requests==2.27.1
werkzeug==2.3.7 werkzeug==2.3.7
websockets==12.0 #websockets==12.0
# from common_requirements; take required ones
#coverage==6.3
#grpcio==1.47.*
#grpcio-health-checking==1.47.*
#grpcio-reflection==1.47.*
#grpcio-tools==1.47.*
#grpclib==0.4.4
#prettytable==3.5.0
#prometheus-client==0.13.0
#protobuf==3.20.*
#pytest==6.2.5
#pytest-benchmark==3.4.1
#python-dateutil==2.8.2
#pytest-depends==1.0.1
import logging, time
from typing import Any, Optional
from flask import Flask, request
from flask_restful import Api, Resource
from flask_socketio import Namespace, SocketIO
LOGGER = logging.getLogger(__name__)
def log_request(response):
timestamp = time.strftime('[%Y-%b-%d %H:%M]')
LOGGER.info(
'%s %s %s %s %s', timestamp, request.remote_addr, request.method,
request.full_path, response.status
)
return response
class NbiApplication:
def __init__(self, base_url : Optional[str] = None) -> None:
if base_url is None: base_url = ''
self.base_url = base_url
self.app = Flask(__name__)
self.app.after_request(log_request)
self.api = Api(self.app, prefix=base_url)
#websocket_path = '/'.join([base_url.rstrip('/'), 'websocket'])
self.sio = SocketIO(self.app, path=base_url, cors_allowed_origins="*")
def add_rest_api_resource(self, resource_class : Resource, *urls, **kwargs) -> None:
self.api.add_resource(resource_class, *urls, **kwargs)
def add_websocket_namespace(self, namespace_class : Namespace, namespace_url : str) -> None:
self.sio.on_namespace(namespace_class(namespace=namespace_url))
def websocket_emit_message(
self, event : str, *args : Any, namespace : str = "/", to : Optional[str] = None
) -> None:
self.sio.emit(event, *args, namespace=namespace, to=to)
def dump_configuration(self) -> None:
LOGGER.debug('Configured Resources:')
for resource in self.api.resources:
LOGGER.debug(' - {:s}'.format(str(resource)))
LOGGER.debug('Configured Rules:')
for rule in self.app.url_map.iter_rules():
LOGGER.debug(' - {:s}'.format(str(rule)))
def run_standalone(
self, bind_address : Optional[str] = None, bind_port : Optional[int] = None
) -> None:
# Run method used when started in a standalone mode, i.e., outside gunicorn or
# similar WSGI HTTP servers. Otherwise, use mechanism defined by the used
# WSGI HTTP server.
#logging.getLogger('werkzeug').setLevel(logging.WARNING)
endpoint = 'http://{:s}:{:s}'.format(str(bind_address), str(bind_port))
if self.base_url is not None:
endpoint = '/'.join([endpoint.rstrip('/'), self.base_url])
LOGGER.info('Listening on {:s}...'.format(endpoint))
self.sio.run(self.app, host=bind_address, port=bind_port)
# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from common.Constants import ServiceNameEnum
from common.Settings import get_service_port_grpc
from common.proto.nbi_pb2_grpc import add_NbiServiceServicer_to_server
from common.tools.service.GenericGrpcService import GenericGrpcService
from nbi.service.NbiServiceServicerImpl import NbiServiceServicerImpl
class NbiService(GenericGrpcService):
def __init__(self, cls_name: str = __name__) -> None:
port = get_service_port_grpc(ServiceNameEnum.NBI)
super().__init__(port, cls_name=cls_name)
self.nbi_servicer = NbiServiceServicerImpl()
def install_servicers(self):
add_NbiServiceServicer_to_server(self.nbi_servicer, self.server)
# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import grpc, logging
from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_method
from common.proto.context_pb2 import (
AuthenticationResult, Empty, Service, ServiceId, ServiceIdList, ServiceStatus, TeraFlowController)
from common.proto.nbi_pb2_grpc import NbiServiceServicer
LOGGER = logging.getLogger(__name__)
METRICS_POOL = MetricsPool('NBI', 'RPC')
class NbiServiceServicerImpl(NbiServiceServicer):
def __init__(self):
LOGGER.info('Creating Servicer...')
LOGGER.info('Servicer Created')
@safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
def CheckCredentials(self, request : TeraFlowController, context : grpc.ServicerContext) -> AuthenticationResult:
LOGGER.warning('NOT IMPLEMENTED')
return AuthenticationResult()
@safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
def GetConnectivityServiceStatus(self, request : ServiceId, context : grpc.ServicerContext) -> ServiceStatus:
LOGGER.warning('NOT IMPLEMENTED')
return ServiceStatus()
@safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
def CreateConnectivityService(self, request : Service, context : grpc.ServicerContext) -> ServiceId:
LOGGER.warning('NOT IMPLEMENTED')
return ServiceId()
@safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
def EditConnectivityService(self, request : Service, context : grpc.ServicerContext) -> ServiceId:
LOGGER.warning('NOT IMPLEMENTED')
return ServiceId()
@safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
def DeleteConnectivityService(self, request : Service, context : grpc.ServicerContext) -> Empty:
LOGGER.warning('NOT IMPLEMENTED')
return Empty()
@safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
def GetAllActiveConnectivityServices(self, request : Empty, context : grpc.ServicerContext) -> ServiceIdList:
LOGGER.warning('NOT IMPLEMENTED')
return ServiceIdList()
@safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
def ClearAllConnectivityServices(self, request : Empty, context : grpc.ServicerContext) -> Empty:
LOGGER.warning('NOT IMPLEMENTED')
return Empty()
...@@ -12,16 +12,17 @@ ...@@ -12,16 +12,17 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import logging, signal, sys, threading import logging
from typing import Optional
from prometheus_client import start_http_server from prometheus_client import start_http_server
from common.Constants import ServiceNameEnum from common.Constants import ServiceNameEnum
from common.Settings import ( from common.Settings import (
ENVVAR_SUFIX_SERVICE_HOST, ENVVAR_SUFIX_SERVICE_PORT_GRPC, ENVVAR_SUFIX_SERVICE_HOST, ENVVAR_SUFIX_SERVICE_PORT_GRPC,
get_env_var_name, get_log_level, get_metrics_port, get_env_var_name, get_http_bind_address, get_log_level,
get_metrics_port, get_service_baseurl_http, get_service_port_http,
wait_for_environment_variables wait_for_environment_variables
) )
from .NbiService import NbiService from .NbiApplication import NbiApplication
from .rest_server.RestServer import RestServer
from .rest_server.nbi_plugins.etsi_bwm import register_etsi_bwm_api from .rest_server.nbi_plugins.etsi_bwm import register_etsi_bwm_api
from .rest_server.nbi_plugins.ietf_hardware import register_ietf_hardware from .rest_server.nbi_plugins.ietf_hardware import register_ietf_hardware
from .rest_server.nbi_plugins.ietf_l2vpn import register_ietf_l2vpn from .rest_server.nbi_plugins.ietf_l2vpn import register_ietf_l2vpn
...@@ -32,22 +33,34 @@ from .rest_server.nbi_plugins.ietf_acl import register_ietf_acl ...@@ -32,22 +33,34 @@ from .rest_server.nbi_plugins.ietf_acl import register_ietf_acl
from .rest_server.nbi_plugins.qkd_app import register_qkd_app from .rest_server.nbi_plugins.qkd_app import register_qkd_app
from .rest_server.nbi_plugins.tfs_api import register_tfs_api from .rest_server.nbi_plugins.tfs_api import register_tfs_api
from .rest_server.nbi_plugins import register_restconf from .rest_server.nbi_plugins import register_restconf
from .context_subscription import register_context_subscription from .websocket_namespaces.hearthbeat import register_heartbeat
terminate = threading.Event()
LOGGER = None LOG_LEVEL = get_log_level()
logging.basicConfig(level=LOG_LEVEL)
def signal_handler(signal, frame): # pylint: disable=redefined-outer-name, unused-argument LOGGER = logging.getLogger(__name__)
LOGGER.warning('Terminate signal received')
terminate.set() BIND_ADDRESS = get_http_bind_address()
BIND_PORT = get_service_port_http(ServiceNameEnum.NBI)
def main(): BASE_URL = get_service_baseurl_http(ServiceNameEnum.NBI) or ''
global LOGGER # pylint: disable=global-statement
REGISTER_METHODS = [
log_level = get_log_level() register_etsi_bwm_api,
logging.basicConfig(level=log_level) register_ietf_hardware,
LOGGER = logging.getLogger(__name__) register_ietf_l2vpn,
register_ietf_l3vpn,
register_ietf_network,
register_ietf_nss,
register_ietf_acl,
register_qkd_app,
register_tfs_api,
register_restconf,
register_heartbeat,
]
def configure_nbi(
base_url : Optional[str] = None
) -> NbiApplication:
wait_for_environment_variables([ wait_for_environment_variables([
get_env_var_name(ServiceNameEnum.CONTEXT, ENVVAR_SUFIX_SERVICE_HOST ), get_env_var_name(ServiceNameEnum.CONTEXT, ENVVAR_SUFIX_SERVICE_HOST ),
get_env_var_name(ServiceNameEnum.CONTEXT, ENVVAR_SUFIX_SERVICE_PORT_GRPC), get_env_var_name(ServiceNameEnum.CONTEXT, ENVVAR_SUFIX_SERVICE_PORT_GRPC),
...@@ -57,52 +70,25 @@ def main(): ...@@ -57,52 +70,25 @@ def main():
get_env_var_name(ServiceNameEnum.SERVICE, ENVVAR_SUFIX_SERVICE_PORT_GRPC), get_env_var_name(ServiceNameEnum.SERVICE, ENVVAR_SUFIX_SERVICE_PORT_GRPC),
]) ])
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
LOGGER.info('Starting...') LOGGER.info('Starting...')
# Start metrics server # Start metrics server
metrics_port = get_metrics_port() metrics_port = get_metrics_port()
start_http_server(metrics_port) start_http_server(metrics_port)
# Starting NBI service nbi_app = NbiApplication(base_url=base_url)
grpc_service = NbiService()
grpc_service.start()
rest_server = RestServer()
register_etsi_bwm_api(rest_server)
register_ietf_hardware(rest_server)
register_ietf_l2vpn(rest_server) # Registering L2VPN entrypoint
register_ietf_l3vpn(rest_server) # Registering L3VPN entrypoint
register_ietf_network(rest_server)
register_ietf_nss(rest_server) # Registering NSS entrypoint
register_ietf_acl(rest_server)
register_qkd_app(rest_server)
register_tfs_api(rest_server)
register_restconf(rest_server)
rest_server.start()
register_context_subscription()
LOGGER.debug('Configured Resources:')
for resource in rest_server.api.resources:
LOGGER.debug(' - {:s}'.format(str(resource)))
LOGGER.debug('Configured Rules:')
for rule in rest_server.app.url_map.iter_rules():
LOGGER.debug(' - {:s}'.format(str(rule)))
# Wait for Ctrl+C or termination signal for register_method in REGISTER_METHODS:
while not terminate.wait(timeout=1.0): pass register_method(nbi_app)
LOGGER.info('Terminating...') nbi_app.dump_configuration()
grpc_service.stop()
rest_server.shutdown()
rest_server.join()
LOGGER.info('Bye') return nbi_app
return 0
if __name__ == '__main__': if __name__ == '__main__':
sys.exit(main()) # Only used to run it locally during development stage;
# otherwise, app is directly launched by gunicorn.
_nbi_app = configure_nbi(base_url=BASE_URL)
_nbi_app.run_standalone(
bind_address=BIND_ADDRESS, bind_port=BIND_PORT
)
import logging, threading, time
from flask import request
from flask_socketio import Namespace, join_room, leave_room
from nbi.service.NbiApplication import NbiApplication
LOGGER = logging.getLogger(__name__)
NAMESPACE_NAME = 'heartbeat'
NAMESPACE_URL = '/heartbeat'
# WebSocket Heartbeat Namespace for debugging purposes
class DebugHeartbeatHandler(Namespace):
def on_connect(self):
LOGGER.debug('Client {:s} connected'.format(str(request.sid)))
join_room(NAMESPACE_NAME)
def on_disconnect(self, reason):
LOGGER.debug('Client {:s} disconnected: reason={:s}'.format(
str(request.sid), str(reason)
))
leave_room(NAMESPACE_NAME)
class DebugHeartbeatThread(threading.Thread):
INTERVAL = 1 # second
def __init__(self, nbi_app : NbiApplication):
super().__init__(daemon=True)
self.nbi_app = nbi_app
def run(self):
interval = DebugHeartbeatThread.INTERVAL
start_time = time.time()
while True:
time.sleep(interval)
uptime = time.time() - start_time
self.nbi_app.websocket_emit_message(
'uptime', {'uptime_seconds': uptime},
namespace=NAMESPACE_URL, to=NAMESPACE_NAME
)
NAMESPACE_DESCRIPTOR = (NAMESPACE_NAME, DebugHeartbeatHandler, NAMESPACE_URL)
...@@ -12,12 +12,10 @@ ...@@ -12,12 +12,10 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
from common.Constants import ServiceNameEnum
from common.Settings import get_service_baseurl_http, get_service_port_http
from common.tools.service.GenericRestServer import GenericRestServer
class RestServer(GenericRestServer): from nbi.service.NbiApplication import NbiApplication
def __init__(self, cls_name: str = __name__) -> None: from .HeartbeatNamespace import NAMESPACE_DESCRIPTOR
bind_port = get_service_port_http(ServiceNameEnum.NBI)
base_url = get_service_baseurl_http(ServiceNameEnum.NBI) def register_heartbeat(nbi_app : NbiApplication):
super().__init__(bind_port, base_url, cls_name=cls_name) _, namespace_class, namespace_url = NAMESPACE_DESCRIPTOR
nbi_app.add_websocket_namespace(namespace_class, namespace_url)
...@@ -137,6 +137,6 @@ def create_app(use_config=None, web_app_root=None): ...@@ -137,6 +137,6 @@ def create_app(use_config=None, web_app_root=None):
'is_deployed_slice' : is_deployed_slice, 'is_deployed_slice' : is_deployed_slice,
}) })
if web_app_root is not None: if web_app_root is not None and len(web_app_root) > 0:
app.wsgi_app = SetSubAppMiddleware(app.wsgi_app, web_app_root) app.wsgi_app = SetSubAppMiddleware(app.wsgi_app, web_app_root)
return app return app
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment