Commit aab1933c authored by Yann Garcia's avatar Yann Garcia
Browse files

Resolve conflict

parents 9ed3252d d7e9202d
Loading
Loading
Loading
Loading
+1 −1
Original line number Diff line number Diff line
@@ -28,5 +28,5 @@ type SubscriptionLinkListLinksSubscriptions struct {
	// The URI referring to the subscription.
	Href string `json:"href"`
	// Type of the subscription. The values are as defined in the \"subscriptionType\" attribute for each different V2X information event subscription data type.
	SubscriptionType string `json:"subscriptionType,omitempty"`
	SubscriptionType string `json:"subscriptionType"`
}
+111 −29
Original line number Diff line number Diff line
@@ -114,6 +114,7 @@ var v2xMsgSubscriptionMap = map[int]*V2xMsgSubscription{}

// var provChgUuUniSubscriptionMap = map[int]*ProvChgUuUniSubscription{}
var subscriptionExpiryMap = map[int][]int{}

var mutex sync.Mutex
var expiryTicker *time.Ticker
var nextSubscriptionIdAvailable int
@@ -239,6 +240,11 @@ func isSubscriptionIdRegisteredV2x(subsIdStr string) bool {
	return returnVal
}

/*
* registerV2x to register new v2xMsgSubscription
* @param {struct} v2xMsgSubscription contains request body send to /subscriptions endpoint
* @param {string} subsIdStr contains an Id to uniquely subscription
 */
func registerV2x(v2xMsgSubscription *V2xMsgSubscription, subsIdStr string) {
	subsId, _ := strconv.Atoi(subsIdStr)
	mutex.Lock()
@@ -795,14 +801,16 @@ func errHandlerProblemDetails(w http.ResponseWriter, error string, code int) {
	fmt.Fprint(w, jsonResponse)
}

// V2xMsgPublicationPOST is to create at V2xMsgPublication /publish_v2x_message endpoint
func V2xMsgPublicationPOST(w http.ResponseWriter, r *http.Request) {

	log.Info("V2xMsgPublicationPOST: ", r)
	w.Header().Set("Content-Type", "application/json; charset=UTF-8")

	var v2xMsgPubReq V2xMsgPublication

	// Read JSON input stream provided in the Request, and stores it in the bodyBytes as bytes
	bodyBytes, _ := ioutil.ReadAll(r.Body)
	// Unmarshal function to converts a JSON-formatted string into a V2xMsgPublication struct and store it in v2xMsgPubReq
	err := json.Unmarshal(bodyBytes, &v2xMsgPubReq)
	if err != nil {
		log.Error(err.Error())
@@ -863,16 +871,22 @@ func V2xMsgPublicationPOST(w http.ResponseWriter, r *http.Request) {
	_ = rc.JSONSetEntry(baseKey+"V2xMsgPublication:"+v2xMsgPubIdStr, ".", convertV2xMsgPublicationToJson(&v2xMsgPubReq))

	subKey := baseKey + "subscriptions:*"
	//v2xMsgCheck

	// msgTypeAndStdOrgCheck struct is passed to msgTypeAndStdOrg as a pointer
	// msgTypeAndStdOrgCheck is a struct which has four fields
	// msgTypeInReq is to get msgType provided in request body
	// stdOrgInReq is to get stdOrganization provided in request body
	// callBackReferenceArray is an empty array of the strings
	// subscriptionLinks is an empty array of the LinkType struct
	msgTypeAndStdOrg := &msgTypeAndStdOrgCheck{
		msgTypeInReq:           *v2xMsgPubReq.MsgType,
		stdOrgInReq:            v2xMsgPubReq.StdOrganization,
		callBackReferenceArray: make([]string, 0),
		subscriptionLinks:      make([]LinkType, 0),
	}

	// Retrieve subscription(s) from redis DB one by one
	_ = rc.ForEachJSONEntry(subKey, compareMsgType, msgTypeAndStdOrg)

	// end response code
	w.WriteHeader(http.StatusNoContent)

	// create V2xMsgNotification body to send notifications
@@ -880,10 +894,16 @@ func V2xMsgPublicationPOST(w http.ResponseWriter, r *http.Request) {
	***/
}

/*
* v2xMsgNotifBody creates ntofication body of type V2xMsgNotification
* @param {struct} v2xMsgPubReq is the request body send to /publish_v2x_message endpoint in post request
* @param {struct} msgTypeAndStdOrg contains array of _links and callbackReferences of stored subscriptions
 */
// func v2xMsgNotifBody(v2xMsgPubReq V2xMsgPublication, msgTypeAndStdOrg *msgTypeAndStdOrgCheck) {

// 	var v2xMsgNotification V2xMsgNotification

// 	// make notification body of type V2xMsgNotification
// 	v2xMsgNotification.NotificationType = v2xMsgNotifType
// 	seconds := time.Now().Unix()
// 	nanoseconds := time.Now().UnixNano()
@@ -907,6 +927,11 @@ func V2xMsgPublicationPOST(w http.ResponseWriter, r *http.Request) {
// 	}
// }

/*
* sendV2xMsgNotification sends notification to the call reference address
* @param {string} notifyUrl contains the call reference address
* @param {struct} notification contains notification body of type V2xMsgNotification
 */
func sendV2xMsgNotification(notifyUrl string, notification V2xMsgNotification) {
	startTime := time.Now()
	jsonNotif, err := json.Marshal(notification)
@@ -959,41 +984,45 @@ func sendV2xMsgNotification(notifyUrl string, notification V2xMsgNotification) {
// 	return nil
// }

// subscriptionsPost is to create subscription at /subscriptions endpoint
func subscriptionsPost(w http.ResponseWriter, r *http.Request) {

	log.Info("subPost")
	w.Header().Set("Content-Type", "application/json; charset=UTF-8")

	var extractSubType SubscriptionCommon

	var subscriptionCommon SubscriptionCommon
	// Read JSON input stream provided in the Request, and stores it in the bodyBytes as bytes
	bodyBytes, _ := ioutil.ReadAll(r.Body)

	err := json.Unmarshal(bodyBytes, &extractSubType)
	// Unmarshal function to converts a JSON-formatted string into a SubscriptionCommon struct and store it in extractSubType
	err := json.Unmarshal(bodyBytes, &subscriptionCommon)
	if err != nil {
		log.Error(err.Error())
		errHandlerProblemDetails(w, err.Error(), http.StatusInternalServerError)
		return
	}

	if extractSubType.SubscriptionType == "" {
	// Validating mandatory parameters provided in the request body
	if subscriptionCommon.SubscriptionType == "" {
		log.Error("Mandatory SubscriptionType parameter should be present")
		errHandlerProblemDetails(w, "Mandatory attribute SubscriptionType is missing in the request body.", http.StatusBadRequest)
		return
	}

	if extractSubType.CallbackReference == "" && extractSubType.WebsockNotifConfig == nil {
	if subscriptionCommon.CallbackReference == "" && subscriptionCommon.WebsockNotifConfig == nil {
		log.Error("At least one of CallbackReference and WebsockNotifConfig parameters should be present")
		errHandlerProblemDetails(w, "At least one of CallbackReference and WebsockNotifConfig parameters should be present.", http.StatusBadRequest)
		return
	}

	//extract subscription type
	subscriptionType := extractSubType.SubscriptionType
	subscriptionType := subscriptionCommon.SubscriptionType

	//new subscription id
	// subscriptionId will be generated sequentially
	newSubsId := nextSubscriptionIdAvailable
	nextSubscriptionIdAvailable++
	subsIdStr := strconv.Itoa(newSubsId)

	// create a unique link for every subscription and concatenate subscription to it
	link := new(Links)
	self := new(LinkType)
	self.Href = hostUrl.String() + basePath + "subscriptions/" + subsIdStr
@@ -1001,7 +1030,9 @@ func subscriptionsPost(w http.ResponseWriter, r *http.Request) {

	var jsonResponse string

	// switch statement is based on provided subscriptionType in the request body
	switch subscriptionType {
	// if subscription is of type V2xMsgSubscription
	case V2X_MSG:

		var v2xSubscription V2xMsgSubscription
@@ -1032,6 +1063,14 @@ func subscriptionsPost(w http.ResponseWriter, r *http.Request) {
			return
		}

		if v2xSubscription.WebsockNotifConfig != nil {
			v2xSubscription.WebsockNotifConfig = subscriptionCommon.WebsockNotifConfig
		}

		if v2xSubscription.CallbackReference != "" {
			v2xSubscription.CallbackReference = subscriptionCommon.CallbackReference
		}

		if !checkMsgTypeValue(v2xSubscription.FilterCriteria.MsgType) {
			log.Error("MsgType parameter should be between 1 and 13")
			errHandlerProblemDetails(w, "MsgType parameter should be between 1 and 13 in the request body.", http.StatusBadRequest)
@@ -1039,14 +1078,15 @@ func subscriptionsPost(w http.ResponseWriter, r *http.Request) {
		}

		v2xSubscription.Links = link
		v2xSubscription.CallbackReference = extractSubType.CallbackReference

		registerV2xSub(&v2xSubscription, subsIdStr)

		// Store subscription key in redis
		_ = rc.JSONSetEntry(baseKey+"subscriptions:"+subsIdStr, ".", convertV2xMsgSubscriptionToJson(&v2xSubscription))

		jsonResponse = convertV2xMsgSubscriptionToJson(&v2xSubscription)

	// if subscription is of type ProvChgUuUniSubscription
	case PROV_CHG_UU_UNI:
		//TODO

@@ -1055,6 +1095,7 @@ func subscriptionsPost(w http.ResponseWriter, r *http.Request) {
		return
	}

	// Prepare & send response
	w.WriteHeader(http.StatusCreated)
	fmt.Fprint(w, jsonResponse)
}
@@ -1075,7 +1116,7 @@ func createSubscriptionLinkList(subType string) *SubscriptionLinkList {
	mutex.Lock()
	defer mutex.Unlock()

	//loop through cell_change map
	//loop through v2x_msg map
	if subType == "" || subType == "v2x_msg" {
		for _, v2xSubscription := range v2xMsgSubscriptionMap {
			if v2xSubscription != nil {
@@ -1091,10 +1132,12 @@ func createSubscriptionLinkList(subType string) *SubscriptionLinkList {
	return subscriptionLinkList
}

// subscriptionsGET is to retrieve information about all existing subscriptions at /subscriptions endpoint
func subscriptionsGET(w http.ResponseWriter, r *http.Request) {
	log.Info("subGet")
	w.Header().Set("Content-Type", "application/json; charset=UTF-8")

	// get & validate query param values for subscription_type
	u, _ := url.Parse(r.URL.String())
	log.Info("url: ", u.RequestURI())
	q := u.Query()
@@ -1138,18 +1181,23 @@ func subscriptionsGET(w http.ResponseWriter, r *http.Request) {

	}

	// get the response against particular subscription type
	response := createSubscriptionLinkList(subType)

	// prepare & send response
	jsonResponse, err := json.Marshal(response)
	if err != nil {
		log.Error(err.Error())
		errHandlerProblemDetails(w, err.Error(), http.StatusInternalServerError)
		return
	}

	// success response code
	w.WriteHeader(http.StatusOK)
	fmt.Fprint(w, string(jsonResponse))
}

// individualSubscriptionGET is to retrive a specific subscriptionsInfo at /subscriptions/{subscriptionId} endpoint
func individualSubscriptionGET(w http.ResponseWriter, r *http.Request) {
	log.Info("individualSubGet")

@@ -1205,6 +1253,9 @@ func registerV2xSub(v2xMsgSubscription *V2xMsgSubscription, subId string) {
	}
}

/*
* checkForExpiredSubscriptions delete those subscriptions whose expiryTime is reached
 */
func checkForExpiredSubscriptions() {

	nowTime := int(time.Now().Unix())
@@ -1238,7 +1289,6 @@ func checkForExpiredSubscriptions() {

				notif.TimeStamp = &timeStamp
				notif.ExpiryDeadline = &expiryTimeStamp

				sendExpiryNotification(link.Subscription.Href, notif)
				_ = delSubscription(baseKey, subsIdStr, true)
			}
@@ -1246,6 +1296,11 @@ func checkForExpiredSubscriptions() {
	}
}

/*
* sendExpiryNotification send expiry notification to the the corresponding callback reference address
* @param {string} notifyUrl contains callback reference address of service consumer
* @param {struct} notification struct of type ExpiryNotification
 */
func sendExpiryNotification(notifyUrl string, notification ExpiryNotification) {
	startTime := time.Now()
	jsonNotif, err := json.Marshal(notification)
@@ -1265,6 +1320,9 @@ func sendExpiryNotification(notifyUrl string, notification ExpiryNotification) {
	defer resp.Body.Close()
}

/*
* delSubscription delete expired subscriptions from redis DB
 */
func delSubscription(keyPrefix string, subsId string, mutexTaken bool) error {

	err := rc.JSONDelEntry(keyPrefix+":"+subsId, ".")
@@ -1320,6 +1378,7 @@ func repopulateV2xMsgSubscriptionMap(key string, jsonInfo string, userData inter
	return nil
}

// individualSubscriptionPut updates the information about a specific subscriptionInfo at /subscriptions/{subscriptionId} endpoint
func individualSubscriptionPut(w http.ResponseWriter, r *http.Request) {
	log.Info("individualSubPut")

@@ -1328,7 +1387,9 @@ func individualSubscriptionPut(w http.ResponseWriter, r *http.Request) {
	subIdParamStr := vars["subscriptionId"]

	var subscriptionCommon SubscriptionCommon
	// read JSON input stream provided in the Request, and stores it in the bodyBytes as bytes
	bodyBytes, _ := ioutil.ReadAll(r.Body)
	// Unmarshal function to converts a JSON-formatted string into a SubscriptionCommon struct and store it in extractSubType
	err := json.Unmarshal(bodyBytes, &subscriptionCommon)
	if err != nil {
		log.Error(err.Error())
@@ -1338,7 +1399,7 @@ func individualSubscriptionPut(w http.ResponseWriter, r *http.Request) {
	// extract common body part
	subscriptionType := subscriptionCommon.SubscriptionType

	// Validating mandatory parameters provided in the request body
	// validating common mandatory parameters provided in the request body
	if subscriptionCommon.SubscriptionType == "" {
		log.Error("Mandatory SubscriptionType parameter should be present")
		errHandlerProblemDetails(w, "Mandatory attribute SubscriptionType is missing in the request body.", http.StatusBadRequest)
@@ -1347,7 +1408,7 @@ func individualSubscriptionPut(w http.ResponseWriter, r *http.Request) {

	if subscriptionCommon.CallbackReference == "" && subscriptionCommon.WebsockNotifConfig == nil {
		log.Error("At least one of callbackReference and websockNotifConfig parameters should be present")
		errHandlerProblemDetails(w, "At least one of callbackReference and websockNotifConfig parameters should be present.", http.StatusBadRequest)
		errHandlerProblemDetails(w, "Both callbackReference and websockNotifConfig parameters are missing in the request body.", http.StatusBadRequest)
		return
	}

@@ -1376,10 +1437,12 @@ func individualSubscriptionPut(w http.ResponseWriter, r *http.Request) {
	alreadyRegistered := false
	var jsonResponse []byte

	// switch statement is based on provided subscriptionType in the request body
	switch subscriptionType {
	// if subscription is of type V2xMsgSubscription
	case V2X_MSG:
		var subscription V2xMsgSubscription
		err = json.Unmarshal(bodyBytes, &subscription)
		var v2xSubscription V2xMsgSubscription
		err = json.Unmarshal(bodyBytes, &v2xSubscription)
		if err != nil {
			log.Error(err.Error())
			errHandlerProblemDetails(w, err.Error(), http.StatusInternalServerError)
@@ -1388,19 +1451,28 @@ func individualSubscriptionPut(w http.ResponseWriter, r *http.Request) {

		v2xMsgSubscription, _ := rc.JSONGetEntry(baseKey+"subscriptions:"+subIdParamStr, ".")

		// Validating mandatory parameters specific to V2xMsgSubscription in the request body
		if v2xMsgSubscription == "" {
			log.Error("subscription not found against the provided subscriptionId")
			errHandlerProblemDetails(w, "subscription not found against the provided subscriptionId", http.StatusNotFound)
			return
		}

		if subscription.FilterCriteria.StdOrganization == "" {
		if v2xSubscription.FilterCriteria.StdOrganization == "" {
			log.Error("Mandatory StdOrganization parameter should be present")
			errHandlerProblemDetails(w, "Mandatory attribute StdOrganization is missing in the request body.", http.StatusBadRequest)
			return
		}

		if !checkMsgTypeValue(subscription.FilterCriteria.MsgType) {
		if v2xSubscription.WebsockNotifConfig != nil {
			v2xSubscription.WebsockNotifConfig = subscriptionCommon.WebsockNotifConfig
		}

		if v2xSubscription.CallbackReference != "" {
			v2xSubscription.CallbackReference = subscriptionCommon.CallbackReference
		}

		if !checkMsgTypeValue(v2xSubscription.FilterCriteria.MsgType) {
			log.Error("MsgType parameter should be between 1 and 13")
			errHandlerProblemDetails(w, "MsgType parameter should be between 1 and 13 in the request body.", http.StatusBadRequest)
			return
@@ -1408,13 +1480,19 @@ func individualSubscriptionPut(w http.ResponseWriter, r *http.Request) {

		// registration
		if isSubscriptionIdRegisteredV2x(subsIdStr) {
			registerV2x(&subscription, subsIdStr)
			_ = rc.JSONSetEntry(baseKey+"subscriptions:"+subsIdStr, ".", convertV2xMsgSubscriptionToJson(&subscription))
			registerV2x(&v2xSubscription, subsIdStr)
			// store subscription key in redis
			_ = rc.JSONSetEntry(baseKey+"subscriptions:"+subsIdStr, ".", convertV2xMsgSubscriptionToJson(&v2xSubscription))
			alreadyRegistered = true
			jsonResponse, err = json.Marshal(subscription)
			jsonResponse, err = json.Marshal(v2xSubscription)
		}

	// if subscription is of type ProvChgUuUniSubscription
	case PROV_CHG_UU_UNI:
		//TODO

	default:
		w.WriteHeader(http.StatusBadRequest)
		log.Error("Unsupported subscriptionType")
		return
	}

@@ -1431,6 +1509,7 @@ func individualSubscriptionPut(w http.ResponseWriter, r *http.Request) {
	}
}

// individualSubscriptionDelete is to delete a specific subscriptionInfo at subscriptions/{subscriptionId} endpoint
func individualSubscriptionDelete(w http.ResponseWriter, r *http.Request) {
	log.Info("individualSubDel")

@@ -1438,20 +1517,23 @@ func individualSubscriptionDelete(w http.ResponseWriter, r *http.Request) {
	vars := mux.Vars(r)
	subIdParamStr := vars["subscriptionId"]

	// Find subscriptionInfo entry in redis DB
	_, err := rc.JSONGetEntry(baseKey+"subscriptions:"+subIdParamStr, ".")
	if err != nil {
		err = errors.New("mtsSessionInfo not found against the provided the sessionId")
		err = errors.New("subscription not found against the provided subscriptionId")
		log.Error(err.Error())
		errHandlerProblemDetails(w, err.Error(), http.StatusNotFound)
		return
	}

	// Delete subscriptionInfo entry from redis DB
	err = delSubscription(baseKey+"subscriptions", subIdParamStr, false)
	if err != nil {
		errHandlerProblemDetails(w, err.Error(), http.StatusInternalServerError)
		return
	}

	// Send response on successful deletion of subscription resource
	w.WriteHeader(http.StatusNoContent)
}

+1 −20
Original line number Diff line number Diff line
@@ -910,10 +910,6 @@ func testSubscriptionsGet(t *testing.T, subscriptionTypeQuery string, expectedRe
	 ******************************/
	//passed as a parameter since a POST had to be sent first

	/******************************
	 * request body section
	 ******************************/

	/******************************
	 * request queries section
	 ******************************/
@@ -931,7 +927,7 @@ func testSubscriptionsGet(t *testing.T, subscriptionTypeQuery string, expectedRe
			t.Fatalf("Failed to get expected response")
		}
	} else {
		rr, err := sendRequest(http.MethodGet, "/vis/v2/subscriptions", nil, queryParam, nil, http.StatusOK, SubGET)
		rr, err := sendRequest(http.MethodGet, "/vis/v2/subscriptions", nil, nil, queryParam, http.StatusOK, SubGET)
		if err != nil {
			t.Fatalf("Failed to get expected response")
		}
@@ -956,14 +952,6 @@ func testIndividualSubscriptionGet(t *testing.T, expectedResponse string) {
	vars := make(map[string]string)
	vars["subscriptionId"] = "1"

	/******************************
	 * request body section
	 ******************************/

	/******************************
	 * request queries section
	 ******************************/

	/******************************
	 * request execution section
	 ******************************/
@@ -1018,7 +1006,6 @@ func testIndividualSubscriptionPut(t *testing.T, expectSuccess bool) string {

	expected_subscriptionType := "V2xMsgSubscription"
	expected_callbackReference := "MyCallback"

	expected_href := LinkType{Href: "http://meAppServer.example.com/vis/v2/subscriptions/123"}
	expected_self := Links{Self: &expected_href}
	//expected_link := V2xMsgSubscription{Links: &expected_self}
@@ -1047,7 +1034,6 @@ func testIndividualSubscriptionPut(t *testing.T, expectSuccess bool) string {
	msgType := []MsgType{DENM, CAM}
	//expected_stdOrganization :=
	filterCriteria := V2xMsgSubscriptionFilterCriteria{StdOrganization: "ETSI", MsgType: msgType}

	testv2xMsgSubscription := V2xMsgSubscription{CallbackReference: callbackReference, FilterCriteria: &filterCriteria, RequestTestNotification: false, SubscriptionType: subscriptionType, WebsockNotifConfig: nil}
	body, err := json.Marshal(testv2xMsgSubscription)
	if err != nil {
@@ -1055,15 +1041,10 @@ func testIndividualSubscriptionPut(t *testing.T, expectSuccess bool) string {
	}
	fmt.Println("body: ", string(body))

	/******************************
	 * request queries section
	 ******************************/

	/******************************
	 * request execution section
	 ******************************/

	// TODO change status code
	rr, err := sendRequest(http.MethodPut, "/vis/v2/subscriptions/1", bytes.NewBuffer(body), nil, nil, http.StatusOK, IndividualSubscriptionPUT)
	if err != nil {
		t.Fatalf(err.Error())