Commit 46f96755 authored by Yann Garcia's avatar Yann Garcia
Browse files

Add Liveness support (message part)

parent d0dfb58a
Loading
Loading
Loading
Loading
+10 −1
Original line number Diff line number Diff line
@@ -87,7 +87,7 @@ func convertSubscriptionLinkListToJson(obj *SubscriptionLinkList) string {
	return string(jsonInfo)
}

func convertMecServiceMgmtApiSubscriptionLinkListToJson(obj *MecServiceMgmtApiSubscriptionLinkList) string {
func convertServiceLivenessInfoToJson(obj *ServiceLivenessInfo) string {
	jsonInfo, err := json.Marshal(*obj)
	if err != nil {
		log.Error(err.Error())
@@ -96,6 +96,15 @@ func convertMecServiceMgmtApiSubscriptionLinkListToJson(obj *MecServiceMgmtApiSu
	return string(jsonInfo)
}

// func convertMecServiceMgmtApiSubscriptionLinkListToJson(obj *MecServiceMgmtApiSubscriptionLinkList) string {
// 	jsonInfo, err := json.Marshal(*obj)
// 	if err != nil {
// 		log.Error(err.Error())
// 		return ""
// 	}
// 	return string(jsonInfo)
// }

func convertProblemDetailstoJson(probdetails *ProblemDetails) string {
	jsonInfo, err := json.Marshal(*probdetails)
	if err != nil {
+4 −4
Original line number Diff line number Diff line
@@ -13,8 +13,8 @@ package server
type ServiceState string

// List of ServiceState
const (
	ACTIVE    ServiceState = "ACTIVE"
	INACTIVE  ServiceState = "INACTIVE"
	SUSPENDED ServiceState = "SUSPENDED"
var ( // FSCOM Change manually
	ACTIVE_ServiceState    ServiceState = "ACTIVE"
	INACTIVE_ServiceState  ServiceState = "INACTIVE"
	SUSPENDED_ServiceState ServiceState = "SUSPENDED"
)
+141 −39
Original line number Diff line number Diff line
@@ -77,10 +77,6 @@ type ServiceInfoList struct {
	Filters                  *FilterParameters
}

type LivenessTimer struct {
	ticker *time.Ticker
}

type FilterParameters struct {
	serInstanceId     []string
	serName           []string
@@ -95,7 +91,7 @@ type StateData struct {
	AppId string
}

var livenessTimerList map[string]LivenessTimer
var livenessTimerList map[string]ServiceLivenessInfo

func Init(sandbox string, mep string, host *url.URL, msgQueue *mq.MsgQueue, redisAddr_ string, globalMutex *sync.Mutex) (err error) {
	redisAddr = redisAddr_
@@ -144,7 +140,7 @@ func Init(sandbox string, mep string, host *url.URL, msgQueue *mq.MsgQueue, redi
	}
	log.Info("Created Subscription Manager")

	livenessTimerList = make(map[string]LivenessTimer)
	livenessTimerList = make(map[string]ServiceLivenessInfo)

	// TODO -- Initialize subscriptions from DB

@@ -169,19 +165,37 @@ func Run() (err error) {
func Stop() (err error) {

	if len(livenessTimerList) != 0 {
		log.Info("Stop all Liveness timers")
		for _, livenessTimer := range livenessTimerList {
			if livenessTimer.ticker != nil {
				livenessTimer.ticker.Stop()
			}
			livenessTimer.ticker = nil
		} // End of 'for' statement
		livenessTimerList = make(map[string]LivenessTimer)
		livenessTimerList = make(map[string]ServiceLivenessInfo)
	}

	return nil
}

func createLivenessTicker(sInfo ServiceInfo) {
	log.Info(">>> createLivenessTicker: ", sInfo)

	livenessTimerList[sInfo.SerInstanceId] = ServiceLivenessInfo{
		State:     &ACTIVE_ServiceState,
		TimeStamp: &ServiceLivenessInfoTimeStamp{Seconds: 0, NanoSeconds: 0},
		Interval:  sInfo.LivenessInterval,
	}
}

func updateLivenessTicker(sInfo ServiceInfo) {
	log.Info(">>> updateLivenessTicker: ", sInfo)

	if sInfo.LivenessInterval != livenessTimerList[sInfo.SerInstanceId].Interval {
		deleteLivenessTicker(sInfo.SerInstanceId)
		createLivenessTicker(sInfo)
	}
}

func deleteLivenessTicker(serInstanceId string) {
	log.Info(">>> deleteLivenessTicker: ", serInstanceId)

	delete(livenessTimerList, serInstanceId)
}

// Message Queue handler
func msgHandler(msg *mq.Msg, userData interface{}) {
	switch msg.Message {
@@ -196,8 +210,9 @@ func msgHandler(msg *mq.Msg, userData interface{}) {
}

func appServicesPOST(w http.ResponseWriter, r *http.Request) {
	w.Header().Set("Content-Type", "application/json; charset=UTF-8")
	log.Info("appServicesPOST")

	w.Header().Set("Content-Type", "application/json; charset=UTF-8")
	vars := mux.Vars(r)
	appId := vars["appInstanceId"]

@@ -326,6 +341,11 @@ func appServicesPOST(w http.ResponseWriter, r *http.Request) {
			Href: hostUrl.String() + basePath + "applications/" + appId + "/services/" + sInfo.SerInstanceId,
		},
	}
	if sInfo.LivenessInterval != 0 {
		sInfo.Links.Liveness = &LinkType{
			Href: hostUrl.String() + basePath + "resource_uri_allocated_by_MEC_platform/" + sInfo.SerInstanceId,
		}
	}

	err, retCode := setService(appId, sInfo, ADDED_ServiceAvailabilityNotificationChangeType)
	if err != nil {
@@ -424,6 +444,22 @@ func appServicesByIdPUT(w http.ResponseWriter, r *http.Request) {
		}
	}

	// Compare LivenessInterval
	if sInfo.LivenessInterval != sInfoPrev.LivenessInterval {
		if _, ok := livenessTimerList[sInfo.SerInstanceId]; ok { // An entry already exist
			if sInfo.LivenessInterval != 0 { // update it
				updateLivenessTicker(sInfo)
			} else {
				deleteLivenessTicker(sInfo.SerInstanceId)
			}
		} else { // No entry
			if sInfo.LivenessInterval != 0 { // Create a new entry
				createLivenessTicker(sInfo)
			}
		}
	} // else, nothing to do
	sInfo.LivenessInterval = sInfoPrev.LivenessInterval

	// Send response
	w.WriteHeader(http.StatusOK)
	fmt.Fprint(w, convertServiceInfoToJson(&sInfo))
@@ -730,26 +766,9 @@ func applicationsSubscriptionGET(w http.ResponseWriter, r *http.Request) {
		return
	}

	// Create subscription link list
	subscriptionLinkList := &MecServiceMgmtApiSubscriptionLinkList{
		Links: &MecServiceMgmtApiSubscriptionLinkListLinks{
			Self: &LinkType{
				Href: hostUrl.String() + basePath + "applications/" + appId + "/subscriptions",
			},
		},
	}

	// Create subscription reference & append it to link list
	subscription := MecServiceMgmtApiSubscriptionLinkListSubscription{
		Rel:  SER_AVAILABILITY_NOTIF_SUB_TYPE,
		Href: sub.Cfg.Self,
	}
	subscriptionLinkList.Links.Subscriptions = append(subscriptionLinkList.Links.Subscriptions, subscription)

	// Send response
	// Return original marshalled subscription
	w.WriteHeader(http.StatusOK)
	fmt.Fprint(w, convertMecServiceMgmtApiSubscriptionLinkListToJson(subscriptionLinkList))

	fmt.Fprintf(w, sub.JsonSubOrig)
}

func applicationsSubscriptionDELETE(w http.ResponseWriter, r *http.Request) {
@@ -874,14 +893,83 @@ func getIndividualMECService(w http.ResponseWriter, r *http.Request) {
	log.Info("getIndividualMECService")

	w.Header().Set("Content-Type", "application/json; charset=UTF-8")

	vars := mux.Vars(r)
	serInstanceId := vars["serInstanceId"]

	mutex.Lock()
	defer mutex.Unlock()

	if serInstanceId == "" {
		err := errors.New("wrong request parameters")
		errHandlerProblemDetails(w, err.Error(), http.StatusBadRequest)
		return
	}
	log.Info("getIndividualMECService: ", serInstanceId)

	if entry, ok := livenessTimerList[serInstanceId]; !ok {
		err := errors.New("Invalid Service instance ID")
		errHandlerProblemDetails(w, err.Error(), http.StatusNotFound)
		return
	} else {
		entry.TimeStamp = &ServiceLivenessInfoTimeStamp{
			Seconds: int32(time.Now().Unix()),
		}
		fmt.Fprint(w, convertServiceLivenessInfoToJson(&entry))
		livenessTimerList[serInstanceId] = entry
	}

	// Send response
	w.WriteHeader(http.StatusOK)
}

func patchIndividualMECService(w http.ResponseWriter, r *http.Request) {
	log.Info("patchIndividualMECService")

	w.Header().Set("Content-Type", "application/json; charset=UTF-8")
	// FIXME FSCOM TODO

	vars := mux.Vars(r)
	serInstanceId := vars["serInstanceId"]

	mutex.Lock()
	defer mutex.Unlock()

	if serInstanceId == "" {
		err := errors.New("wrong request parameters")
		errHandlerProblemDetails(w, err.Error(), http.StatusBadRequest)
		return
	}
	log.Info("patchIndividualMECService: ", serInstanceId)

	if entry, ok := livenessTimerList[serInstanceId]; !ok {
		err := errors.New("Invalid Service instance ID")
		errHandlerProblemDetails(w, err.Error(), http.StatusNotFound)
		return
	} else {
		// Retrieve request
		var serviceLivenessUpdate ServiceLivenessUpdate
		decoder := json.NewDecoder(r.Body)
		err := decoder.Decode(&serviceLivenessUpdate)
		if err != nil {
			log.Error(err.Error())
			errHandlerProblemDetails(w, err.Error(), http.StatusInternalServerError)
			return
		}
		if *serviceLivenessUpdate.State == ACTIVE_ServiceState {
			entry.State = &ACTIVE_ServiceState
		} else { // ETSI GS MEC 011 V3.2.1 (2024-04) Table 8.1.2.5-1: Attributes of ServiceLivenessUpdate
			err := errors.New("Wrong body content")
			errHandlerProblemDetails(w, err.Error(), http.StatusBadRequest)
			return
		}
		entry.TimeStamp = &ServiceLivenessInfoTimeStamp{
			Seconds: 0,
		}
		fmt.Fprint(w, convertServiceLivenessInfoToJson(&entry))
		livenessTimerList[serInstanceId] = entry
	}

	// Send response
	w.WriteHeader(http.StatusOK)
}

@@ -1008,6 +1096,12 @@ func delServiceById(appId string, svcId string) error {
	if err != nil {
		return err
	}

	// Delete Liveness timer is any
	if _, ok := livenessTimerList[svcId]; ok {
		deleteLivenessTicker(svcId)
	}

	return nil
}

@@ -1028,10 +1122,18 @@ func setService(appId string, sInfo *ServiceInfo, changeType ServiceAvailability
	// Send local service availability notifications
	checkSerAvailNotification(sInfo, mepName, changeType)

	// FIXME FSCOM
	// if sInfo.LivenessTimerList == -1 {
	// 	livenessTimerList = append(LivenessTimer{sInfo.serInstanceId, time.NewTicker(sInfo.LivenessTimerList * time.Second) })
	// }
	// Set Liveness mechanism if required
	if sInfo.LivenessInterval == 0 { // Liveness interval was ommitted
		if _, ok := livenessTimerList[sInfo.SerInstanceId]; ok {
			deleteLivenessTicker(sInfo.SerInstanceId)
		}
	} else { // Liveness interval was set
		if _, ok := livenessTimerList[sInfo.SerInstanceId]; ok { // An entry already exist, update it
			updateLivenessTicker(*sInfo)
		} else { // Create new entry
			createLivenessTicker(*sInfo)
		}
	}

	return nil, http.StatusOK
}