import logging, requests, threading, json from typing import Any, Iterator, List, Optional, Tuple, Union, Dict from queue import Queue from common.method_wrappers.Decorator import MetricsPool, metered_subclass_method from device.service.driver_api._Driver import _Driver LOGGER = logging.getLogger(__name__) DRIVER_NAME = 'morpheus' METRICS_POOL = MetricsPool('Device', 'Driver', labels={'driver': DRIVER_NAME}) class MorpheusApiDriver(_Driver): def __init__(self, address: str, port: int, **settings) -> None: super().__init__(DRIVER_NAME, address, port, **settings) self.__lock = threading.Lock() self.__started = threading.Event() self.__terminate = threading.Event() scheme = self.settings.get('scheme', 'http') self.__morpheus_root = '{:s}://{:s}:{:d}'.format(scheme, self.address, int(self.port)) self.__timeout = int(self.settings.get('timeout', 120)) self.__headers = {'Accept': 'application/yang-data+json', 'Content-Type': 'application/yang-data+json'} self.__detection_thread = None self.__pipeline_error_thread = None size = self.settings.get('queue_events_size', 10) self.__pipeline_error_queue = Queue(maxsize=size) self.__detection_queue = Queue(maxsize=size) def Connect(self) -> bool: url = self.__morpheus_root + '/restconf/data/naudit-morpheus:morpheus' with self.__lock: if self.__started.is_set(): return True try: requests.get(url, headers=self.__headers, timeout=self.__timeout, verify=False) self.__started.set() return True except requests.exceptions.Timeout: LOGGER.exception('Timeout connecting {:s}'.format(str(self.__morpheus_root))) return False except Exception: # pylint: disable=broad-except LOGGER.exception('Exception connecting {:s}'.format(str(self.__morpheus_root))) return False def Disconnect(self) -> bool: with self.__lock: try: if self.__detection_thread and self.__detection_thread.is_alive(): self.UnsubscribeDetectionEvent() if self.__pipeline_thread and self.__pipeline_thread.is_alive(): self.UnsubscribePipelineError() except Exception as e: LOGGER.exception(f'Error during disconnect: {str(e)}') self.__terminate.set() return True @metered_subclass_method(METRICS_POOL) def GetInitialConfig(self) -> List[Tuple[str, Any]]: url = self.__morpheus_root + '/restconf/data/naudit-morpheus:morpheus/config' with self.__lock: try: response = requests.get(url, headers=self.__headers, timeout=self.__timeout, verify=False) if response.ok: config = response.json() result = [] for key, value in config.items(): result.append((key, value)) return result except Exception as e: LOGGER.exception('Exception getting initial config {:s}'.format(str(self.__morpheus_root))) return [] @metered_subclass_method(METRICS_POOL) def GetConfig(self, resource_keys : List[str] = []) -> List[Tuple[str, Union[Any, None, Exception]]]: url = self.__morpheus_root + '/restconf/data/naudit-morpheus:morpheus/config' with self.__lock: try: response = requests.get(url, headers=self.__headers, timeout=self.__timeout, verify=False) if response.ok: config = response.json() results = [] if not resource_keys: for key, value in config.items(): results.append((key, value)) return results for key in resource_keys: try: results.append(config[key]) except KeyError: results.append(None) except Exception as e: results.append(e) return results return [(key, None) for key in resource_keys] except Exception as e: LOGGER.exception(f'Error getting config: {str(e)}') return [(key, e) for key in resource_keys] @metered_subclass_method(METRICS_POOL) def SetConfig(self, resources: List[Tuple[str, Any]]) -> List[Union[bool, Exception]]: url = self.__morpheus_root + '/restconf/data/naudit-morpheus:morpheus/config' results = [] with self.__lock: config = dict(resources) try: response = requests.put(url, headers=self.__headers, json=config, timeout=self.__timeout, verify=False) results.append(response.ok) except Exception as e: results.append(e) return results def GetState(self) -> List[Tuple[str, Any]]: url = self.__morpheus_root + '/restconf/data/naudit-morpheus:morpheus/state' with self.__lock: try: response = requests.get(url, headers=self.__headers, timeout=self.__timeout, verify=False) if response.ok: state = response.json() result = [] for key, value in state.items(): result.append((key, value)) return result return [] except Exception as e: LOGGER.exception(f'Error getting state: {str(e)}') return [] def StartPipeline(self) -> Union[bool, Exception]: url = self.__morpheus_root + '/restconf/data/naudit-morpheus:morpheus/start' with self.__lock: try: response = requests.post(url, headers=self.__headers, timeout=self.__timeout, verify=False) response.raise_for_status() return True except Exception as e: LOGGER.exception(f'Error starting pipeline: {e}') return e def StopPipeline(self) -> Union[bool, Exception]: url = self.__morpheus_root + '/restconf/data/naudit-morpheus:morpheus/stop' with self.__lock: try: response = requests.post(url, headers=self.__headers, timeout=self.__timeout, verify=False) response.raise_for_status() return True except Exception as e: LOGGER.exception(f'Error stopping pipeline: {e}') return False @metered_subclass_method(METRICS_POOL) def SubscribeDetectionEvent(self) -> Union[bool, Exception]: url = self.__morpheus_root + '/restconf/streams/naudit-morpheus:morpheus/detection-event' with self.__lock: try: self.__detection_thread = threading.Thread( target=self.__handle_notification_stream, args=(url, self.__detection_queue), daemon=True ) self.__detection_thread.start() return True except Exception as e: LOGGER.exception(f'Error subscribing to detection events: {str(e)}') return e @metered_subclass_method(METRICS_POOL) def UnsubscribeDetectionEvent(self) -> Union[bool, Exception]: try: if self.__detection_thread and self.__detection_thread.is_alive(): self.__detection_thread.join(timeout=5) return True except Exception as e: LOGGER.exception(f'Error unsubscribing from detection events: {str(e)}') return e def GetDetectionEvent(self, blocking=False, terminate : Optional[threading.Event] = None) -> Iterator[Dict]: while True: if self.__terminate.is_set(): break if terminate is not None and terminate.is_set(): break try: event = self.__detection_queue.get(block=blocking, timeout=0.1) if event is not None: yield event except queue.Empty: if blocking: continue return def SubscribePipelineError(self) -> Union[bool, Exception]: url = self.__morpheus_root + '/restconf/streams/naudit-morpheus:morpheus/pipeline-error' with self.__lock: try: self.__pipeline_error_thread = threading.Thread( target=self.__handle_notification_stream, args=(url, self.__pipeline_error_queue), daemon=True ) self.__pipeline_error_thread.start() return True except Exception as e: LOGGER.exception(f'Error subscribing to pipeline errors: {str(e)}') return e def UnsubscribePipelineError(self) -> Union[bool, Exception]: try: if self.__pipeline_error_thread and self.__pipeline_error_thread.is_alive(): self.__pipeline_error_thread.join(timeout=5) return True except Exception as e: LOGGER.exception(f'Error unsubscribing from pipeline errors: {str(e)}') return e def GetPipelineError(self, blocking=False, terminate: Optional[threading.Event] = None) -> Iterator[Dict]: while True: if self.__terminate.is_set(): break if terminate is not None and terminate.is_set(): break try: error = self.__pipeline_error_queue.get(block=blocking, timeout=0.1) if error is not None: yield error except queue.Empty: if blocking: continue return def __handle_notification_stream(self, url: str, queue: Queue[Any]) -> None: try: with requests.get(url, headers=self.__headers, stream=True, verify=False) as response: if not response.ok: LOGGER.error(f'Error connecting to event stream: {response.status_code}') return try: event = response.json() queue.put(event['data']['ietf-restconf:notification']) except json.JSONDecodeError as e: LOGGER.error(f'Error parsing event: {e}') except Exception as e: LOGGER.exception(f'Error in notification stream handler: {str(e)}')