Commit efc75241 authored by Jorge Moratinos's avatar Jorge Moratinos
Browse files

Change redis bus name to events, also code refactor of events

parent 45bd21d6
Loading
Loading
Loading
Loading
Loading
+1 −1
Original line number Diff line number Diff line
@@ -35,7 +35,7 @@ class RedisEvent():
        return json.dumps(self.redis_event, cls=JSONEncoder)

    def send_event(self):
        publisher_ops.publish_message("events-log", self.to_string())
        publisher_ops.publish_message("events", self.to_string())

    def __call__(self):
        return self.redis_event
+1 −1
Original line number Diff line number Diff line
@@ -35,7 +35,7 @@ class RedisEvent():
        return json.dumps(self.redis_event, cls=JSONEncoder)

    def send_event(self):
        publisher_ops.publish_message("events-log", self.to_string())
        publisher_ops.publish_message("events", self.to_string())

    def __call__(self):
        return self.redis_event
+4 −7
Original line number Diff line number Diff line
@@ -16,19 +16,16 @@ class Subscriber():
        self.notification = Notifications()
        self.event_ops = InternalEventOperations()
        self.p = self.r.pubsub()
        self.p.subscribe("events", "internal-messages", "events-log")
        self.p.subscribe("events", "internal-messages")

    def listen(self):
        for raw_message in self.p.listen():
            current_app.logger.info(raw_message)
            if raw_message["type"] == "message" and raw_message["channel"].decode('utf-8') == "events":
                current_app.logger.info("Event received")
                self.notification.send_notifications(raw_message["data"].decode('utf-8'))
            elif raw_message["type"] == "message" and raw_message["channel"].decode('utf-8') == "events-log":
                current_app.logger.info("Event-log received")
                event_redis=json.loads(raw_message["data"].decode('utf-8'))
                current_app.logger.info(json.dumps(event_redis, indent=4))
                self.notification.send_notifications_new(event_redis)
                redis_event=json.loads(raw_message["data"].decode('utf-8'))
                current_app.logger.info(json.dumps(redis_event, indent=4))
                self.notification.send_notifications(redis_event)
            elif raw_message["type"] == "message" and raw_message["channel"].decode('utf-8') == "internal-messages":
                message, *subscriber_ids = raw_message["data"].decode('utf-8').split(":")
                if message == "invoker-removed" and len(subscriber_ids)>0:
+6 −20
Original line number Diff line number Diff line
@@ -16,21 +16,7 @@ class Notifications():
    def __init__(self):
        self.events_ops = InternalEventOperations()

    def send_notifications(self, event):
        current_app.logger.info("Received event, sending notifications")
        subscriptions = self.events_ops.get_event_subscriptions(event)

        try:
            for sub in subscriptions:
                url = sub["notification_destination"]
                data = EventNotification(sub["subscription_id"], events=event)
                self.request_post(url, data)

        except Exception as e:
            current_app.logger.error("An exception occurred ::" + str(e))
            return False

    def send_notifications_new(self, redis_event):
    def send_notifications(self, redis_event):
        try:
            if redis_event.get('event', None) == None:
                raise("Event value is not present on received event from REDIS")
@@ -69,9 +55,9 @@ class Notifications():
    async def send(self, url, data):
        try:
            response = await self.send_request(url, data)
            print(response)
            current_app.logger.debug(response)
        except asyncio.TimeoutError:
            print("Timeout: Request timeout")


            current_app.logger.error("Timeout: Request timeout")
        except Exception as e:
            current_app.logger.error("An exception occurred sending notification::" + str(e))
            return False
+1 −1
Original line number Diff line number Diff line
@@ -35,7 +35,7 @@ class RedisEvent():
        return json.dumps(self.redis_event, cls=JSONEncoder)

    def send_event(self):
        publisher_ops.publish_message("events-log", self.to_string())
        publisher_ops.publish_message("events", self.to_string())

    def __call__(self):
        return self.redis_event
Loading