Commit 43994d88 authored by M. Hamza's avatar M. Hamza
Browse files

add comments in vis.go in meep-vis

parent 5c490713
Loading
Loading
Loading
Loading
+87 −22
Original line number Diff line number Diff line
@@ -110,6 +110,7 @@ var v2xMsgSubscriptionMap = map[int]*V2xMsgSubscription{}

// var provChgUuUniSubscriptionMap = map[int]*ProvChgUuUniSubscription{}
var subscriptionExpiryMap = map[int][]int{}

var mutex sync.Mutex
var expiryTicker *time.Ticker
var nextSubscriptionIdAvailable int
@@ -234,6 +235,11 @@ func isSubscriptionIdRegisteredV2x(subsIdStr string) bool {
	return returnVal
}

/*
* registerV2x to register new v2xMsgSubscription
* @param {struct} v2xMsgSubscription contains request body send to /subscriptions endpoint
* @param {string} subsIdStr contains an Id to uniquely subscription
 */
func registerV2x(v2xMsgSubscription *V2xMsgSubscription, subsIdStr string) {
	subsId, _ := strconv.Atoi(subsIdStr)
	mutex.Lock()
@@ -773,14 +779,16 @@ func errHandlerProblemDetails(w http.ResponseWriter, error string, code int) {
	fmt.Fprint(w, jsonResponse)
}

// V2xMsgPublicationPOST is to create at V2xMsgPublication /publish_v2x_message endpoint
func V2xMsgPublicationPOST(w http.ResponseWriter, r *http.Request) {

	log.Info("V2xMsgPublicationPOST")
	w.Header().Set("Content-Type", "application/json; charset=UTF-8")

	var v2xMsgPubReq V2xMsgPublication

	// Read JSON input stream provided in the Request, and stores it in the bodyBytes as bytes
	bodyBytes, _ := ioutil.ReadAll(r.Body)
	// Unmarshal function to converts a JSON-formatted string into a V2xMsgPublication struct and store it in v2xMsgPubReq
	err := json.Unmarshal(bodyBytes, &v2xMsgPubReq)
	if err != nil {
		log.Error(err.Error())
@@ -828,26 +836,38 @@ func V2xMsgPublicationPOST(w http.ResponseWriter, r *http.Request) {
	_ = rc.JSONSetEntry(baseKey+"V2xMsgPublication:"+v2xMsgPubIdStr, ".", convertV2xMsgPublicationToJson(&v2xMsgPubReq))

	subKey := baseKey + "subscriptions:*"
	//v2xMsgCheck

	// msgTypeAndStdOrgCheck struct is passed to msgTypeAndStdOrg as a pointer
	// msgTypeAndStdOrgCheck is a struct which has four fields
	// msgTypeInReq is to get msgType provided in request body
	// stdOrgInReq is to get stdOrganization provided in request body
	// callBackReferenceArray is an empty array of the strings
	// subscriptionLinks is an empty array of the LinkType struct
	msgTypeAndStdOrg := &msgTypeAndStdOrgCheck{
		msgTypeInReq:           *v2xMsgPubReq.MsgType,
		stdOrgInReq:            v2xMsgPubReq.StdOrganization,
		callBackReferenceArray: make([]string, 0),
		subscriptionLinks:      make([]LinkType, 0),
	}

	// Retrieve subscription(s) from redis DB one by one and store in the mtsSessionInfoList array
	_ = rc.ForEachJSONEntry(subKey, compareMsgType, msgTypeAndStdOrg)

	// end response code
	w.WriteHeader(http.StatusNoContent)

	// create V2xMsgNotification body to send notifications
	v2xMsgNotifBody(v2xMsgPubReq, msgTypeAndStdOrg)
}

/*
* v2xMsgNotifBody creates ntofication body of type V2xMsgNotification
* @param {struct} v2xMsgPubReq is the request body send to /publish_v2x_message endpoint in post request
* @param {struct} msgTypeAndStdOrg contains array of _links and callbackReferences of stored subscriptions
 */
func v2xMsgNotifBody(v2xMsgPubReq V2xMsgPublication, msgTypeAndStdOrg *msgTypeAndStdOrgCheck) {

	var v2xMsgNotification V2xMsgNotification

	// make notification body of type V2xMsgNotification
	v2xMsgNotification.NotificationType = v2xMsgNotifType
	seconds := time.Now().Unix()
	nanoseconds := time.Now().UnixNano()
@@ -860,17 +880,25 @@ func v2xMsgNotifBody(v2xMsgPubReq V2xMsgPublication, msgTypeAndStdOrg *msgTypeAn
	v2xMsgNotification.MsgEncodeFormat = v2xMsgPubReq.MsgEncodeFormat
	v2xMsgNotification.MsgContent = v2xMsgPubReq.MsgContent

	// to send notifications one by one to subscribed service consumers based on the callbackReferences array
	for i, subLink := range msgTypeAndStdOrg.subscriptionLinks {
		// save _links in the v2xMsgNotification struct
		v2xMsgNotification.Links = &V2xMsgNotificationLinks{
			Subscription: &LinkType{
				Href: subLink.Href,
			},
		}
		notifyUrl := msgTypeAndStdOrg.callBackReferenceArray[i]
		// sends notifcations to subscribed service consumers
		sendV2xMsgNotification(notifyUrl, v2xMsgNotification)
	}
}

/*
* sendV2xMsgNotification sends notification to the call reference address
* @param {string} notifyUrl contains the call reference address
* @param {struct} notification contains notification body of type V2xMsgNotification
 */
func sendV2xMsgNotification(notifyUrl string, notification V2xMsgNotification) {
	startTime := time.Now()
	jsonNotif, err := json.Marshal(notification)
@@ -891,6 +919,13 @@ func sendV2xMsgNotification(notifyUrl string, notification V2xMsgNotification) {
	defer resp.Body.Close()
}

/*
* compareMsgType compares the provided stdOrganization and msgType with the subscriptions stored in redis DB
* @param {string} subKey to retrieve subscriptions with this key
* @param {string} jsonInfo is the single subscription retrieve from redisDB
* @param {struct} msgTypeAndStdOrg struct pass to this func as msgTypeAndStdOrg struct
* @return {String} error error message
 */
func compareMsgType(subKey string, jsonInfo string, msgTypeAndStdOrg interface{}) error {

	data := msgTypeAndStdOrg.(*msgTypeAndStdOrgCheck)
@@ -921,41 +956,45 @@ func compareMsgType(subKey string, jsonInfo string, msgTypeAndStdOrg interface{}
	return nil
}

// subscriptionsPost is to create subscription at /subscriptions endpoint
func subscriptionsPost(w http.ResponseWriter, r *http.Request) {

	log.Info("subPost")
	w.Header().Set("Content-Type", "application/json; charset=UTF-8")

	var extractSubType SubscriptionCommon

	var subscriptionCommon SubscriptionCommon
	// Read JSON input stream provided in the Request, and stores it in the bodyBytes as bytes
	bodyBytes, _ := ioutil.ReadAll(r.Body)

	err := json.Unmarshal(bodyBytes, &extractSubType)
	// Unmarshal function to converts a JSON-formatted string into a SubscriptionCommon struct and store it in extractSubType
	err := json.Unmarshal(bodyBytes, &subscriptionCommon)
	if err != nil {
		log.Error(err.Error())
		errHandlerProblemDetails(w, err.Error(), http.StatusInternalServerError)
		return
	}

	if extractSubType.SubscriptionType == "" {
	// Validating mandatory parameters provided in the request body
	if subscriptionCommon.SubscriptionType == "" {
		log.Error("Mandatory SubscriptionType parameter should be present")
		errHandlerProblemDetails(w, "Mandatory attribute SubscriptionType is missing in the request body.", http.StatusBadRequest)
		return
	}

	if extractSubType.CallbackReference == "" && extractSubType.WebsockNotifConfig == nil {
	if subscriptionCommon.CallbackReference == "" && subscriptionCommon.WebsockNotifConfig == nil {
		log.Error("At least one of CallbackReference and WebsockNotifConfig parameters should be present")
		errHandlerProblemDetails(w, "At least one of CallbackReference and WebsockNotifConfig parameters should be present.", http.StatusBadRequest)
		return
	}

	//extract subscription type
	subscriptionType := extractSubType.SubscriptionType
	subscriptionType := subscriptionCommon.SubscriptionType

	//new subscription id
	// subscriptionId will be generated sequentially
	newSubsId := nextSubscriptionIdAvailable
	nextSubscriptionIdAvailable++
	subsIdStr := strconv.Itoa(newSubsId)

	// create a unique link for every subscription and concatenate subscription to it
	link := new(Links)
	self := new(LinkType)
	self.Href = hostUrl.String() + basePath + "subscriptions/" + subsIdStr
@@ -963,7 +1002,9 @@ func subscriptionsPost(w http.ResponseWriter, r *http.Request) {

	var jsonResponse string

	// switch statement is based on provided subscriptionType in the request body
	switch subscriptionType {
	// if subscription is of type V2xMsgSubscription
	case V2X_MSG:

		var v2xSubscription V2xMsgSubscription
@@ -994,6 +1035,10 @@ func subscriptionsPost(w http.ResponseWriter, r *http.Request) {
			return
		}

		if v2xSubscription.WebsockNotifConfig != nil {
			v2xSubscription.WebsockNotifConfig = subscriptionCommon.WebsockNotifConfig
		}

		for _, msgTypeString := range v2xSubscription.FilterCriteria.MsgType {
			msgTypeInt, err := strconv.Atoi(msgTypeString)
			if msgTypeInt < 1 || msgTypeInt > 13 {
@@ -1008,14 +1053,16 @@ func subscriptionsPost(w http.ResponseWriter, r *http.Request) {
		}

		v2xSubscription.Links = link
		v2xSubscription.CallbackReference = extractSubType.CallbackReference
		v2xSubscription.CallbackReference = subscriptionCommon.CallbackReference

		registerV2xSub(&v2xSubscription, subsIdStr)

		// Store subscription key in redis
		_ = rc.JSONSetEntry(baseKey+"subscriptions:"+subsIdStr, ".", convertV2xMsgSubscriptionToJson(&v2xSubscription))

		jsonResponse = convertV2xMsgSubscriptionToJson(&v2xSubscription)

	// if subscription is of type ProvChgUuUniSubscription
	case PROV_CHG_UU_UNI:
		//TODO

@@ -1024,6 +1071,7 @@ func subscriptionsPost(w http.ResponseWriter, r *http.Request) {
		return
	}

	// Prepare & send response
	w.WriteHeader(http.StatusCreated)
	fmt.Fprint(w, jsonResponse)
}
@@ -1166,6 +1214,9 @@ func registerV2xSub(v2xMsgSubscription *V2xMsgSubscription, subId string) {
	log.Info("New registration: ", subsId, " type: ", v2xSubscriptionType)
}

/*
* checkForExpiredSubscriptions delete those subscriptions whose expiryTime is reached
 */
func checkForExpiredSubscriptions() {

	nowTime := int(time.Now().Unix())
@@ -1199,7 +1250,6 @@ func checkForExpiredSubscriptions() {

				notif.TimeStamp = &timeStamp
				notif.ExpiryDeadline = &expiryTimeStamp

				sendExpiryNotification(link.Subscription.Href, notif)
				_ = delSubscription(baseKey, subsIdStr, true)
			}
@@ -1207,6 +1257,11 @@ func checkForExpiredSubscriptions() {
	}
}

/*
* sendExpiryNotification send expiry notification to the the corresponding callback reference address
* @param {string} notifyUrl contains callback reference address of service consumer
* @param {struct} notification struct of type ExpiryNotification
 */
func sendExpiryNotification(notifyUrl string, notification ExpiryNotification) {
	startTime := time.Now()
	jsonNotif, err := json.Marshal(notification)
@@ -1226,6 +1281,9 @@ func sendExpiryNotification(notifyUrl string, notification ExpiryNotification) {
	defer resp.Body.Close()
}

/*
* delSubscription delete expired subscriptions from redis DB
 */
func delSubscription(keyPrefix string, subsId string, mutexTaken bool) error {

	err := rc.JSONDelEntry(keyPrefix+":"+subsId, ".")
@@ -1334,8 +1392,8 @@ func individualSubscriptionPut(w http.ResponseWriter, r *http.Request) {

	switch subscriptionType {
	case V2X_MSG:
		var subscription V2xMsgSubscription
		err = json.Unmarshal(bodyBytes, &subscription)
		var v2xSubscription V2xMsgSubscription
		err = json.Unmarshal(bodyBytes, &v2xSubscription)
		if err != nil {
			log.Error(err.Error())
			errHandlerProblemDetails(w, err.Error(), http.StatusInternalServerError)
@@ -1350,13 +1408,17 @@ func individualSubscriptionPut(w http.ResponseWriter, r *http.Request) {
			return
		}

		if subscription.FilterCriteria.StdOrganization == "" {
		if v2xSubscription.FilterCriteria.StdOrganization == "" {
			log.Error("Mandatory StdOrganization parameter should be present")
			errHandlerProblemDetails(w, "Mandatory attribute StdOrganization is missing in the request body.", http.StatusBadRequest)
			return
		}

		for _, msgTypeString := range subscription.FilterCriteria.MsgType {
		if v2xSubscription.WebsockNotifConfig != nil {
			v2xSubscription.WebsockNotifConfig = subscriptionCommon.WebsockNotifConfig
		}

		for _, msgTypeString := range v2xSubscription.FilterCriteria.MsgType {
			msgTypeInt, err := strconv.Atoi(msgTypeString)
			if msgTypeInt < 1 || msgTypeInt > 13 {
				log.Error("MsgType parameter should be between 1 and 13")
@@ -1371,13 +1433,16 @@ func individualSubscriptionPut(w http.ResponseWriter, r *http.Request) {

		//registration
		if isSubscriptionIdRegisteredV2x(subsIdStr) {
			registerV2x(&subscription, subsIdStr)
			_ = rc.JSONSetEntry(baseKey+"subscriptions:"+subsIdStr, ".", convertV2xMsgSubscriptionToJson(&subscription))
			registerV2x(&v2xSubscription, subsIdStr)
			_ = rc.JSONSetEntry(baseKey+"subscriptions:"+subsIdStr, ".", convertV2xMsgSubscriptionToJson(&v2xSubscription))
			alreadyRegistered = true
			jsonResponse, err = json.Marshal(subscription)
			jsonResponse, err = json.Marshal(v2xSubscription)
		}
	case PROV_CHG_UU_UNI:
		//TODO

	default:
		w.WriteHeader(http.StatusBadRequest)
		log.Error("Unsupported subscriptionType")
		return
	}