Skip to content
MorpheusApiDriver.py 10.8 KiB
Newer Older
import logging, requests, threading, json
from typing import Any, Iterator, List, Optional, Tuple, Union, Dict
from queue import Queue
from common.method_wrappers.Decorator import MetricsPool, metered_subclass_method
from device.service.driver_api._Driver import _Driver

LOGGER = logging.getLogger(__name__)

DRIVER_NAME = 'morpheus'
METRICS_POOL = MetricsPool('Device', 'Driver', labels={'driver': DRIVER_NAME})

class MorpheusApiDriver(_Driver):
    def __init__(self, address: str, port: int, **settings) -> None:
        super().__init__(DRIVER_NAME, address, port, **settings)
        self.__lock = threading.Lock()
        self.__started = threading.Event()
        self.__terminate = threading.Event()
        scheme = self.settings.get('scheme', 'http')
        self.__morpheus_root = '{:s}://{:s}:{:d}'.format(scheme, self.address, int(self.port))
        self.__timeout = int(self.settings.get('timeout', 120))
        self.__headers = {'Accept': 'application/yang-data+json', 'Content-Type': 'application/yang-data+json'}

        self.__detection_thread = None
        self.__pipeline_error_thread = None

        size = self.settings.get('queue_events_size', 10)
        self.__pipeline_error_queue = Queue(maxsize=size)
        self.__detection_queue = Queue(maxsize=size)

    def Connect(self) -> bool:
        url = self.__morpheus_root + '/restconf/data/naudit-morpheus:morpheus'
        with self.__lock:
            if self.__started.is_set(): return True
            try:
                requests.get(url, headers=self.__headers, timeout=self.__timeout, verify=False)
                self.__started.set()
                return True
            except requests.exceptions.Timeout:
                LOGGER.exception('Timeout connecting {:s}'.format(str(self.__morpheus_root)))
                return False
            except Exception:  # pylint: disable=broad-except
                LOGGER.exception('Exception connecting {:s}'.format(str(self.__morpheus_root)))
                return False

    def Disconnect(self) -> bool:
        with self.__lock:
            try:
                if self.__detection_thread and self.__detection_thread.is_alive():
                    self.UnsubscribeDetectionEvent()

                if self.__pipeline_thread and self.__pipeline_thread.is_alive():
                    self.UnsubscribePipelineError()
            except Exception as e:
                LOGGER.exception(f'Error during disconnect: {str(e)}')

            self.__terminate.set()
            return True

    @metered_subclass_method(METRICS_POOL)
    def GetInitialConfig(self) -> List[Tuple[str, Any]]:
        url = self.__morpheus_root + '/restconf/data/naudit-morpheus:morpheus/config'
        with self.__lock:
            try:
                response = requests.get(url, headers=self.__headers, timeout=self.__timeout, verify=False)
                if response.ok:
                    config = response.json()
                    result = []

                    for key, value in config.items():
                        result.append((key, value))

                    return  result
            except Exception as e:
                LOGGER.exception('Exception getting initial config {:s}'.format(str(self.__morpheus_root)))
            return []

    @metered_subclass_method(METRICS_POOL)
    def GetConfig(self, resource_keys : List[str] = []) -> List[Tuple[str, Union[Any, None, Exception]]]:
        url = self.__morpheus_root + '/restconf/data/naudit-morpheus:morpheus/config'
        with self.__lock:
            try:
                response = requests.get(url, headers=self.__headers, timeout=self.__timeout, verify=False)
                if response.ok:
                    config = response.json()
                    results = []

                    if not resource_keys:
                        for key, value in config.items():
                            results.append((key, value))
                        return results

                    for key in resource_keys:
                        try:
                            results.append(config[key])
                        except KeyError:
                            results.append(None)
                        except Exception as e:
                            results.append(e)

                    return results
                return [(key, None) for key in resource_keys]
            except Exception as e:
                LOGGER.exception(f'Error getting config: {str(e)}')
                return [(key, e) for key in resource_keys]

    @metered_subclass_method(METRICS_POOL)
    def SetConfig(self, resources: List[Tuple[str, Any]]) -> List[Union[bool, Exception]]:
        url = self.__morpheus_root + '/restconf/data/naudit-morpheus:morpheus/config'
        results = []
        with self.__lock:
            config = dict(resources)
            try:
                response = requests.put(url,
                                        headers=self.__headers,
                                        json=config,
                                        timeout=self.__timeout,
                                        verify=False)
                results.append(response.ok)
            except Exception as e:
                results.append(e)
        return results

    def GetState(self) -> List[Tuple[str, Any]]:
        url = self.__morpheus_root + '/restconf/data/naudit-morpheus:morpheus/state'
        with self.__lock:
            try:
                response = requests.get(url, headers=self.__headers, timeout=self.__timeout, verify=False)
                if response.ok:
                    state = response.json()
                    result = []
                    for key, value in state.items():
                        result.append((key, value))
                    return result
                return []
            except Exception as e:
                LOGGER.exception(f'Error getting state: {str(e)}')
                return []

    def StartPipeline(self) -> Union[bool, Exception]:
        url = self.__morpheus_root + '/restconf/data/naudit-morpheus:morpheus/start'
        with self.__lock:
            try:
                response = requests.post(url, headers=self.__headers, timeout=self.__timeout, verify=False)
                response.raise_for_status()
                return True
            except Exception as e:
                LOGGER.exception(f'Error starting pipeline: {e}')
                return e

    def StopPipeline(self) -> Union[bool, Exception]:
        url = self.__morpheus_root + '/restconf/data/naudit-morpheus:morpheus/stop'
        with self.__lock:
            try:
                response = requests.post(url, headers=self.__headers, timeout=self.__timeout, verify=False)
                response.raise_for_status()
                return True
            except Exception as e:
                LOGGER.exception(f'Error stopping pipeline: {e}')
                return False

    @metered_subclass_method(METRICS_POOL)
    def SubscribeDetectionEvent(self) -> Union[bool, Exception]:
        url = self.__morpheus_root + '/restconf/streams/naudit-morpheus:morpheus/detection-event'
        with self.__lock:
            try:
                self.__detection_thread = threading.Thread(
                        target=self.__handle_notification_stream,
                        args=(url, self.__detection_queue),
                        daemon=True
                        )
                self.__detection_thread.start()
                return True
            except Exception as e:
                LOGGER.exception(f'Error subscribing to detection events: {str(e)}')
                return e

    @metered_subclass_method(METRICS_POOL)
    def UnsubscribeDetectionEvent(self) -> Union[bool, Exception]:
        try:
            if self.__detection_thread and self.__detection_thread.is_alive():
                self.__detection_thread.join(timeout=5)
            return True
        except Exception as e:
            LOGGER.exception(f'Error unsubscribing from detection events: {str(e)}')
            return e

    def GetDetectionEvent(self, blocking=False, terminate : Optional[threading.Event] = None) -> Iterator[Dict]:
        while True:
            if self.__terminate.is_set(): break
            if terminate is not None and terminate.is_set(): break
            try:
                event = self.__detection_queue.get(block=blocking, timeout=0.1)
                if event is not None:
                    yield event
            except queue.Empty:
                if blocking:
                    continue
                return

    def SubscribePipelineError(self) -> Union[bool, Exception]:
        url = self.__morpheus_root + '/restconf/streams/naudit-morpheus:morpheus/pipeline-error'
        with self.__lock:
            try:
                self.__pipeline_error_thread = threading.Thread(
                        target=self.__handle_notification_stream,
                        args=(url, self.__pipeline_error_queue),
                        daemon=True
                        )
                self.__pipeline_error_thread.start()
                return True
            except Exception as e:
                LOGGER.exception(f'Error subscribing to pipeline errors: {str(e)}')
                return e

    def UnsubscribePipelineError(self) -> Union[bool, Exception]:
        try:
            if self.__pipeline_error_thread and self.__pipeline_error_thread.is_alive():
                self.__pipeline_error_thread.join(timeout=5)
            return True
        except Exception as e:
            LOGGER.exception(f'Error unsubscribing from pipeline errors: {str(e)}')
            return e

    def GetPipelineError(self, blocking=False, terminate: Optional[threading.Event] = None) -> Iterator[Dict]:
        while True:
            if self.__terminate.is_set(): break
            if terminate is not None and terminate.is_set(): break
            try:
                error = self.__pipeline_error_queue.get(block=blocking, timeout=0.1)
                if error is not None:
                    yield error
            except queue.Empty:
                if blocking:
                    continue
                return

    def __handle_notification_stream(self, url: str, queue: Queue[Any]) -> None:
        try:
            with requests.get(url,
                              headers=self.__headers,
                              stream=True,
                              verify=False) as response:

                if not response.ok:
                    LOGGER.error(f'Error connecting to event stream: {response.status_code}')
                    return

                try:
                    event = response.json()
                    queue.put(event['data']['ietf-restconf:notification'])
                except json.JSONDecodeError as e:
                    LOGGER.error(f'Error parsing event: {e}')
        except Exception as e:
            LOGGER.exception(f'Error in notification stream handler: {str(e)}')