Newer
Older
# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import grpc, logging, queue, threading
from common.proto.context_pb2 import Empty
from common.tools.grpc.Tools import grpc_message_to_json_string
from context.client.ContextClient import ContextClient
LOGGER = logging.getLogger(__name__)
LOGGER.setLevel(logging.DEBUG)
class EventsCollector:
def __init__(
self, context_client : ContextClient,
log_events_received : bool = False,
activate_context_collector : bool = True,
activate_topology_collector : bool = True,
activate_device_collector : bool = True,
activate_link_collector : bool = True,
activate_service_collector : bool = True,
activate_slice_collector : bool = True,
activate_connection_collector : bool = True,
) -> None:
self._events_queue = queue.Queue()
self._log_events_received = log_events_received
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
self._context_stream, self._context_thread = None, None
if activate_context_collector:
self._context_stream = context_client.GetContextEvents(Empty())
self._context_thread = self._create_collector_thread(self._context_stream)
self._topology_stream, self._topology_thread = None, None
if activate_topology_collector:
self._topology_stream = context_client.GetTopologyEvents(Empty())
self._topology_thread = self._create_collector_thread(self._topology_stream)
self._device_stream, self._device_thread = None, None
if activate_device_collector:
self._device_stream = context_client.GetDeviceEvents(Empty())
self._device_thread = self._create_collector_thread(self._device_stream)
self._link_stream, self._link_thread = None, None
if activate_link_collector:
self._link_stream = context_client.GetLinkEvents(Empty())
self._link_thread = self._create_collector_thread(self._link_stream)
self._service_stream, self._service_thread = None, None
if activate_service_collector:
self._service_stream = context_client.GetServiceEvents(Empty())
self._service_thread = self._create_collector_thread(self._service_stream)
self._slice_stream, self._slice_thread = None, None
if activate_slice_collector:
self._slice_stream = context_client.GetSliceEvents(Empty())
self._slice_thread = self._create_collector_thread(self._slice_stream)
self._connection_stream, self._connection_thread = None, None
if activate_connection_collector:
self._connection_stream = context_client.GetConnectionEvents(Empty())
self._connection_thread = self._create_collector_thread(self._connection_stream)
def _create_collector_thread(self, stream, as_daemon : bool = False):
return threading.Thread(target=self._collect, args=(stream,), daemon=as_daemon)
def _collect(self, events_stream) -> None:
try:
for event in events_stream:
if self._log_events_received:
LOGGER.info('[_collect] event: {:s}'.format(grpc_message_to_json_string(event)))
self._events_queue.put_nowait(event)
except grpc.RpcError as e:
if e.code() != grpc.StatusCode.CANCELLED: # pylint: disable=no-member
raise # pragma: no cover
def start(self):
if self._context_thread is not None: self._context_thread.start()
if self._topology_thread is not None: self._topology_thread.start()
if self._device_thread is not None: self._device_thread.start()
if self._link_thread is not None: self._link_thread.start()
if self._service_thread is not None: self._service_thread.start()
if self._slice_thread is not None: self._slice_thread.start()
if self._connection_thread is not None: self._connection_thread.start()
def get_event(self, block : bool = True, timeout : float = 0.1):
try:
return self._events_queue.get(block=block, timeout=timeout)
except queue.Empty: # pylint: disable=catching-non-exception
return None
def get_events(self, block : bool = True, timeout : float = 0.1, count : int = None):
events = []
if count is None:
while True:
event = self.get_event(block=block, timeout=timeout)
if event is None: break
events.append(event)
else:
for _ in range(count):
event = self.get_event(block=block, timeout=timeout)
if event is None: continue
events.append(event)
return sorted(events, key=lambda e: e.event.timestamp.timestamp)
if self._context_stream is not None: self._context_stream.cancel()
if self._topology_stream is not None: self._topology_stream.cancel()
if self._device_stream is not None: self._device_stream.cancel()
if self._link_stream is not None: self._link_stream.cancel()
if self._service_stream is not None: self._service_stream.cancel()
if self._slice_stream is not None: self._slice_stream.cancel()
if self._connection_stream is not None: self._connection_stream.cancel()
if self._context_thread is not None: self._context_thread.join()
if self._topology_thread is not None: self._topology_thread.join()
if self._device_thread is not None: self._device_thread.join()
if self._link_thread is not None: self._link_thread.join()
if self._service_thread is not None: self._service_thread.join()
if self._slice_thread is not None: self._slice_thread.join()
if self._connection_thread is not None: self._connection_thread.join()