Commit de1e12c2 authored by Yann Garcia's avatar Yann Garcia
Browse files

Add V2xMsgDistributionServerInfo support

parent 0a631197
Loading
Loading
Loading
Loading
+0 −1
Original line number Diff line number Diff line
@@ -395,7 +395,6 @@ func processActiveScenarioUpdate() {
		}

	}

}

func initializeV2xMessageDistribution() (err error) {
+146 −70
Original line number Diff line number Diff line
@@ -808,75 +808,6 @@ func errHandlerProblemDetails(w http.ResponseWriter, error string, code int) {
	fmt.Fprint(w, jsonResponse)
}

func v2xMsgPublicationPOST(w http.ResponseWriter, r *http.Request) {
	log.Info(">>> V2xMsgPublicationPOST: ", r)

	w.Header().Set("Content-Type", "application/json; charset=UTF-8")

	// Read JSON input stream provided in the Request, and stores it in the bodyBytes as bytes
	bodyBytes, _ := ioutil.ReadAll(r.Body)
	// Unmarshal function to converts a JSON-formatted string into a V2xMsgPublication struct and store it in v2xMsgPubReq
	var v2xMsgPubReq V2xMsgPublication
	err := json.Unmarshal(bodyBytes, &v2xMsgPubReq)
	if err != nil {
		log.Error(err.Error())
		errHandlerProblemDetails(w, err.Error(), http.StatusInternalServerError)
		return
	}

	// Validating mandatory parameters provided in the request body
	if v2xMsgPubReq.MsgPropertiesValues == nil { // ETSI GS MEC 030 V3.1.1 Clause 6.2.7 Type: V2xMsgPublication
		log.Error("Mandatory MsgPropertiesValues parameter should be present")
		errHandlerProblemDetails(w, "Mandatory attribute MsgPropertiesValues is missing in the request body.", http.StatusBadRequest)
		return
	}

	var msgPropertiesValues V2xMsgPropertiesValues = *v2xMsgPubReq.MsgPropertiesValues
	if msgPropertiesValues.StdOrganization == "" {
		log.Error("Mandatory StdOrganization parameter should be present")
		errHandlerProblemDetails(w, "Mandatory attribute StdOrganization is missing in the request body.", http.StatusBadRequest)
		return
	}

	if msgPropertiesValues.MsgType == "" {
		log.Error("Mandatory MsgType parameter should be present")
		errHandlerProblemDetails(w, "Mandatory attribute MsgType is missing in the request body.", http.StatusBadRequest)
		return
	}
	var msgType int32 = parseMsgTypeToInt(msgPropertiesValues.MsgType)
	if msgType == -1 {
		log.Error("Mandatory MsgType parameter should be present")
		errHandlerProblemDetails(w, "Mandatory attribute MsgType is missing in the request body.", http.StatusBadRequest)
		return
	}

	if v2xMsgPubReq.MsgContent == "" {
		log.Error("Mandatory MsgContent parameter should be present")
		errHandlerProblemDetails(w, "Mandatory attribute MsgContent is missing in the request body.", http.StatusBadRequest)
		return
	}

	if len(v2xMsgSubscriptionMap) != 0 { // There are some subscription ongoing, we can publish it
		// Publish message on message broker
		err = sbi.PublishMessageOnMessageBroker(v2xMsgPubReq.MsgContent, v2xMsgPubReq.MsgRepresentationFormat, msgPropertiesValues.StdOrganization, &msgType)
		if err != nil {
			w.WriteHeader(http.StatusInternalServerError)
		}
		w.WriteHeader(http.StatusNoContent)
	} else { // No subscription ongoing, discard it
		log.Error("No subscription ongoing, discard it")
		errHandlerProblemDetails(w, "No subscription ongoing, discard it.", http.StatusBadRequest)
		return
	}
}

func v2xMsgDistributionServerPost(w http.ResponseWriter, r *http.Request) {
	log.Info(">>> v2xMsgDistributionServerPost: ", r)

	w.Header().Set("Content-Type", "application/json; charset=UTF-8")
	w.WriteHeader(http.StatusOK)
}

/*
* sendV2xMsgNotification sends notification to the call reference address
* @param {string} notifyUrl contains the call reference address
@@ -1965,7 +1896,7 @@ func individualSubscriptionPut(w http.ResponseWriter, r *http.Request) {
	log.Info("subsIdStr: ", subsIdStr)

	var subscriptionCommon SubscriptionCommon
	// read JSON input stream provided in the Request, and stores it in the bodyBytes as bytes
	// Read JSON input stream provided in the Request, and stores it in the bodyBytes as bytes
	bodyBytes, _ := ioutil.ReadAll(r.Body)
	// Unmarshal function to converts a JSON-formatted string into a SubscriptionCommon struct and store it in extractSubType
	err := json.Unmarshal(bodyBytes, &subscriptionCommon)
@@ -2399,6 +2330,151 @@ func provInfoPc5GET(w http.ResponseWriter, r *http.Request) {
	fmt.Fprint(w, string(jsonResponse))
}

func v2xMsgPublicationPOST(w http.ResponseWriter, r *http.Request) {
	log.Info(">>> V2xMsgPublicationPOST: ", r)

	w.Header().Set("Content-Type", "application/json; charset=UTF-8")

	// Read JSON input stream provided in the Request, and stores it in the bodyBytes as bytes
	bodyBytes, _ := ioutil.ReadAll(r.Body)
	// Unmarshal function to converts a JSON-formatted string into a V2xMsgPublication struct and store it in v2xMsgPubReq
	var v2xMsgPubReq V2xMsgPublication
	err := json.Unmarshal(bodyBytes, &v2xMsgPubReq)
	if err != nil {
		log.Error(err.Error())
		errHandlerProblemDetails(w, err.Error(), http.StatusInternalServerError)
		return
	}

	// Validating mandatory parameters provided in the request body
	if v2xMsgPubReq.MsgPropertiesValues == nil { // ETSI GS MEC 030 V3.1.1 Clause 6.2.7 Type: V2xMsgPublication
		log.Error("Mandatory MsgPropertiesValues parameter should be present")
		errHandlerProblemDetails(w, "Mandatory attribute MsgPropertiesValues is missing in the request body.", http.StatusBadRequest)
		return
	}

	var msgPropertiesValues V2xMsgPropertiesValues = *v2xMsgPubReq.MsgPropertiesValues
	if msgPropertiesValues.StdOrganization == "" {
		log.Error("Mandatory StdOrganization parameter should be present")
		errHandlerProblemDetails(w, "Mandatory attribute StdOrganization is missing in the request body.", http.StatusBadRequest)
		return
	}

	if msgPropertiesValues.MsgType == "" {
		log.Error("Mandatory MsgType parameter should be present")
		errHandlerProblemDetails(w, "Mandatory attribute MsgType is missing in the request body.", http.StatusBadRequest)
		return
	}
	var msgType int32 = parseMsgTypeToInt(msgPropertiesValues.MsgType)
	if msgType == -1 {
		log.Error("Mandatory MsgType parameter should be present")
		errHandlerProblemDetails(w, "Mandatory attribute MsgType is missing in the request body.", http.StatusBadRequest)
		return
	}

	if v2xMsgPubReq.MsgContent == "" {
		log.Error("Mandatory MsgContent parameter should be present")
		errHandlerProblemDetails(w, "Mandatory attribute MsgContent is missing in the request body.", http.StatusBadRequest)
		return
	}

	if len(v2xMsgSubscriptionMap) != 0 { // There are some subscription ongoing, we can publish it
		// Publish message on message broker
		err = sbi.PublishMessageOnMessageBroker(v2xMsgPubReq.MsgContent, v2xMsgPubReq.MsgRepresentationFormat, msgPropertiesValues.StdOrganization, &msgType)
		if err != nil {
			w.WriteHeader(http.StatusInternalServerError)
		}
		w.WriteHeader(http.StatusNoContent)
	} else { // No subscription ongoing, discard it
		log.Error("No subscription ongoing, discard it")
		errHandlerProblemDetails(w, "No subscription ongoing, discard it.", http.StatusBadRequest)
		return
	}
}

func v2xMsgDistributionServerPost(w http.ResponseWriter, r *http.Request) {
	log.Info(">>> v2xMsgDistributionServerPost: ", r)

	w.Header().Set("Content-Type", "application/json; charset=UTF-8")

	// Read JSON input stream provided in the Request, and stores it in the bodyBytes as bytes
	var v2xMsgDistributionServerInfo V2xMsgDistributionServerInfo
	bodyBytes, _ := ioutil.ReadAll(r.Body)
	// Unmarshal function to converts a JSON-formatted string into a SubscriptionCommon struct and store it in extractSubType
	err := json.Unmarshal(bodyBytes, &v2xMsgDistributionServerInfo)
	if err != nil {
		log.Error(err.Error())
		errHandlerProblemDetails(w, err.Error(), http.StatusInternalServerError)
		return
	}
	log.Info("v2xMsgDistributionServerInfo: ", v2xMsgDistributionServerInfo)

	if len(v2xMsgDistributionServerInfo.V2xMsgDistributionServer) == 0 {
		log.Error("At least one V2xMsgDistributionServer parameters should be present")
		errHandlerProblemDetails(w, "V2xMsgDistributionServer parameters are missing in the request body.", http.StatusBadRequest)
		return
	} else {
		for _, v2xMsgDistributionServer := range v2xMsgDistributionServerInfo.V2xMsgDistributionServer {
			if v2xMsgDistributionServer.InfoConnection != nil {
				log.Error("InfoConnection parameters shall not be present")
				errHandlerProblemDetails(w, "InfoConnection parameters shall not be present", http.StatusBadRequest)
				return
			}
			if v2xMsgDistributionServer.InfoProtocol == nil {
				log.Error("At least one InfoProtocol parameters should be present")
				errHandlerProblemDetails(w, "InfoProtocol parameters are missing in the request body.", http.StatusBadRequest)
				return
			} else {
				if len(v2xMsgDistributionServer.InfoProtocol.MsgProtocol) == 0 {
					log.Error("At least one MsgProtocol parameters should be present")
					errHandlerProblemDetails(w, "MsgProtocol parameters are missing in the request body.", http.StatusBadRequest)
					return
				}
			}
		} // End of 'for'statement
	}

	u, err := url.ParseRequestURI(v2x_broker)
	log.Info("v2xMsgDistributionServerPost: u: ", u)
	if err != nil {
		log.Error(err.Error())
		return
	}
	//log.Info("v2xMsgDistributionServerPost: url:%v - scheme:%v - host:%v - Path:%v - Port:%s", u, u.Scheme, u.Hostname(), u.Path, u.Port())
	portNumber, err := strconv.Atoi(u.Port())
	if err != nil {
		log.Error(err.Error())
		return
	}

	v2xMsgDistributionServerInfoResp := v2xMsgDistributionServerInfo // Same format
	for _, v2xMsgDistributionServer := range v2xMsgDistributionServerInfoResp.V2xMsgDistributionServer {
		for _, msgProtocol := range v2xMsgDistributionServer.InfoProtocol.MsgProtocol {
			if msgProtocol == 0 { // MQTT v3.1.0
				v2xMsgDistributionServer.InfoConnection = &InfoConnection{IpAddress: u.Hostname(), PortNumber: int32(portNumber)}
			} else if msgProtocol == 1 { // MQTT v3.1.1
				v2xMsgDistributionServer.InfoConnection = &InfoConnection{IpAddress: u.Hostname(), PortNumber: int32(portNumber)}
			} else {
				v2xMsgDistributionServer.InfoConnection = nil
				log.Warn("v2xMsgDistributionServerPost: Unsupported MsgProtocol: ", msgProtocol)
			}
		} // End of 'for'statement
	} // End of 'for'statement

	log.Info("v2xMsgDistributionServerPost: ", v2xMsgDistributionServerInfoResp)

	jsonResponse, err := json.Marshal(v2xMsgDistributionServerInfoResp)
	if err != nil {
		log.Error(err.Error())
		errHandlerProblemDetails(w, err.Error(), http.StatusBadRequest)
		return
	}
	log.Info("jsonResponse: ", jsonResponse)

	w.WriteHeader(http.StatusOK)
	fmt.Fprint(w, string(jsonResponse))
}

func v2xNotify(v2xMessage []byte, v2xType int32, msgProtocolVersion int32, stdOrganization string, longitude *float32, latitude *float32) {
	log.Info(">>> v2xNotify: ", v2xMessage)

+211 −1
Original line number Diff line number Diff line
@@ -678,7 +678,6 @@ func TestProvInfoUuUnicastGET(t *testing.T) {
	/******************************
	 * expected request section
	 ******************************/

	ecgi_1 := Ecgi{
		CellId: &CellId{CellId: "2345678"},
		Plmn:   &Plmn{Mcc: "123", Mnc: "456"},
@@ -2075,6 +2074,217 @@ func TestV2xMsgPublicationPost(t *testing.T) {
	terminateScenario()
}

func TestV2xMsgDistributionServerPost(t *testing.T) {
	fmt.Println("--- ", t.Name())
	log.MeepTextLogInit(t.Name())
	initializeVars()
	err := Init()
	if err != nil {
		t.Fatalf("Error initializing test basic procedure")
	}
	err = Run()
	if err != nil {
		t.Fatalf("Error running test basic procedure")
	}
	fmt.Println("Set a scenario")
	initialiseScenario(testScenario)
	time.Sleep(1000 * time.Millisecond)
	updateScenario("mobility1")

	/******************************
	 * expected response section
	 ******************************/
	// Initialize the data structure for the POST request
	// MEC-030 Clause 6.2.6
	// MEC-030 Clause 7.7.3.4
	/******************************
	 * expected request section
	 ******************************/
	locationInfo := make([]LocationInfo, 0)
	msgProtocol := make([]int32, 1)
	msgProtocol[0] = 0
	infoProtocol := &InfoProtocol{MsgProtocol: msgProtocol, ProtImplementation: ""}
	expected_infoConnection := &InfoConnection{IpAddress: "test.mosquito.org", PortNumber: 1338}
	expected_v2xMsgDistributionServer := make([]V2xMsgDistributionServer, 1)
	expected_v2xMsgDistributionServer[0] = V2xMsgDistributionServer{InfoConnection: expected_infoConnection, InfoProtocol: infoProtocol}
	expected_v2xMsgDistributionServerInfo := V2xMsgDistributionServerInfo{
		LocationInfo:             locationInfo,
		V2xMsgDistributionServer: expected_v2xMsgDistributionServer,
	}
	expected_json_response, err := json.Marshal(expected_v2xMsgDistributionServerInfo)
	if err != nil {
		t.Fatalf(err.Error())
	}
	fmt.Println("expected_json_response: ", string(expected_json_response))

	/******************************
	 * request execution section
	 ******************************/
	v2xMsgDistributionServer := make([]V2xMsgDistributionServer, 1)
	v2xMsgDistributionServer[0] = V2xMsgDistributionServer{InfoConnection: nil, InfoProtocol: infoProtocol}
	v2xMsgDistributionServerInfo := V2xMsgDistributionServerInfo{
		LocationInfo:             locationInfo,
		V2xMsgDistributionServer: v2xMsgDistributionServer,
	}
	body, err := json.Marshal(v2xMsgDistributionServerInfo)
	if err != nil {
		t.Fatalf(err.Error())
	}
	fmt.Println("body: ", string(body))
	rr, err := sendRequest(http.MethodPost, "/vis/v2/provide_v2x_msg_distribution_server_info", bytes.NewBuffer(body), nil, nil, nil, http.StatusOK, V2xMsgDistributionServerPost)
	if err != nil {
		t.Fatalf(err.Error())
	}
	fmt.Println("Respone: rr: ", rr)
	var resp V2xMsgDistributionServerInfo
	err = json.Unmarshal([]byte(rr), &resp)
	if err != nil {
		t.Fatalf("err.Error()")
	}
	fmt.Println("Respone: resp: ", resp)

	if !validateV2xMsgDistributionServerResponse(resp, expected_v2xMsgDistributionServerInfo) {
		t.Errorf("handler returned unexpected body: got %v want %v", rr, expected_json_response)
	}

	/******************************
	 * back to initial state section
	 ******************************/

	terminateScenario()
}

func TestFailV2xMsgDistributionServerPost(t *testing.T) {
	fmt.Println("--- ", t.Name())
	log.MeepTextLogInit(t.Name())
	initializeVars()
	err := Init()
	if err != nil {
		t.Fatalf("Error initializing test basic procedure")
	}
	err = Run()
	if err != nil {
		t.Fatalf("Error running test basic procedure")
	}
	fmt.Println("Set a scenario")
	initialiseScenario(testScenario)
	time.Sleep(1000 * time.Millisecond)
	updateScenario("mobility1")

	/******************************
	 * expected response section
	 ******************************/
	// Initialize the data structure for the POST request
	// MEC-030 Clause 6.2.6
	// MEC-030 Clause 7.7.3.4
	/******************************
	 * expected request section
	 ******************************/
	locationInfo := make([]LocationInfo, 0)
	msgProtocol := make([]int32, 1)
	msgProtocol[0] = 0
	infoProtocol := &InfoProtocol{MsgProtocol: msgProtocol, ProtImplementation: ""}
	expected_infoConnection := &InfoConnection{IpAddress: "test.mosquito.org", PortNumber: 1338}
	expected_v2xMsgDistributionServer := make([]V2xMsgDistributionServer, 1)
	expected_v2xMsgDistributionServer[0] = V2xMsgDistributionServer{InfoConnection: expected_infoConnection, InfoProtocol: infoProtocol}
	expected_v2xMsgDistributionServerInfo := V2xMsgDistributionServerInfo{
		LocationInfo:             locationInfo,
		V2xMsgDistributionServer: expected_v2xMsgDistributionServer,
	}
	expected_json_response, err := json.Marshal(expected_v2xMsgDistributionServerInfo)
	if err != nil {
		t.Fatalf(err.Error())
	}
	fmt.Println("expected_json_response: ", string(expected_json_response))

	/******************************
	 * request execution section
	 ******************************/
	body, err := json.Marshal(expected_v2xMsgDistributionServerInfo) // Error: InfoConnection field is present :(
	if err != nil {
		t.Fatalf(err.Error())
	}
	_, err = sendRequest(http.MethodPost, "/vis/v2/provide_v2x_msg_distribution_server_info", bytes.NewBuffer(body), nil, nil, nil, http.StatusBadRequest, V2xMsgDistributionServerPost)
	if err != nil {
		t.Fatalf(err.Error())
	}
	fmt.Println("Request done")

	expected_v2xMsgDistributionServerInfo.V2xMsgDistributionServer[0].infoProtocol.msgProtocol = make([]int32, 0) // No message protocol
	body, err := json.Marshal(expected_v2xMsgDistributionServerInfo)                                              // Error: InfoConnection field is present :(
	if err != nil {
		t.Fatalf(err.Error())
	}
	_, err = sendRequest(http.MethodPost, "/vis/v2/provide_v2x_msg_distribution_server_info", bytes.NewBuffer(body), nil, nil, nil, http.StatusBadRequest, V2xMsgDistributionServerPost)
	if err != nil {
		t.Fatalf(err.Error())
	}
	fmt.Println("Request done")

	expected_v2xMsgDistributionServerInfo.V2xMsgDistributionServer = make([]V2xMsgDistributionServer, 0) // No V2xMsgDistributionServer
	body, err := json.Marshal(expected_v2xMsgDistributionServerInfo)                                     // Error: InfoConnection field is present :(
	if err != nil {
		t.Fatalf(err.Error())
	}
	_, err = sendRequest(http.MethodPost, "/vis/v2/provide_v2x_msg_distribution_server_info", bytes.NewBuffer(body), nil, nil, nil, http.StatusBadRequest, V2xMsgDistributionServerPost)
	if err != nil {
		t.Fatalf(err.Error())
	}
	fmt.Println("Request done")

	/******************************
	 * back to initial state section
	 ******************************/

	terminateScenario()
}

func validateV2xMsgDistributionServerResponse(received V2xMsgDistributionServerInfo, expected V2xMsgDistributionServerInfo) bool {
	fmt.Println("validateV2xMsgDistributionServerResponse: received: ", received)
	fmt.Println("validateV2xMsgDistributionServerResponse: expected: ", expected)

	if len(received.LocationInfo) != len(expected.LocationInfo) {
		fmt.Println("len(received.LocationInfo) mismatch")
		return false
	}
	if len(received.V2xMsgDistributionServer) != len(expected.V2xMsgDistributionServer) {
		fmt.Println("len(received.LocationInfo) mismatch")
		return false
	} else {
		for i, v2xMsgDistributionServer := range received.V2xMsgDistributionServer {
			if v2xMsgDistributionServer.InfoProtocol != nil && received.V2xMsgDistributionServer[i].InfoProtocol != nil {
				if len(v2xMsgDistributionServer.InfoProtocol.MsgProtocol) != len(received.V2xMsgDistributionServer[i].InfoProtocol.MsgProtocol) {
					fmt.Println("len(v2xMsgDistributionServer.InfoProtocol.MsgProtocol) mismatch")
					return false
				}
				if v2xMsgDistributionServer.InfoProtocol.ProtImplementation != received.V2xMsgDistributionServer[i].InfoProtocol.ProtImplementation {
					fmt.Println("v2xMsgDistributionServer.InfoProtocol.ProtImplementation mismatch")
					return false
				}
			} else if v2xMsgDistributionServer.InfoProtocol != nil || received.V2xMsgDistributionServer[i].InfoProtocol != nil {
				fmt.Println("v2xMsgDistributionServer.InfoProtocol mismatch")
				return false
			}
			if v2xMsgDistributionServer.InfoConnection != nil && received.V2xMsgDistributionServer[i].InfoConnection != nil {
				if v2xMsgDistributionServer.InfoConnection.IpAddress != received.V2xMsgDistributionServer[i].InfoConnection.IpAddress {
					fmt.Println("v2xMsgDistributionServer.InfoConnection.IpAddress mismatch")
					return false
				}
				if v2xMsgDistributionServer.InfoConnection.PortNumber != received.V2xMsgDistributionServer[i].InfoConnection.PortNumber {
					fmt.Println("v2xMsgDistributionServer.InfoConnection.PortNumber mismatch")
					return false
				}
			} else if v2xMsgDistributionServer.InfoConnection != nil || received.V2xMsgDistributionServer[i].InfoConnection != nil {
				fmt.Println("v2xMsgDistributionServer.InfoConnection mismatch")
				return false
			}
		} // End of 'for' statement
	}

	fmt.Println("validateV2xMsgDistributionServerResponse: succeed")
	return true
}

func initializeVars() {
	mod.DbAddress = redisTestAddr
	redisAddr = redisTestAddr
+11 −2
Original line number Diff line number Diff line
@@ -60,7 +60,13 @@ type APIClient struct {

	// API Services

	V2xiApi *V2xiApiService
	QoSApi *QoSApiService

	QueriesApi *QueriesApiService

	SubscriptionApi *SubscriptionApiService

	V2XMsgApi *V2XMsgApiService
}

type service struct {
@@ -79,7 +85,10 @@ func NewAPIClient(cfg *Configuration) *APIClient {
	c.common.client = c

	// API Services
	c.V2xiApi = (*V2xiApiService)(&c.common)
	c.QoSApi = (*QoSApiService)(&c.common)
	c.QueriesApi = (*QueriesApiService)(&c.common)
	c.SubscriptionApi = (*SubscriptionApiService)(&c.common)
	c.V2XMsgApi = (*V2XMsgApiService)(&c.common)

	return c
}
+3 −3
Original line number Diff line number Diff line
@@ -39,7 +39,7 @@ func (amqp *message_broker_amqp) Init(tm *TrafficMgr) (err error) {
		log.Error(err.Error())
		return err
	}
	log.Info("url:%v\nscheme:%v host:%v Path:%v Port:%s", u, u.Scheme, u.Hostname(), u.Path, u.Port())
	log.Info("url:%v - scheme:%v - host:%v - Path:%v - Port:%s", u, u.Scheme, u.Hostname(), u.Path, u.Port())

	// TODO

Loading