diff --git a/src/device/service/drivers/morpheus/MorpheusApiDriver.py b/src/device/service/drivers/morpheus/MorpheusApiDriver.py new file mode 100644 index 0000000000000000000000000000000000000000..f9b033cc3e4bc3584ccb79d18a54276777f970cd --- /dev/null +++ b/src/device/service/drivers/morpheus/MorpheusApiDriver.py @@ -0,0 +1,254 @@ +import logging, requests, threading, json +from typing import Any, Iterator, List, Optional, Tuple, Union, Dict +from queue import Queue +from common.method_wrappers.Decorator import MetricsPool, metered_subclass_method +from device.service.driver_api._Driver import _Driver + +LOGGER = logging.getLogger(__name__) + +DRIVER_NAME = 'morpheus' +METRICS_POOL = MetricsPool('Device', 'Driver', labels={'driver': DRIVER_NAME}) + +class MorpheusApiDriver(_Driver): + def __init__(self, address: str, port: int, **settings) -> None: + super().__init__(DRIVER_NAME, address, port, **settings) + self.__lock = threading.Lock() + self.__started = threading.Event() + self.__terminate = threading.Event() + scheme = self.settings.get('scheme', 'http') + self.__morpheus_root = '{:s}://{:s}:{:d}'.format(scheme, self.address, int(self.port)) + self.__timeout = int(self.settings.get('timeout', 120)) + self.__headers = {'Accept': 'application/yang-data+json', 'Content-Type': 'application/yang-data+json'} + + self.__detection_thread = None + self.__pipeline_error_thread = None + + size = self.settings.get('queue_events_size', 10) + self.__pipeline_error_queue = Queue(maxsize=size) + self.__detection_queue = Queue(maxsize=size) + + def Connect(self) -> bool: + url = self.__morpheus_root + '/restconf/data/naudit-morpheus:morpheus' + with self.__lock: + if self.__started.is_set(): return True + try: + requests.get(url, headers=self.__headers, timeout=self.__timeout, verify=False) + self.__started.set() + return True + except requests.exceptions.Timeout: + LOGGER.exception('Timeout connecting {:s}'.format(str(self.__morpheus_root))) + return False + except Exception: # pylint: disable=broad-except + LOGGER.exception('Exception connecting {:s}'.format(str(self.__morpheus_root))) + return False + + def Disconnect(self) -> bool: + with self.__lock: + try: + if self.__detection_thread and self.__detection_thread.is_alive(): + self.UnsubscribeDetectionEvent() + + if self.__pipeline_thread and self.__pipeline_thread.is_alive(): + self.UnsubscribePipelineError() + except Exception as e: + LOGGER.exception(f'Error during disconnect: {str(e)}') + + self.__terminate.set() + return True + + @metered_subclass_method(METRICS_POOL) + def GetInitialConfig(self) -> List[Tuple[str, Any]]: + url = self.__morpheus_root + '/restconf/data/naudit-morpheus:morpheus/config' + with self.__lock: + try: + response = requests.get(url, headers=self.__headers, timeout=self.__timeout, verify=False) + if response.ok: + config = response.json() + result = [] + + for key, value in config.items(): + result.append((key, value)) + + return result + except Exception as e: + LOGGER.exception('Exception getting initial config {:s}'.format(str(self.__morpheus_root))) + return [] + + @metered_subclass_method(METRICS_POOL) + def GetConfig(self, resource_keys : List[str] = []) -> List[Tuple[str, Union[Any, None, Exception]]]: + url = self.__morpheus_root + '/restconf/data/naudit-morpheus:morpheus/config' + with self.__lock: + try: + response = requests.get(url, headers=self.__headers, timeout=self.__timeout, verify=False) + if response.ok: + config = response.json() + results = [] + + if not resource_keys: + for key, value in config.items(): + results.append((key, value)) + return results + + for key in resource_keys: + try: + results.append(config[key]) + except KeyError: + results.append(None) + except Exception as e: + results.append(e) + + return results + return [(key, None) for key in resource_keys] + except Exception as e: + LOGGER.exception(f'Error getting config: {str(e)}') + return [(key, e) for key in resource_keys] + + @metered_subclass_method(METRICS_POOL) + def SetConfig(self, resources: List[Tuple[str, Any]]) -> List[Union[bool, Exception]]: + url = self.__morpheus_root + '/restconf/data/naudit-morpheus:morpheus/config' + results = [] + with self.__lock: + config = dict(resources) + try: + response = requests.put(url, + headers=self.__headers, + json=config, + timeout=self.__timeout, + verify=False) + results.append(response.ok) + except Exception as e: + results.append(e) + return results + + def GetState(self) -> List[Tuple[str, Any]]: + url = self.__morpheus_root + '/restconf/data/naudit-morpheus:morpheus/state' + with self.__lock: + try: + response = requests.get(url, headers=self.__headers, timeout=self.__timeout, verify=False) + if response.ok: + state = response.json() + result = [] + for key, value in state.items(): + result.append((key, value)) + return result + return [] + except Exception as e: + LOGGER.exception(f'Error getting state: {str(e)}') + return [] + + def StartPipeline(self) -> Union[bool, Exception]: + url = self.__morpheus_root + '/restconf/data/naudit-morpheus:morpheus/start' + with self.__lock: + try: + response = requests.post(url, headers=self.__headers, timeout=self.__timeout, verify=False) + response.raise_for_status() + return True + except Exception as e: + LOGGER.exception(f'Error starting pipeline: {e}') + return e + + def StopPipeline(self) -> Union[bool, Exception]: + url = self.__morpheus_root + '/restconf/data/naudit-morpheus:morpheus/stop' + with self.__lock: + try: + response = requests.post(url, headers=self.__headers, timeout=self.__timeout, verify=False) + response.raise_for_status() + return True + except Exception as e: + LOGGER.exception(f'Error stopping pipeline: {e}') + return False + + @metered_subclass_method(METRICS_POOL) + def SubscribeDetectionEvent(self) -> Union[bool, Exception]: + url = self.__morpheus_root + '/restconf/streams/naudit-morpheus:morpheus/detection-event' + with self.__lock: + try: + self.__detection_thread = threading.Thread( + target=self.__handle_notification_stream, + args=(url, self.__detection_queue), + daemon=True + ) + self.__detection_thread.start() + return True + except Exception as e: + LOGGER.exception(f'Error subscribing to detection events: {str(e)}') + return e + + @metered_subclass_method(METRICS_POOL) + def UnsubscribeDetectionEvent(self) -> Union[bool, Exception]: + try: + if self.__detection_thread and self.__detection_thread.is_alive(): + self.__detection_thread.join(timeout=5) + return True + except Exception as e: + LOGGER.exception(f'Error unsubscribing from detection events: {str(e)}') + return e + + def GetDetectionEvent(self, blocking=False, terminate : Optional[threading.Event] = None) -> Iterator[Dict]: + while True: + if self.__terminate.is_set(): break + if terminate is not None and terminate.is_set(): break + try: + event = self.__detection_queue.get(block=blocking, timeout=0.1) + if event is not None: + yield event + except queue.Empty: + if blocking: + continue + return + + def SubscribePipelineError(self) -> Union[bool, Exception]: + url = self.__morpheus_root + '/restconf/streams/naudit-morpheus:morpheus/pipeline-error' + with self.__lock: + try: + self.__pipeline_error_thread = threading.Thread( + target=self.__handle_notification_stream, + args=(url, self.__pipeline_error_queue), + daemon=True + ) + self.__pipeline_error_thread.start() + return True + except Exception as e: + LOGGER.exception(f'Error subscribing to pipeline errors: {str(e)}') + return e + + def UnsubscribePipelineError(self) -> Union[bool, Exception]: + try: + if self.__pipeline_error_thread and self.__pipeline_error_thread.is_alive(): + self.__pipeline_error_thread.join(timeout=5) + return True + except Exception as e: + LOGGER.exception(f'Error unsubscribing from pipeline errors: {str(e)}') + return e + + def GetPipelineError(self, blocking=False, terminate: Optional[threading.Event] = None) -> Iterator[Dict]: + while True: + if self.__terminate.is_set(): break + if terminate is not None and terminate.is_set(): break + try: + error = self.__pipeline_error_queue.get(block=blocking, timeout=0.1) + if error is not None: + yield error + except queue.Empty: + if blocking: + continue + return + + def __handle_notification_stream(self, url: str, queue: Queue[Any]) -> None: + try: + with requests.get(url, + headers=self.__headers, + stream=True, + verify=False) as response: + + if not response.ok: + LOGGER.error(f'Error connecting to event stream: {response.status_code}') + return + + try: + event = response.json() + queue.put(event['data']['ietf-restconf:notification']) + except json.JSONDecodeError as e: + LOGGER.error(f'Error parsing event: {e}') + except Exception as e: + LOGGER.exception(f'Error in notification stream handler: {str(e)}')