Commit 33abdf49 authored by Lluis Gifre Renom's avatar Lluis Gifre Renom
Browse files

Merge branch 'fix/54-notificatoins-from-context-are-incomplete' into 'develop'

Resolve "Notificatoins from Context are incomplete"

See merge request !147
parents 38233dfc 2af56d77
Loading
Loading
Loading
Loading
+21 −17
Original line number Diff line number Diff line
@@ -52,9 +52,11 @@ class NatsBackendThread(threading.Thread):
        self, topic_name : str, timeout : float, out_queue : queue.Queue[Message], unsubscribe : threading.Event,
        ready_event : threading.Event
    ) -> None:
        try:
            LOGGER.info('[_run_subscriber] NATS URI: {:s}'.format(str(self._nats_uri)))
            client = await nats.connect(servers=[self._nats_uri])
        LOGGER.info('[_run_subscriber] Connected!')
            server_version = client.connected_server_version
            LOGGER.info('[_run_subscriber] Connected! NATS Server version: {:s}'.format(str(repr(server_version))))
            subscription = await client.subscribe(topic_name)
            LOGGER.info('[_run_subscriber] Subscribed!')
            ready_event.set()
@@ -68,6 +70,8 @@ class NatsBackendThread(threading.Thread):
                out_queue.put(Message(message.subject, message.data.decode('UTF-8')))
            await subscription.unsubscribe()
            await client.drain()
        except Exception:   # pylint: disable=broad-exception-caught
            LOGGER.exception('[_run_subscriber] Unhandled Exception')

    def subscribe(
        self, topic_name : str, timeout : float, out_queue : queue.Queue[Message], unsubscribe : threading.Event
@@ -79,7 +83,7 @@ class NatsBackendThread(threading.Thread):
        self._tasks.append(task)
        LOGGER.info('[subscribe] Waiting for subscriber to be ready...')
        is_ready = ready_event.wait(timeout=120)
        LOGGER.info('[subscribe] Subscriber Ready: {:s}'.format(str(is_ready)))
        LOGGER.info('[subscribe] Subscriber is Ready? {:s}'.format(str(is_ready)))

    def run(self) -> None:
        asyncio.set_event_loop(self._event_loop)
+0 −4
Original line number Diff line number Diff line
@@ -28,10 +28,6 @@ TOPIC_SERVICE = 'service'
TOPIC_SLICE      = 'slice'
TOPIC_TOPOLOGY   = 'topology'

TOPICS = {
    TOPIC_CONNECTION, TOPIC_CONTEXT, TOPIC_DEVICE, TOPIC_LINK, TOPIC_POLICY, TOPIC_SERVICE, TOPIC_SLICE, TOPIC_TOPOLOGY
}

CONSUME_TIMEOUT = 0.5 # seconds

class Message(NamedTuple):
+21 −3
Original line number Diff line number Diff line
@@ -15,7 +15,7 @@
import grpc, logging
from typing import List, Optional
from common.Constants import DEFAULT_CONTEXT_NAME
from common.proto.context_pb2 import ContextId, Topology, TopologyId
from common.proto.context_pb2 import ContextId, Topology, TopologyDetails, TopologyId
from common.tools.object_factory.Context import json_context_id
from common.tools.object_factory.Topology import json_topology
from context.client.ContextClient import ContextClient
@@ -23,13 +23,13 @@ from context.client.ContextClient import ContextClient
LOGGER = logging.getLogger(__name__)

def create_topology(
    context_client : ContextClient, context_uuid : str, topology_uuid : str
    context_client : ContextClient, context_uuid : str, topology_uuid : str, name : Optional[str] = None
) -> None:
    context_id = ContextId(**json_context_id(context_uuid))
    existing_topology_ids = context_client.ListTopologyIds(context_id)
    existing_topology_uuids = {topology_id.topology_uuid.uuid for topology_id in existing_topology_ids.topology_ids}
    if topology_uuid in existing_topology_uuids: return
    context_client.SetTopology(Topology(**json_topology(topology_uuid, context_id=context_id)))
    context_client.SetTopology(Topology(**json_topology(topology_uuid, context_id=context_id, name=name)))

def create_missing_topologies(
    context_client : ContextClient, context_id : ContextId, topology_uuids : List[str]
@@ -61,3 +61,21 @@ def get_topology(
    except grpc.RpcError:
        #LOGGER.exception('Unable to get topology({:s} / {:s})'.format(str(context_uuid), str(topology_uuid)))
        return None

def get_topology_details(
        context_client : ContextClient, topology_uuid : str, context_uuid : str = DEFAULT_CONTEXT_NAME,
        rw_copy : bool = False
    ) -> Optional[Topology]:
    try:
        # pylint: disable=no-member
        topology_id = TopologyId()
        topology_id.context_id.context_uuid.uuid = context_uuid
        topology_id.topology_uuid.uuid = topology_uuid
        ro_topology_details = context_client.GetTopologyDetails(topology_id)
        if not rw_copy: return ro_topology_details
        rw_topology_details = TopologyDetails()
        rw_topology_details.CopyFrom(ro_topology_details)
        return rw_topology_details
    except grpc.RpcError:
        #LOGGER.exception('Unable to get topology({:s} / {:s})'.format(str(context_uuid), str(topology_uuid)))
        return None
+62 −133

File changed.

Preview size limit exceeded, changes collapsed.

src/context/service/Events.py

deleted100644 → 0
+0 −42
Original line number Diff line number Diff line
# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import json, time
from typing import Dict
from common.message_broker.Message import Message
from common.message_broker.MessageBroker import MessageBroker
from common.proto.context_pb2 import EventTypeEnum

TOPIC_CONNECTION = 'connection'
TOPIC_CONTEXT    = 'context'
TOPIC_DEVICE     = 'device'
TOPIC_LINK       = 'link'
TOPIC_POLICY     = 'policy'
TOPIC_SERVICE    = 'service'
TOPIC_SLICE      = 'slice'
TOPIC_TOPOLOGY   = 'topology'

TOPICS = {
    TOPIC_CONNECTION, TOPIC_CONTEXT, TOPIC_DEVICE, TOPIC_LINK, TOPIC_POLICY, TOPIC_SERVICE, TOPIC_SLICE, TOPIC_TOPOLOGY
}

CONSUME_TIMEOUT = 0.5 # seconds

def notify_event(
    messagebroker : MessageBroker, topic_name : str, event_type : EventTypeEnum, fields : Dict[str, str]
) -> None:
    event = {'event': {'timestamp': {'timestamp': time.time()}, 'event_type': event_type}}
    for field_name, field_value in fields.items():
        event[field_name] = field_value
    messagebroker.publish(Message(topic_name, json.dumps(event)))
Loading