Newer
Older
/*
* Copyright (c) 2019 InterDigital Communications, Inc
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package main
import (
"encoding/json"
"strconv"
dkm "github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-data-key-mgr"
log "github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-logger"
mgModel "github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-mg-manager-model"
mq "github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-mq"
redis "github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-redis"
)
const typeLb string = "lb"
const typeMeSvc string = "ME-SVC"
const typeIngressSvc string = "INGRESS-SVC"
const typeEgressSvc string = "EGRESS-SVC"
const fieldSvcType string = "svc-type"
const fieldSvcName string = "svc-name"
const fieldSvcIp string = "svc-ip"
const fieldSvcProtocol string = "svc-protocol"
const fieldSvcPort string = "svc-port"
const fieldLbSvcName string = "lb-svc-name"
const fieldLbSvcIp string = "lb-svc-ip"
const fieldLbSvcPort string = "lb-svc-port"
const fieldLbPodName string = "lb-pod-name"
const fieldLbPodIp string = "lb-pod-ip"
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
const DEFAULT_LB_RULES_DB = 0
// LbRulesStore -
type LbRulesStore struct {
baseKey string
rc *redis.Connector
}
type RoutingEngine struct {
name string
sandboxName string
lbRulesStore *LbRulesStore
}
func NewRoutingEngine(name string, sandboxName string) (re *RoutingEngine, err error) {
// Create new Routing Engine instance
re = new(RoutingEngine)
re.name = name
re.sandboxName = sandboxName
// Open Load Balancing Rules Store
re.lbRulesStore = new(LbRulesStore)
re.lbRulesStore.baseKey = dkm.GetKeyRoot(tce.sandboxName) + mgManagerKey
re.lbRulesStore.rc, err = redis.NewConnector(redisAddr, DEFAULT_LB_RULES_DB)
if err != nil {
log.Error("Failed connection to LB Rules Store Redis DB. Error: ", err)
return nil, err
}
log.Info("Connected to LB Rules Store redis DB")
log.Info("Successfully create Routing Engine")
return re, nil
}
// RefreshLbRules - Fetch & apply latest MG Manager LB rules
func (re *RoutingEngine) RefreshLbRules() {
// Retrieve LB rules from DB
jsonNetElemList, err := re.lbRulesStore.rc.JSONGetEntry(re.lbRulesStore.baseKey+typeLb, ".")
if err != nil {
log.Error(err.Error())
return
}
// Unmarshal MG Service Maps
var netElemList mgModel.NetworkElementList
err = json.Unmarshal([]byte(jsonNetElemList), &netElemList)
if err != nil {
log.Error(err.Error())
return
}
// Update pod MG service mappings
for _, netElem := range netElemList.NetworkElements {
podInfo := podInfoMap[netElem.Name]
if podInfo == nil {
log.Error("Failed to find network element: ", netElem.Name)
continue
}
// Set load balanced MG Service instance
for _, svcMap := range netElem.ServiceMaps {
if svcInfo, found := svcInfoMap[svcMap.LbSvcName]; found {
podInfo.MgSvcMap[svcMap.MgSvcName] = svcInfo
} else {
log.Error("failed to find service instance: ", svcMap.LbSvcName)
}
}
// Apply new MG Service mapping rules
// Inform sidecars of LB rule updates
re.publishLbRulesUpdate()
}
// publishLbRulesUpdate - Inform sidecars of LB rules update
func (re *RoutingEngine) publishLbRulesUpdate() {
// Send TC LB Rules update message to TC Sidecars for enforcement
msg := tce.mqLocal.CreateMsg(mq.MsgTcLbRulesUpdate, moduleTcSidecar, tce.sandboxName)
log.Debug("TX MSG: ", mq.PrintMsg(msg))
if err != nil {
log.Error("Failed to send message. Error: ", err.Error())
}
}
// Generate & store rules based on mapping
func (re *RoutingEngine) applyLbRules() {
log.Debug("applyLbRules")
keys := map[string]bool{}
// For each pod, add MG, ingress & egress Service LB rules
for _, podInfo := range podInfoMap {
// MG Service LB rules
for _, svcInfo := range podInfo.MgSvcMap {
// Add one rule per port
for _, portInfo := range svcInfo.Ports {
// Populate rule fields
fields := make(map[string]interface{})
fields[fieldSvcType] = typeMeSvc
fields[fieldSvcName] = svcInfo.MgSvc.Name
fields[fieldSvcIp] = tce.ipManager.GetSvcIp(svcInfo.MgSvc.Name)
fields[fieldSvcProtocol] = portInfo.Protocol
fields[fieldSvcPort] = portInfo.Port
fields[fieldLbSvcName] = svcInfo.Name
fields[fieldLbSvcIp] = tce.ipManager.GetSvcIp(svcInfo.Name)
fields[fieldLbSvcPort] = portInfo.Port
fields[fieldLbPodName] = svcInfo.Node
fields[fieldLbPodIp] = tce.ipManager.GetPodIp(svcInfo.Node)
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
// Make unique key
key := tce.netCharStore.baseKey + typeLb + ":" + podInfo.Name + ":" +
svcInfo.MgSvc.Name + ":" + strconv.Itoa(int(portInfo.Port))
keys[key] = true
// Set rule information in DB
_ = tce.netCharStore.rc.SetEntry(key, fields)
}
}
// Ingress Service rules
for _, svcMap := range podInfo.IngressSvcMapList {
// Get Service info from exposed service name
// Check if MG Service first
var svcInfo *ServiceInfo
var found bool
if svcInfo, found = podInfo.MgSvcMap[svcMap.SvcName]; !found {
// If not found, must be unique service
if svcInfo, found = svcInfoMap[svcMap.SvcName]; !found {
log.Warn("Failed to find service instance: ", svcMap.SvcName)
continue
}
}
// Populate rule fields
fields := make(map[string]interface{})
fields[fieldSvcType] = typeIngressSvc
fields[fieldSvcName] = svcMap.SvcName
fields[fieldSvcIp] = "0.0.0.0/0"
fields[fieldSvcProtocol] = svcMap.Protocol
fields[fieldSvcPort] = svcMap.NodePort
fields[fieldLbSvcName] = svcInfo.Name
fields[fieldLbSvcIp] = tce.ipManager.GetSvcIp(svcInfo.Name)
fields[fieldLbSvcPort] = svcMap.SvcPort
fields[fieldLbPodName] = svcInfo.Node
fields[fieldLbPodIp] = tce.ipManager.GetPodIp(svcInfo.Node)
// Make unique key
key := tce.netCharStore.baseKey + typeLb + ":" + podInfo.Name + ":" +
svcMap.SvcName + ":" + strconv.Itoa(int(svcMap.NodePort))
keys[key] = true
// Set rule information in DB
_ = tce.netCharStore.rc.SetEntry(key, fields)
}
// Egress Service rules
for _, svcMap := range podInfo.EgressSvcMapList {
// Populate rule fields
fields := make(map[string]interface{})
fields[fieldSvcType] = typeEgressSvc
fields[fieldSvcName] = svcMap.SvcName
fields[fieldSvcIp] = "0.0.0.0/0"
fields[fieldSvcProtocol] = svcMap.Protocol
fields[fieldSvcPort] = svcMap.SvcPort
fields[fieldLbSvcName] = svcMap.SvcName
fields[fieldLbSvcIp] = svcMap.SvcIp
fields[fieldLbSvcPort] = svcMap.SvcPort
fields[fieldLbPodName] = "n/a"
fields[fieldLbPodIp] = IP_ADDR_NONE
// Make unique key
key := tce.netCharStore.baseKey + typeLb + ":" + podInfo.Name + ":" +
svcMap.SvcName + ":" + strconv.Itoa(int(svcMap.SvcPort))
keys[key] = true
// Set rule information in DB
_ = tce.netCharStore.rc.SetEntry(key, fields)
}
}
keyName := tce.netCharStore.baseKey + typeLb + ":*"
err := tce.netCharStore.rc.ForEachEntry(keyName, removeLbEntryHandler, &keys)
if err != nil {
log.Error("Failed to remove old entries with err: ", err)
return
}
}
func removeLbEntryHandler(key string, fields map[string]string, userData interface{}) error {
keys := userData.(*map[string]bool)
if _, found := (*keys)[key]; !found {
_ = tce.netCharStore.rc.DelEntry(key)
}
return nil
}