Commit 0dbfc858 authored by Michel Roy's avatar Michel Roy Committed by Kevin Di Lallo
Browse files

using meep-model package

parent 755ae833
Loading
Loading
Loading
Loading

go-apps/meep-webhook/db.go

deleted100755 → 0
+0 −240
Original line number Diff line number Diff line
/*
 * Copyright (c) 2019  InterDigital Communications, Inc
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package main

import (
	"errors"
	"net"
	"reflect"
	"time"

	log "github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-logger"

	"github.com/KromDaniel/rejonson"
	"github.com/go-redis/redis"
)

var dbClient *rejonson.Client
var dbClientStarted = false

var pubsub *redis.PubSub

// DBConnect - Establish connection to DB
func DBConnect() error {
	if !dbClientStarted {
		err := openDB()
		if err != nil {
			return err
		}
	}
	return nil
}

func openDB() error {

	redisClient := redis.NewClient(&redis.Options{
		Addr:     "meep-redis-master:6379",
		Password: "", // no password set
		DB:       0,  // use default DB
	})
	rejonsonClient := rejonson.ExtendClient(redisClient)

	pong, err := rejonsonClient.Ping().Result()

	if pong == "" {
		log.Info("pong is null")
		return err
	}

	if err != nil {
		log.Info("Redis DB not accessible")
		return err
	}
	dbClientStarted = true
	dbClient = rejonsonClient
	log.Info("Redis DB opened and well!")
	return nil
}

// // DBFlush - Empty DB
// func DBFlush(module string) error {
// 	var cursor uint64
// 	var err error
// 	log.Debug("DBFlush module: ", module)

// 	// Find all module keys
// 	// Process in chunks of 50 matching entries to optimize processing speed & memory
// 	keyMatchStr := module + ":*"
// 	for {
// 		var keys []string
// 		keys, cursor, err = dbClient.Scan(cursor, keyMatchStr, 50).Result()
// 		if err != nil {
// 			log.Debug("ERROR: ", err)
// 			break
// 		}

// 		// Delete all matching entries
// 		if len(keys) > 0 {
// 			_, err = dbClient.Del(keys...).Result()
// 			if err != nil {
// 				log.Debug("Failed to retrieve entry fields")
// 				break
// 			}
// 		}

// 		// Stop searching if cursor is back at beginning
// 		if cursor == 0 {
// 			break
// 		}
// 	}

// 	return nil
// }

// // DBForEachEntry - Search for matching keys and run handler for each entry
// func DBForEachEntry(keyMatchStr string, entryHandler func(string, map[string]string, interface{}) error, userData interface{}) error {
// 	var cursor uint64
// 	var err error

// 	// Process in chunks of 50 matching entries to optimize processing speed & memory
// 	for {
// 		var keys []string
// 		keys, cursor, err = dbClient.Scan(cursor, keyMatchStr, 50).Result()
// 		if err != nil {
// 			log.Debug("ERROR: ", err)
// 			break
// 		}

// 		if len(keys) > 0 {
// 			for i := 0; i < len(keys); i++ {
// 				fields, err := dbClient.HGetAll(keys[i]).Result()
// 				if err != nil || fields == nil {
// 					log.Debug("Failed to retrieve entry fields")
// 					break
// 				}

// 				// Invoke handler to process entry
// 				err = entryHandler(keys[i], fields, userData)
// 				if err != nil {
// 					return err
// 				}
// 			}
// 		}

// 		// Stop searching if cursor is back at beginning
// 		if cursor == 0 {
// 			break
// 		}
// 	}
// 	return nil
// }

// // DBSetEntry - Update existing entry or create new entry if it does not exist
// func DBSetEntry(key string, fields map[string]interface{}) error {
// 	// Update existing entry or create new entry if it does not exist
// 	_, err := dbClient.HMSet(key, fields).Result()
// 	if err != nil {
// 		return err
// 	}
// 	return nil
// }

// // DBRemoveEntry - Remove entry from DB
// func DBRemoveEntry(key string) error {
// 	_, err := dbClient.Del(key).Result()
// 	if err != nil {
// 		return err
// 	}
// 	return nil
// }

// DBJsonGetEntry - Retrieve entry from DB
func DBJsonGetEntry(key string, path string) (string, error) {
	// Update existing entry or create new entry if it does not exist
	json, err := dbClient.JsonGet(key, path).Result()
	if err != nil {
		return "", err
	}
	return json, nil
}

// // DBJsonSetEntry - Update existing entry or create new entry if it does not exist
// func DBJsonSetEntry(key string, path string, json string) error {
// 	// Update existing entry or create new entry if it does not exist
// 	_, err := dbClient.JsonSet(key, path, json).Result()
// 	if err != nil {
// 		return err
// 	}
// 	return nil
// }

// // DBJsonDelEntry - Remove existing entry
// func DBJsonDelEntry(key string, path string) error {
// 	_, err := dbClient.JsonDel(key, path).Result()
// 	if err != nil {
// 		return err
// 	}
// 	return nil
// }

// Subscribe - Register as a listener for provided channels
func Subscribe(channels ...string) error {
	pubsub = dbClient.Subscribe(channels...)
	return nil
}

// Listen - Wait for subscribed events
func Listen(handler func(string, string)) error {

	// Make sure listener is subscribed to pubsub
	if pubsub == nil {
		return errors.New("Not subscribed to pubsub")
	}

	// Main listening loop
	for {
		// Wait for subscribed channel events, or timeout
		msg, err := pubsub.ReceiveTimeout(time.Second)
		if err != nil {
			if reflect.TypeOf(err) == reflect.TypeOf(&net.OpError{}) &&
				reflect.TypeOf(err.(*net.OpError).Err).String() == "*net.timeoutError" {
				// Timeout, ignore and wait for next event
				continue
			}
		}

		// Process published event
		switch m := msg.(type) {

		// Process Subscription
		case *redis.Subscription:
			log.Info("Subscription Message: ", m.Kind, " to channel ", m.Channel, ". Total subscriptions: ", m.Count)

		// Process received Message
		case *redis.Message:
			log.Info("MSG on ", m.Channel, ": ", m.Payload)
			handler(m.Channel, m.Payload)
		}
	}
}

// // Publish - Publish message to channel
// func Publish(channel string, message string) error {
// 	log.Info("Publish to channel: ", channel, " Message: ", message)
// 	_, err := dbClient.Publish(channel, message).Result()
// 	return err
// }
+5 −4
Original line number Diff line number Diff line
@@ -3,12 +3,9 @@ module github.com/InterDigitalInc/AdvantEDGE/go-apps/meep-webhook
go 1.12

require (
	github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-ctrl-engine-model v0.0.0
	github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-logger v0.0.0
	github.com/KromDaniel/jonson v0.0.0-20180630143114-d2f9c3c389db // indirect
	github.com/KromDaniel/rejonson v0.0.0-20180822072824-00b5bcf2b351
	github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-model v0.0.0
	github.com/ghodss/yaml v1.0.0
	github.com/go-redis/redis v6.15.2+incompatible
	k8s.io/api v0.0.0-20190430012547-97d6bb8ea5f4
	k8s.io/apimachinery v0.0.0-20190430211124-5bae42371a56
)
@@ -16,3 +13,7 @@ require (
replace github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-ctrl-engine-model => ../../go-packages/meep-ctrl-engine-model

replace github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-logger => ../../go-packages/meep-logger

replace github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-model => ../../go-packages/meep-model

replace github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-redis => ../../go-packages/meep-redis

go-apps/meep-webhook/go.sum

100644 → 100755
+8 −0
Original line number Diff line number Diff line
github.com/InterDigitalInc/AdvantEDGE v1.2.0 h1:46Mr4OqKz/6WU/1CziSacw62Z3pc52dgSOLppB5N/Bc=
github.com/KromDaniel/jonson v0.0.0-20180630143114-d2f9c3c389db h1:Zkf5kwhxdW0xV7WM/crqIcOP5LCFGnAmumWSFAewJ74=
github.com/KromDaniel/jonson v0.0.0-20180630143114-d2f9c3c389db/go.mod h1:RU+6d0CNIRSp6yo1mXLIIrnFa/3LHhvcDVLVJyovptM=
github.com/KromDaniel/rejonson v0.0.0-20180822072824-00b5bcf2b351 h1:1u1XrfCBnY+GijnyU6O1k4odp5TnqZQTsp5v7+n/E4Y=
@@ -7,10 +8,12 @@ github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSs
github.com/docker/spdystream v0.0.0-20160310174837-449fdfce4d96/go.mod h1:Qh8CwZgvJUkLughtfhJv5dyTYa91l1fOUCrgjqmcifM=
github.com/elazarl/goproxy v0.0.0-20170405201442-c4fc26588b6e/go.mod h1:/Zj4wYkgs4iZTTu3o/KG3Itv/qCCa8VVMlb3i9OVuzc=
github.com/evanphx/json-patch v0.0.0-20190203023257-5858425f7550/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk=
github.com/flimzy/kivik v1.8.1/go.mod h1:S2aPycbG0eDFll4wgXt9uacSNkXISPufutnc9sv+mdA=
github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I=
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
github.com/ghodss/yaml v1.0.0 h1:wQHKEahhL6wmXdzwWG11gIVCkOv05bNOh+Rxn0yngAk=
github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04=
github.com/go-kivik/couchdb v1.8.1/go.mod h1:5XJRkAMpBlEVA4q0ktIZjUPYBjoBmRoiWvwUBzP3BOQ=
github.com/go-redis/redis v6.15.2+incompatible h1:9SpNVG76gr6InJGxoZ6IuuxaCOQwDAhzyXg+Bs+0Sb4=
github.com/go-redis/redis v6.15.2+incompatible/go.mod h1:NAIEuMOZ/fxfXJIrKDQDz8wamY7mA7PouImQ2Jvg6kA=
github.com/gogo/protobuf v0.0.0-20171007142547-342cbe0a0415 h1:WSBJMqJbLxsn+bTCPyPYZfqHdJmc8MK4wrBjMft6BAM=
@@ -22,9 +25,11 @@ github.com/google/gofuzz v0.0.0-20170612174753-24818f796faf h1:+RRA9JqSOZFfKrOeq
github.com/google/gofuzz v0.0.0-20170612174753-24818f796faf/go.mod h1:HP5RmnzzSNb993RKQDq4+1A4ia9nllfqcQFTQJedwGI=
github.com/google/uuid v1.0.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/googleapis/gnostic v0.0.0-20170729233727-0c5108395e2d/go.mod h1:sJBsCZ4ayReDTBIg8b9dl28c5xFWyhBTVRp3pOg5EKY=
github.com/gorilla/mux v1.7.3/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs=
github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI=
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
github.com/imdario/mergo v0.3.7/go.mod h1:2EnlNZ0deacrJVfApfmtdGgDfMuh/nq6Ok1EcJh5FfA=
github.com/json-iterator/go v0.0.0-20180701071628-ab8a2e0c74be h1:AHimNtVIpiBjPUhEF5KNCkrUyqTSA5zWUl8sQ2bfGBE=
github.com/json-iterator/go v0.0.0-20180701071628-ab8a2e0c74be/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU=
github.com/konsorten/go-windows-terminal-sequences v1.0.1 h1:mweAR1A6xJ3oS2pRaGiHgQ4OO8tzTaLawm8vnODuwDk=
@@ -36,8 +41,11 @@ github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3Rllmb
github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f/go.mod h1:ZdcZmHo+o7JKHSa8/e818NopupXU1YMK5fe1lsApnBw=
github.com/onsi/ginkgo v1.6.0 h1:Ix8l273rp3QzYgXSR+c8d1fTG7UPgYkOSELPhiY/YGw=
github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/ginkgo v1.10.1/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/gomega v0.0.0-20190113212917-5533ce8a0da3 h1:EooPXg51Tn+xmWPXJUGCnJhJSpeuMlBmfJVcqIRmmv8=
github.com/onsi/gomega v0.0.0-20190113212917-5533ce8a0da3/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
github.com/onsi/gomega v1.7.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/sirupsen/logrus v1.4.1 h1:GL2rEmy6nsikmW0r8opw9JIRScdMF5hA8cOYLH7In1k=
+13 −4
Original line number Diff line number Diff line
@@ -29,18 +29,27 @@ import (
	"syscall"

	log "github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-logger"
	mod "github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-model"
)

var Model *mod.Model

func main() {
	var parameters WhSvrParameters

	// Initialize logging
	log.MeepJSONLogInit("meep-webhook")

	// Connect to Active DB
	err := activeDBConnect()
	// Listen for model updates
	var err error
	Model, err = mod.NewModel(mod.DbAddress, "meep-webhook", "activeScenario")
	if err != nil {
		log.Error("Failed to create model: ", err.Error())
		return
	}
	err = Model.Listen(eventHandler)
	if err != nil {
		log.Error("Failed to connect to Active DB: ", err.Error())
		log.Error("Unable to listen to model updates: ", err.Error())
		return
	}

@@ -78,7 +87,7 @@ func main() {
	whsvr.server.Handler = mux

	// Start DB listener in new routine
	go activeDBListen()
	// go activeDBListen()

	// Start webhook server in new routine
	go func() {
+5 −72
Original line number Diff line number Diff line
@@ -25,7 +25,6 @@ import (
	"io/ioutil"
	"net/http"

	ceModel "github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-ctrl-engine-model"
	log "github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-logger"

	"github.com/ghodss/yaml"
@@ -38,9 +37,6 @@ import (
	"k8s.io/apimachinery/pkg/runtime/serializer"
)

const moduleCtrlEngine string = "ctrl-engine"
const typeActive string = "active"
const channelCtrlActive string = moduleCtrlEngine + "-" + typeActive
const meepOrigin = "scenario"

// Active scenarion name
@@ -81,84 +77,21 @@ func init() {
	_ = admissionregistrationv1beta1.AddToScheme(runtimeScheme)
}

func activeDBConnect() (err error) {
	// Connect to Active DB
	err = DBConnect()
	if err != nil {
		log.Error("Failed connection to Active DB. Error: ", err)
		return err
	}
	log.Info("Connected to Active DB")

	// Subscribe to Pub-Sub events for MEEP Controller
	// NOTE: Current implementation is RedisDB Pub-Sub
	err = Subscribe(channelCtrlActive)
	if err != nil {
		log.Error("Failed to subscribe to Pub/Sub events. Error: ", err)
		return
	}
	log.Info("Subscribed to Pub/Sub events")

	// Initialize using current active scenario
	processActiveScenarioUpdate()

	return nil
}

func activeDBListen() {
	// Listen for subscribed events. Provide event handler method.
	_ = Listen(eventHandler)
}

func eventHandler(channel string, payload string) {
	// Handle Message according to Rx Channel
	switch channel {

	// MEEP Ctrl Engine active scenario update Channel
	case channelCtrlActive:
		log.Debug("Event received on channel: ", channelCtrlActive)
		processActiveScenarioUpdate()
	// case channelCtrlActive:
	case Model.ActiveChannel:
		log.Debug("Event received on channel: ", Model.ActiveChannel)
		// processActiveScenarioUpdate()
		activeScenarioName = Model.GetScenarioName()

	default:
		log.Warn("Unsupported channel")
	}
}

func processActiveScenarioUpdate() {
	// Retrieve active scenario from DB
	jsonScenario, err := DBJsonGetEntry(moduleCtrlEngine+":"+typeActive, ".")
	if err != nil {
		log.Error(err.Error())
		clearScenario()
		return
	}

	// Unmarshal Active scenario
	var scenario ceModel.Scenario
	err = json.Unmarshal([]byte(jsonScenario), &scenario)
	if err != nil {
		log.Error(err.Error())
		clearScenario()
		return
	}

	// Parse scenario
	parseScenario(scenario)
}

func clearScenario() {
	log.Debug("clearScenario() -- Resetting all variables")
	activeScenarioName = ""
}

func parseScenario(scenario ceModel.Scenario) {
	log.Debug("parseScenario")

	// Update active scenatio name
	activeScenarioName = scenario.Name
	log.Info("Active scenario name set to: ", activeScenarioName)
}

func loadConfig(configFile string) (*Config, error) {
	data, err := ioutil.ReadFile(configFile)
	if err != nil {