Commit 704b6b79 authored by Jorge Moratinos's avatar Jorge Moratinos
Browse files

Add new object at loggin to send redis message to events service

parent 78fd87c5
Loading
Loading
Loading
Loading
Loading
+8 −1
Original line number Diff line number Diff line
@@ -16,13 +16,20 @@ class Subscriber():
        self.notification = Notifications()
        self.event_ops = InternalEventOperations()
        self.p = self.r.pubsub()
        self.p.subscribe("events", "internal-messages")
        self.p.subscribe("events", "internal-messages", "events-log")

    def listen(self):
        for raw_message in self.p.listen():
            current_app.logger.info(raw_message)
            if raw_message["type"] == "message" and raw_message["channel"].decode('utf-8') == "events":
                current_app.logger.info("Event received")
                self.notification.send_notifications(raw_message["data"].decode('utf-8'))
            if raw_message["type"] == "message" and raw_message["channel"].decode('utf-8') == "events-log":
                current_app.logger.info("Event-log received")
                event_redis=json.loads(raw_message["data"].decode('utf-8'))
                current_app.logger.info(json.dumps(event_redis, indent=4))
                self.notification.send_notifications_new(event_redis)


            elif raw_message["type"] == "message" and raw_message["channel"].decode('utf-8') == "internal-messages":
                message, *invoker_id = raw_message["data"].decode('utf-8').split(":")
+2 −2
Original line number Diff line number Diff line
@@ -20,10 +20,10 @@ class InternalEventOperations(Resource):
        #self.auth_manager.remove_auth_all_event(subscriber_id)

    def get_event_subscriptions(self, event):
        current_app.logger.info("get subscription from db")
        try:
            mycol = self.db.get_col_by_name(self.db.event_collection)

            query= {'events':event}
            query={'events':{'$in':[event]}}
            subscriptions = mycol.find(query)

            if  subscriptions is None:
+47 −0
Original line number Diff line number Diff line
@@ -8,6 +8,8 @@ from ..encoder import JSONEncoder
import sys
import json
from flask import current_app
import asyncio
import aiohttp

class Notifications():

@@ -37,7 +39,52 @@ class Notifications():
            current_app.logger.error("An exception occurred ::" + str(e))
            return False

    def send_notifications_new(self, redis_event):
        try:
            if redis_event.get('event', None) == None:
                raise("Event value is not present on received event from REDIS")
            
            current_app.logger.info("Received event " + redis_event.get('event') + ", sending notifications")
            subscriptions = self.events_ops.get_event_subscriptions(redis_event.get('event'))
            # message, *ids = event.split(":")
            current_app.logger.info(subscriptions)

            for sub in subscriptions:
                url = sub["notification_destination"]
                current_app.logger.debug(url)
                event_detail=None
                if redis_event.get('key', None) != None and redis_event.get('information', None) != None:
                    # current_app.logger.debug(json.dumps(redis_event.get('information'),cls=JSONEncoder))
                    # event_detail=CAPIFEventDetail().from_dict({redis_event.get('key'):redis_event.get('information')})
                    # current_app.logger.debug(json.dumps(event_detail,cls=JSONEncoder))
                    event_detail={redis_event.get('key'):redis_event.get('information')}
                
                data = EventNotification(sub["subscription_id"], events=redis_event.get('event'), event_detail=[event_detail])
                current_app.logger.debug(json.dumps(data,cls=JSONEncoder))
                
                # self.request_post(url, data)
                asyncio.run(self.send(url, json.dumps(data,cls=JSONEncoder)))

        except Exception as e:
            current_app.logger.error("An exception occurred ::" + str(e))
            return False

    def request_post(self, url, data):
        headers = {'content-type': 'application/json'}
        return requests.post(url, json={'text': str(data.to_str())}, headers=headers)
    
    async def send_request(self, url, data):
        async with aiohttp.ClientSession() as session:
            timeout = aiohttp.ClientTimeout(total=10)  # Establecer timeout a 10 segundos
            async with session.post(url, json=data, timeout=timeout) as response:
                return await response.text()
    
    async def send(self, url, data):
        try:
            response = await self.send_request(url, data)
            print(response)
        except asyncio.TimeoutError:
            print("Timeout: Request timeout")


+2 −0
Original line number Diff line number Diff line
@@ -21,3 +21,5 @@ rfc3987
redis
flask_executor
Flask-APScheduler
aiohttp==3.9.5
async-timeout==4.0.3
+18 −5
Original line number Diff line number Diff line
@@ -13,9 +13,11 @@ from ..util import dict_to_camel_case, clean_empty
from .resources import Resource
from .responses import bad_request_error, internal_server_error, forbidden_error, not_found_error, unauthorized_error, make_response
from ..models.invocation_log import InvocationLog
from .publisher import Publisher
# from .publisher import Publisher
from .redis_event import RedisEvent
import copy

publisher_ops = Publisher()
# publisher_ops = Publisher()


class LoggingInvocationOperations(Resource):
@@ -82,17 +84,28 @@ class LoggingInvocationOperations(Resource):
                return result

            current_app.logger.debug("Check service apis")
            event=None
            invocation_log_base=json.loads(json.dumps(invocationlog, cls=JSONEncoder))

            for log in invocationlog.logs:
                result = self.__check_service_apis(log.api_id, log.api_name)

                current_app.logger.debug("Inside for loop.")
                if result is not None:
                    return result
                
                if log.result:
                    current_app.logger.debug(log)
                    if int(log.result) >= 200 and int(log.result) < 300:
                        publisher_ops.publish_message("events", "SERVICE_API_INVOCATION_SUCCESS")
                        event="SERVICE_API_INVOCATION_SUCCESS"
                    else:
                        publisher_ops.publish_message("events", "SERVICE_API_INVOCATION_FAILURE")
                        event="SERVICE_API_INVOCATION_FAILURE"

                    current_app.logger.info(event)
                    invocation_log_base['logs']=[log]
                    RedisEvent(event,invocation_log_base,"invocationLogs").send_event()

            current_app.logger.debug("After log check")

            current_app.logger.debug("Check existing logs")
            my_query = {'aef_id': aef_id, 'api_invoker_id': invocationlog.api_invoker_id}
Loading