Skip to content
Events.py 574 B
Newer Older
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
import json, time
from typing import Dict
from common.message_broker.Message import Message
from common.message_broker.MessageBroker import MessageBroker
from context.proto.context_pb2 import EventTypeEnum

def notify_event(
    messagebroker : MessageBroker, topic_name : str, event_type : EventTypeEnum, fields : Dict[str, str]) -> None:

    event = {'event': {'timestamp': time.time(), 'event_type': event_type}}
    for field_name, field_value in fields.items():
        event[field_name] = field_value
    messagebroker.publish(Message(topic_name, json.dumps(event)))