diff --git a/manifests/nbiservice.yaml b/manifests/nbiservice.yaml index d2d65c719d23780c31228936d9fb4893a7b247f9..3e6d1de8403c45d3ce6a2c7f5a185be84c0243b9 100644 --- a/manifests/nbiservice.yaml +++ b/manifests/nbiservice.yaml @@ -44,20 +44,23 @@ spec: value: "production" # change to "development" if developing - name: IETF_NETWORK_RENDERER value: "LIBYANG" + envFrom: + - secretRef: + name: kfk-kpi-data readinessProbe: httpGet: path: /healthz port: 8080 - initialDelaySeconds: 5 + initialDelaySeconds: 30 # NBI's gunicorn takes 30~40 seconds to bootstrap periodSeconds: 10 - failureThreshold: 3 + failureThreshold: 6 livenessProbe: httpGet: path: /healthz port: 8080 - initialDelaySeconds: 5 + initialDelaySeconds: 30 # NBI's gunicorn takes 30~40 seconds to bootstrap periodSeconds: 10 - failureThreshold: 3 + failureThreshold: 6 resources: requests: cpu: 150m diff --git a/src/nbi/Dockerfile b/src/nbi/Dockerfile index c56dff12b978ddf21d053242d96dd663af72e686..6bca1c81a5afbaf891b22c7497d230066b750e75 100644 --- a/src/nbi/Dockerfile +++ b/src/nbi/Dockerfile @@ -89,5 +89,4 @@ RUN mkdir -p /var/teraflow/tests/tools COPY src/tests/tools/mock_osm/. tests/tools/mock_osm/ # Start the service -ENTRYPOINT ["gunicorn", "-w", "4", "--worker-class", "eventlet", "-b", "0.0.0.0:8080", "nbi.service.app:app"] -#ENTRYPOINT ["gunicorn", "-w", "4", "--worker-class", "geventwebsocket.gunicorn.workers.GeventWebSocketWorker", "-b", "0.0.0.0:8080", "nbi.service.app:app"] +ENTRYPOINT ["gunicorn", "--workers", "4", "--worker-class", "eventlet", "--bind", "0.0.0.0:8080", "nbi.service.app:app"] diff --git a/src/nbi/README.md b/src/nbi/README.md index f997ce21c9b809a1749f046672e895d3ad466824..5dc5be29b9cd5f30c8783c5242fad35b47215d7b 100644 --- a/src/nbi/README.md +++ b/src/nbi/README.md @@ -2,6 +2,15 @@ The NBI component uses libyang to validate and process messages. Follow instructions below to install it. + +## IMPORTANT +**TL;DR**: Use kafka-python for consuming from kafka in the NBI component. + +Why: + +`confluent-kafka` is written in C, thus, it bypasses eventlet monkey_patches that convert normal threads into green_threads. +That implies methods such as consumer.poll() become blocking in eventlet scenario used by gunicorn web server. + ## Install libyang - Ref: https://github.com/CESNET/libyang - Ref: https://github.com/CESNET/libyang-python/ diff --git a/src/nbi/requirements.in b/src/nbi/requirements.in index 401a6de3026f4ab5896f21224e7674435553e080..e21aee4f699991f02ab535ac7d6216ac50dc3977 100644 --- a/src/nbi/requirements.in +++ b/src/nbi/requirements.in @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +confluent-kafka==2.3.* # only for creating topics and compatibility deepdiff==6.7.* deepmerge==1.1.* eventlet==0.39.0 @@ -19,16 +20,18 @@ Flask==2.1.3 Flask-HTTPAuth==4.5.0 Flask-RESTful==0.3.9 flask-socketio==5.5.1 -jsonschema==4.4.0 #gevent==24.11.1 #gevent-websocket==0.10.1 #greenlet==3.1.1 gunicorn==23.0.0 +jsonschema==4.4.0 +kafka-python==2.0.6 # for publishing and consuming messages in an eventlet-compatible way libyang==2.8.4 netaddr==0.9.0 pyang==2.6.0 git+https://github.com/robshakir/pyangbind.git pydantic==2.6.3 +python-socketio==5.12.1 requests==2.27.1 werkzeug==2.3.7 #websockets==12.0 diff --git a/src/nbi/service/app.py b/src/nbi/service/app.py index a413c6db81869149157d75a5954986ed26c5b699..a0206cdd6df679a8e7860ea49b2a7330e7f1e8df 100644 --- a/src/nbi/service/app.py +++ b/src/nbi/service/app.py @@ -20,6 +20,7 @@ eventlet.monkey_patch() #pylint: disable=wrong-import-position import logging +from common.tools.kafka.Variables import KafkaTopic from common.Constants import ServiceNameEnum from common.Settings import ( ENVVAR_SUFIX_SERVICE_HOST, ENVVAR_SUFIX_SERVICE_PORT_GRPC, @@ -39,14 +40,21 @@ from .ietf_network_slice import register_ietf_nss from .qkd_app import register_qkd_app from .restconf_root import register_restconf_root from .tfs_api import register_tfs_api +#from .topology_updates import register_topology_updates +from .vntm_recommend import register_vntm_recommend from .well_known_meta import register_well_known LOG_LEVEL = get_log_level() -logging.basicConfig(level=LOG_LEVEL) +logging.basicConfig( + level=LOG_LEVEL, + format="[Worker-%(process)d][%(asctime)s] %(levelname)s:%(name)s:%(message)s", +) logging.getLogger('socketio.server').setLevel(logging.WARNING) LOGGER = logging.getLogger(__name__) +LOGGER.info('Starting...') + wait_for_environment_variables([ get_env_var_name(ServiceNameEnum.CONTEXT, ENVVAR_SUFIX_SERVICE_HOST ), get_env_var_name(ServiceNameEnum.CONTEXT, ENVVAR_SUFIX_SERVICE_PORT_GRPC), @@ -58,23 +66,32 @@ wait_for_environment_variables([ BASE_URL = get_service_baseurl_http(ServiceNameEnum.NBI) or '' +LOGGER.info('Creating missing Kafka topics...') +KafkaTopic.create_all_topics() +LOGGER.info('Created required Kafka topics') + nbi_app = NbiApplication(base_url=BASE_URL) -register_health_probes(nbi_app) -register_restconf_root(nbi_app) -register_well_known (nbi_app) -register_tfs_api (nbi_app) -register_etsi_bwm_api (nbi_app) -register_ietf_hardware(nbi_app) -register_ietf_l2vpn (nbi_app) -register_ietf_l3vpn (nbi_app) -register_ietf_network (nbi_app) -register_ietf_nss (nbi_app) -register_ietf_acl (nbi_app) -register_qkd_app (nbi_app) +register_health_probes (nbi_app) +register_restconf_root (nbi_app) +register_well_known (nbi_app) +register_tfs_api (nbi_app) +register_etsi_bwm_api (nbi_app) +register_ietf_hardware (nbi_app) +register_ietf_l2vpn (nbi_app) +register_ietf_l3vpn (nbi_app) +register_ietf_network (nbi_app) +register_ietf_nss (nbi_app) +register_ietf_acl (nbi_app) +register_qkd_app (nbi_app) +#register_topology_updates(nbi_app) # does not work; check if eventlet-grpc side effects +register_vntm_recommend (nbi_app) +LOGGER.info('All connectors registered') nbi_app.dump_configuration() app = nbi_app.get_flask_app() +LOGGER.info('Initialization completed!') + if __name__ == '__main__': # Only used to run it locally during development stage; # otherwise, app is directly launched by gunicorn. diff --git a/src/nbi/service/health_probes/HeartbeatThread.py b/src/nbi/service/health_probes/HeartbeatThread.py index c49f4ab49fdd1d09f5747ba9680b79e22299bc11..67da0b55e2d02f429b1c16dace60c910b64e1443 100644 --- a/src/nbi/service/health_probes/HeartbeatThread.py +++ b/src/nbi/service/health_probes/HeartbeatThread.py @@ -31,20 +31,14 @@ class HeartbeatThread(threading.Thread): self._terminate.set() def run(self): - LOGGER.debug('[HeartbeatThread::run] begin') try: + LOGGER.info('[run] Running...') while not self._terminate.is_set(): - LOGGER.debug('[HeartbeatThread::run] Running...') time.sleep(HEARTHBEAT_INTERVAL) - LOGGER.debug('[HeartbeatThread::run] Interval elapsed') - server : socketio.Server = self._namespace.server if server is None: continue - - LOGGER.debug('[HeartbeatThread::run] emitting...') data = {'uptime_seconds': time.time() - START_TIME} server.emit('uptime', data, namespace=SIO_NAMESPACE, to=SIO_ROOM) - LOGGER.debug('[HeartbeatThread::run] emitted') except: # pylint: disable=bare-except - LOGGER.exception('[HeartbeatThread::run] thread failed') - LOGGER.debug('[HeartbeatThread::run] end') + LOGGER.exception('[run] Unexpected Thread Exception') + LOGGER.info('[run] Terminated') diff --git a/src/nbi/service/health_probes/Namespaces.py b/src/nbi/service/health_probes/Namespaces.py index 9f7517d9b2a634cf21aaa73eed2b49d44dbd507b..8a3f9323c3312bd8376bb9839f7ce09c377d22ec 100644 --- a/src/nbi/service/health_probes/Namespaces.py +++ b/src/nbi/service/health_probes/Namespaces.py @@ -30,11 +30,11 @@ class HeartbeatServerNamespace(Namespace): self._thread.stop() def on_connect(self, auth): - MSG = '[HeartbeatServerNamespace::on_connect] Client connect: sid={:s}, auth={:s}' + MSG = '[on_connect] Client connect: sid={:s}, auth={:s}' LOGGER.info(MSG.format(str(request.sid), str(auth))) join_room(SIO_ROOM, namespace=SIO_NAMESPACE) def on_disconnect(self, reason): - MSG = '[HeartbeatServerNamespace::on_disconnect] Client disconnect: sid={:s}, reason={:s}' + MSG = '[on_disconnect] Client disconnect: sid={:s}, reason={:s}' LOGGER.info(MSG.format(str(request.sid), str(reason))) leave_room(SIO_ROOM, namespace=SIO_NAMESPACE) diff --git a/src/nbi/service/topology_updates/Constants.py b/src/nbi/service/topology_updates/Constants.py new file mode 100644 index 0000000000000000000000000000000000000000..3e4dada3a736ca9922f86108b38fec53e6be1295 --- /dev/null +++ b/src/nbi/service/topology_updates/Constants.py @@ -0,0 +1,17 @@ +# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +SIO_NAMESPACE = '/topo-updates' +SIO_ROOM = 'topo-updates' diff --git a/src/nbi/service/topology_updates/Namespaces.py b/src/nbi/service/topology_updates/Namespaces.py new file mode 100644 index 0000000000000000000000000000000000000000..963a1f26c4cf188debe654494c1de40e665d9ced --- /dev/null +++ b/src/nbi/service/topology_updates/Namespaces.py @@ -0,0 +1,63 @@ +# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +from flask import request +from flask_socketio import Namespace, join_room, leave_room +from common.Constants import DEFAULT_CONTEXT_NAME, DEFAULT_TOPOLOGY_NAME +from common.proto.context_pb2 import TopologyId +from common.tools.grpc.Tools import grpc_message_to_json_string +from common.tools.object_factory.Context import json_context_id +from common.tools.object_factory.Topology import json_topology_id +from context.client.ContextClient import ContextClient +from .Constants import SIO_NAMESPACE, SIO_ROOM +from .TopoUpdatesThread import TopoUpdatesThread + +LOGGER = logging.getLogger(__name__) + +class TopoUpdatesServerNamespace(Namespace): + def __init__(self): + super().__init__(namespace=SIO_NAMESPACE) + self._thread = TopoUpdatesThread(self) + self._thread.start() + + def stop_thread(self) -> None: + self._thread.stop() + + def on_connect(self, auth): + MSG = '[on_connect] Client connect: sid={:s}, auth={:s}' + LOGGER.info(MSG.format(str(request.sid), str(auth))) + join_room(SIO_ROOM, namespace=SIO_NAMESPACE) + + LOGGER.debug('[on_connect] emitting topology snapshot...') + + context_id = json_context_id(DEFAULT_CONTEXT_NAME) + topology_id = json_topology_id(DEFAULT_TOPOLOGY_NAME, context_id) + + try: + context_client = ContextClient() + topology_details = context_client.GetTopologyDetails( + TopologyId(**topology_id) + ) + except: # pylint: disable=bare-except + MSG = 'Unable to retrieve topology snapshot: {:s}' + LOGGER.exception(MSG.format(str(topology_id))) + else: + topology_snapshot = grpc_message_to_json_string(topology_details) + self.emit('topology-snapshot', topology_snapshot) + + def on_disconnect(self, reason): + MSG = '[on_disconnect] Client disconnect: sid={:s}, reason={:s}' + LOGGER.info(MSG.format(str(request.sid), str(reason))) + leave_room(SIO_ROOM, namespace=SIO_NAMESPACE) diff --git a/src/nbi/service/topology_updates/TopoUpdatesThread.py b/src/nbi/service/topology_updates/TopoUpdatesThread.py new file mode 100644 index 0000000000000000000000000000000000000000..dd9f96cd6ee2c3130b1c60f3b56df0583df0f6fd --- /dev/null +++ b/src/nbi/service/topology_updates/TopoUpdatesThread.py @@ -0,0 +1,92 @@ +# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging, socketio, threading +from common.proto.context_pb2 import TopologyId +from common.tools.grpc.Tools import grpc_message_to_json_string +from common.tools.object_factory.Context import json_context_id +from common.tools.object_factory.Topology import json_topology_id +from common.Constants import DEFAULT_CONTEXT_NAME, DEFAULT_TOPOLOGY_NAME +from context.client.ContextClient import ContextClient +from context.client.EventsCollector import EventsCollector +from .Constants import SIO_NAMESPACE, SIO_ROOM + +LOGGER = logging.getLogger(__name__) + +ADMIN_TOPOLOGY_ID = TopologyId( + **json_topology_id( + DEFAULT_TOPOLOGY_NAME, + context_id=json_context_id(DEFAULT_CONTEXT_NAME) + ) +) + +GET_EVENT_TIMEOUT = 1.0 + +class TopoUpdatesThread(threading.Thread): + def __init__(self, namespace : socketio.Namespace): + super().__init__(daemon=True) + self._terminate = threading.Event() + self._namespace = namespace + + def start(self): + self._terminate.clear() + return super().start() + + def stop(self) -> None: + self._terminate.set() + + def run(self): + LOGGER.info('[run] Starting...') + try: + context_client = ContextClient() + events_collector = EventsCollector( + context_client, + log_events_received = True, + activate_context_collector = True, + activate_topology_collector = True, + activate_device_collector = True, + activate_link_collector = True, + activate_service_collector = False, + activate_slice_collector = False, + activate_connection_collector = False, + ) + events_collector.start() + + LOGGER.info('[run] Running...') + while not self._terminate.is_set(): + event = events_collector.get_event(block=True, timeout=GET_EVENT_TIMEOUT) + if event is None: continue + MSG = '[run] Event: {:s}' + LOGGER.debug(MSG.format(grpc_message_to_json_string(event))) + + # TODO: ideally, each event should trigger a notification containing + # the type of event and the relevant data for the event. To simplify, + # for now, the entire topology is sent. + + topology_details = context_client.GetTopologyDetails(ADMIN_TOPOLOGY_ID) + topology_update = grpc_message_to_json_string(topology_details) + + LOGGER.debug('[run] checking server namespace...') + server : socketio.Server = self._namespace.server + if server is None: continue + + LOGGER.debug('[run] emitting topology update...') + server.emit('topology-update', topology_update, namespace=SIO_NAMESPACE, to=SIO_ROOM) + LOGGER.debug('[run] emitted') + + LOGGER.info('[run] Exiting') + events_collector.stop() + except: # pylint: disable=bare-except + LOGGER.exception('[run] Unexpected Thread Exception') + LOGGER.info('[run] Terminated') diff --git a/src/nbi/service/topology_updates/__init__.py b/src/nbi/service/topology_updates/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..6a3d33228a8cab317b70c4d3185544f0d54e5d41 --- /dev/null +++ b/src/nbi/service/topology_updates/__init__.py @@ -0,0 +1,20 @@ +# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from nbi.service.NbiApplication import NbiApplication +from .Namespaces import TopoUpdatesServerNamespace + +def register_topology_updates(nbi_app : NbiApplication): + nbi_app.add_websocket_namespace(TopoUpdatesServerNamespace()) diff --git a/src/nbi/service/vntm_recommend/Constants.py b/src/nbi/service/vntm_recommend/Constants.py new file mode 100644 index 0000000000000000000000000000000000000000..99438dac39a22b7951b6ca3f058ff3eb10de108d --- /dev/null +++ b/src/nbi/service/vntm_recommend/Constants.py @@ -0,0 +1,17 @@ +# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +SIO_NAMESPACE = '/vnt-recomm' +SIO_ROOM = 'vnt-recomm' diff --git a/src/nbi/service/vntm_recommend/Namespaces.py b/src/nbi/service/vntm_recommend/Namespaces.py new file mode 100644 index 0000000000000000000000000000000000000000..c4cb211a6ba3b8bbf2305f32f40443f071afcb61 --- /dev/null +++ b/src/nbi/service/vntm_recommend/Namespaces.py @@ -0,0 +1,40 @@ +# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +from flask import request +from flask_socketio import Namespace, join_room, leave_room +from .Constants import SIO_NAMESPACE, SIO_ROOM +from .VntRecommThread import VntRecommThread + +LOGGER = logging.getLogger(__name__) + +class VntRecommServerNamespace(Namespace): + def __init__(self): + super().__init__(namespace=SIO_NAMESPACE) + self._thread = VntRecommThread(self) + self._thread.start() + + def stop_thread(self) -> None: + self._thread.stop() + + def on_connect(self, auth): + MSG = '[on_connect] Client connect: sid={:s}, auth={:s}' + LOGGER.info(MSG.format(str(request.sid), str(auth))) + join_room(SIO_ROOM, namespace=SIO_NAMESPACE) + + def on_disconnect(self, reason): + MSG = '[on_disconnect] Client disconnect: sid={:s}, reason={:s}' + LOGGER.info(MSG.format(str(request.sid), str(reason))) + leave_room(SIO_ROOM, namespace=SIO_NAMESPACE) diff --git a/src/nbi/service/vntm_recommend/VntRecommThread.py b/src/nbi/service/vntm_recommend/VntRecommThread.py new file mode 100644 index 0000000000000000000000000000000000000000..d5248751300a15bba8e9ab7ce3b3e52c2f5a1346 --- /dev/null +++ b/src/nbi/service/vntm_recommend/VntRecommThread.py @@ -0,0 +1,86 @@ +# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging, socketio, threading +from common.tools.kafka.Variables import KafkaConfig, KafkaTopic +from kafka import KafkaConsumer +from .Constants import SIO_NAMESPACE, SIO_ROOM + +logging.getLogger('kafka.client').setLevel(logging.WARNING) +logging.getLogger('kafka.cluster').setLevel(logging.WARNING) +logging.getLogger('kafka.conn').setLevel(logging.WARNING) +logging.getLogger('kafka.consumer.fetcher').setLevel(logging.WARNING) +logging.getLogger('kafka.consumer.group').setLevel(logging.WARNING) +logging.getLogger('kafka.consumer.subscription_state').setLevel(logging.WARNING) +logging.getLogger('kafka.metrics.metrics').setLevel(logging.WARNING) +logging.getLogger('kafka.protocol.parser').setLevel(logging.WARNING) + +LOGGER = logging.getLogger(__name__) + +class VntRecommThread(threading.Thread): + def __init__(self, namespace : socketio.Namespace): + super().__init__(daemon=True) + self._terminate = threading.Event() + self._namespace = namespace + + def start(self): + self._terminate.clear() + return super().start() + + def stop(self) -> None: + self._terminate.set() + + def run(self): + LOGGER.info('[run] Starting...') + try: + kafka_consumer = KafkaConsumer( + bootstrap_servers = KafkaConfig.get_kafka_address(), + group_id = None, # consumer dispatch all messages sent to subscribed topics + auto_offset_reset = 'latest', + ) + + LOGGER.info('[run] Subscribing...') + kafka_consumer.subscribe(topics=[KafkaTopic.VNTMANAGER_REQUEST.value]) + LOGGER.info('[run] Subscribed') + + while not self._terminate.is_set(): + records = kafka_consumer.poll(timeout_ms=1000, max_records=1) + if len(records) == 0: continue # no pending messages... continuing + + MSG = '[run] records={:s}' + LOGGER.debug(MSG.format(str(records))) + raise NotImplementedError('parse kafka records and extract recommendation') + + #if vntm_request.error(): + # if vntm_request.error().code() == KafkaError._PARTITION_EOF: continue + # MSG = '[run] Consumer error: {:s}' + # LOGGER.error(MSG.format(str(vntm_request.error()))) + # break + #message_key = vntm_request.key().decode('utf-8') + #message_value = vntm_request.value().decode('utf-8') + #MSG = '[run] Recommendation: key={:s} value={:s}' + #LOGGER.debug(MSG.format(str(message_key), str(message_value))) + # + #LOGGER.debug('[run] checking server namespace...') + #server : socketio.Server = self._namespace.server + #if server is None: continue + #LOGGER.debug('[run] emitting recommendation...') + #server.emit('recommendation', message_value, namespace=SIO_NAMESPACE, to=SIO_ROOM) + #LOGGER.debug('[run] emitted') + + LOGGER.info('[run] Closing...') + kafka_consumer.close() + except: # pylint: disable=bare-except + LOGGER.exception('[run] Unexpected Thread Exception') + LOGGER.info('[run] Terminated') diff --git a/src/nbi/service/vntm_recommend/__init__.py b/src/nbi/service/vntm_recommend/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..21deb31f0f2571ab3d6f8f0c563bf70b693e7a79 --- /dev/null +++ b/src/nbi/service/vntm_recommend/__init__.py @@ -0,0 +1,20 @@ +# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from nbi.service.NbiApplication import NbiApplication +from .Namespaces import VntRecommServerNamespace + +def register_vntm_recommend(nbi_app : NbiApplication): + nbi_app.add_websocket_namespace(VntRecommServerNamespace())