Skip to content
Snippets Groups Projects
Commit ebb001db authored by Javier Diaz's avatar Javier Diaz
Browse files

debugging

parent c41f38d9
No related branches found
No related tags found
2 merge requests!294Release TeraFlowSDN 4.0,!259Resolve "(CTTC) Replace DLT Gateway functionality with an opensource and Hyper Ledger v2.4+ compliant version"
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import grpc, json, logging, threading
from typing import Any, Dict, Set
from common.Constants import DEFAULT_CONTEXT_NAME, DEFAULT_TOPOLOGY_NAME, INTERDOMAIN_TOPOLOGY_NAME
from common.proto.context_pb2 import ContextId, Device, EventTypeEnum, Link, Slice, TopologyId
from common.proto.dlt_connector_pb2 import DltSliceId
from common.proto.dlt_gateway_pb2 import DltRecordEvent, DltRecordOperationEnum, DltRecordTypeEnum
from common.tools.context_queries.Context import create_context
from common.tools.context_queries.Device import add_device_to_topology
from common.tools.context_queries.Link import add_link_to_topology
from common.tools.context_queries.Topology import create_topology
from common.tools.grpc.Tools import grpc_message_to_json_string
from common.tools.object_factory.Context import json_context_id
from common.tools.object_factory.Topology import json_topology_id
from context.client.ContextClient import ContextClient
from dlt.connector.client.DltConnectorClient import DltConnectorClient
from dlt.connector.client.DltEventsCollector import DltEventsCollector
from dlt.connector.client.DltGatewayClient import DltGatewayClient
from interdomain.client.InterdomainClient import InterdomainClient
LOGGER = logging.getLogger(__name__)
GET_EVENT_TIMEOUT = 0.5
ADMIN_CONTEXT_ID = ContextId(**json_context_id(DEFAULT_CONTEXT_NAME))
class Clients:
def __init__(self) -> None:
self.context_client = ContextClient()
self.dlt_connector_client = DltConnectorClient()
self.dlt_gateway_client = DltGatewayClient()
self.interdomain_client = InterdomainClient()
def close(self) -> None:
self.interdomain_client.close()
self.dlt_gateway_client.close()
self.dlt_connector_client.close()
self.context_client.close()
class DltEventDispatcher(threading.Thread):
def __init__(self) -> None:
LOGGER.debug('Creating connector...')
super().__init__(name='DltEventDispatcher', daemon=True)
self._terminate = threading.Event()
LOGGER.debug('Connector created')
def start(self) -> None:
self._terminate.clear()
return super().start()
def stop(self):
self._terminate.set()
def run(self) -> None:
clients = Clients()
create_context(clients.context_client, DEFAULT_CONTEXT_NAME)
create_topology(clients.context_client, DEFAULT_CONTEXT_NAME, DEFAULT_TOPOLOGY_NAME)
create_topology(clients.context_client, DEFAULT_CONTEXT_NAME, INTERDOMAIN_TOPOLOGY_NAME)
dlt_events_collector = DltEventsCollector(clients.dlt_gateway_client, log_events_received=True)
dlt_events_collector.start()
while not self._terminate.is_set():
event = dlt_events_collector.get_event(block=True, timeout=GET_EVENT_TIMEOUT)
if event is None: continue
existing_topology_ids = clients.context_client.ListTopologyIds(ADMIN_CONTEXT_ID)
local_domain_uuids = {
topology_id.topology_uuid.uuid for topology_id in existing_topology_ids.topology_ids
}
local_domain_uuids.discard(DEFAULT_TOPOLOGY_NAME)
local_domain_uuids.discard(INTERDOMAIN_TOPOLOGY_NAME)
self.dispatch_event(clients, local_domain_uuids, event)
dlt_events_collector.stop()
clients.close()
def dispatch_event(self, clients : Clients, local_domain_uuids : Set[str], event : DltRecordEvent) -> None:
record_type : DltRecordTypeEnum = event.record_id.type # {UNDEFINED/CONTEXT/TOPOLOGY/DEVICE/LINK/SERVICE/SLICE}
if record_type == DltRecordTypeEnum.DLTRECORDTYPE_DEVICE:
self._dispatch_device(clients, local_domain_uuids, event)
elif record_type == DltRecordTypeEnum.DLTRECORDTYPE_LINK:
self._dispatch_link(clients, local_domain_uuids, event)
elif record_type == DltRecordTypeEnum.DLTRECORDTYPE_SLICE:
self._dispatch_slice(clients, local_domain_uuids, event)
else:
raise NotImplementedError('EventType: {:s}'.format(grpc_message_to_json_string(event)))
def _dispatch_device(self, clients : Clients, local_domain_uuids : Set[str], event : DltRecordEvent) -> None:
domain_uuid : str = event.record_id.domain_uuid.uuid
if domain_uuid in local_domain_uuids:
MSG = '[_dispatch_device] Ignoring DLT event received (local): {:s}'
LOGGER.info(MSG.format(grpc_message_to_json_string(event)))
return
MSG = '[_dispatch_device] DLT event received (remote): {:s}'
LOGGER.info(MSG.format(grpc_message_to_json_string(event)))
event_type : EventTypeEnum = event.event.event_type # {UNDEFINED/CREATE/UPDATE/REMOVE}
if event_type in {EventTypeEnum.EVENTTYPE_CREATE, EventTypeEnum.EVENTTYPE_UPDATE}:
LOGGER.info('[_dispatch_device] event.record_id={:s}'.format(grpc_message_to_json_string(event.record_id)))
record = clients.dlt_gateway_client.GetFromDlt(event.record_id)
LOGGER.info('[_dispatch_device] record={:s}'.format(grpc_message_to_json_string(record)))
create_context(clients.context_client, domain_uuid)
create_topology(clients.context_client, domain_uuid, DEFAULT_TOPOLOGY_NAME)
device = Device(**json.loads(record.data_json))
clients.context_client.SetDevice(device)
device_uuid = device.device_id.device_uuid.uuid # pylint: disable=no-member
add_device_to_topology(clients.context_client, ADMIN_CONTEXT_ID, INTERDOMAIN_TOPOLOGY_NAME, device_uuid)
domain_context_id = ContextId(**json_context_id(domain_uuid))
add_device_to_topology(clients.context_client, domain_context_id, DEFAULT_TOPOLOGY_NAME, device_uuid)
elif event_type in {EventTypeEnum.EVENTTYPE_DELETE}:
raise NotImplementedError('Delete Device')
def _dispatch_link(self, clients : Clients, local_domain_uuids : Set[str], event : DltRecordEvent) -> None:
domain_uuid : str = event.record_id.domain_uuid.uuid
if domain_uuid in local_domain_uuids:
MSG = '[_dispatch_link] Ignoring DLT event received (local): {:s}'
LOGGER.info(MSG.format(grpc_message_to_json_string(event)))
return
MSG = '[_dispatch_link] DLT event received (remote): {:s}'
LOGGER.info(MSG.format(grpc_message_to_json_string(event)))
event_type : EventTypeEnum = event.event.event_type # {UNDEFINED/CREATE/UPDATE/REMOVE}
if event_type in {EventTypeEnum.EVENTTYPE_CREATE, EventTypeEnum.EVENTTYPE_UPDATE}:
LOGGER.info('[_dispatch_link] event.record_id={:s}'.format(grpc_message_to_json_string(event.record_id)))
record = clients.dlt_gateway_client.GetFromDlt(event.record_id)
LOGGER.info('[_dispatch_link] record={:s}'.format(grpc_message_to_json_string(record)))
link = Link(**json.loads(record.data_json))
clients.context_client.SetLink(link)
link_uuid = link.link_id.link_uuid.uuid # pylint: disable=no-member
add_link_to_topology(clients.context_client, ADMIN_CONTEXT_ID, INTERDOMAIN_TOPOLOGY_NAME, link_uuid)
elif event_type in {EventTypeEnum.EVENTTYPE_DELETE}:
raise NotImplementedError('Delete Link')
def _dispatch_slice(self, clients : Clients, local_domain_uuids : Set[str], event : DltRecordEvent) -> None:
event_type : EventTypeEnum = event.event.event_type # {UNDEFINED/CREATE/UPDATE/REMOVE}
domain_uuid : str = event.record_id.domain_uuid.uuid
LOGGER.info('[_dispatch_slice] event.record_id={:s}'.format(grpc_message_to_json_string(event.record_id)))
record = clients.dlt_gateway_client.GetFromDlt(event.record_id)
LOGGER.info('[_dispatch_slice] record={:s}'.format(grpc_message_to_json_string(record)))
slice_ = Slice(**json.loads(record.data_json))
context_uuid = slice_.slice_id.context_id.context_uuid.uuid
owner_uuid = slice_.slice_owner.owner_uuid.uuid
create_context(clients.context_client, context_uuid)
create_topology(clients.context_client, context_uuid, DEFAULT_TOPOLOGY_NAME)
if domain_uuid in local_domain_uuids:
# it is for "me"
if event_type in {EventTypeEnum.EVENTTYPE_CREATE, EventTypeEnum.EVENTTYPE_UPDATE}:
try:
db_slice = clients.context_client.GetSlice(slice_.slice_id)
# exists
db_json_slice = grpc_message_to_json_string(db_slice)
except grpc.RpcError:
# not exists
db_json_slice = None
_json_slice = grpc_message_to_json_string(slice_)
if db_json_slice != _json_slice:
# not exists or is different...
slice_id = clients.interdomain_client.RequestSlice(slice_)
topology_id = TopologyId(**json_topology_id(domain_uuid))
dlt_slice_id = DltSliceId()
dlt_slice_id.topology_id.CopyFrom(topology_id) # pylint: disable=no-member
dlt_slice_id.slice_id.CopyFrom(slice_id) # pylint: disable=no-member
clients.dlt_connector_client.RecordSlice(dlt_slice_id)
elif event_type in {EventTypeEnum.EVENTTYPE_DELETE}:
raise NotImplementedError('Delete Slice')
elif owner_uuid in local_domain_uuids:
# it is owned by me
# just update it locally
LOGGER.info('[_dispatch_slice] updating locally')
local_slice = Slice()
local_slice.CopyFrom(slice_)
# pylint: disable=no-member
del local_slice.slice_service_ids[:] # they are from remote domains so will not be present locally
del local_slice.slice_subslice_ids[:] # they are from remote domains so will not be present locally
clients.context_client.SetSlice(local_slice)
else:
MSG = '[_dispatch_slice] Ignoring DLT event received (remote): {:s}'
LOGGER.info(MSG.format(grpc_message_to_json_string(event)))
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import grpc, json, logging, threading
import asyncio
import logging
import json
from typing import Any, Dict, Set
from common.Constants import DEFAULT_CONTEXT_NAME, DEFAULT_TOPOLOGY_NAME, INTERDOMAIN_TOPOLOGY_NAME
from common.proto.context_pb2 import ContextId, Device, EventTypeEnum, Link, Slice, TopologyId
from common.proto.dlt_connector_pb2 import DltSliceId
from common.proto.dlt_gateway_pb2 import DltRecordEvent, DltRecordOperationEnum, DltRecordTypeEnum
from common.proto.dlt_gateway_pb2 import DltRecordEvent, DltRecordTypeEnum
from common.tools.context_queries.Context import create_context
from common.tools.context_queries.Device import add_device_to_topology
from common.tools.context_queries.Link import add_link_to_topology
......@@ -50,31 +38,30 @@ class Clients:
self.dlt_connector_client.close()
self.context_client.close()
class DltEventDispatcher(threading.Thread):
class DltEventDispatcher:
def __init__(self) -> None:
LOGGER.debug('Creating connector...')
super().__init__(name='DltEventDispatcher', daemon=True)
self._terminate = threading.Event()
self._terminate = asyncio.Event()
LOGGER.debug('Connector created')
def start(self) -> None:
async def start(self) -> None:
self._terminate.clear()
return super().start()
await self.run()
def stop(self):
async def stop(self):
self._terminate.set()
def run(self) -> None:
async def run(self) -> None:
clients = Clients()
create_context(clients.context_client, DEFAULT_CONTEXT_NAME)
create_topology(clients.context_client, DEFAULT_CONTEXT_NAME, DEFAULT_TOPOLOGY_NAME)
create_topology(clients.context_client, DEFAULT_CONTEXT_NAME, INTERDOMAIN_TOPOLOGY_NAME)
dlt_events_collector = DltEventsCollector(clients.dlt_gateway_client, log_events_received=True)
dlt_events_collector.start()
await dlt_events_collector.run()
while not self._terminate.is_set():
event = dlt_events_collector.get_event(block=True, timeout=GET_EVENT_TIMEOUT)
event = await dlt_events_collector.get_event(block=True, timeout=GET_EVENT_TIMEOUT)
if event is None: continue
existing_topology_ids = clients.context_client.ListTopologyIds(ADMIN_CONTEXT_ID)
......@@ -84,24 +71,24 @@ class DltEventDispatcher(threading.Thread):
local_domain_uuids.discard(DEFAULT_TOPOLOGY_NAME)
local_domain_uuids.discard(INTERDOMAIN_TOPOLOGY_NAME)
self.dispatch_event(clients, local_domain_uuids, event)
await self.dispatch_event(clients, local_domain_uuids, event)
dlt_events_collector.stop()
await dlt_events_collector.stop()
clients.close()
def dispatch_event(self, clients : Clients, local_domain_uuids : Set[str], event : DltRecordEvent) -> None:
record_type : DltRecordTypeEnum = event.record_id.type # {UNDEFINED/CONTEXT/TOPOLOGY/DEVICE/LINK/SERVICE/SLICE}
async def dispatch_event(self, clients: Clients, local_domain_uuids: Set[str], event: DltRecordEvent) -> None:
record_type: DltRecordTypeEnum = event.record_id.type # {UNDEFINED/CONTEXT/TOPOLOGY/DEVICE/LINK/SERVICE/SLICE}
if record_type == DltRecordTypeEnum.DLTRECORDTYPE_DEVICE:
self._dispatch_device(clients, local_domain_uuids, event)
await self._dispatch_device(clients, local_domain_uuids, event)
elif record_type == DltRecordTypeEnum.DLTRECORDTYPE_LINK:
self._dispatch_link(clients, local_domain_uuids, event)
await self._dispatch_link(clients, local_domain_uuids, event)
elif record_type == DltRecordTypeEnum.DLTRECORDTYPE_SLICE:
self._dispatch_slice(clients, local_domain_uuids, event)
await self._dispatch_slice(clients, local_domain_uuids, event)
else:
raise NotImplementedError('EventType: {:s}'.format(grpc_message_to_json_string(event)))
def _dispatch_device(self, clients : Clients, local_domain_uuids : Set[str], event : DltRecordEvent) -> None:
domain_uuid : str = event.record_id.domain_uuid.uuid
async def _dispatch_device(self, clients: Clients, local_domain_uuids: Set[str], event: DltRecordEvent) -> None:
domain_uuid: str = event.record_id.domain_uuid.uuid
if domain_uuid in local_domain_uuids:
MSG = '[_dispatch_device] Ignoring DLT event received (local): {:s}'
......@@ -111,25 +98,25 @@ class DltEventDispatcher(threading.Thread):
MSG = '[_dispatch_device] DLT event received (remote): {:s}'
LOGGER.info(MSG.format(grpc_message_to_json_string(event)))
event_type : EventTypeEnum = event.event.event_type # {UNDEFINED/CREATE/UPDATE/REMOVE}
event_type: EventTypeEnum = event.event.event_type # {UNDEFINED/CREATE/UPDATE/REMOVE}
if event_type in {EventTypeEnum.EVENTTYPE_CREATE, EventTypeEnum.EVENTTYPE_UPDATE}:
LOGGER.info('[_dispatch_device] event.record_id={:s}'.format(grpc_message_to_json_string(event.record_id)))
record = clients.dlt_gateway_client.GetFromDlt(event.record_id)
record = await clients.dlt_gateway_client.GetFromDlt(event.record_id)
LOGGER.info('[_dispatch_device] record={:s}'.format(grpc_message_to_json_string(record)))
create_context(clients.context_client, domain_uuid)
create_topology(clients.context_client, domain_uuid, DEFAULT_TOPOLOGY_NAME)
device = Device(**json.loads(record.data_json))
clients.context_client.SetDevice(device)
device_uuid = device.device_id.device_uuid.uuid # pylint: disable=no-member
device_uuid = device.device_id.device_uuid.uuid # pylint: disable=no-member
add_device_to_topology(clients.context_client, ADMIN_CONTEXT_ID, INTERDOMAIN_TOPOLOGY_NAME, device_uuid)
domain_context_id = ContextId(**json_context_id(domain_uuid))
add_device_to_topology(clients.context_client, domain_context_id, DEFAULT_TOPOLOGY_NAME, device_uuid)
elif event_type in {EventTypeEnum.EVENTTYPE_DELETE}:
raise NotImplementedError('Delete Device')
def _dispatch_link(self, clients : Clients, local_domain_uuids : Set[str], event : DltRecordEvent) -> None:
domain_uuid : str = event.record_id.domain_uuid.uuid
async def _dispatch_link(self, clients: Clients, local_domain_uuids: Set[str], event: DltRecordEvent) -> None:
domain_uuid: str = event.record_id.domain_uuid.uuid
if domain_uuid in local_domain_uuids:
MSG = '[_dispatch_link] Ignoring DLT event received (local): {:s}'
......@@ -139,25 +126,25 @@ class DltEventDispatcher(threading.Thread):
MSG = '[_dispatch_link] DLT event received (remote): {:s}'
LOGGER.info(MSG.format(grpc_message_to_json_string(event)))
event_type : EventTypeEnum = event.event.event_type # {UNDEFINED/CREATE/UPDATE/REMOVE}
event_type: EventTypeEnum = event.event.event_type # {UNDEFINED/CREATE/UPDATE/REMOVE}
if event_type in {EventTypeEnum.EVENTTYPE_CREATE, EventTypeEnum.EVENTTYPE_UPDATE}:
LOGGER.info('[_dispatch_link] event.record_id={:s}'.format(grpc_message_to_json_string(event.record_id)))
record = clients.dlt_gateway_client.GetFromDlt(event.record_id)
record = await clients.dlt_gateway_client.GetFromDlt(event.record_id)
LOGGER.info('[_dispatch_link] record={:s}'.format(grpc_message_to_json_string(record)))
link = Link(**json.loads(record.data_json))
clients.context_client.SetLink(link)
link_uuid = link.link_id.link_uuid.uuid # pylint: disable=no-member
link_uuid = link.link_id.link_uuid.uuid # pylint: disable=no-member
add_link_to_topology(clients.context_client, ADMIN_CONTEXT_ID, INTERDOMAIN_TOPOLOGY_NAME, link_uuid)
elif event_type in {EventTypeEnum.EVENTTYPE_DELETE}:
raise NotImplementedError('Delete Link')
def _dispatch_slice(self, clients : Clients, local_domain_uuids : Set[str], event : DltRecordEvent) -> None:
event_type : EventTypeEnum = event.event.event_type # {UNDEFINED/CREATE/UPDATE/REMOVE}
domain_uuid : str = event.record_id.domain_uuid.uuid
async def _dispatch_slice(self, clients: Clients, local_domain_uuids: Set[str], event: DltRecordEvent) -> None:
event_type: EventTypeEnum = event.event.event_type # {UNDEFINED/CREATE/UPDATE/REMOVE}
domain_uuid: str = event.record_id.domain_uuid.uuid
LOGGER.info('[_dispatch_slice] event.record_id={:s}'.format(grpc_message_to_json_string(event.record_id)))
record = clients.dlt_gateway_client.GetFromDlt(event.record_id)
record = await clients.dlt_gateway_client.GetFromDlt(event.record_id)
LOGGER.info('[_dispatch_slice] record={:s}'.format(grpc_message_to_json_string(record)))
slice_ = Slice(**json.loads(record.data_json))
......@@ -185,7 +172,7 @@ class DltEventDispatcher(threading.Thread):
topology_id = TopologyId(**json_topology_id(domain_uuid))
dlt_slice_id = DltSliceId()
dlt_slice_id.topology_id.CopyFrom(topology_id) # pylint: disable=no-member
dlt_slice_id.slice_id.CopyFrom(slice_id) # pylint: disable=no-member
dlt_slice_id.slice_id.CopyFrom(slice_id) # pylint: disable=no-member
clients.dlt_connector_client.RecordSlice(dlt_slice_id)
elif event_type in {EventTypeEnum.EVENTTYPE_DELETE}:
......@@ -199,11 +186,10 @@ class DltEventDispatcher(threading.Thread):
local_slice.CopyFrom(slice_)
# pylint: disable=no-member
del local_slice.slice_service_ids[:] # they are from remote domains so will not be present locally
del local_slice.slice_subslice_ids[:] # they are from remote domains so will not be present locally
del local_slice.slice_service_ids[:] # they are from remote domains so will not be present locally
del local_slice.slice_subslice_ids[:] # they are from remote domains so will not be present locally
clients.context_client.SetSlice(local_slice)
else:
MSG = '[_dispatch_slice] Ignoring DLT event received (remote): {:s}'
LOGGER.info(MSG.format(grpc_message_to_json_string(event)))
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment