Loading go-packages/meep-logger/logger.go +6 −0 Original line number Diff line number Diff line Loading @@ -15,6 +15,12 @@ import ( var componentName string func MeepTextLogInit(name string) { log.SetFormatter(&log.TextFormatter{}) log.SetLevel(log.DebugLevel) componentName = name } func MeepJSONLogInit(name string) { log.SetFormatter(&log.JSONFormatter{}) log.SetLevel(log.DebugLevel) Loading go-packages/meep-redis/db.go +105 −59 Original line number Diff line number Diff line Loading @@ -11,8 +11,7 @@ package redisdb import ( "errors" "net" "reflect" "strings" "time" log "github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-logger" Loading @@ -26,47 +25,51 @@ var dbClientStarted = false var pubsub *redis.PubSub const ModuleCtrlEngine string = "ctrl-engine" const TypeActive string = "active" const ChannelCtrlActive string = ModuleCtrlEngine + "-" + TypeActive // Connector - Implements a Redis connector type Connector struct { addr string connected bool client *rejonson.Client pubsub *redis.PubSub isListening bool doneListening chan bool } // DBConnect - Establish connection to DB func DBConnect(addr string) error { if !dbClientStarted { err := openDB(addr) // NewConnector - Creates and initialize a Redis connector func NewConnector(addr string) (rc *Connector, err error) { rc = new(Connector) err = rc.connectDB(addr) if err != nil { return err } return nil, err } return nil return rc, nil } func openDB(addr string) error { func (rc *Connector) connectDB(addr string) error { if addr == "" { addr = "meep-redis-master:6379" rc.addr = "meep-redis-master:6379" } else { rc.addr = addr } redisClient := redis.NewClient(&redis.Options{ log.Debug("Redis Connector connecting to ", rc.addr) Addr: addr, redisClient := redis.NewClient(&redis.Options{ Addr: rc.addr, Password: "", // no password set DB: 0, // use default DB }) rejonsonClient := rejonson.ExtendClient(redisClient) rc.client = rejonson.ExtendClient(redisClient) pong, err := rejonsonClient.Ping().Result() pong, err := rc.client.Ping().Result() if pong == "" { log.Info("pong is null") if pong == "" || err != nil { log.Error("Redis Connector unable to connect ", rc.addr) return err } if err != nil { log.Info("Redis DB not accessible") return err } dbClientStarted = true dbClient = rejonsonClient log.Info("Redis DB opened and well!") rc.connected = true log.Info("Redis Connector connected to ", rc.addr) return nil } Loading Loading @@ -162,10 +165,13 @@ func openDB(addr string) error { // return nil // } // DBJsonGetEntry - Retrieve entry from DB func DBJsonGetEntry(key string, path string) (string, error) { // JSONGetEntry - Retrieve entry from DB func (rc *Connector) JSONGetEntry(key string, path string) (string, error) { if !rc.connected { return "", errors.New("Redis Connector is disconnected (JSONGetEntry)") } // Update existing entry or create new entry if it does not exist json, err := dbClient.JsonGet(key, path).Result() json, err := rc.client.JsonGet(key, path).Result() if err != nil { return "", err } Loading @@ -192,49 +198,89 @@ func DBJsonGetEntry(key string, path string) (string, error) { // } // Subscribe - Register as a listener for provided channels func Subscribe(channels ...string) error { pubsub = dbClient.Subscribe(channels...) func (rc *Connector) Subscribe(channels ...string) error { if !rc.connected { return errors.New("Redis Connector is disconnected (Subscribe)") } rc.pubsub = rc.client.Subscribe(channels...) return nil } // Listen - Wait for subscribed events func Listen(handler func(string, string)) error { // Unsubscribe - Unregister as a listener for provided channels func (rc *Connector) Unsubscribe(channels ...string) error { if !rc.connected { return errors.New("Redis Connector is disconnected (Unsubscribe)") } if rc.pubsub != nil { rc.pubsub.Unsubscribe(channels...) } return nil } // Make sure listener is subscribed to pubsub if pubsub == nil { return errors.New("Not subscribed to pubsub") // Listen - Wait for subscribed events func (rc *Connector) Listen(handler func(string, string)) error { if !rc.connected { return errors.New("Redis Connector is disconnected (Listen)") } if rc.pubsub == nil { return errors.New("Not subscribed to pubsub (Listen)") } rc.isListening = true rc.doneListening = make(chan bool, 1) // Main listening loop for { // Wait for subscribed channel events, or timeout msg, err := pubsub.ReceiveTimeout(time.Second) msg, err := rc.pubsub.ReceiveTimeout(time.Second) if err != nil { if reflect.TypeOf(err) == reflect.TypeOf(&net.OpError{}) && reflect.TypeOf(err.(*net.OpError).Err).String() == "*net.timeoutError" { // Timeout, ignore and wait for next event continue } if !strings.Contains(err.Error(), "timeout") { log.Debug("Listen Error: ", err) } } else { channel := "" payload := "" // Process published event switch m := msg.(type) { // Process Subscription case *redis.Subscription: log.Info("Subscription Message: ", m.Kind, " to channel ", m.Channel, ". Total subscriptions: ", m.Count) continue // Process received Message case *redis.Message: log.Info("MSG on ", m.Channel, ": ", m.Payload) handler(m.Channel, m.Payload) channel = m.Channel payload = m.Payload log.Info("RX-MSG [", channel, "] ", payload) handler(channel, payload) } } if !rc.isListening { log.Debug("Redis Connector exiting listen routine") rc.doneListening <- true return nil } } } // // Publish - Publish message to channel // func Publish(channel string, message string) error { // log.Info("Publish to channel: ", channel, " Message: ", message) // _, err := dbClient.Publish(channel, message).Result() // return err // } // StopListen - Stop the listening goroutine func (rc *Connector) StopListen() { if rc.isListening { // stop the listen goroutine rc.isListening = false // synchronize on completion <-rc.doneListening } } // Publish - Publish message to channel func (rc *Connector) Publish(channel string, message string) error { if !rc.connected { return errors.New("Redis Connector is disconnected (Publish)") } log.Info("TX-MSG [", channel, "] ", message) _, err := rc.client.Publish(channel, message).Result() return err } Loading
go-packages/meep-logger/logger.go +6 −0 Original line number Diff line number Diff line Loading @@ -15,6 +15,12 @@ import ( var componentName string func MeepTextLogInit(name string) { log.SetFormatter(&log.TextFormatter{}) log.SetLevel(log.DebugLevel) componentName = name } func MeepJSONLogInit(name string) { log.SetFormatter(&log.JSONFormatter{}) log.SetLevel(log.DebugLevel) Loading
go-packages/meep-redis/db.go +105 −59 Original line number Diff line number Diff line Loading @@ -11,8 +11,7 @@ package redisdb import ( "errors" "net" "reflect" "strings" "time" log "github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-logger" Loading @@ -26,47 +25,51 @@ var dbClientStarted = false var pubsub *redis.PubSub const ModuleCtrlEngine string = "ctrl-engine" const TypeActive string = "active" const ChannelCtrlActive string = ModuleCtrlEngine + "-" + TypeActive // Connector - Implements a Redis connector type Connector struct { addr string connected bool client *rejonson.Client pubsub *redis.PubSub isListening bool doneListening chan bool } // DBConnect - Establish connection to DB func DBConnect(addr string) error { if !dbClientStarted { err := openDB(addr) // NewConnector - Creates and initialize a Redis connector func NewConnector(addr string) (rc *Connector, err error) { rc = new(Connector) err = rc.connectDB(addr) if err != nil { return err } return nil, err } return nil return rc, nil } func openDB(addr string) error { func (rc *Connector) connectDB(addr string) error { if addr == "" { addr = "meep-redis-master:6379" rc.addr = "meep-redis-master:6379" } else { rc.addr = addr } redisClient := redis.NewClient(&redis.Options{ log.Debug("Redis Connector connecting to ", rc.addr) Addr: addr, redisClient := redis.NewClient(&redis.Options{ Addr: rc.addr, Password: "", // no password set DB: 0, // use default DB }) rejonsonClient := rejonson.ExtendClient(redisClient) rc.client = rejonson.ExtendClient(redisClient) pong, err := rejonsonClient.Ping().Result() pong, err := rc.client.Ping().Result() if pong == "" { log.Info("pong is null") if pong == "" || err != nil { log.Error("Redis Connector unable to connect ", rc.addr) return err } if err != nil { log.Info("Redis DB not accessible") return err } dbClientStarted = true dbClient = rejonsonClient log.Info("Redis DB opened and well!") rc.connected = true log.Info("Redis Connector connected to ", rc.addr) return nil } Loading Loading @@ -162,10 +165,13 @@ func openDB(addr string) error { // return nil // } // DBJsonGetEntry - Retrieve entry from DB func DBJsonGetEntry(key string, path string) (string, error) { // JSONGetEntry - Retrieve entry from DB func (rc *Connector) JSONGetEntry(key string, path string) (string, error) { if !rc.connected { return "", errors.New("Redis Connector is disconnected (JSONGetEntry)") } // Update existing entry or create new entry if it does not exist json, err := dbClient.JsonGet(key, path).Result() json, err := rc.client.JsonGet(key, path).Result() if err != nil { return "", err } Loading @@ -192,49 +198,89 @@ func DBJsonGetEntry(key string, path string) (string, error) { // } // Subscribe - Register as a listener for provided channels func Subscribe(channels ...string) error { pubsub = dbClient.Subscribe(channels...) func (rc *Connector) Subscribe(channels ...string) error { if !rc.connected { return errors.New("Redis Connector is disconnected (Subscribe)") } rc.pubsub = rc.client.Subscribe(channels...) return nil } // Listen - Wait for subscribed events func Listen(handler func(string, string)) error { // Unsubscribe - Unregister as a listener for provided channels func (rc *Connector) Unsubscribe(channels ...string) error { if !rc.connected { return errors.New("Redis Connector is disconnected (Unsubscribe)") } if rc.pubsub != nil { rc.pubsub.Unsubscribe(channels...) } return nil } // Make sure listener is subscribed to pubsub if pubsub == nil { return errors.New("Not subscribed to pubsub") // Listen - Wait for subscribed events func (rc *Connector) Listen(handler func(string, string)) error { if !rc.connected { return errors.New("Redis Connector is disconnected (Listen)") } if rc.pubsub == nil { return errors.New("Not subscribed to pubsub (Listen)") } rc.isListening = true rc.doneListening = make(chan bool, 1) // Main listening loop for { // Wait for subscribed channel events, or timeout msg, err := pubsub.ReceiveTimeout(time.Second) msg, err := rc.pubsub.ReceiveTimeout(time.Second) if err != nil { if reflect.TypeOf(err) == reflect.TypeOf(&net.OpError{}) && reflect.TypeOf(err.(*net.OpError).Err).String() == "*net.timeoutError" { // Timeout, ignore and wait for next event continue } if !strings.Contains(err.Error(), "timeout") { log.Debug("Listen Error: ", err) } } else { channel := "" payload := "" // Process published event switch m := msg.(type) { // Process Subscription case *redis.Subscription: log.Info("Subscription Message: ", m.Kind, " to channel ", m.Channel, ". Total subscriptions: ", m.Count) continue // Process received Message case *redis.Message: log.Info("MSG on ", m.Channel, ": ", m.Payload) handler(m.Channel, m.Payload) channel = m.Channel payload = m.Payload log.Info("RX-MSG [", channel, "] ", payload) handler(channel, payload) } } if !rc.isListening { log.Debug("Redis Connector exiting listen routine") rc.doneListening <- true return nil } } } // // Publish - Publish message to channel // func Publish(channel string, message string) error { // log.Info("Publish to channel: ", channel, " Message: ", message) // _, err := dbClient.Publish(channel, message).Result() // return err // } // StopListen - Stop the listening goroutine func (rc *Connector) StopListen() { if rc.isListening { // stop the listen goroutine rc.isListening = false // synchronize on completion <-rc.doneListening } } // Publish - Publish message to channel func (rc *Connector) Publish(channel string, message string) error { if !rc.connected { return errors.New("Redis Connector is disconnected (Publish)") } log.Info("TX-MSG [", channel, "] ", message) _, err := rc.client.Publish(channel, message).Result() return err }