Skip to content
mq.go 7.8 KiB
Newer Older
/*
 * Copyright (c) 2020  InterDigital Communications, Inc
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package mq

import (
	"encoding/json"
	"errors"
	"time"

	log "github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-logger"
	redis "github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-redis"
)

type Msg struct {
	SrcName      string            `json:"src,omitempty"`
	SrcNamespace string            `json:"src-ns,omitempty"`
	DstName      string            `json:"dst,omitempty"`
	DstNamespace string            `json:"dst-ns,omitempty"`
	Scope        string            `json:"scope,omitempty"`
	Message      Message           `json:"msg,omitempty"`
	Payload      map[string]string `json:"payload,omitempty"`
}

type MsgHandler struct {
	Handler  func(msg *Msg, userData interface{})
	UserData interface{}
}

type MsgQueue struct {
	name            string
	moduleName      string
	moduleNamespace string
	rc              *redis.Connector
	handlers        map[int]MsgHandler
	counter         int
}

// Messages
type Message string

const (
	// Sandbox Control
	MsgSandboxCreate  Message = "SANDBOX-CREATE"
	MsgSandboxDestroy Message = "SANDBOX-DESTROY"

	// Scenario Management
	MsgScenarioActivate  Message = "SCENARIO-ACTIVATE"
	MsgScenarioUpdate    Message = "SCENARIO-UPDATE"
	MsgScenarioTerminate Message = "SCENARIO-TERMINATE"
	// PDU Session Management
	MsgPduSessionCreated    Message = "PDU-SESSION-CREATED"
	MsgPduSessionTerminated Message = "PDU-SESSION-TERMINATED"

	// Mobility Groups
	MsgMgLbRulesUpdate Message = "MG-LB-RULES-UPDATE"

	// Traffic Control
	MsgTcLbRulesUpdate  Message = "TC-LB-RULES-UPDATE"
	MsgTcNetRulesUpdate Message = "TC-NET-RULES-UPDATE"

	// GIS Engine
	MsgGeUpdate Message = "GIS-ENGINE-UPDATE"

	// App Enablement
	MsgMecSvcUpdate Message = "MEC-SVC-UPDATE"

	// Watchdog
	MsgPing Message = "PING"
	MsgPong Message = "PONG"
const globalQueueName = "mq:global"
const localQueueNamePrefix = "mq:"
const TargetAll = "all"
const redisTable = 0
// MsgQueue - Creates and initialize a Message Queue instance
func NewMsgQueue(name string, moduleName string, moduleNamespace string, addr string) (*MsgQueue, error) {
	// Validate input params
	if name == "" {
		err = errors.New("Invalid name")
		log.Error(err.Error())
		return nil, err
	}
	if moduleName == "" {
		err = errors.New("Invalid name or namespace")
		log.Error(err.Error())
		return nil, err
	}
	if moduleNamespace == "" {
		err = errors.New("Invalid module namespace name or namespace")
		log.Error(err.Error())
		return nil, err
	}
	// Create new Message Queue
	log.Info("Creating new MsgQueue")
	mq := new(MsgQueue)
	mq.name = name
	mq.moduleName = moduleName
	mq.moduleNamespace = moduleNamespace
	mq.counter = 0
	mq.handlers = make(map[int]MsgHandler)

	// Connect to Redis DB
	mq.rc, err = redis.NewConnector(addr, redisTable)
	if err != nil {
		log.Error("Failed connection to Message Queue redis DB. Error: ", err)
		return nil, err
	}
	log.Info("Connected to Message Queue Redis DB")
	return mq, nil
}

// CreateMsg - Create a new message
func (mq *MsgQueue) CreateMsg(message Message, dstName string, dstNamespace string) *Msg {
	msg := new(Msg)
	msg.SrcName = mq.moduleName
	msg.SrcNamespace = mq.moduleNamespace
	msg.DstName = dstName
	msg.DstNamespace = dstNamespace
	msg.Scope = mq.name
	msg.Message = message
	msg.Payload = make(map[string]string)
	return msg
}

// SendMsg - Send the provided message
func (mq *MsgQueue) SendMsg(msg *Msg) error {
	// Validate message format
	err := mq.validateMsg(msg)
	if err != nil {
		log.Error("Message validation failed with err: ", err.Error())
		return err
	}
	// Validate message source
	if msg.SrcName != mq.moduleName || msg.SrcNamespace != mq.moduleNamespace {
		err = errors.New("Message source not equal to Msg Queue module name/namespace")
		log.Error(err.Error())
		return err
	}
	log.Trace("Sending message: ", PrintMsg(msg))

	// Marshal message
	jsonMsg, err := json.Marshal(msg)
	if err != nil {
		log.Error("Failed to marshal message with err: ", err.Error())
		return err
	}

	// Publish message on queue
	err = mq.rc.Publish(mq.name, string(jsonMsg))
	if err != nil {
		log.Error("Failed to publish message on queue ", mq.name, " with err: ", err.Error())
		return err
// Register - Add a message handler
func (mq *MsgQueue) RegisterHandler(handler MsgHandler) (id int, err error) {

	// Validate handler
	if handler.Handler == nil {
		err = errors.New("Invalid handler")
	// Add Handler
	mq.counter++
	mq.handlers[mq.counter] = handler
	// Start listening for messages if first handler
	if len(mq.handlers) == 1 {
		// Subscribe to channels
		err = mq.rc.Subscribe([]string{mq.name}...)
		if err != nil {
			log.Error("Failed to subscribe to channels with err: ", err.Error())
			delete(mq.handlers, mq.counter)
			return
		// Start goroutine to listen on subscribed channels
		go func() {
			err := mq.rc.Listen(mq.eventHandler)
			if err != nil {
				log.Error("Error listening on subscribed channels: ", err.Error())
			}
			log.Info("Exiting listener goroutine")
		}()

		// Give the Listener time to create the stop channel
		time.Sleep(100 * time.Millisecond)
	}
	// Return handler ID
	return mq.counter, nil
// Unregister - Remove a message handler
func (mq *MsgQueue) UnregisterHandler(id int) {
	// lock.Lock()
	// defer lock.Unlock()

	// Remove handler
	delete(mq.handlers, id)

	// Stop listening if no more handlers
	if len(mq.handlers) == 0 {
		mq.rc.StopListen()
		_ = mq.rc.Unsubscribe([]string{mq.name}...)
}

// Event handler
func (mq *MsgQueue) eventHandler(channel string, payload string) {
	log.Trace("Received message on channel[", channel, "]")

	// Unmarshal message
	msg := new(Msg)
	err := json.Unmarshal([]byte(payload), msg)
	if err != nil {
		log.Error("Failed to unmarshal message")
		return
	}

	// Validate message format
	err = mq.validateMsg(msg)
	if err != nil {
		log.Error("Message validation failed with err: ", err.Error())
		return
	}
	// Validate message destination
	if (msg.DstName != TargetAll && msg.DstName != mq.moduleName) ||
		(msg.DstNamespace != TargetAll && msg.DstNamespace != mq.moduleNamespace) {
		log.Trace("Ignoring message with other destination")
	log.Trace("Received message: ", PrintMsg(msg))

	// Invoke registered handlers
	for _, handler := range mq.handlers {
		handler.Handler(msg, handler.UserData)
	}
}

// Validate message format
func (mq *MsgQueue) validateMsg(msg *Msg) error {
	if msg == nil {
		return errors.New("nil message")
	}
	if msg.SrcName == "" || msg.SrcNamespace == "" {
		return errors.New("Invalid source")
	}
	if msg.DstName == "" || msg.DstNamespace == "" {
		return errors.New("Invalid destination")
	}
	if msg.Scope != mq.name {
		return errors.New("Invalid scope")
	}
	if msg.Message == "" {
		return errors.New("Invalid message type")
	}
	return nil
}

// GetGlobalName - Get global queue name
func GetGlobalName() string {
	return globalQueueName
}

// GetLocalName - Get local namespace-specific queue name
func GetLocalName(namespace string) string {
	return localQueueNamePrefix + namespace
// Convert message to string
func PrintMsg(msg *Msg) string {
	msgStr := "Message[" + string(msg.Message) +
		"] Src[" + msg.SrcNamespace + ":" + msg.SrcName +
		"] Dst[" + msg.DstNamespace + ":" + msg.DstName +
		"] Scope[" + msg.Scope +
		"] Payload[" + fmt.Sprintf("%+v", msg.Payload) + "]"

	return msgStr
}