Newer
Older
import json, time
from typing import Dict
from common.message_broker.Message import Message
from common.message_broker.MessageBroker import MessageBroker
from context.proto.context_pb2 import EventTypeEnum
def notify_event(
messagebroker : MessageBroker, topic_name : str, event_type : EventTypeEnum, fields : Dict[str, str]) -> None:
event = {'event': {'timestamp': time.time(), 'event_type': event_type}}
for field_name, field_value in fields.items():
event[field_name] = field_value
messagebroker.publish(Message(topic_name, json.dumps(event)))