Skip to content
routing-engine.go 7.63 KiB
Newer Older
/*
 * Copyright (c) 2019  InterDigital Communications, Inc
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package main

import (
	"encoding/json"
	"strconv"

	dkm "github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-data-key-mgr"
	log "github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-logger"
	mgModel "github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-mg-manager-model"
	mq "github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-mq"
	redis "github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-redis"
)

const typeLb string = "lb"
const typeMeSvc string = "ME-SVC"
const typeIngressSvc string = "INGRESS-SVC"
const typeEgressSvc string = "EGRESS-SVC"

const fieldSvcType string = "svc-type"
const fieldSvcName string = "svc-name"
const fieldSvcIp string = "svc-ip"
const fieldSvcProtocol string = "svc-protocol"
const fieldSvcPort string = "svc-port"
const fieldLbSvcName string = "lb-svc-name"
const fieldLbSvcIp string = "lb-svc-ip"
const fieldLbSvcPort string = "lb-svc-port"
const fieldLbPodName string = "lb-pod-name"
const fieldLbPodIp string = "lb-pod-ip"

const DEFAULT_LB_RULES_DB = 0

// LbRulesStore -
type LbRulesStore struct {
	baseKey string
	rc      *redis.Connector
}

type RoutingEngine struct {
	name         string
	sandboxName  string
	lbRulesStore *LbRulesStore
}

func NewRoutingEngine(name string, sandboxName string) (re *RoutingEngine, err error) {
	// Create new Routing Engine instance
	re = new(RoutingEngine)
	re.name = name
	re.sandboxName = sandboxName

	// Open Load Balancing Rules Store
	re.lbRulesStore = new(LbRulesStore)
	re.lbRulesStore.baseKey = dkm.GetKeyRoot(tce.sandboxName) + mgManagerKey
	re.lbRulesStore.rc, err = redis.NewConnector(redisAddr, DEFAULT_LB_RULES_DB)
	if err != nil {
		log.Error("Failed connection to LB Rules Store Redis DB. Error: ", err)
		return nil, err
	}
	log.Info("Connected to LB Rules Store redis DB")

	log.Info("Successfully create Routing Engine")
	return re, nil
}

// RefreshLbRules - Fetch & apply latest MG Manager LB rules
func (re *RoutingEngine) RefreshLbRules() {

	// Retrieve LB rules from DB
	jsonNetElemList, err := re.lbRulesStore.rc.JSONGetEntry(re.lbRulesStore.baseKey+typeLb, ".")
	if err != nil {
		log.Error(err.Error())
		return
	}

	// Unmarshal MG Service Maps
	var netElemList mgModel.NetworkElementList
	err = json.Unmarshal([]byte(jsonNetElemList), &netElemList)
	if err != nil {
		log.Error(err.Error())
		return
	}

	// Update pod MG service mappings
	for _, netElem := range netElemList.NetworkElements {
		podInfo := podInfoMap[netElem.Name]
		if podInfo == nil {
			log.Error("Failed to find network element: ", netElem.Name)
			continue
		}

		// Set load balanced MG Service instance
		for _, svcMap := range netElem.ServiceMaps {
			if svcInfo, found := svcInfoMap[svcMap.LbSvcName]; found {
				podInfo.MgSvcMap[svcMap.MgSvcName] = svcInfo
			} else {
				log.Error("failed to find service instance: ", svcMap.LbSvcName)
		}
	}

	// Apply new MG Service mapping rules
	re.applyLbRules()
	// Inform sidecars of LB rule updates
	re.publishLbRulesUpdate()
}

// publishLbRulesUpdate - Inform sidecars of LB rules update
func (re *RoutingEngine) publishLbRulesUpdate() {

	// Send TC LB Rules update message to TC Sidecars for enforcement
	msg := tce.mqLocal.CreateMsg(mq.MsgTcLbRulesUpdate, moduleTcSidecar, tce.sandboxName)
	log.Debug("TX MSG: ", mq.PrintMsg(msg))
	err := tce.mqLocal.SendMsg(msg)
	if err != nil {
		log.Error("Failed to send message. Error: ", err.Error())
	}
}

// Generate & store rules based on mapping
func (re *RoutingEngine) applyLbRules() {
	log.Debug("applyLbRules")

	keys := map[string]bool{}

	// For each pod, add MG, ingress & egress Service LB rules
	for _, podInfo := range podInfoMap {
		// MG Service LB rules
		for _, svcInfo := range podInfo.MgSvcMap {
			// Add one rule per port
			for _, portInfo := range svcInfo.Ports {
				// Populate rule fields
				fields := make(map[string]interface{})
				fields[fieldSvcType] = typeMeSvc
				fields[fieldSvcName] = svcInfo.MgSvc.Name
				fields[fieldSvcIp] = tce.ipManager.GetSvcIp(svcInfo.MgSvc.Name)
				fields[fieldSvcProtocol] = portInfo.Protocol
				fields[fieldSvcPort] = portInfo.Port
				fields[fieldLbSvcName] = svcInfo.Name
				fields[fieldLbSvcIp] = tce.ipManager.GetSvcIp(svcInfo.Name)
				fields[fieldLbSvcPort] = portInfo.Port
				fields[fieldLbPodName] = svcInfo.Node
				fields[fieldLbPodIp] = tce.ipManager.GetPodIp(svcInfo.Node)

				// Make unique key
				key := tce.netCharStore.baseKey + typeLb + ":" + podInfo.Name + ":" +
					svcInfo.MgSvc.Name + ":" + strconv.Itoa(int(portInfo.Port))
				keys[key] = true

				// Set rule information in DB
				_ = tce.netCharStore.rc.SetEntry(key, fields)
			}
		}

		// Ingress Service rules
		for _, svcMap := range podInfo.IngressSvcMapList {
			// Get Service info from exposed service name
			// Check if MG Service first
			var svcInfo *ServiceInfo
			var found bool
			if svcInfo, found = podInfo.MgSvcMap[svcMap.SvcName]; !found {
				// If not found, must be unique service
				if svcInfo, found = svcInfoMap[svcMap.SvcName]; !found {
					log.Warn("Failed to find service instance: ", svcMap.SvcName)
					continue
				}
			}

			// Populate rule fields
			fields := make(map[string]interface{})
			fields[fieldSvcType] = typeIngressSvc
			fields[fieldSvcName] = svcMap.SvcName
			fields[fieldSvcIp] = "0.0.0.0/0"
			fields[fieldSvcProtocol] = svcMap.Protocol
			fields[fieldSvcPort] = svcMap.NodePort
			fields[fieldLbSvcName] = svcInfo.Name
			fields[fieldLbSvcIp] = tce.ipManager.GetSvcIp(svcInfo.Name)
			fields[fieldLbSvcPort] = svcMap.SvcPort
			fields[fieldLbPodName] = svcInfo.Node
			fields[fieldLbPodIp] = tce.ipManager.GetPodIp(svcInfo.Node)

			// Make unique key
			key := tce.netCharStore.baseKey + typeLb + ":" + podInfo.Name + ":" +
				svcMap.SvcName + ":" + strconv.Itoa(int(svcMap.NodePort))
			keys[key] = true

			// Set rule information in DB
			_ = tce.netCharStore.rc.SetEntry(key, fields)
		}

		// Egress Service rules
		for _, svcMap := range podInfo.EgressSvcMapList {
			// Populate rule fields
			fields := make(map[string]interface{})
			fields[fieldSvcType] = typeEgressSvc
			fields[fieldSvcName] = svcMap.SvcName
			fields[fieldSvcIp] = "0.0.0.0/0"
			fields[fieldSvcProtocol] = svcMap.Protocol
			fields[fieldSvcPort] = svcMap.SvcPort
			fields[fieldLbSvcName] = svcMap.SvcName
			fields[fieldLbSvcIp] = svcMap.SvcIp
			fields[fieldLbSvcPort] = svcMap.SvcPort
			fields[fieldLbPodName] = "n/a"
			fields[fieldLbPodIp] = IP_ADDR_NONE

			// Make unique key
			key := tce.netCharStore.baseKey + typeLb + ":" + podInfo.Name + ":" +
				svcMap.SvcName + ":" + strconv.Itoa(int(svcMap.SvcPort))
			keys[key] = true

			// Set rule information in DB
			_ = tce.netCharStore.rc.SetEntry(key, fields)
		}
	}

	// Remove stale DB entries
	keyName := tce.netCharStore.baseKey + typeLb + ":*"
	err := tce.netCharStore.rc.ForEachEntry(keyName, removeLbEntryHandler, &keys)
	if err != nil {
		log.Error("Failed to remove old entries with err: ", err)
		return
	}
}

func removeLbEntryHandler(key string, fields map[string]string, userData interface{}) error {
	keys := userData.(*map[string]bool)

	if _, found := (*keys)[key]; !found {
		_ = tce.netCharStore.rc.DelEntry(key)
	}
	return nil
}