Commit 41122c06 authored by Lluis Gifre Renom's avatar Lluis Gifre Renom
Browse files

Multiple changes:

Common:
- Added missing test units for ORM framework
- Solved few bugs in ORM framework
- Implemented initial skeleton of MessageBroker to implement GetEvents methods for Context.

Context:
- Implemented TopologyModel and related test units
- Solved few bugs in ContextModel
parent 5c34ea77
Loading
Loading
Loading
Loading
+1 −1
Original line number Diff line number Diff line
#!/bin/bash

./report_coverage_all.sh | grep --color -E -i "^common/.*$|$"
./report_coverage_all.sh | grep -v -E "^(cent|comp|cont|devi|moni|serv|test)" | grep --color -E -i "^common/.*$|$"

report_coverage_common_orm.sh

deleted100755 → 0
+0 −3
Original line number Diff line number Diff line
#!/bin/bash

./report_coverage_all.sh | grep -v -E "^(cent|comp|cont|devi|moni|serv|test)" | grep --color -E -i "^common/orm/.*$|$"
+145 −0
Original line number Diff line number Diff line
import logging, sys, threading, time
from queue import Queue, Empty
from typing import Dict, Iterator, NamedTuple, Optional, Set

LOGGER = logging.getLogger(__name__)

SUBSCRIBER_DEAD_TIMEOUT = 5.0 # seconds
CONSUME_TIMEOUT = 0.5 # seconds

class Watchdog:
    def __init__(self, dead_timeout) -> None:
        self._lock = threading.Lock()
        self._dead_timeout = dead_timeout
        self._timestamp = time.time()

    def refresh(self):
        with self._lock:
            self._timestamp = time.time()

    def is_alive(self):
        with self._lock:
            return (time.time() - self._timestamp) < self._dead_timeout

class Message(NamedTuple):
    topic: str
    content: str

class Subscriber(Watchdog):
    def __init__(self, consume_timeout=CONSUME_TIMEOUT, dead_timeout=SUBSCRIBER_DEAD_TIMEOUT) -> None:
        super().__init__(dead_timeout)
        self._consume_timeout = consume_timeout
        self._queue = Queue()

    def publish(self, message : Message) -> None:
        self._queue.put_nowait(message)

    def consume(self) -> Optional[Message]:
        self.refresh()
        try:
            return self._queue.get(block=True, timeout=self._consume_timeout)
        except Empty:
            return None

class Topic:
    def __init__(self) -> None:
        self._subscribers : Set[Subscriber] = set()

    def add_subscriber(self, subscriber : Subscriber) -> None:
        self._subscribers.add(subscriber)

    def remove_subscriber(self, subscriber : Subscriber) -> None:
        self._subscribers.discard(subscriber)

    def publish(self, message : Message) -> None:
        dead_subscribers = set()
        for subscriber in self._subscribers:
            if subscriber.is_alive():
                subscriber.publish(message)
            else:
                dead_subscribers.add(subscriber)
        for dead_subscriber in dead_subscribers:
            self.remove_subscriber(dead_subscriber)

class MessageBroker:
    def __init__(self):
        self._terminate = threading.Event()
        self._topics : Dict[str, Topic] = {}

    def get_or_create_topic(self, topic_name) -> Topic:
        return self._topics.setdefault(topic_name, Topic())

    def terminate(self) -> None:
        self._terminate.set()

    def publish(self, message : Message) -> None:
        self.get_or_create_topic(message.topic).publish(message)

    def consume(
        self, topic_names : Set[str], consume_timeout=CONSUME_TIMEOUT, dead_timeout=SUBSCRIBER_DEAD_TIMEOUT
        ) -> Iterator[Message]:

        subscriber = Subscriber(consume_timeout=consume_timeout, dead_timeout=dead_timeout)
        for topic_name in topic_names:
            self.get_or_create_topic(topic_name).add_subscriber(subscriber)

        while not self._terminate.is_set():
            message = subscriber.consume()
            if message is None: continue
            yield message

        for topic_name in topic_names:
            self.get_or_create_topic(topic_name).remove_subscriber(subscriber)

class Consumer(threading.Thread):
    def __init__(
        self, message_broker : MessageBroker, topic_names : Set[str], consume_timeout=CONSUME_TIMEOUT,
        dead_timeout=SUBSCRIBER_DEAD_TIMEOUT) -> None:

        super().__init__(daemon=True)
        self._message_broker = message_broker
        self._topic_names = topic_names
        self._consume_timeout = consume_timeout
        self._dead_timeout = dead_timeout

    def run(self) -> None:
        print(time.time(), self.name, 'subscribes to', self._topic_names)
        messages = self._message_broker.consume(
            self._topic_names, consume_timeout=self._consume_timeout, dead_timeout=self._dead_timeout)
        for message in messages:
            print(time.time(), self.name, 'receives', message)
        print(time.time(), self.name, 'terminates')

TOPIC_DEVICES  = 'devices'
TOPIC_LINKS    = 'links'
TOPIC_SERVICES = 'services'

def main():
    message_broker = MessageBroker()

    consumer1 = Consumer(message_broker, {TOPIC_DEVICES, TOPIC_LINKS})
    consumer1.start()

    consumer2 = Consumer(message_broker, {TOPIC_DEVICES, TOPIC_SERVICES})
    consumer2.start()

    consumer3 = Consumer(message_broker, {TOPIC_SERVICES})
    consumer3.start()

    time.sleep(SUBSCRIBER_DEAD_TIMEOUT)

    message_broker.publish(Message(topic=TOPIC_DEVICES, content='new-device-01'))
    message_broker.publish(Message(topic=TOPIC_DEVICES, content='new-device-02'))
    message_broker.publish(Message(topic=TOPIC_LINKS,   content='new-link-01-02'))

    time.sleep(SUBSCRIBER_DEAD_TIMEOUT)

    message_broker.publish(Message(topic=TOPIC_DEVICES,  content='update-device-01'))
    message_broker.publish(Message(topic=TOPIC_DEVICES,  content='update-device-02'))
    message_broker.publish(Message(topic=TOPIC_SERVICES, content='new-service-01-02'))

    message_broker.terminate()
    return 0

if __name__ == '__main__':
    sys.exit(main())
+0 −0

Empty file added.

+27 −11
Original line number Diff line number Diff line
@@ -123,14 +123,14 @@ class Model(metaclass=MetaModel):
        raise MutexException('Unable to unlock keys {:s} using owner_key {:s}'.format(
            str(lock_keys), str(self._owner_key)))

    def load(self) -> None:
    def load(self) -> bool:
        pk_field_name = self._pk_field_name # pylint: disable=no-member

        try:
            self.lock()

            attributes = self._backend.dict_get(self._instance_key)
            if attributes is None: return
            if attributes is None or len(attributes) == 0: return False
            for field_name in self._field_names_list: # pylint: disable=no-member
                if field_name == pk_field_name: continue
                if field_name not in attributes: continue
@@ -141,6 +141,7 @@ class Model(metaclass=MetaModel):
                    setattr(self, field_name + KEYWORD_STORED, field_value)
                    field_value = field_instance.foreign_model(self._database, field_value, auto_load=True)
                setattr(self, field_name, field_value)
            return True
        finally:
            self.unlock()

@@ -219,22 +220,37 @@ class Model(metaclass=MetaModel):
        finally:
            self.unlock(extra_keys=extra_keys)

    @staticmethod
    def get_model_name(model_or_str) -> str:
        if isinstance(model_or_str, str):
            return model_or_str
        if (type(model_or_str).__name__ == 'MetaModel') and issubclass(model_or_str, Model):
            return model_or_str.__name__
        raise Exception()

    def references(
        self, filter_by_models : Optional[Union[type, List[type], Set[type]]] = None) -> Set[Tuple[str, str]]:
        self, filter_by_models : Optional[Union[type, List[type], Set[type], Tuple[type]]] = None
        ) -> Set[Tuple[str, str]]:

        try:
            self.lock()
            if not self._backend.exists(self._references_key): return {}
            references = self._backend.set_get_all(self._references_key)
            try:
                if filter_by_models is None:
                    pass
            elif isinstance(filter_by_models, (list, set)):
                filter_by_models = {type(chk_issubclass(model, Model)).__name__ for model in filter_by_models}
            elif issubclass(filter_by_models, Model):
                filter_by_models = {type(chk_issubclass(filter_by_models, Model)).__name__}
                elif isinstance(filter_by_models, str):
                    filter_by_models = {filter_by_models}
                elif isinstance(filter_by_models, (list, set, tuple)):
                    filter_by_models = {Model.get_model_name(model_or_str) for model_or_str in filter_by_models}
                elif (type(filter_by_models).__name__ == 'MetaModel') and issubclass(filter_by_models, Model):
                    filter_by_models = {Model.get_model_name(filter_by_models)}
                else:
                    raise Exception()
            except Exception as e:
                msg = 'filter_by_models({:s}) unsupported. Expected a type or a list/set of types. Optionally, keep '\
                      'it as None to retrieve all the references pointing to this instance.'
                raise AttributeError(msg.format(str(filter_by_models)))
                raise AttributeError(msg.format(str(filter_by_models))) from e
            if filter_by_models:
                references = filter(lambda instance_key: instance_key.split('[', 1)[0] in filter_by_models, references)
            return {tuple(reference.rsplit(':', 1)) for reference in references}
Loading