Unverified Commit de0c2c7c authored by Kevin Di Lallo's avatar Kevin Di Lallo Committed by GitHub
Browse files

Merge pull request #23 from pastorsx/sp_dev_sp29_bws

Bandwidth Sharing (Tier 1)
parents c7bb6e87 52e815d6
Loading
Loading
Loading
Loading
+4 −5
Original line number Diff line number Diff line
@@ -3,12 +3,11 @@ module github.com/InterDigitalInc/AdvantEDGE/go-apps/meep-tc-engine
go 1.12

require (
	github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-bw-sharing v0.0.0
	github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-ctrl-engine-model v0.0.0
	github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-logger v0.0.0
	github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-mg-manager-model v0.0.0
	github.com/KromDaniel/jonson v0.0.0-20180630143114-d2f9c3c389db // indirect
	github.com/KromDaniel/rejonson v0.0.0-20180822072824-00b5bcf2b351
	github.com/go-redis/redis v6.15.2+incompatible
	github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-redis v0.0.0
	github.com/gogo/protobuf v1.2.1 // indirect
	github.com/google/btree v1.0.0 // indirect
	github.com/google/gofuzz v1.0.0 // indirect
@@ -19,8 +18,6 @@ require (
	github.com/json-iterator/go v1.1.6 // indirect
	github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
	github.com/modern-go/reflect2 v1.0.1 // indirect
	github.com/onsi/ginkgo v1.8.0 // indirect
	github.com/onsi/gomega v1.5.0 // indirect
	github.com/peterbourgon/diskv v2.0.1+incompatible // indirect
	github.com/spf13/pflag v1.0.3 // indirect
	golang.org/x/crypto v0.0.0-20190411191339-88737f569e3a // indirect
@@ -36,7 +33,9 @@ require (
)

replace (
	github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-bw-sharing => ../../go-packages/meep-bw-sharing
	github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-ctrl-engine-model => ../../go-packages/meep-ctrl-engine-model
	github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-logger => ../../go-packages/meep-logger
	github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-mg-manager-model => ../../go-packages/meep-mg-manager-model
	github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-redis => ../../go-packages/meep-redis
)
+2 −13
Original line number Diff line number Diff line
cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
github.com/KromDaniel/jonson v0.0.0-20180630143114-d2f9c3c389db h1:Zkf5kwhxdW0xV7WM/crqIcOP5LCFGnAmumWSFAewJ74=
github.com/KromDaniel/jonson v0.0.0-20180630143114-d2f9c3c389db/go.mod h1:RU+6d0CNIRSp6yo1mXLIIrnFa/3LHhvcDVLVJyovptM=
github.com/KromDaniel/rejonson v0.0.0-20180822072824-00b5bcf2b351 h1:1u1XrfCBnY+GijnyU6O1k4odp5TnqZQTsp5v7+n/E4Y=
github.com/KromDaniel/rejonson v0.0.0-20180822072824-00b5bcf2b351/go.mod h1:HxwfbuElTuGf+/uKZfjJrCnv0BmmpkPJDI7gBwj1KkM=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I=
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
github.com/go-redis/redis v6.15.2+incompatible h1:9SpNVG76gr6InJGxoZ6IuuxaCOQwDAhzyXg+Bs+0Sb4=
github.com/go-redis/redis v6.15.2+incompatible/go.mod h1:NAIEuMOZ/fxfXJIrKDQDz8wamY7mA7PouImQ2Jvg6kA=
@@ -25,7 +22,6 @@ github.com/gorilla/mux v1.7.1 h1:Dw4jY2nghMMRsh1ol8dv1axHkDwMQK2DHerMNJsIpJU=
github.com/gorilla/mux v1.7.1/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs=
github.com/gregjones/httpcache v0.0.0-20190212212710-3befbb6ad0cc h1:f8eY6cV/x1x+HLjOp4r72s/31/V2aTUtg5oKRRPf8/Q=
github.com/gregjones/httpcache v0.0.0-20190212212710-3befbb6ad0cc/go.mod h1:FecbI9+v66THATjSRHfNgh1IVFe/9kFxbXtjV0ctIMA=
github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI=
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
github.com/json-iterator/go v1.1.6 h1:MrUvLMLTMxbqFJ9kzlvat/rYZqZnW3u4wkLzWTaFwKs=
github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU=
@@ -38,20 +34,17 @@ github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJ
github.com/modern-go/reflect2 v1.0.1 h1:9f412s+6RmYXLWZSEzVVgPGK7C2PphHj5RJrvfx9AWI=
github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/ginkgo v1.8.0 h1:VkHVNpR4iVnU8XQR6DBm8BqYjN7CRzw+xKUbVVbbW9w=
github.com/onsi/ginkgo v1.8.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/gomega v1.5.0 h1:izbySO9zDPmjJ8rDjLvkA2zJHIo+HkYXHnf7eN7SSyo=
github.com/onsi/ginkgo v1.10.1/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/gomega v1.5.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
github.com/onsi/gomega v1.7.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
github.com/peterbourgon/diskv v2.0.1+incompatible h1:UBdAOUP5p4RWqPBg048CAvpKN+vxiaj6gdUUzhl4XmI=
github.com/peterbourgon/diskv v2.0.1+incompatible/go.mod h1:uqqh8zWWbv1HBMNONnaR/tNboyR3/BZd58JJSHlUSCU=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/sirupsen/logrus v1.4.1 h1:GL2rEmy6nsikmW0r8opw9JIRScdMF5hA8cOYLH7In1k=
github.com/sirupsen/logrus v1.4.1/go.mod h1:ni0Sbl8bgC9z8RoU9G6nDWqqs/fq4eDPysMBDgk/93Q=
github.com/spf13/pflag v1.0.3 h1:zPAT6CGy6wXeQ7NtTnaTerfKOsV6V6F8agHXFiazDkg=
github.com/spf13/pflag v1.0.3/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4=
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.2.2 h1:bSDNvY7ZPG5RlJ8otE/7V6gMiyenm9RtJ7IUVIAoJ1w=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
golang.org/x/crypto v0.0.0-20190411191339-88737f569e3a h1:Igim7XhdOpBnWPuYJ70XcNpq8q3BCACtVgNfoJxOV7g=
golang.org/x/crypto v0.0.0-20190411191339-88737f569e3a/go.mod h1:WFFai1msRO1wXaEeE5yQxYXgSfI8pQAWXbQop6sCtWE=
@@ -62,7 +55,6 @@ golang.org/x/net v0.0.0-20190108225652-1e06a53dbb7e/go.mod h1:mL1N/T3taQHkDXs73r
golang.org/x/oauth2 v0.0.0-20190402181905-9f3314589c9a h1:tImsplftrFpALCYumobsd0K86vlAs/eXGFms2txfJfA=
golang.org/x/oauth2 v0.0.0-20190402181905-9f3314589c9a/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4 h1:YUO/7uOKsKeq9UokNS62b8FYywz3ker1l1vDZRCRefw=
golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
@@ -75,13 +67,10 @@ golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxb
golang.org/x/tools v0.0.0-20180221164845-07fd8470d635/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
google.golang.org/appengine v1.4.0 h1:/wp5JvzpHIxhs/dumFmF7BXTf3Z+dd4uXta4kVyO508=
google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/fsnotify.v1 v1.4.7 h1:xOHLXZwVvI9hhs+cLKq5+I5onOuwQLhQwiu63xxlHs4=
gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys=
gopkg.in/inf.v0 v0.9.1 h1:73M5CoZyi3ZLMOyDlQh031Cx6N9NDJ2Vvfl76EDAgDc=
gopkg.in/inf.v0 v0.9.1/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw=
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ=
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw=
gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw=
+0 −239
Original line number Diff line number Diff line
/*
 * Copyright (c) 2019  InterDigital Communications, Inc
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package server

import (
	"errors"
	"net"
	"reflect"
	"time"

	log "github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-logger"

	"github.com/KromDaniel/rejonson"
	"github.com/go-redis/redis"
)

var dbClient *rejonson.Client
var dbClientStarted = false

var pubsub *redis.PubSub

// DBConnect - Establish connection to DB
func DBConnect() error {
	if !dbClientStarted {
		err := openDB()
		if err != nil {
			return err
		}
	}
	return nil
}

func openDB() error {
	redisClient := redis.NewClient(&redis.Options{
		Addr:     "meep-redis-master:6379",
		Password: "", // no password set
		DB:       0,  // use default DB
	})
	rejonsonClient := rejonson.ExtendClient(redisClient)

	pong, err := rejonsonClient.Ping().Result()
	if pong == "" {
		log.Info("pong is null")
		return err
	}
	if err != nil {
		log.Info("Redis DB not accessible")
		return err
	}

	dbClientStarted = true
	dbClient = rejonsonClient

	log.Info("Redis DB opened and well!")
	return nil
}

// DBFlush - Empty DB
func DBFlush(module string) error {
	var cursor uint64
	var err error
	log.Debug("DBFlush module: ", module)

	// Find all module keys
	// Process in chunks of 50 matching entries to optimize processing speed & memory
	keyMatchStr := module + ":*"
	for {
		var keys []string
		keys, cursor, err = dbClient.Scan(cursor, keyMatchStr, 50).Result()
		if err != nil {
			log.Debug("ERROR: ", err)
			break
		}

		// Delete all matching entries
		if len(keys) > 0 {
			_, err = dbClient.Del(keys...).Result()
			if err != nil {
				log.Debug("Failed to retrieve entry fields")
				break
			}
		}

		// Stop searching if cursor is back at beginning
		if cursor == 0 {
			break
		}
	}

	return nil
}

// DBSetEntry - Update existing entry or create new entry if it does not exist
func DBSetEntry(key string, fields map[string]interface{}) error {
	// Update existing entry or create new entry if it does not exist
	_, err := dbClient.HMSet(key, fields).Result()
	if err != nil {
		return err
	}
	return nil
}

// DBRemoveEntry - Remove existing entries
func DBRemoveEntry(keys ...string) error {
	_, err := dbClient.Del(keys...).Result()
	if err != nil {
		return err
	}
	return nil
}

// DBForEachEntry - Search for matching keys and run handler for each entry
func DBForEachEntry(keyMatchStr string, entryHandler func(string, map[string]string, interface{}) error, userData interface{}) error {
	var cursor uint64
	var err error

	// Process in chunks of 50 matching entries to optimize processing speed & memory
	for {
		var keys []string
		keys, cursor, err = dbClient.Scan(cursor, keyMatchStr, 50).Result()
		if err != nil {
			log.Debug("ERROR: ", err)
			break
		}

		if len(keys) > 0 {
			for i := 0; i < len(keys); i++ {
				fields, err := dbClient.HGetAll(keys[i]).Result()
				if err != nil || fields == nil {
					log.Debug("Failed to retrieve entry fields")
					break
				}

				// Invoke handler to process entry
				err = entryHandler(keys[i], fields, userData)
				if err != nil {
					return err
				}
			}
		}

		// Stop searching if cursor is back at beginning
		if cursor == 0 {
			break
		}
	}
	return nil
}

// DBJsonGetEntry - Retrieve entry from DB
func DBJsonGetEntry(key string, path string) (string, error) {
	// Update existing entry or create new entry if it does not exist
	json, err := dbClient.JsonGet(key, path).Result()
	if err != nil {
		return "", err
	}
	return json, nil
}

// DBJsonSetEntry - Update existing entry or create new entry if it does not exist
func DBJsonSetEntry(key string, path string, json string) error {
	// Update existing entry or create new entry if it does not exist
	_, err := dbClient.JsonSet(key, path, json).Result()
	if err != nil {
		return err
	}
	return nil
}

// DBJsonDelEntry - Remove existing entry
func DBJsonDelEntry(key string, path string) error {
	_, err := dbClient.JsonDel(key, path).Result()
	if err != nil {
		return err
	}
	return nil
}

// Subscribe - Register as a listener for provided channels
func Subscribe(channels ...string) error {
	pubsub = dbClient.Subscribe(channels...)
	return nil
}

// Listen - Wait for subscribed events
func Listen(handler func(string, string)) error {

	// Make sure listener is subscribed to pubsub
	if pubsub == nil {
		return errors.New("Not subscribed to pubsub")
	}

	// Main listening loop
	for {
		// Wait for subscribed channel events, or timeout
		msg, err := pubsub.ReceiveTimeout(time.Second)
		if err != nil {
			if reflect.TypeOf(err) == reflect.TypeOf(&net.OpError{}) &&
				reflect.TypeOf(err.(*net.OpError).Err).String() == "*net.timeoutError" {
				// Timeout, ignore and wait for next event
				continue
			}
		}

		// Process published event
		switch m := msg.(type) {

		// Process Subscription
		case *redis.Subscription:
			log.Info("Subscription Message: ", m.Kind, " to channel ", m.Channel, ". Total subscriptions: ", m.Count)

		// Process received Message
		case *redis.Message:
			log.Info("MSG on ", m.Channel, ": ", m.Payload)
			handler(m.Channel, m.Payload)
		}
	}
}

// Publish - Publish message to channel
func Publish(channel string, message string) error {
	log.Info("Publish to channel: ", channel, " Message: ", message)
	_, err := dbClient.Publish(channel, message).Result()
	return err
}
+215 −49

File changed.

Preview size limit exceeded, changes collapsed.

go-apps/meep-tc-sidecar/db.go

deleted100755 → 0
+0 −180
Original line number Diff line number Diff line
/*
 * Copyright (c) 2019  InterDigital Communications, Inc
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package main

import (
	"errors"
	"net"
	"reflect"
	"time"

	log "github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-logger"

	"github.com/go-redis/redis"
)

var dbClient *redis.Client
var dbClientStarted = false

var pubsub *redis.PubSub

// DBConnect - Establish connection to DB
func DBConnect() error {
	if !dbClientStarted {
		err := openDB()
		if err != nil {
			return err
		}
	}
	return nil
}

func openDB() error {

	db := redis.NewClient(&redis.Options{
		Addr:     "meep-redis-master:6379",
		Password: "", // no password set
		DB:       0,  // use default DB
	})

	pong, err := db.Ping().Result()

	if pong == "" {
		log.Info("pong is null")
		return err
	}

	if err != nil {
		log.Info("Redis DB not accessible")
		return err
	}
	dbClientStarted = true
	dbClient = db
	return nil
}

// DBForEachEntry - Search for matching keys and run handler for each entry
func DBForEachEntry(keyMatchStr string, entryHandler func(string, map[string]string, interface{}) error, userData interface{}) error {
	var cursor uint64
	var err error

	// Process in chunks of 50 matching entries to optimize processing speed & memory
	for {
		var keys []string
		keys, cursor, err = dbClient.Scan(cursor, keyMatchStr, 50).Result()
		if err != nil {
			log.Debug("ERROR: ", err)
			break
		}

		if len(keys) > 0 {
			for i := 0; i < len(keys); i++ {
				fields, err := dbClient.HGetAll(keys[i]).Result()
				if err != nil || fields == nil {
					log.Debug("Failed to retrieve entry fields")
					break
				}

				// Invoke handler to process entry
				err = entryHandler(keys[i], fields, userData)
				if err != nil {
					return err
				}
			}
		}

		// Stop searching if cursor is back at beginning
		if cursor == 0 {
			break
		}
	}
	return nil
}

// DBEntryExists - true if entry exists; false otherwise
func DBEntryExists(key string) bool {
	value := dbClient.Exists(key).Val()
	return value != 0
}

// // DBAddEntry - Add entry to DB
// func DBAddEntry(key string, fields map[string]string) error {
// 	m := convertMapStrStrToMapStrInt(fields)
// 	_, err := dbClient.HMSet(key, m).Result()
// 	if err != nil {
// 		return err
// 	}
// 	return nil
// }

// func convertMapStrStrToMapStrInt(src map[string]string) (dst map[string]interface{}) {
// 	dst = make(map[string]interface{})
// 	for key, value := range src {
// 		dst[key] = value
// 	}
// 	return dst
// }

// // DBRemoveEntry - Remove entry from DB
// func DBRemoveEntry(key string) error {
// 	_, err := dbClient.Del(key).Result()
// 	if err != nil {
// 		return err
// 	}
// 	return nil
// }

// Subscribe - Register as a listener for provided channels
func Subscribe(channels ...string) error {
	pubsub = dbClient.Subscribe(channels...)
	return nil
}

// Listen - Wait for subscribed events
func Listen(handler func(string, string)) error {

	// Make sure listener is subscribed to pubsub
	if pubsub == nil {
		return errors.New("Not subscribed to pubsub")
	}

	// Main listening loop
	for {
		// Wait for subscribed channel events, or timeout
		msg, err := pubsub.ReceiveTimeout(time.Second)
		if err != nil {
			if reflect.TypeOf(err) == reflect.TypeOf(&net.OpError{}) &&
				reflect.TypeOf(err.(*net.OpError).Err).String() == "*net.timeoutError" {
				// Timeout, ignore and wait for next event
				continue
			}
		}

		// Process published event
		switch m := msg.(type) {

		// Process Subscription
		case *redis.Subscription:
			log.Info("Subscription Message: ", m.Kind, " to channel ", m.Channel, ". Total subscriptions: ", m.Count)

		// Process received Message
		case *redis.Message:
			log.Info("MSG on ", m.Channel, ": ", m.Payload)
			handler(m.Channel, m.Payload)
		}
	}
}
Loading