Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • tfs/controller
1 result
Show changes
Showing
with 370 additions and 286 deletions
...@@ -16,7 +16,7 @@ FROM python:3.9-slim ...@@ -16,7 +16,7 @@ FROM python:3.9-slim
# Install dependencies # Install dependencies
RUN apt-get --yes --quiet --quiet update && \ RUN apt-get --yes --quiet --quiet update && \
apt-get --yes --quiet --quiet install wget g++ git build-essential cmake libpcre2-dev python3-dev python3-cffi && \ apt-get --yes --quiet --quiet install g++ git build-essential cmake libpcre2-dev python3-dev python3-cffi && \
rm -rf /var/lib/apt/lists/* rm -rf /var/lib/apt/lists/*
# Download, build and install libyang. Note that APT package is outdated # Download, build and install libyang. Note that APT package is outdated
...@@ -38,11 +38,6 @@ RUN ldconfig ...@@ -38,11 +38,6 @@ RUN ldconfig
# Set Python to show logs as they occur # Set Python to show logs as they occur
ENV PYTHONUNBUFFERED=0 ENV PYTHONUNBUFFERED=0
# Download the gRPC health probe
RUN GRPC_HEALTH_PROBE_VERSION=v0.2.0 && \
wget -qO/bin/grpc_health_probe https://github.com/grpc-ecosystem/grpc-health-probe/releases/download/${GRPC_HEALTH_PROBE_VERSION}/grpc_health_probe-linux-amd64 && \
chmod +x /bin/grpc_health_probe
# Get generic Python packages # Get generic Python packages
RUN python3 -m pip install --upgrade pip RUN python3 -m pip install --upgrade pip
RUN python3 -m pip install --upgrade setuptools wheel RUN python3 -m pip install --upgrade setuptools wheel
...@@ -95,4 +90,5 @@ RUN mkdir -p /var/teraflow/tests/tools ...@@ -95,4 +90,5 @@ RUN mkdir -p /var/teraflow/tests/tools
COPY src/tests/tools/mock_osm/. tests/tools/mock_osm/ COPY src/tests/tools/mock_osm/. tests/tools/mock_osm/
# Start the service # Start the service
ENTRYPOINT ["python", "-m", "nbi.service"] # NOTE: Configured single worker to prevent issues with multi-worker synchronization. To be invetsigated.
ENTRYPOINT ["gunicorn", "--workers", "1", "--worker-class", "eventlet", "--bind", "0.0.0.0:8080", "nbi.service.app:app"]
...@@ -2,6 +2,15 @@ ...@@ -2,6 +2,15 @@
The NBI component uses libyang to validate and process messages. Follow instructions below to install it. The NBI component uses libyang to validate and process messages. Follow instructions below to install it.
## IMPORTANT
**TL;DR**: Use kafka-python for consuming from kafka in the NBI component.
Why:
`confluent-kafka` is written in C, thus, it bypasses eventlet monkey_patches that convert normal threads into green_threads.
That implies methods such as consumer.poll() become blocking in eventlet scenario used by gunicorn web server.
## Install libyang ## Install libyang
- Ref: https://github.com/CESNET/libyang - Ref: https://github.com/CESNET/libyang
- Ref: https://github.com/CESNET/libyang-python/ - Ref: https://github.com/CESNET/libyang-python/
......
# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import grpc, logging
from common.Constants import ServiceNameEnum
from common.Settings import get_service_host, get_service_port_grpc
from common.proto.nbi_pb2_grpc import NbiServiceStub
from common.proto.context_pb2 import (
AuthenticationResult, Empty, Service, ServiceId, ServiceIdList, ServiceStatus, TeraFlowController)
from common.tools.client.RetryDecorator import retry, delay_exponential
from common.tools.grpc.Tools import grpc_message_to_json_string
LOGGER = logging.getLogger(__name__)
MAX_RETRIES = 15
DELAY_FUNCTION = delay_exponential(initial=0.01, increment=2.0, maximum=5.0)
RETRY_DECORATOR = retry(max_retries=MAX_RETRIES, delay_function=DELAY_FUNCTION, prepare_method_name='connect')
class NbiClient:
def __init__(self, host=None, port=None):
if not host: host = get_service_host(ServiceNameEnum.NBI)
if not port: port = get_service_port_grpc(ServiceNameEnum.NBI)
self.endpoint = '{:s}:{:s}'.format(str(host), str(port))
LOGGER.debug('Creating channel to {:s}...'.format(str(self.endpoint)))
self.channel = None
self.stub = None
self.connect()
LOGGER.debug('Channel created')
def connect(self):
self.channel = grpc.insecure_channel(self.endpoint)
self.stub = NbiServiceStub(self.channel)
def close(self):
if self.channel is not None: self.channel.close()
self.channel = None
self.stub = None
@RETRY_DECORATOR
def CheckCredentials(self, request : TeraFlowController) -> AuthenticationResult:
LOGGER.debug('CheckCredentials request: {:s}'.format(grpc_message_to_json_string(request)))
response = self.stub.CheckCredentials(request)
LOGGER.debug('CheckCredentials result: {:s}'.format(grpc_message_to_json_string(response)))
return response
@RETRY_DECORATOR
def GetConnectivityServiceStatus(self, request : ServiceId) -> ServiceStatus:
LOGGER.debug('GetConnectivityServiceStatus request: {:s}'.format(grpc_message_to_json_string(request)))
response = self.stub.GetConnectivityServiceStatus(request)
LOGGER.debug('GetConnectivityServiceStatus result: {:s}'.format(grpc_message_to_json_string(response)))
return response
@RETRY_DECORATOR
def CreateConnectivityService(self, request : Service) -> ServiceId:
LOGGER.debug('CreateConnectivityService request: {:s}'.format(grpc_message_to_json_string(request)))
response = self.stub.CreateConnectivityService(request)
LOGGER.debug('CreateConnectivityService result: {:s}'.format(grpc_message_to_json_string(response)))
return response
@RETRY_DECORATOR
def EditConnectivityService(self, request : Service) -> ServiceId:
LOGGER.debug('EditConnectivityService request: {:s}'.format(grpc_message_to_json_string(request)))
response = self.stub.EditConnectivityService(request)
LOGGER.debug('EditConnectivityService result: {:s}'.format(grpc_message_to_json_string(response)))
return response
@RETRY_DECORATOR
def DeleteConnectivityService(self, request : Service) -> Empty:
LOGGER.debug('DeleteConnectivityService request: {:s}'.format(grpc_message_to_json_string(request)))
response = self.stub.DeleteConnectivityService(request)
LOGGER.debug('DeleteConnectivityService result: {:s}'.format(grpc_message_to_json_string(response)))
return response
@RETRY_DECORATOR
def GetAllActiveConnectivityServices(self, request : Empty) -> ServiceIdList:
LOGGER.debug('GetAllActiveConnectivityServices request: {:s}'.format(grpc_message_to_json_string(request)))
response = self.stub.GetAllActiveConnectivityServices(request)
LOGGER.debug('GetAllActiveConnectivityServices result: {:s}'.format(grpc_message_to_json_string(response)))
return response
@RETRY_DECORATOR
def ClearAllConnectivityServices(self, request : Empty) -> Empty:
LOGGER.debug('ClearAllConnectivityServices request: {:s}'.format(grpc_message_to_json_string(request)))
response = self.stub.ClearAllConnectivityServices(request)
LOGGER.debug('ClearAllConnectivityServices result: {:s}'.format(grpc_message_to_json_string(response)))
return response
...@@ -12,17 +12,26 @@ ...@@ -12,17 +12,26 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
confluent-kafka==2.3.* # only for creating topics and compatibility
deepdiff==6.7.* deepdiff==6.7.*
deepmerge==1.1.* deepmerge==1.1.*
eventlet==0.39.0
Flask==2.1.3 Flask==2.1.3
Flask-HTTPAuth==4.5.0 Flask-HTTPAuth==4.5.0
Flask-RESTful==0.3.9 Flask-RESTful==0.3.9
flask-socketio==5.5.1
#gevent==24.11.1
#gevent-websocket==0.10.1
#greenlet==3.1.1
gunicorn==23.0.0
jsonschema==4.4.0 jsonschema==4.4.0
kafka-python==2.0.6 # for publishing and consuming messages in an eventlet-compatible way
libyang==2.8.4 libyang==2.8.4
netaddr==0.9.0 netaddr==0.9.0
pyang==2.6.0 pyang==2.6.0
git+https://github.com/robshakir/pyangbind.git git+https://github.com/robshakir/pyangbind.git
pydantic==2.6.3 pydantic==2.6.3
python-socketio==5.12.1
requests==2.27.1 requests==2.27.1
werkzeug==2.3.7 werkzeug==2.3.7
websockets==12.0 #websockets==12.0
#!/bin/bash
# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
export FLASK_ENV=development
gunicorn -w 4 --worker-class eventlet -b 0.0.0.0:18080 --log-level DEBUG nbi.service.app:app
# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging, socketio, time
from typing import Any, List, Optional, Tuple
from flask import Flask, request
from flask_restful import Api, Resource
from flask_socketio import Namespace, SocketIO
from common.tools.kafka.Variables import KafkaConfig, KafkaTopic
from nbi.Config import SECRET_KEY
LOGGER = logging.getLogger(__name__)
def log_request(response):
timestamp = time.strftime('[%Y-%b-%d %H:%M]')
LOGGER.info(
'%s %s %s %s %s', timestamp, request.remote_addr, request.method,
request.full_path, response.status
)
return response
class NbiApplication:
def __init__(self, base_url : Optional[str] = None) -> None:
if base_url is None: base_url = ''
self.base_url = base_url
self._app = Flask(__name__)
self._app.config['SECRET_KEY'] = SECRET_KEY
self._app.after_request(log_request)
self._api = Api(self._app, prefix=base_url)
# Configure KafkaManager to enable SocketIO Servers running in different
# gunicorn workers to self-coordinate and share sessions.
#self._sio_client_manager = socketio.KafkaManager(
# url='kafka://{:s}'.format(KafkaConfig.get_kafka_address()),
# channel=KafkaTopic.NBI_SOCKETIO_WORKERS.value
#)
self._sio = SocketIO(
self._app, cors_allowed_origins='*', async_mode='eventlet',
#client_manager=self._sio_client_manager,
logger=True, engineio_logger=True
)
def add_rest_api_resource(self, resource_class : Resource, *urls, **kwargs) -> None:
self._api.add_resource(resource_class, *urls, **kwargs)
def add_rest_api_resources(self, resources : List[Tuple[Resource, str, str]]) -> None:
for endpoint_name, resource_class, resource_url in resources:
self.add_rest_api_resource(resource_class, resource_url, endpoint=endpoint_name)
def add_websocket_namespace(self, namespace : Namespace) -> None:
self._sio.on_namespace(namespace)
def websocket_emit_message(
self, event : str, *args : Any, namespace : str = '/', to : Optional[str] = None
) -> None:
self._sio.emit(event, *args, namespace=namespace, to=to)
def get_flask_app(self) -> Flask:
return self._app
def get_flask_api(self) -> Api:
return self._api
def get_socketio_server(self) -> Optional[socketio.Server]:
return self._sio.server
def dump_configuration(self) -> None:
LOGGER.debug('Configured REST-API Resources:')
for resource in self._api.resources:
LOGGER.debug(' - {:s}'.format(str(resource)))
LOGGER.debug('Configured Flask Rules:')
for rule in self._app.url_map.iter_rules():
LOGGER.debug(' - {:s}'.format(str(rule)))
LOGGER.debug('Configured SocketIO/WebSocket Namespaces:')
for handler in self._sio.handlers:
LOGGER.debug(' - {:s}'.format(str(handler)))
for namespace in self._sio.namespace_handlers:
LOGGER.debug(' - {:s}'.format(str(namespace)))
for namespace in self._sio.server.handlers:
LOGGER.debug(' - {:s}'.format(str(namespace)))
for namespace in self._sio.server.namespace_handlers:
LOGGER.debug(' - {:s}'.format(str(namespace)))
# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import grpc, logging
from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_method
from common.proto.context_pb2 import (
AuthenticationResult, Empty, Service, ServiceId, ServiceIdList, ServiceStatus, TeraFlowController)
from common.proto.nbi_pb2_grpc import NbiServiceServicer
LOGGER = logging.getLogger(__name__)
METRICS_POOL = MetricsPool('NBI', 'RPC')
class NbiServiceServicerImpl(NbiServiceServicer):
def __init__(self):
LOGGER.info('Creating Servicer...')
LOGGER.info('Servicer Created')
@safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
def CheckCredentials(self, request : TeraFlowController, context : grpc.ServicerContext) -> AuthenticationResult:
LOGGER.warning('NOT IMPLEMENTED')
return AuthenticationResult()
@safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
def GetConnectivityServiceStatus(self, request : ServiceId, context : grpc.ServicerContext) -> ServiceStatus:
LOGGER.warning('NOT IMPLEMENTED')
return ServiceStatus()
@safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
def CreateConnectivityService(self, request : Service, context : grpc.ServicerContext) -> ServiceId:
LOGGER.warning('NOT IMPLEMENTED')
return ServiceId()
@safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
def EditConnectivityService(self, request : Service, context : grpc.ServicerContext) -> ServiceId:
LOGGER.warning('NOT IMPLEMENTED')
return ServiceId()
@safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
def DeleteConnectivityService(self, request : Service, context : grpc.ServicerContext) -> Empty:
LOGGER.warning('NOT IMPLEMENTED')
return Empty()
@safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
def GetAllActiveConnectivityServices(self, request : Empty, context : grpc.ServicerContext) -> ServiceIdList:
LOGGER.warning('NOT IMPLEMENTED')
return ServiceIdList()
@safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
def ClearAllConnectivityServices(self, request : Empty, context : grpc.ServicerContext) -> Empty:
LOGGER.warning('NOT IMPLEMENTED')
return Empty()
# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging, signal, sys, threading
from prometheus_client import start_http_server
from common.Constants import ServiceNameEnum
from common.Settings import (
ENVVAR_SUFIX_SERVICE_HOST, ENVVAR_SUFIX_SERVICE_PORT_GRPC,
get_env_var_name, get_log_level, get_metrics_port,
wait_for_environment_variables
)
from .NbiService import NbiService
from .rest_server.RestServer import RestServer
from .rest_server.nbi_plugins.etsi_bwm import register_etsi_bwm_api
from .rest_server.nbi_plugins.ietf_hardware import register_ietf_hardware
from .rest_server.nbi_plugins.ietf_l2vpn import register_ietf_l2vpn
from .rest_server.nbi_plugins.ietf_l3vpn import register_ietf_l3vpn
from .rest_server.nbi_plugins.ietf_network import register_ietf_network
from .rest_server.nbi_plugins.ietf_network_slice import register_ietf_nss
from .rest_server.nbi_plugins.ietf_acl import register_ietf_acl
from .rest_server.nbi_plugins.qkd_app import register_qkd_app
from .rest_server.nbi_plugins.tfs_api import register_tfs_api
from .rest_server.nbi_plugins import register_restconf
from .context_subscription import register_context_subscription
terminate = threading.Event()
LOGGER = None
def signal_handler(signal, frame): # pylint: disable=redefined-outer-name, unused-argument
LOGGER.warning('Terminate signal received')
terminate.set()
def main():
global LOGGER # pylint: disable=global-statement
log_level = get_log_level()
logging.basicConfig(level=log_level)
LOGGER = logging.getLogger(__name__)
wait_for_environment_variables([
get_env_var_name(ServiceNameEnum.CONTEXT, ENVVAR_SUFIX_SERVICE_HOST ),
get_env_var_name(ServiceNameEnum.CONTEXT, ENVVAR_SUFIX_SERVICE_PORT_GRPC),
get_env_var_name(ServiceNameEnum.DEVICE, ENVVAR_SUFIX_SERVICE_HOST ),
get_env_var_name(ServiceNameEnum.DEVICE, ENVVAR_SUFIX_SERVICE_PORT_GRPC),
get_env_var_name(ServiceNameEnum.SERVICE, ENVVAR_SUFIX_SERVICE_HOST ),
get_env_var_name(ServiceNameEnum.SERVICE, ENVVAR_SUFIX_SERVICE_PORT_GRPC),
])
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
LOGGER.info('Starting...')
# Start metrics server
metrics_port = get_metrics_port()
start_http_server(metrics_port)
# Starting NBI service
grpc_service = NbiService()
grpc_service.start()
rest_server = RestServer()
register_etsi_bwm_api(rest_server)
register_ietf_hardware(rest_server)
register_ietf_l2vpn(rest_server) # Registering L2VPN entrypoint
register_ietf_l3vpn(rest_server) # Registering L3VPN entrypoint
register_ietf_network(rest_server)
register_ietf_nss(rest_server) # Registering NSS entrypoint
register_ietf_acl(rest_server)
register_qkd_app(rest_server)
register_tfs_api(rest_server)
register_restconf(rest_server)
rest_server.start()
register_context_subscription()
LOGGER.debug('Configured Resources:')
for resource in rest_server.api.resources:
LOGGER.debug(' - {:s}'.format(str(resource)))
LOGGER.debug('Configured Rules:')
for rule in rest_server.app.url_map.iter_rules():
LOGGER.debug(' - {:s}'.format(str(rule)))
# Wait for Ctrl+C or termination signal
while not terminate.wait(timeout=1.0): pass
LOGGER.info('Terminating...')
grpc_service.stop()
rest_server.shutdown()
rest_server.join()
LOGGER.info('Bye')
return 0
if __name__ == '__main__':
sys.exit(main())
# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Enable eventlet for async networking
# NOTE: monkey_patch needs to be executed before importing any other module.
import eventlet
eventlet.monkey_patch()
#pylint: disable=wrong-import-position
import logging
from common.tools.kafka.Variables import KafkaTopic
from common.Constants import ServiceNameEnum
from common.Settings import (
ENVVAR_SUFIX_SERVICE_HOST, ENVVAR_SUFIX_SERVICE_PORT_GRPC,
get_env_var_name, get_http_bind_address, get_log_level,
get_service_baseurl_http, get_service_port_http,
wait_for_environment_variables
)
from .NbiApplication import NbiApplication
from .etsi_bwm import register_etsi_bwm_api
from .health_probes import register_health_probes
from .ietf_acl import register_ietf_acl
from .ietf_hardware import register_ietf_hardware
from .ietf_l2vpn import register_ietf_l2vpn
from .ietf_l3vpn import register_ietf_l3vpn
from .ietf_network import register_ietf_network
from .ietf_network_slice import register_ietf_nss
from .qkd_app import register_qkd_app
from .restconf_root import register_restconf_root
from .tfs_api import register_tfs_api
#from .topology_updates import register_topology_updates
from .vntm_recommend import register_vntm_recommend
from .well_known_meta import register_well_known
LOG_LEVEL = get_log_level()
logging.basicConfig(
level=LOG_LEVEL,
format="[Worker-%(process)d][%(asctime)s] %(levelname)s:%(name)s:%(message)s",
)
logging.getLogger('kafka.client').setLevel(logging.WARNING)
logging.getLogger('kafka.cluster').setLevel(logging.WARNING)
logging.getLogger('kafka.conn').setLevel(logging.WARNING)
logging.getLogger('kafka.consumer.fetcher').setLevel(logging.WARNING)
logging.getLogger('kafka.consumer.group').setLevel(logging.WARNING)
logging.getLogger('kafka.consumer.subscription_state').setLevel(logging.WARNING)
logging.getLogger('kafka.metrics.metrics').setLevel(logging.WARNING)
logging.getLogger('kafka.producer.kafka').setLevel(logging.WARNING)
logging.getLogger('kafka.producer.record_accumulator').setLevel(logging.WARNING)
logging.getLogger('kafka.producer.sender').setLevel(logging.WARNING)
logging.getLogger('kafka.protocol.parser').setLevel(logging.WARNING)
logging.getLogger('socketio.server').setLevel(logging.WARNING)
LOGGER = logging.getLogger(__name__)
LOGGER.info('Starting...')
wait_for_environment_variables([
get_env_var_name(ServiceNameEnum.CONTEXT, ENVVAR_SUFIX_SERVICE_HOST ),
get_env_var_name(ServiceNameEnum.CONTEXT, ENVVAR_SUFIX_SERVICE_PORT_GRPC),
get_env_var_name(ServiceNameEnum.DEVICE, ENVVAR_SUFIX_SERVICE_HOST ),
get_env_var_name(ServiceNameEnum.DEVICE, ENVVAR_SUFIX_SERVICE_PORT_GRPC),
get_env_var_name(ServiceNameEnum.SERVICE, ENVVAR_SUFIX_SERVICE_HOST ),
get_env_var_name(ServiceNameEnum.SERVICE, ENVVAR_SUFIX_SERVICE_PORT_GRPC),
])
BASE_URL = get_service_baseurl_http(ServiceNameEnum.NBI) or ''
LOGGER.info('Creating missing Kafka topics...')
KafkaTopic.create_all_topics()
LOGGER.info('Created required Kafka topics')
nbi_app = NbiApplication(base_url=BASE_URL)
register_health_probes (nbi_app)
register_restconf_root (nbi_app)
register_well_known (nbi_app)
register_tfs_api (nbi_app)
register_etsi_bwm_api (nbi_app)
register_ietf_hardware (nbi_app)
register_ietf_l2vpn (nbi_app)
register_ietf_l3vpn (nbi_app)
register_ietf_network (nbi_app)
register_ietf_nss (nbi_app)
register_ietf_acl (nbi_app)
register_qkd_app (nbi_app)
#register_topology_updates(nbi_app) # does not work; check if eventlet-grpc side effects
register_vntm_recommend (nbi_app)
LOGGER.info('All connectors registered')
nbi_app.dump_configuration()
app = nbi_app.get_flask_app()
LOGGER.info('Initialization completed!')
if __name__ == '__main__':
# Only used to run it locally during development stage;
# otherwise, app is directly launched by gunicorn.
BIND_ADDRESS = get_http_bind_address()
BIND_PORT = get_service_port_http(ServiceNameEnum.NBI)
nbi_app._sio.run(
app, host=BIND_ADDRESS, port=BIND_PORT,
debug=True, use_reloader=False
)
...@@ -12,18 +12,19 @@ ...@@ -12,18 +12,19 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
from nbi.service.rest_server.RestServer import RestServer from nbi.service.NbiApplication import NbiApplication
from .Resources import BwInfo, BwInfoId from .Resources import BwInfo, BwInfoId
URL_PREFIX = '/restconf/bwm/v1' URL_PREFIX = '/restconf/bwm/v1'
# Use 'path' type since some identifiers might contain char '/' and Flask is unable to recognize them in 'string' type. def register_etsi_bwm_api(nbi_app : NbiApplication):
RESOURCES = [ nbi_app.add_rest_api_resource(
# (endpoint_name, resource_class, resource_url) BwInfo,
('api.bw_info', BwInfo, '/bw_allocations'), URL_PREFIX + '/bw_allocations',
('api.bw_info_id', BwInfoId, '/bw_allocations/<path:allocationId>'), endpoint='etsi_bwm.bw_info'
] )
nbi_app.add_rest_api_resource(
def register_etsi_bwm_api(rest_server : RestServer): BwInfoId,
for endpoint_name, resource_class, resource_url in RESOURCES: URL_PREFIX + '/bw_allocations/<path:allocationId>',
rest_server.add_resource(resource_class, URL_PREFIX + resource_url, endpoint=endpoint_name) endpoint='etsi_bwm.bw_info_id'
)
# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import time
SIO_NAMESPACE = '/heartbeat'
SIO_ROOM = 'heartbeat'
START_TIME = time.time()
HEARTHBEAT_INTERVAL = 1 # second
# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging, socketio, threading, time
from .Constants import HEARTHBEAT_INTERVAL, SIO_NAMESPACE, SIO_ROOM, START_TIME
LOGGER = logging.getLogger(__name__)
class HeartbeatThread(threading.Thread):
def __init__(self, namespace : socketio.Namespace):
super().__init__(daemon=True)
self._terminate = threading.Event()
self._namespace = namespace
def start(self):
self._terminate.clear()
return super().start()
def stop(self) -> None:
self._terminate.set()
def run(self):
try:
LOGGER.info('[run] Running...')
while not self._terminate.is_set():
time.sleep(HEARTHBEAT_INTERVAL)
server : socketio.Server = self._namespace.server
if server is None: continue
data = {'uptime_seconds': time.time() - START_TIME}
server.emit('uptime', data, namespace=SIO_NAMESPACE, to=SIO_ROOM)
except: # pylint: disable=bare-except
LOGGER.exception('[run] Unexpected Thread Exception')
LOGGER.info('[run] Terminated')
# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from flask import request
from flask_socketio import Namespace, join_room, leave_room
from .Constants import SIO_NAMESPACE, SIO_ROOM
from .HeartbeatThread import HeartbeatThread
LOGGER = logging.getLogger(__name__)
class HeartbeatServerNamespace(Namespace):
def __init__(self):
super().__init__(namespace=SIO_NAMESPACE)
self._thread = HeartbeatThread(self)
#self._thread.start()
def stop_thread(self) -> None:
self._thread.stop()
def on_connect(self, auth):
MSG = '[on_connect] Client connect: sid={:s}, auth={:s}'
LOGGER.info(MSG.format(str(request.sid), str(auth)))
join_room(SIO_ROOM, namespace=SIO_NAMESPACE)
def on_disconnect(self, reason):
MSG = '[on_disconnect] Client disconnect: sid={:s}, reason={:s}'
LOGGER.info(MSG.format(str(request.sid), str(reason)))
leave_room(SIO_ROOM, namespace=SIO_NAMESPACE)