Commit 7cf030c4 authored by Kevin Di Lallo's avatar Kevin Di Lallo
Browse files

meep-mq initial implemenation

parent 1cd857cb
Loading
Loading
Loading
Loading
+13 −0
Original line number Diff line number Diff line
module github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-mq

go 1.12

require (
	github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-logger v0.0.0
	github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-redis v0.0.0
)

replace (
	github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-logger => ../../go-packages/meep-logger
	github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-redis => ../../go-packages/meep-redis
)
+28 −0
Original line number Diff line number Diff line
github.com/KromDaniel/jonson v0.0.0-20180630143114-d2f9c3c389db/go.mod h1:RU+6d0CNIRSp6yo1mXLIIrnFa/3LHhvcDVLVJyovptM=
github.com/KromDaniel/rejonson v0.0.0-20180822072824-00b5bcf2b351 h1:1u1XrfCBnY+GijnyU6O1k4odp5TnqZQTsp5v7+n/E4Y=
github.com/KromDaniel/rejonson v0.0.0-20180822072824-00b5bcf2b351/go.mod h1:HxwfbuElTuGf+/uKZfjJrCnv0BmmpkPJDI7gBwj1KkM=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
github.com/go-redis/redis v6.15.2+incompatible h1:9SpNVG76gr6InJGxoZ6IuuxaCOQwDAhzyXg+Bs+0Sb4=
github.com/go-redis/redis v6.15.2+incompatible/go.mod h1:NAIEuMOZ/fxfXJIrKDQDz8wamY7mA7PouImQ2Jvg6kA=
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/ginkgo v1.10.1/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/gomega v1.7.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/sirupsen/logrus v1.4.1 h1:GL2rEmy6nsikmW0r8opw9JIRScdMF5hA8cOYLH7In1k=
github.com/sirupsen/logrus v1.4.1/go.mod h1:ni0Sbl8bgC9z8RoU9G6nDWqqs/fq4eDPysMBDgk/93Q=
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e h1:o3PsSEY8E4eXWkXrIP9YJALUkVZqzHJT5DOasTyn8Vs=
golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys=
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw=
gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
+273 −0
Original line number Diff line number Diff line
/*
 * Copyright (c) 2020  InterDigital Communications, Inc
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package mq

import (
	"encoding/json"
	"errors"
	"time"

	log "github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-logger"
	redis "github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-redis"
)

type Msg struct {
	SrcName      string            `json:"src,omitempty"`
	SrcNamespace string            `json:"src-ns,omitempty"`
	DstName      string            `json:"dst,omitempty"`
	DstNamespace string            `json:"dst-ns,omitempty"`
	Scope        string            `json:"scope,omitempty"`
	Message      Message           `json:"msg,omitempty"`
	Payload      map[string]string `json:"payload,omitempty"`
}

type MsgHandler func(msg *Msg)

type MsgQueue struct {
	name      string
	namespace string
	global    string
	local     string
	rc        *redis.Connector
	handler   MsgHandler
}

// Messages
type Message string

const (
	MsgSandboxCreate     Message = "SANDBOX-CREATE"
	MsgSandboxDestroy    Message = "SANDBOX-DESTROY"
	MsgScenarioActivate  Message = "SCENARIO-ACTIVATE"
	MsgScenarioTerminate Message = "SCENARIO-TERMINATE"
)

// Scopes
const (
	ScopeLocal  = "local"
	ScopeGlobal = "global"
	ScopeAll    = "all"
)

const TargetAll = "all"
const redisTable = 0
const globalQueueName = "mq:global"

// MsgQueue - Creates and initialize a Message Queue instance
func NewMsgQueue(name string, namespace string, redisAddr string) (mq *MsgQueue, err error) {
	// Validate name & namespace
	if name == "" || namespace == "" {
		err = errors.New("Invalid name or namespace")
		log.Error(err.Error())
		return nil, err
	}

	// Create new Message Queue instance
	mq = new(MsgQueue)
	mq.name = name
	mq.namespace = namespace
	mq.global = globalQueueName
	mq.local = "mq:local-" + namespace
	mq.handler = nil

	// Connect to Redis DB
	mq.rc, err = redis.NewConnector(redisAddr, redisTable)
	if err != nil {
		log.Error("Failed connection to Message Queue redis DB. Error: ", err)
		return nil, err
	}
	log.Info("Connected to Message Queue Redis DB")

	return mq, nil
}

// CreateMsg - Create a new message
func (mq *MsgQueue) CreateMsg(dstName string, dstNamespace string, scope string, message Message) *Msg {
	msg := new(Msg)
	msg.SrcName = mq.name
	msg.SrcNamespace = mq.namespace
	msg.DstName = dstName
	msg.DstNamespace = dstNamespace
	msg.Scope = scope
	msg.Message = message
	msg.Payload = make(map[string]string)
	return msg
}

// SendMsg - Send the provided message
func (mq *MsgQueue) SendMsg(msg *Msg) error {
	// Validate message format
	err := validateMsg(msg)
	if err != nil {
		log.Error("Message validation failed with err: ", err.Error())
		return err
	}
	// Validate message source
	if msg.SrcName != mq.name || msg.SrcNamespace != mq.namespace {
		err = errors.New("Message source not equal to Msg Queue name/namespace")
		log.Error(err.Error())
		return err
	}
	log.Debug("Sending message: ", msgToStr(msg))

	// Marshal message
	jsonMsg, err := json.Marshal(msg)
	if err != nil {
		log.Error("Failed to marshal message with err: ", err.Error())
		return err
	}

	// Publish on local queue if scope permits
	if msg.Scope == ScopeLocal || msg.Scope == ScopeAll {
		err = mq.rc.Publish(mq.local, string(jsonMsg))
		if err != nil {
			log.Error("Failed to publish message on local queue ", mq.local, " with err: ", err.Error())
			return err
		}
	}
	// Publish on global queue if scope permits
	if msg.Scope == ScopeGlobal || msg.Scope == ScopeAll {
		err = mq.rc.Publish(mq.global, string(jsonMsg))
		if err != nil {
			log.Error("Failed to publish message on global queue ", mq.global, " with err: ", err.Error())
			return err
		}
	}

	return nil
}

// Listen - Register a message handler and listen for messages
func (mq *MsgQueue) Listen(handler MsgHandler, scope string) (err error) {
	// Validate handler
	if handler == nil {
		err = errors.New("Invalid handler")
		log.Error(err.Error())
		return err
	}
	// Make sure we are not already listening
	if mq.handler != nil {
		err = errors.New("MsgQueue handler already registered")
		log.Error(err.Error())
		return err
	}

	// Get list of channels
	var channels []string
	switch scope {
	case ScopeLocal:
		channels = []string{mq.local}
	case ScopeGlobal:
		channels = []string{mq.global}
	case ScopeAll:
		channels = []string{mq.local, mq.global}
	default:
		err = errors.New("Invalid scope")
		log.Error(err.Error())
		return err
	}

	// Subscribe to channels
	err = mq.rc.Subscribe(channels...)
	if err != nil {
		log.Error("Failed to subscribe to channels with err: ", err.Error())
		return err
	}

	// Store handler
	mq.handler = handler

	// Start goroutine to listen on subscribed channels
	go func() {
		err := mq.rc.Listen(mq.eventHandler)
		if err != nil {
			log.Error("Error listening on subscribed channels: ", err.Error())
		}
		log.Info("Exiting listener goroutine")
	}()

	// Give the Listener time to create the stop channel
	time.Sleep(100 * time.Millisecond)

	return nil
}

// StopListen - Stop the listening goroutine
func (mq *MsgQueue) StopListen() {
	mq.rc.StopListen()
	_ = mq.rc.Unsubscribe([]string{mq.local, mq.global}...)
	mq.handler = nil
}

// Event handler
func (mq *MsgQueue) eventHandler(channel string, payload string) {
	log.Debug("Received message on channel[", channel, "]")

	// Unmarshal message
	msg := new(Msg)
	err := json.Unmarshal([]byte(payload), msg)
	if err != nil {
		log.Error("Failed to unmarshal message")
		return
	}

	// Validate message format
	err = validateMsg(msg)
	if err != nil {
		log.Error("Message validation failed with err: ", err.Error())
		return
	}
	// Validate message destination
	if (msg.DstName != TargetAll && msg.DstName != mq.name) ||
		(msg.DstNamespace != TargetAll && msg.DstNamespace != mq.namespace) {
		log.Debug("Destination does not match Msg Queue name... ignoring message")
		return
	}
	log.Debug("Received message: ", msgToStr(msg))

	// Invoke registered handler
	if mq.handler != nil {
		mq.handler(msg)
	}
}

// Validate message format
func validateMsg(msg *Msg) error {
	if msg == nil {
		return errors.New("nil message")
	}
	if msg.SrcName == "" || msg.SrcNamespace == "" {
		return errors.New("Invalid source")
	}
	if msg.DstName == "" || msg.DstNamespace == "" {
		return errors.New("Invalid destination")
	}
	if msg.Scope != ScopeLocal && msg.Scope != ScopeGlobal && msg.Scope != ScopeAll {
		return errors.New("Invalid scope")
	}
	if msg.Message == "" {
		return errors.New("Invalid message type")
	}
	return nil
}

// Convert message to string
func msgToStr(msg *Msg) string {
	msgStr := "Message[" + string(msg.Message) + "] Src[" + msg.SrcNamespace + ":" + msg.SrcName + "] Dst[" +
		msg.DstNamespace + ":" + msg.DstName + "] Scope[" + msg.Scope + "] Payload[]"
	return msgStr
}
+340 −0
Original line number Diff line number Diff line
/*
 * Copyright (c) 2019  InterDigital Communications, Inc
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package mq

import (
	"fmt"
	"testing"
	"time"

	log "github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-logger"
)

const mqRedisAddr string = "localhost:30380"
const mqName string = "name"
const mqNamespace string = "sbox-1"

var rxMsg *Msg = nil
var rxMsgUpdated bool = false

func TestMsgQueueNew(t *testing.T) {
	fmt.Println("--- ", t.Name())
	log.MeepTextLogInit(t.Name())
	var mq *MsgQueue
	var err error

	fmt.Println("Invalid Message Queue")
	mq, err = NewMsgQueue("", mqNamespace, mqRedisAddr)
	if err == nil || mq != nil {
		t.Fatalf("Message Queue creation should have failed")
	}
	mq, err = NewMsgQueue(mqName, "", mqRedisAddr)
	if err == nil || mq != nil {
		t.Fatalf("Message Queue creation should have failed")
	}

	fmt.Println("Create Message Queue")
	mq, err = NewMsgQueue(mqName, mqNamespace, mqRedisAddr)
	if err != nil {
		t.Fatalf("Unable to create Message Queue")
	}
	if mq.name != mqName || mq.namespace != mqNamespace {
		t.Fatalf("Invalid Message Queue")
	}
}

func TestMsgQueueSendMsg(t *testing.T) {
	fmt.Println("--- ", t.Name())
	log.MeepTextLogInit(t.Name())
	var mq *MsgQueue
	var msg *Msg
	var err error

	fmt.Println("Create Message Queue")
	mq, err = NewMsgQueue(mqName, mqNamespace, mqRedisAddr)
	if err != nil {
		t.Fatalf("Unable to create Message Queue")
	}

	fmt.Println("Send Message with invalid format")
	err = mq.SendMsg(nil)
	if err == nil {
		t.Fatalf("SendMsg should have failed")
	}
	msg = mq.CreateMsg("", "destination-ns", ScopeLocal, "msg-type")
	err = mq.SendMsg(msg)
	if err == nil {
		t.Fatalf("SendMsg should have failed")
	}
	msg = mq.CreateMsg("destination", "", ScopeLocal, "msg-type")
	err = mq.SendMsg(msg)
	if err == nil {
		t.Fatalf("SendMsg should have failed")
	}
	msg = mq.CreateMsg("destination", "destination-ns", "invalid", "msg-type")
	err = mq.SendMsg(msg)
	if err == nil {
		t.Fatalf("SendMsg should have failed")
	}
	msg = mq.CreateMsg("destination", "destination-ns", ScopeLocal, "")
	err = mq.SendMsg(msg)
	if err == nil {
		t.Fatalf("SendMsg should have failed")
	}

	fmt.Println("Send valid Message")
	msg = mq.CreateMsg("destination", "destination-ns", ScopeLocal, "msg-type")
	err = mq.SendMsg(msg)
	if err != nil {
		t.Fatalf("Unable to send message")
	}
}

func TestMsgQueueListen(t *testing.T) {
	fmt.Println("--- ", t.Name())
	log.MeepTextLogInit(t.Name())
	var mq *MsgQueue
	var msg *Msg
	var err error

	fmt.Println("Create Message Queue")
	mq, err = NewMsgQueue(mqName, mqNamespace, mqRedisAddr)
	if err != nil {
		t.Fatalf("Unable to create Message Queue")
	}

	fmt.Println("Invalid listen")
	err = mq.Listen(nil, ScopeLocal)
	if err == nil {
		t.Fatalf("Listen should have failed")
	}
	err = mq.Listen(msgHandler, "")
	if err == nil {
		t.Fatalf("Listen should have failed")
	}
	err = mq.Listen(msgHandler, ScopeLocal)
	if err != nil {
		t.Fatalf("Unable to register listener")
	}
	err = mq.Listen(msgHandler, ScopeLocal)
	if err == nil {
		t.Fatalf("Listen should have failed")
	}
	mq.StopListen()

	// SCOPE LOCAL
	fmt.Println("Register message handler for local messages only")
	err = mq.Listen(msgHandler, ScopeLocal)
	if err != nil {
		t.Fatalf("Unable to register listener")
	}
	msg = mq.CreateMsg(mqName, mqNamespace, ScopeGlobal, "msg-type")
	err = mq.SendMsg(msg)
	if err != nil {
		t.Fatalf("Unable to send message")
	}
	if !validateRxMsg(nil, false) {
		t.Fatalf("Invalid Rx Message")
	}
	msg = mq.CreateMsg("invalid", mqNamespace, ScopeLocal, "msg-type")
	err = mq.SendMsg(msg)
	if err != nil {
		t.Fatalf("Unable to send message")
	}
	if !validateRxMsg(nil, false) {
		t.Fatalf("Invalid Rx Message")
	}
	msg = mq.CreateMsg(mqName, "invalid", ScopeLocal, "msg-type")
	err = mq.SendMsg(msg)
	if err != nil {
		t.Fatalf("Unable to send message")
	}
	if !validateRxMsg(nil, false) {
		t.Fatalf("Invalid Rx Message")
	}
	msg = mq.CreateMsg(mqName, mqNamespace, ScopeLocal, "msg-type")
	err = mq.SendMsg(msg)
	if err != nil {
		t.Fatalf("Unable to send message")
	}
	if !validateRxMsg(msg, true) {
		t.Fatalf("Invalid Rx Message")
	}
	msg = mq.CreateMsg(mqName, mqNamespace, ScopeLocal, "msg-type-2")
	err = mq.SendMsg(msg)
	if err != nil {
		t.Fatalf("Unable to send message")
	}
	if !validateRxMsg(msg, true) {
		t.Fatalf("Invalid Rx Message")
	}
	mq.StopListen()

	// SCOPE GLOBAL
	fmt.Println("Register message handler for global messages only")
	err = mq.Listen(msgHandler, ScopeGlobal)
	if err != nil {
		t.Fatalf("Unable to register listener")
	}
	msg = mq.CreateMsg(mqName, mqNamespace, ScopeLocal, "msg-type")
	err = mq.SendMsg(msg)
	if err != nil {
		t.Fatalf("Unable to send message")
	}
	if !validateRxMsg(nil, false) {
		t.Fatalf("Invalid Rx Message")
	}
	msg = mq.CreateMsg("invalid", mqNamespace, ScopeGlobal, "msg-type")
	err = mq.SendMsg(msg)
	if err != nil {
		t.Fatalf("Unable to send message")
	}
	if !validateRxMsg(nil, false) {
		t.Fatalf("Invalid Rx Message")
	}
	msg = mq.CreateMsg(mqName, "invalid", ScopeGlobal, "msg-type")
	err = mq.SendMsg(msg)
	if err != nil {
		t.Fatalf("Unable to send message")
	}
	if !validateRxMsg(nil, false) {
		t.Fatalf("Invalid Rx Message")
	}
	msg = mq.CreateMsg(mqName, mqNamespace, ScopeGlobal, "msg-type")
	err = mq.SendMsg(msg)
	if err != nil {
		t.Fatalf("Unable to send message")
	}
	if !validateRxMsg(msg, true) {
		t.Fatalf("Invalid Rx Message")
	}
	msg = mq.CreateMsg(mqName, mqNamespace, ScopeGlobal, "msg-type-2")
	err = mq.SendMsg(msg)
	if err != nil {
		t.Fatalf("Unable to send message")
	}
	if !validateRxMsg(msg, true) {
		t.Fatalf("Invalid Rx Message")
	}
	mq.StopListen()

	// SCOPE ALL
	fmt.Println("Register message handler for local & global messages")
	err = mq.Listen(msgHandler, ScopeAll)
	if err != nil {
		t.Fatalf("Unable to register listener")
	}
	msg = mq.CreateMsg("invalid", mqNamespace, ScopeLocal, "msg-type")
	err = mq.SendMsg(msg)
	if err != nil {
		t.Fatalf("Unable to send message")
	}
	if !validateRxMsg(nil, false) {
		t.Fatalf("Invalid Rx Message")
	}
	msg = mq.CreateMsg(mqName, "invalid", ScopeLocal, "msg-type")
	err = mq.SendMsg(msg)
	if err != nil {
		t.Fatalf("Unable to send message")
	}
	if !validateRxMsg(nil, false) {
		t.Fatalf("Invalid Rx Message")
	}
	msg = mq.CreateMsg("invalid", mqNamespace, ScopeGlobal, "msg-type")
	err = mq.SendMsg(msg)
	if err != nil {
		t.Fatalf("Unable to send message")
	}
	if !validateRxMsg(nil, false) {
		t.Fatalf("Invalid Rx Message")
	}
	msg = mq.CreateMsg(mqName, "invalid", ScopeGlobal, "msg-type")
	err = mq.SendMsg(msg)
	if err != nil {
		t.Fatalf("Unable to send message")
	}
	if !validateRxMsg(nil, false) {
		t.Fatalf("Invalid Rx Message")
	}
	msg = mq.CreateMsg(mqName, mqNamespace, ScopeLocal, "msg-type")
	err = mq.SendMsg(msg)
	if err != nil {
		t.Fatalf("Unable to send message")
	}
	if !validateRxMsg(msg, true) {
		t.Fatalf("Invalid Rx Message")
	}
	msg = mq.CreateMsg(mqName, mqNamespace, ScopeLocal, "msg-type-2")
	err = mq.SendMsg(msg)
	if err != nil {
		t.Fatalf("Unable to send message")
	}
	if !validateRxMsg(msg, true) {
		t.Fatalf("Invalid Rx Message")
	}
	msg = mq.CreateMsg(mqName, mqNamespace, ScopeGlobal, "msg-type")
	err = mq.SendMsg(msg)
	if err != nil {
		t.Fatalf("Unable to send message")
	}
	if !validateRxMsg(msg, true) {
		t.Fatalf("Invalid Rx Message")
	}
	msg = mq.CreateMsg(mqName, mqNamespace, ScopeGlobal, "msg-type-2")
	err = mq.SendMsg(msg)
	if err != nil {
		t.Fatalf("Unable to send message")
	}
	if !validateRxMsg(msg, true) {
		t.Fatalf("Invalid Rx Message")
	}
	mq.StopListen()
}

var msgHandler MsgHandler = func(msg *Msg) {
	rxMsgUpdated = true
	rxMsg = msg
}

func validateRxMsg(msg *Msg, updated bool) bool {
	// Give time for the message to arrive
	time.Sleep(50 * time.Millisecond)

	// Make sure Message received if expected
	if updated && !rxMsgUpdated || !updated && rxMsgUpdated {
		return false
	}
	rxMsgUpdated = false

	// Validate message contents
	if msg != nil {
		if rxMsg == nil ||
			msg.SrcName != rxMsg.SrcName ||
			msg.SrcNamespace != rxMsg.SrcNamespace ||
			msg.DstName != rxMsg.DstName ||
			msg.DstNamespace != rxMsg.DstNamespace ||
			msg.Scope != rxMsg.Scope ||
			msg.Message != rxMsg.Message {
			return false
		}
		rxMsg = nil
	} else if rxMsg != nil {
		return false
	}

	return true
}
+1 −1
Original line number Diff line number Diff line
@@ -61,7 +61,7 @@ func NewConnector(addr string, table int) (rc *Connector, err error) {

func (rc *Connector) connectDB(addr string, table int) error {
	if addr == "" {
		rc.addr = "meep-redis-master:6379"
		rc.addr = "meep-redis-master.default.svc.cluster.local:6379"
	} else {
		rc.addr = addr
	}