Commit d3dcedca authored by Lluis Gifre Renom's avatar Lluis Gifre Renom
Browse files

intermediate backup of Device monitoring loops

parent 446e0db8
Loading
Loading
Loading
Loading
+36 −15
Original line number Diff line number Diff line
@@ -19,9 +19,9 @@ from .database.DatabaseTools import (
    delete_device_from_context, get_device_driver_filter_fields, sync_device_from_context, sync_device_to_context,
    update_device_in_local_database)
from .database.DeviceModel import DeviceModel, DriverModel
from .database.EndPointModel import EndPointModel
from .database.EndPointModel import EndPointModel, EndPointMonitorModel
from .database.KpiModel import KpiModel
from .database.KpiSampleType import grpc_to_enum__kpi_sample_type
from .database.KpiSampleType import ORM_KpiSampleType, grpc_to_enum__kpi_sample_type
from .driver_api._Driver import _Driver, RESOURCE_ENDPOINTS, RESOURCE_INTERFACES, RESOURCE_NETWORK_INSTANCES
from .driver_api.DriverInstanceCache import DriverInstanceCache
from .driver_api.Tools import (
@@ -109,26 +109,34 @@ class DeviceServiceServicerImpl(DeviceServiceServicer):
        driver.Connect()

        endpoints = driver.GetConfig([RESOURCE_ENDPOINTS])
        #LOGGER.info('[AddDevice] endpoints = {:s}'.format(str(endpoints)))
        for _, resource_value in endpoints:
        LOGGER.info('[AddDevice] endpoints = {:s}'.format(str(endpoints)))
        for resource_key, resource_value in endpoints:
            endpoint_uuid = resource_value.get('uuid')
            endpoint_type = resource_value.get('type')
            str_endpoint_key = key_to_str([device_uuid, endpoint_uuid])
            update_or_create_object(
            db_endpoint, _ = update_or_create_object(
                self.database, EndPointModel, str_endpoint_key, {
                'device_fk'    : db_device,
                'endpoint_uuid': endpoint_uuid,
                'endpoint_type': endpoint_type,
                'resource_key' : resource_key,
            })
            sample_types = resource_value.get('sample_types', {})
            for sample_type, monitor_resource_key in sample_types.items():
                str_endpoint_monitor_key = key_to_str([str_endpoint_key, str(sample_type)])
                update_or_create_object(self.database, EndPointMonitorModel, str_endpoint_monitor_key, {
                    'endpoint_fk'    : db_endpoint,
                    'resource_key'   : monitor_resource_key,
                    'kpi_sample_type': grpc_to_enum__kpi_sample_type(sample_type),
                })

        running_config_rules = driver.GetConfig([RESOURCE_INTERFACES, RESOURCE_NETWORK_INSTANCES])
        running_config_rules = driver.GetConfig()
        running_config_rules = [
            (ORM_ConfigActionEnum.SET, config_rule[0], json.dumps(config_rule[1], sort_keys=True))
            for config_rule in running_config_rules
        ]
        #for running_config_rule in running_config_rules:
        #    LOGGER.info('[AddDevice] running_config_rule: {:s}'.format(str(running_config_rule)))

        update_config(self.database, device_uuid, 'running', running_config_rules)

        initial_config_rules = driver.GetInitialConfig()
@@ -190,6 +198,15 @@ class DeviceServiceServicerImpl(DeviceServiceServicer):
        if len(errors) > 0:
            raise OperationFailedException('ConfigureDevice', extra_details=errors)

        running_config_rules = driver.GetConfig()
        running_config_rules = [
            (ORM_ConfigActionEnum.SET, config_rule[0], json.dumps(config_rule[1], sort_keys=True))
            for config_rule in running_config_rules
        ]
        #for running_config_rule in running_config_rules:
        #    LOGGER.info('[AddDevice] running_config_rule: {:s}'.format(str(running_config_rule)))
        update_config(self.database, device_uuid, 'running', running_config_rules)

        sync_device_to_context(db_device, self.context_client)
        return DeviceId(**db_device.dump_id())

@@ -204,6 +221,9 @@ class DeviceServiceServicerImpl(DeviceServiceServicer):
        self.driver_instance_cache.delete(device_uuid)
        delete_device_from_context(db_device, self.context_client)

        for db_kpi_pk,_ in db_device.references(KpiModel):
            KpiModel(self.database, db_kpi_pk).delete()

        for db_endpoint_pk,_ in db_device.references(EndPointModel):
            EndPointModel(self.database, db_endpoint_pk).delete()

@@ -252,6 +272,7 @@ class DeviceServiceServicerImpl(DeviceServiceServicer):
            str_endpoint_key = key_to_str([str_endpoint_key, str_topology_key], separator=':')
        db_endpoint : EndPointModel = get_object(
            self.database, EndPointModel, str_endpoint_key, raise_if_not_found=False)
        sampling_resource = db_endpoint.resource_key

        attributes = {
            'kpi_uuid'         : request.kpi_id.kpi_id.uuid,
@@ -273,8 +294,6 @@ class DeviceServiceServicerImpl(DeviceServiceServicer):
            msg = 'Device({:s}) has not been added to this Device instance'.format(str(device_uuid))
            raise OperationFailedException('ConfigureDevice', extra_details=msg)

        sampling_resource = driver.GetResource(db_endpoint.endpoint_uuid)

        #resources_to_subscribe   : List[Tuple[str, float, float]] = [] # key, sampling_duration, sampling_interval
        #resources_to_unsubscribe : List[Tuple[str, float, float]] = [] # key, sampling_duration, sampling_interval
        #LOGGER.info('[ConfigureDevice] resources_to_subscribe = {:s}'.format(str(resources_to_subscribe)))
@@ -289,10 +308,12 @@ class DeviceServiceServicerImpl(DeviceServiceServicer):
        #    results_unsubscribestate = driver.UnsubscribeState(resources_to_unsubscribe)
        #    errors.extend(check_unsubscribe_errors(resources_to_delete, results_unsubscribestate))

        results = driver.SubscribeState([
        subscriptions = [
            (sampling_resource, db_kpi.sampling_duration, db_kpi.sampling_interval),
        ])
        assert len(results) == 4
        ]
        results = driver.SubscribeState(subscriptions)
        LOGGER.info('results = {:s}'.format(str(results)))
        assert len(results) == len(subscriptions)
        for result in results: assert isinstance(result, bool) and result

        self.monitoring_loops.add(device_uuid, driver)
+14 −5
Original line number Diff line number Diff line
import logging, queue, threading
from typing import Dict
from common.orm.Database import Database
from common.orm.HighLevel import get_object
from device.service.database.KpiModel import KpiModel
from monitoring.client.monitoring_client import MonitoringClient
from monitoring.proto.monitoring_pb2 import Kpi
from .driver_api._Driver import _Driver
@@ -8,7 +11,8 @@ LOGGER = logging.getLogger(__name__)
QUEUE_GET_WAIT_TIMEOUT = 0.5

class MonitoringLoop:
    def __init__(self, driver : _Driver, samples_queue : queue.Queue) -> None:
    def __init__(self, device_uuid : str, driver : _Driver, samples_queue : queue.Queue) -> None:
        self._device_uuid = device_uuid
        self._driver = driver
        self._samples_queue = samples_queue
        self._running = threading.Event()
@@ -19,8 +23,7 @@ class MonitoringLoop:
    def _collect(self) -> None:
        for sample in self._samples_stream:
            if self._terminate.is_set(): break
            LOGGER.info('[MonitoringLoop:_collect] sample={:s}'.format(str(sample)))
            # TODO: add timestamp (if not present)
            sample = (self._device_uuid, *sample)
            self._samples_queue.put_nowait(sample)

    def start(self):
@@ -38,6 +41,7 @@ class MonitoringLoop:
class MonitoringLoops:
    def __init__(self, monitoring_client : MonitoringClient) -> None:
        self._monitoring_client = monitoring_client
        self._database = None
        self._samples_queue = queue.Queue()
        self._running = threading.Event()
        self._terminate = threading.Event()
@@ -45,11 +49,14 @@ class MonitoringLoops:
        self._device_uuid__to__monitoring_loop : Dict[str, MonitoringLoop] = {}
        self._exporter_thread = threading.Thread(target=self._export, daemon=False)

    def set_database(self, database : Database) -> None:
        self._database = database

    def add(self, device_uuid : str, driver : _Driver) -> None:
        with self._lock:
            monitoring_loop = self._device_uuid__to__monitoring_loop.get(device_uuid)
            if (monitoring_loop is not None) and monitoring_loop.is_running: return
            monitoring_loop = MonitoringLoop(driver, self._samples_queue)
            monitoring_loop = MonitoringLoop(device_uuid, driver, self._samples_queue)
            self._device_uuid__to__monitoring_loop[device_uuid] = monitoring_loop
            monitoring_loop.start()

@@ -78,6 +85,8 @@ class MonitoringLoops:
                LOGGER.info('[MonitoringLoops:_export] sample={:s}'.format(str(sample)))
            except queue.Empty:
                continue
            # TODO: find in database the KpiId, format KPI and send to Monitoring
            
            get_object(self._database, KpiModel)
            self._database
            kpi_data = {}
            self._monitoring_client.IncludeKpi(Kpi(**kpi_data))
+9 −0
Original line number Diff line number Diff line
import logging
from typing import Dict
from common.orm.fields.EnumeratedField import EnumeratedField
from common.orm.fields.ForeignKeyField import ForeignKeyField
from common.orm.fields.PrimaryKeyField import PrimaryKeyField
from common.orm.fields.StringField import StringField
from common.orm.model.Model import Model
from .DeviceModel import DeviceModel
from .KpiSampleType import ORM_KpiSampleType
from .TopologyModel import TopologyModel

LOGGER = logging.getLogger(__name__)
@@ -15,6 +17,7 @@ class EndPointModel(Model):
    device_fk = ForeignKeyField(DeviceModel)
    endpoint_uuid = StringField(required=True, allow_empty=False)
    endpoint_type = StringField()
    resource_key = StringField(required=True, allow_empty=False)

    def dump_id(self) -> Dict:
        device_id = DeviceModel(self.database, self.device_fk).dump_id()
@@ -31,3 +34,9 @@ class EndPointModel(Model):
            'endpoint_id': self.dump_id(),
            'endpoint_type': self.endpoint_type,
        }

class EndPointMonitorModel(Model):
    pk = PrimaryKeyField()
    endpoint_fk = ForeignKeyField(EndPointModel)
    resource_key = StringField(required=True, allow_empty=False)
    kpi_sample_type = EnumeratedField(ORM_KpiSampleType, required=True)
+6 −1
Original line number Diff line number Diff line
import anytree
from typing import Any, List, Optional
from apscheduler.job import Job

class TreeNode(anytree.node.Node):
    def __init__(self, name, parent=None, children=None, **kwargs) -> None:
@@ -45,6 +46,9 @@ def set_subnode_value(resolver : anytree.Resolver, root : TreeNode, path : List[
            node = resolver.get(node, path_item)
        except anytree.ChildResolverError:
            node = TreeNode(path_item, parent=node)
    if isinstance(node.value, dict) and isinstance(value, dict):
        node.value.update(value)
    else:
        node.value = value

def dump_subtree(root : TreeNode):
@@ -56,5 +60,6 @@ def dump_subtree(root : TreeNode):
        if len(path) == 0: continue
        value = node.value
        if value is None: continue
        if isinstance(value, Job): value = str(value)
        results.append((path, value))
    return results
+26 −9
Original line number Diff line number Diff line
import json
import anytree, logging, pytz, queue, random, threading
from datetime import datetime, timedelta
from typing import Any, Dict, Iterator, List, Optional, Tuple, Union
@@ -6,6 +7,7 @@ from apscheduler.job import Job
from apscheduler.jobstores.memory import MemoryJobStore
from apscheduler.schedulers.background import BackgroundScheduler
from common.type_checkers.Checkers import chk_float, chk_length, chk_string, chk_type
from device.service.database.KpiSampleType import ORM_KpiSampleType, grpc_to_enum__kpi_sample_type
from device.service.driver_api._Driver import (
    RESOURCE_ENDPOINTS, RESOURCE_INTERFACES, RESOURCE_NETWORK_INSTANCES,
    _Driver)
@@ -22,11 +24,27 @@ SPECIAL_RESOURCE_MAPPINGS = {
def compose_resource_endpoint(endpoint_data : Dict[str, Any]) -> Tuple[str, Any]:
    endpoint_uuid = endpoint_data.get('uuid')
    if endpoint_uuid is None: return None
    endpoint_type = endpoint_data.get('type')
    if endpoint_type is None: return None
    endpoint_resource_path = SPECIAL_RESOURCE_MAPPINGS.get(RESOURCE_ENDPOINTS)
    endpoint_resource_key = '{:s}/endpoint[{:s}]'.format(endpoint_resource_path, endpoint_uuid)
    endpoint_resource_value = {'uuid': endpoint_uuid, 'type': endpoint_type}

    endpoint_type = endpoint_data.get('type')
    if endpoint_type is None: return None

    endpoint_sample_types = endpoint_data.get('sample_types')
    if endpoint_sample_types is None: return None
    sample_types = {}
    for endpoint_sample_type in endpoint_sample_types:
        try:
            kpi_sample_type : ORM_KpiSampleType = grpc_to_enum__kpi_sample_type(endpoint_sample_type)
        except: # pylint: disable=bare-except
            LOGGER.warning('Unknown EndpointSampleType({:s}) for Endpoint({:s}). Ignoring and continuing...'.format(
                str(endpoint_sample_type), str(endpoint_data)))
            continue
        metric_name = kpi_sample_type.name.lower()
        monitoring_resource_key = '{:s}/state/{:s}'.format(endpoint_resource_key, metric_name)
        sample_types[endpoint_sample_type] = monitoring_resource_key

    endpoint_resource_value = {'uuid': endpoint_uuid, 'type': endpoint_type, 'sample_types': sample_types}
    return endpoint_resource_key, endpoint_resource_value

def do_sampling(resource_key : str, out_samples : queue.Queue):
@@ -105,12 +123,6 @@ class EmulatedDriver(_Driver):
                results.extend(dump_subtree(resource_node))
            return results

    #def GetResource(self, endpoint_uuid : str) -> Optional[str]:
    #    chk_string('endpoint_uuid', endpoint_uuid)
    #    return {
    #        #'key': 'value',
    #    }.get(endpoint_uuid)

    def SetConfig(self, resources : List[Tuple[str, Any]]) -> List[Union[bool, Exception]]:
        chk_type('resources', resources, list)
        if len(resources) == 0: return []
@@ -130,6 +142,11 @@ class EmulatedDriver(_Driver):
                    results.append(e) # if validation fails, store the exception
                    continue

                try:
                    resource_value = json.loads(resource_value)
                except: # pylint: disable=broad-except
                    pass

                set_subnode_value(resolver, self.__running, resource_path, resource_value)
                results.append(True)
        return results
Loading