Loading services/TS29222_CAPIF_Security_API/capif_security/core/redis_event.py 0 → 100644 +41 −0 Original line number Diff line number Diff line from ..encoder import JSONEncoder from .publisher import Publisher import json publisher_ops = Publisher() class RedisEvent(): def __init__(self, event, event_detail_key=None, information=None) -> None: self.EVENTS_ENUM = [ 'SERVICE_API_AVAILABLE', 'SERVICE_API_UNAVAILABLE', 'SERVICE_API_UPDATE', 'API_INVOKER_ONBOARDED', 'API_INVOKER_OFFBOARDED', 'SERVICE_API_INVOCATION_SUCCESS', 'SERVICE_API_INVOCATION_FAILURE', 'ACCESS_CONTROL_POLICY_UPDATE', 'ACCESS_CONTROL_POLICY_UNAVAILABLE', 'API_INVOKER_AUTHORIZATION_REVOKED', 'API_INVOKER_UPDATED', 'API_TOPOLOGY_HIDING_CREATED', 'API_TOPOLOGY_HIDING_REVOKED'] if event not in self.EVENTS_ENUM: raise Exception( "Event (" + event + ") is not on event enum (" + ','.join(self.EVENTS_ENUM) + ")") self.redis_event = { "event": event } if event_detail_key != None and information != None: self.redis_event['key'] = event_detail_key self.redis_event['information'] = information def to_string(self): return json.dumps(self.redis_event, cls=JSONEncoder) def send_event(self): publisher_ops.publish_message("events-log", self.to_string()) def __call__(self): return self.redis_event Loading
services/TS29222_CAPIF_Security_API/capif_security/core/redis_event.py 0 → 100644 +41 −0 Original line number Diff line number Diff line from ..encoder import JSONEncoder from .publisher import Publisher import json publisher_ops = Publisher() class RedisEvent(): def __init__(self, event, event_detail_key=None, information=None) -> None: self.EVENTS_ENUM = [ 'SERVICE_API_AVAILABLE', 'SERVICE_API_UNAVAILABLE', 'SERVICE_API_UPDATE', 'API_INVOKER_ONBOARDED', 'API_INVOKER_OFFBOARDED', 'SERVICE_API_INVOCATION_SUCCESS', 'SERVICE_API_INVOCATION_FAILURE', 'ACCESS_CONTROL_POLICY_UPDATE', 'ACCESS_CONTROL_POLICY_UNAVAILABLE', 'API_INVOKER_AUTHORIZATION_REVOKED', 'API_INVOKER_UPDATED', 'API_TOPOLOGY_HIDING_CREATED', 'API_TOPOLOGY_HIDING_REVOKED'] if event not in self.EVENTS_ENUM: raise Exception( "Event (" + event + ") is not on event enum (" + ','.join(self.EVENTS_ENUM) + ")") self.redis_event = { "event": event } if event_detail_key != None and information != None: self.redis_event['key'] = event_detail_key self.redis_event['information'] = information def to_string(self): return json.dumps(self.redis_event, cls=JSONEncoder) def send_event(self): publisher_ops.publish_message("events-log", self.to_string()) def __call__(self): return self.redis_event