Commit 6c40f4c8 authored by M. Hamza's avatar M. Hamza
Browse files

add v2xMsgPublication endpoint in meep-vis

parent fa5e3d1d
Loading
Loading
Loading
Loading
+1 −1
Original line number Diff line number Diff line
@@ -60,5 +60,5 @@ func SubGET(w http.ResponseWriter, r *http.Request) {
}

func V2xMessagePOST(w http.ResponseWriter, r *http.Request) {
	notImplemented(w, r)
	V2xMsgPublicationPOST(w, r)
}
+9 −0
Original line number Diff line number Diff line
@@ -48,3 +48,12 @@ func convertV2xMsgSubscriptionToJson(v2xMsgSubscription *V2xMsgSubscription) str
	}
	return string(jsonInfo)
}

func convertV2xMsgPublicationToJson(v2xMsgPublication *V2xMsgPublication) string {
	jsonInfo, err := json.Marshal(*v2xMsgPublication)
	if err != nil {
		log.Error(err.Error())
		return ""
	}
	return string(jsonInfo)
}
+163 −5
Original line number Diff line number Diff line
@@ -110,6 +110,16 @@ var subscriptionExpiryMap = map[int][]int{}
var mutex sync.Mutex
var expiryTicker *time.Ticker
var nextSubscriptionIdAvailable int
var nextV2xMsgPubIdAvailable int = 0

const v2xMsgNotifType = "V2xMsgNotification"

type msgTypeAndStdOrgCheck struct {
	msgTypeInReq           MsgType
	stdOrgInReq            string
	subscriptionLinks      []LinkType
	callBackReferenceArray []string
}

func notImplemented(w http.ResponseWriter, r *http.Request) {
	w.Header().Set("Content-Type", "application/json; charset=UTF-8")
@@ -731,6 +741,154 @@ func errHandlerProblemDetails(w http.ResponseWriter, error string, code int) {
	fmt.Fprint(w, jsonResponse)
}

func V2xMsgPublicationPOST(w http.ResponseWriter, r *http.Request) {

	log.Info("V2xMsgPublicationPOST")
	w.Header().Set("Content-Type", "application/json; charset=UTF-8")

	var v2xMsgPubReq V2xMsgPublication

	bodyBytes, _ := ioutil.ReadAll(r.Body)
	err := json.Unmarshal(bodyBytes, &v2xMsgPubReq)
	if err != nil {
		log.Error(err.Error())
		errHandlerProblemDetails(w, err.Error(), http.StatusInternalServerError)
		return
	}

	// Validating mandatory parameters provided in the request body
	if v2xMsgPubReq.StdOrganization == "" {
		log.Error("Mandatory StdOrganization parameter should be present")
		errHandlerProblemDetails(w, "Mandatory attribute StdOrganization is missing in the request body.", http.StatusBadRequest)
		return
	}

	if v2xMsgPubReq.MsgType == nil {
		log.Error("Mandatory MsgType parameter should be present")
		errHandlerProblemDetails(w, "Mandatory attribute MsgType is missing in the request body.", http.StatusBadRequest)
		return
	}

	if v2xMsgPubReq.MsgEncodeFormat == "" {
		log.Error("Mandatory MsgEncodeFormat parameter should be present")
		errHandlerProblemDetails(w, "Mandatory attribute MsgEncodeFormat is missing in the request body.", http.StatusBadRequest)
		return
	}

	if v2xMsgPubReq.MsgContent == "" {
		log.Error("Mandatory MsgContent parameter should be present")
		errHandlerProblemDetails(w, "Mandatory attribute MsgContent is missing in the request body.", http.StatusBadRequest)
		return
	}

	if *v2xMsgPubReq.MsgType < 1 || *v2xMsgPubReq.MsgType > 255 {
		log.Error("MsgType parameter should be between 1 and 255")
		errHandlerProblemDetails(w, "MsgType parameter should be between 1 and 255 in the request body.", http.StatusBadRequest)
		return
	} else if err != nil {
		log.Error("MsgType parameter should be between 1 and 255")
		errHandlerProblemDetails(w, "MsgType parameter should be between 1 and 255 in the request body.", http.StatusBadRequest)
		return
	}

	// V2xMsgPub id will be generated sequentially
	nextV2xMsgPubIdAvailable++
	newV2xMsgPubId := nextV2xMsgPubIdAvailable
	v2xMsgPubIdStr := strconv.Itoa(newV2xMsgPubId)

	// Save V2xMsgPublication key in the Redis DB
	_ = rc.JSONSetEntry(baseKey+"V2xMsgPublication:"+v2xMsgPubIdStr, ".", convertV2xMsgPublicationToJson(&v2xMsgPubReq))

	subKey := baseKey + "subscriptions:*"
	//v2xMsgCheck
	msgTypeAndStdOrg := &msgTypeAndStdOrgCheck{
		msgTypeInReq:           *v2xMsgPubReq.MsgType,
		stdOrgInReq:            v2xMsgPubReq.StdOrganization,
		callBackReferenceArray: make([]string, 0),
		subscriptionLinks:      make([]LinkType, 0),
	}

	_ = rc.ForEachJSONEntry(subKey, compareMsgType, msgTypeAndStdOrg)

	w.WriteHeader(http.StatusNoContent)

	// create V2xMsgNotification body to send notifications
	v2xMsgNotiBody(v2xMsgPubReq, msgTypeAndStdOrg)
}

func v2xMsgNotiBody(v2xMsgPubReq V2xMsgPublication, msgTypeAndStdOrg *msgTypeAndStdOrgCheck) {

	var v2xMsgNotification V2xMsgNotification

	v2xMsgNotification.NotificationType = v2xMsgNotifType
	seconds := time.Now().Unix()
	nanoseconds := time.Now().UnixNano()
	v2xMsgNotification.TimeStamp = &TimeStamp{
		NanoSeconds: int32(nanoseconds),
		Seconds:     int32(seconds),
	}
	v2xMsgNotification.StdOrganization = msgTypeAndStdOrg.stdOrgInReq
	v2xMsgNotification.MsgType = &msgTypeAndStdOrg.msgTypeInReq
	v2xMsgNotification.MsgEncodeFormat = v2xMsgPubReq.MsgEncodeFormat
	v2xMsgNotification.MsgContent = v2xMsgPubReq.MsgContent

	for i, subLink := range msgTypeAndStdOrg.subscriptionLinks {
		v2xMsgNotification.Links.Subscription = &subLink
		notifyUrl := msgTypeAndStdOrg.callBackReferenceArray[i]
		sendV2xMsgNotification(notifyUrl, v2xMsgNotification)
	}
}

func sendV2xMsgNotification(notifyUrl string, notification V2xMsgNotification) {
	startTime := time.Now()
	jsonNotif, err := json.Marshal(notification)
	if err != nil {
		log.Error(err)
		return
	}

	resp, err := http.Post(notifyUrl, "application/json", bytes.NewBuffer(jsonNotif))
	duration := float64(time.Since(startTime).Microseconds()) / 1000.0
	_ = httpLog.LogNotification(notifyUrl, "POST", "", "", string(jsonNotif), resp, startTime)
	if err != nil {
		log.Error(err)
		met.ObserveNotification(sandboxName, serviceName, v2xMsgNotifType, notifyUrl, nil, duration)
		return
	}
	met.ObserveNotification(sandboxName, serviceName, v2xMsgNotifType, notifyUrl, resp, duration)
	defer resp.Body.Close()
}

func compareMsgType(subKey string, jsonInfo string, msgTypeAndStdOrg interface{}) error {

	data := msgTypeAndStdOrg.(*msgTypeAndStdOrgCheck)
	if data == nil {
		return errors.New("msgTypeAndStdOrg not found")
	}

	// Retrieve V2xMsgSubscription from DB
	var v2xMsgSub V2xMsgSubscription
	err := json.Unmarshal([]byte(jsonInfo), &v2xMsgSub)
	if err != nil {
		return err
	}

	// compare StdOrganization and MsgType provided in the V2xMsgPublication with the
	// StdOrganization and MsgType of the stored subscriptions
	if v2xMsgSub.FilterCriteria.StdOrganization == data.stdOrgInReq {
		for _, msgType := range v2xMsgSub.FilterCriteria.MsgType {
			msgTypeInt, _ := strconv.Atoi(msgType)
			if int32(msgTypeInt) == int32(data.msgTypeInReq) {
				// append CallbackReference and Links into the list of array to send Notifications on the CallbackReferences
				data.callBackReferenceArray = append(data.callBackReferenceArray, v2xMsgSub.CallbackReference)
				data.subscriptionLinks = append(data.subscriptionLinks, *v2xMsgSub.Links.Self)
			}
		}
	}

	return nil
}

func subscriptionsPost(w http.ResponseWriter, r *http.Request) {

	log.Info("subPost")
@@ -804,13 +962,13 @@ func subscriptionsPost(w http.ResponseWriter, r *http.Request) {

		for _, msgTypeString := range v2xSubscription.FilterCriteria.MsgType {
			msgTypeInt, err := strconv.Atoi(msgTypeString)
			if msgTypeInt < 1 || msgTypeInt > 255 {
				log.Error("MsgType parameter should be between 1 and 255")
				errHandlerProblemDetails(w, "MsgType parameter should be between 1 and 255 in the request body.", http.StatusBadRequest)
			if msgTypeInt < 1 || msgTypeInt > 13 {
				log.Error("MsgType parameter should be between 1 and 13")
				errHandlerProblemDetails(w, "MsgType parameter should be between 1 and 13 in the request body.", http.StatusBadRequest)
				return
			} else if err != nil {
				log.Error("MsgType parameter should be between 1 and 255")
				errHandlerProblemDetails(w, "MsgType parameter should be between 1 and 255 in the request body.", http.StatusBadRequest)
				log.Error("MsgType parameter should be between 1 and 13")
				errHandlerProblemDetails(w, "MsgType parameter should be between 1 and 13 in the request body.", http.StatusBadRequest)
				return
			}
		}