Commit 7876a2f5 authored by Mudassar Khan's avatar Mudassar Khan
Browse files

add retry logic for publishing and unpublishing services

parent 548a3fe8
Loading
Loading
Loading
Loading
+205 −23
Original line number Diff line number Diff line
@@ -17,6 +17,7 @@
package server

import (
	"context"
	"encoding/json"
	"errors"
	"fmt"
@@ -47,6 +48,10 @@ const SER_AVAILABILITY_NOTIF_SUB_TYPE = "SerAvailabilityNotificationSubscription
const SER_AVAILABILITY_NOTIF_TYPE = "SerAvailabilityNotification"
const APP_STATE_READY = "READY"

// CAPIF retry configuration
const capifMaxRetries = 5
const capifRetryInterval = 30 * time.Second

// const logModuleAppEnablement = "meep-app-enablement"
const serviceName = "App Enablement Service"

@@ -94,7 +99,29 @@ type StateData struct {
	AppId string
}

type capifPendingPublishInfo struct {
	SerInstanceId string `json:"serInstanceId"`
	AppId         string `json:"appId"`
	ApiName       string `json:"apiName"`
	Version       string `json:"version"`
	URI           string `json:"uri"`
	SerName       string `json:"serName"`
	Description   string `json:"description"`
	Host          string `json:"host"`
	Port          int    `json:"port"`
	RetryCount    int    `json:"retryCount"`
	LastError     string `json:"lastError"`
}

type capifPendingUnpublishInfo struct {
	SerInstanceId string `json:"serInstanceId"`
	ApiId         string `json:"apiId"`
	RetryCount    int    `json:"retryCount"`
	LastError     string `json:"lastError"`
}

var livenessTimerList map[string]ServiceLivenessInfo
var capifRetryStop chan struct{}

func Init(sandbox string, mep string, host *url.URL, msgQueue *mq.MsgQueue, redisAddr_ string, globalMutex *sync.Mutex, capifCl *cc.CapifClient) (err error) {
	redisAddr = redisAddr_
@@ -162,12 +189,21 @@ func Run() (err error) {
		return err
	}

	// Start background worker to retry failed CAPIF publish/unpublish operations
	capifRetryStop = make(chan struct{})
	go runCapifRetryWorker(capifRetryStop)

	return nil
}

// Stop - Stop Service Mgmt
func Stop() (err error) {

	if capifRetryStop != nil {
		close(capifRetryStop)
		capifRetryStop = nil
	}

	if len(livenessTimerList) != 0 {
		livenessTimerList = make(map[string]ServiceLivenessInfo)
	}
@@ -421,6 +457,19 @@ func appServicesPOST(w http.ResponseWriter, r *http.Request) {
			)
			if err != nil {
				log.Error("CAPIF publish failed for service ", sInfo.SerName, ": ", err.Error())
				storePendingPublish(capifPendingPublishInfo{
					SerInstanceId: sInfo.SerInstanceId,
					AppId:         appId,
					ApiName:       sInfo.SerName + "-" + sandboxName,
					Version:       sInfo.Version,
					URI:           uri,
					SerName:       sInfo.SerName,
					Description:   "MEC service: " + sInfo.SerName,
					Host:          host,
					Port:          port,
					RetryCount:    0,
					LastError:     err.Error(),
				})
				return
			}
			storeCapifMapping(sInfo.SerInstanceId, apiId)
@@ -602,12 +651,21 @@ func appServicesByIdDELETE(w http.ResponseWriter, r *http.Request) {
	if capifClient != nil && capifClient.IsReady() {
		go func(svcId string) {
			apiId := getCapifMapping(svcId)
			if apiId != "" {
			if apiId == "" {
				return
			}
			if err := capifClient.UnpublishServiceAPI(apiId); err != nil {
				log.Error("CAPIF unpublish failed for service ", svcId, ": ", err.Error())
				// Keep mapping in Redis and schedule for retry
				storePendingUnpublish(capifPendingUnpublishInfo{
					SerInstanceId: svcId,
					ApiId:         apiId,
					RetryCount:    0,
					LastError:     err.Error(),
				})
				return
			}
			deleteCapifMapping(svcId)
			}
		}(svcId)
	}

@@ -1835,6 +1893,111 @@ func deleteCapifMapping(localId string) {
	}
}

// --- CAPIF retry / dead-letter helpers ---

func storePendingPublish(info capifPendingPublishInfo) {
	key := baseKey + "capif:pending:pub:" + info.SerInstanceId
	data, err := json.Marshal(info)
	if err != nil {
		log.Error("Failed to marshal pending publish info: ", err.Error())
		return
	}
	if err := rc.JSONSetEntry(key, ".", string(data)); err != nil {
		log.Error("Failed to store pending publish for ", info.SerInstanceId, ": ", err.Error())
	}
}

func storePendingUnpublish(info capifPendingUnpublishInfo) {
	key := baseKey + "capif:pending:unpub:" + info.SerInstanceId
	data, err := json.Marshal(info)
	if err != nil {
		log.Error("Failed to marshal pending unpublish info: ", err.Error())
		return
	}
	if err := rc.JSONSetEntry(key, ".", string(data)); err != nil {
		log.Error("Failed to store pending unpublish for ", info.SerInstanceId, ": ", err.Error())
	}
}

func runCapifRetryWorker(stop chan struct{}) {
	ticker := time.NewTicker(capifRetryInterval)
	defer ticker.Stop()
	for {
		select {
		case <-stop:
			return
		case <-ticker.C:
			if capifClient == nil || !capifClient.IsReady() {
				continue
			}
			retryPendingPublishes()
			retryPendingUnpublishes()
		}
	}
}

func retryPendingPublishes() {
	_ = rc.ForEachJSONEntry(baseKey+"capif:pending:pub:*", func(key, val string, _ interface{}) error {
		var info capifPendingPublishInfo
		if err := json.Unmarshal([]byte(val), &info); err != nil {
			log.Error("CAPIF retry: failed to unmarshal pending publish entry: ", err.Error())
			return nil
		}
		apiId, err := capifClient.PublishServiceAPI(
			info.ApiName,
			info.Version,
			info.URI,
			info.SerName,
			info.Description,
			info.Host,
			info.Port,
		)
		if err != nil {
			info.RetryCount++
			info.LastError = err.Error()
			if info.RetryCount >= capifMaxRetries {
				log.Error("CAPIF publish dead letter: service ", info.SerName, " (", info.SerInstanceId, ") exceeded max retries. Last error: ", info.LastError)
				_ = rc.JSONDelEntry(key, ".")
			} else {
				log.Warn("CAPIF publish retry ", info.RetryCount, "/", capifMaxRetries, " failed for service ", info.SerName, ": ", err.Error())
				storePendingPublish(info)
			}
			return nil
		}
		storeCapifMapping(info.SerInstanceId, apiId)
		_ = rc.JSONDelEntry(key, ".")
		log.Info("CAPIF publish retry succeeded for service ", info.SerName)
		return nil
	}, nil)
}

func retryPendingUnpublishes() {
	_ = rc.ForEachJSONEntry(baseKey+"capif:pending:unpub:*", func(key, val string, _ interface{}) error {
		var info capifPendingUnpublishInfo
		if err := json.Unmarshal([]byte(val), &info); err != nil {
			log.Error("CAPIF retry: failed to unmarshal pending unpublish entry: ", err.Error())
			return nil
		}
		if err := capifClient.UnpublishServiceAPI(info.ApiId); err != nil {
			info.RetryCount++
			info.LastError = err.Error()
			if info.RetryCount >= capifMaxRetries {
				log.Error("CAPIF unpublish dead letter: service ", info.SerInstanceId, " (apiId=", info.ApiId, ") exceeded max retries. Last error: ", info.LastError)
				deleteCapifMapping(info.SerInstanceId)
				_ = rc.JSONDelEntry(key, ".")
			} else {
				log.Warn("CAPIF unpublish retry ", info.RetryCount, "/", capifMaxRetries, " failed for service ", info.SerInstanceId, ": ", err.Error())
				storePendingUnpublish(info)
			}
			return nil
		}
		deleteCapifMapping(info.SerInstanceId)
		_ = rc.JSONDelEntry(key, ".")
		log.Info("CAPIF unpublish retry succeeded for service ", info.SerInstanceId)
		return nil
	}, nil)
}

// UnpublishAllServices unpublishes all CAPIF-published services and clears their mappings.
// Called during scenario termination / service shutdown.
// RegisterCapifService registers the CAPIF Register API as a MEC service in MEC011.
@@ -1924,26 +2087,45 @@ func UnpublishAllServices() {
	if capifClient == nil || !capifClient.IsReady() {
		return
	}
	log.Info("Unpublishing all CAPIF services...")
	log.Info("Unpublishing all CAPIF services (concurrent)...")

	keyPattern := baseKey + "capif:svc:*"
	err := rc.ForEachJSONEntry(keyPattern, unpublishCapifEntry, nil)
	if err != nil {
		log.Error("Failed to iterate CAPIF mappings: ", err.Error())
	}
}
	ctx, cancel := context.WithTimeout(context.Background(), 25*time.Second)
	defer cancel()

func unpublishCapifEntry(key string, val string, _ interface{}) error {
	var wg sync.WaitGroup
	sem := make(chan struct{}, 10) // max 10 concurrent unpublish calls

	keyPattern := baseKey + "capif:svc:*"
	err := rc.ForEachJSONEntry(keyPattern, func(key, val string, _ interface{}) error {
		apiId := strings.Trim(val, "\"")
		if apiId == "" {
			return nil
		}
	if err := capifClient.UnpublishServiceAPI(apiId); err != nil {
		log.Error("CAPIF unpublish failed for apiId ", apiId, ": ", err.Error())
		// Stop launching new goroutines if shutdown deadline is reached
		select {
		case <-ctx.Done():
			log.Warn("CAPIF shutdown: timeout reached, skipping remaining unpublish operations")
			return fmt.Errorf("shutdown timeout exceeded")
		case sem <- struct{}{}:
		}
		wg.Add(1)
		go func(k, id string) {
			defer wg.Done()
			defer func() { <-sem }()
			if ctx.Err() != nil {
				return
			}
			if err := capifClient.UnpublishServiceAPI(id); err != nil {
				log.Error("CAPIF unpublish failed for apiId ", id, ": ", err.Error())
			} else {
		log.Info("CAPIF service unpublished: apiId=", apiId)
				log.Info("CAPIF service unpublished: apiId=", id)
			}
	// Delete the mapping key
	_ = rc.JSONDelEntry(key, ".")
			_ = rc.JSONDelEntry(k, ".")
		}(key, apiId)
		return nil
	}, nil)
	if err != nil && err.Error() != "shutdown timeout exceeded" {
		log.Error("Failed to iterate CAPIF mappings: ", err.Error())
	}
	wg.Wait()
}