Commit b1996331 authored by Sylvain Renault's avatar Sylvain Renault
Browse files

Integration of websocket subscriptions

parent 24e858d2
Loading
Loading
Loading
Loading
+128 −2
Original line number Diff line number Diff line
@@ -763,6 +763,12 @@ namespace ETSI.ARF.OpenAPI.WorldAnalysis
            return SubscribeToPoseAsync(token, sessionID, body, System.Threading.CancellationToken.None);
        }

        // SylR
        public virtual System.Threading.Tasks.Task<SubscriptionMultiple> SubscribeToPoseAsync(string token, string sessionID, SubscriptionMultipleRequest body)
        {
            return SubscribeToPoseAsync(token, sessionID, body, System.Threading.CancellationToken.None);
        }

        /// <summary>
        /// Subscribe to collect the pose of an AR device, an Anchor or a Trackable
        /// </summary>
@@ -776,6 +782,12 @@ namespace ETSI.ARF.OpenAPI.WorldAnalysis
            return System.Threading.Tasks.Task.Run(async () => await SubscribeToPoseAsync(token, sessionID, body, System.Threading.CancellationToken.None)).GetAwaiter().GetResult();
        }
    
        // SylR
        public virtual SubscriptionMultiple SubscribeToPose(string token, string sessionID, SubscriptionMultipleRequest body)
        {
            return System.Threading.Tasks.Task.Run(async () => await SubscribeToPoseAsync(token, sessionID, body, System.Threading.CancellationToken.None)).GetAwaiter().GetResult();
        }

        /// <param name="cancellationToken">A cancellation token that can be used by other objects or threads to receive notice of cancellation.</param>
        /// <summary>
        /// Subscribe to collect the pose of an AR device, an Anchor or a Trackable
@@ -898,6 +910,120 @@ namespace ETSI.ARF.OpenAPI.WorldAnalysis
            }
        }

        // SylR
        public virtual async System.Threading.Tasks.Task<SubscriptionMultiple> SubscribeToPoseAsync(string token, string sessionID, SubscriptionMultipleRequest body, System.Threading.CancellationToken cancellationToken)
        {
            if (body == null)
                throw new System.ArgumentNullException("body");

            var client_ = _httpClient;
            var disposeClient_ = false;
            try
            {
                using (var request_ = new System.Net.Http.HttpRequestMessage())
                {

                    if (token != null)
                        request_.Headers.TryAddWithoutValidation("token", ConvertToString(token, System.Globalization.CultureInfo.InvariantCulture));

                    if (sessionID != null)
                        request_.Headers.TryAddWithoutValidation("sessionID", ConvertToString(sessionID, System.Globalization.CultureInfo.InvariantCulture));
                    var json_ = Newtonsoft.Json.JsonConvert.SerializeObject(body, _settings.Value);
                    var content_ = new System.Net.Http.StringContent(json_);
                    content_.Headers.ContentType = System.Net.Http.Headers.MediaTypeHeaderValue.Parse("application/json");
                    request_.Content = content_;
                    request_.Method = new System.Net.Http.HttpMethod("POST");
                    request_.Headers.Accept.Add(System.Net.Http.Headers.MediaTypeWithQualityHeaderValue.Parse("application/json"));

                    var urlBuilder_ = new System.Text.StringBuilder();

                    // Operation Path: "pose/subscriptions"
                    urlBuilder_.Append("pose/subscriptions");

                    PrepareRequest(client_, request_, urlBuilder_);

                    var url_ = urlBuilder_.ToString();
                    request_.RequestUri = new System.Uri(url_, System.UriKind.RelativeOrAbsolute);

                    PrepareRequest(client_, request_, url_);

                    var response_ = await client_.SendAsync(request_, System.Net.Http.HttpCompletionOption.ResponseHeadersRead, cancellationToken).ConfigureAwait(false);
                    var disposeResponse_ = true;
                    try
                    {
                        var headers_ = new System.Collections.Generic.Dictionary<string, System.Collections.Generic.IEnumerable<string>>();
                        foreach (var item_ in response_.Headers)
                            headers_[item_.Key] = item_.Value;
                        if (response_.Content != null && response_.Content.Headers != null)
                        {
                            foreach (var item_ in response_.Content.Headers)
                                headers_[item_.Key] = item_.Value;
                        }

                        ProcessResponse(client_, response_);

                        var status_ = (int)response_.StatusCode;
                        if (status_ == 200)
                        {
                            var objectResponse_ = await ReadObjectResponseAsync<SubscriptionMultiple>(response_, headers_, cancellationToken).ConfigureAwait(false);
                            if (objectResponse_.Object == null)
                            {
                                throw new ApiException("Response was null which was not expected.", status_, objectResponse_.Text, headers_, null);
                            }
                            return objectResponse_.Object;
                        }
                        else
                        if (status_ == 400)
                        {
                            var responseData_ = response_.Content == null ? null : await response_.Content.ReadAsStringAsync().ConfigureAwait(false);
                            var result_ = (string)System.Convert.ChangeType(responseData_, typeof(string));
                            throw new ApiException<string>("Bad request.", status_, responseData_, headers_, result_, null);
                        }
                        else
                        if (status_ == 403)
                        {
                            var responseData_ = response_.Content == null ? null : await response_.Content.ReadAsStringAsync().ConfigureAwait(false);
                            var result_ = (string)System.Convert.ChangeType(responseData_, typeof(string));
                            throw new ApiException<string>("Not allowed.", status_, responseData_, headers_, result_, null);
                        }
                        else
                        if (status_ == 404)
                        {
                            var responseData_ = response_.Content == null ? null : await response_.Content.ReadAsStringAsync().ConfigureAwait(false);
                            var result_ = (string)System.Convert.ChangeType(responseData_, typeof(string));
                            throw new ApiException<string>("Not found.", status_, responseData_, headers_, result_, null);
                        }
                        else
                        if (status_ == 405)
                        {
                            var responseData_ = response_.Content == null ? null : await response_.Content.ReadAsStringAsync().ConfigureAwait(false);
                            var result_ = (string)System.Convert.ChangeType(responseData_, typeof(string));
                            throw new ApiException<string>("Not supported.", status_, responseData_, headers_, result_, null);
                        }
                        else
                        {
                            var objectResponse_ = await ReadObjectResponseAsync<Error>(response_, headers_, cancellationToken).ConfigureAwait(false);
                            if (objectResponse_.Object == null)
                            {
                                throw new ApiException("Response was null which was not expected.", status_, objectResponse_.Text, headers_, null);
                            }
                            throw new ApiException<Error>("Unexpected error.", status_, objectResponse_.Text, headers_, objectResponse_.Object, null);
                        }
                    }
                    finally
                    {
                        if (disposeResponse_)
                            response_.Dispose();
                    }
                }
            }
            finally
            {
                if (disposeClient_)
                    client_.Dispose();
            }
        }

        /// <summary>
        /// Get information about a subscription
        /// </summary>
@@ -1927,8 +2053,8 @@ namespace ETSI.ARF.OpenAPI.WorldAnalysis
        /// <summary>
        /// List of modes representing the context of the Relocalization information (AR device to WorldAnchor/Trackable or WorldAnchor/Trackable to AR device)
        /// </summary>
        [Newtonsoft.Json.JsonProperty("mode", Required = Newtonsoft.Json.Required.Always, ItemConverterType = typeof(Newtonsoft.Json.Converters.StringEnumConverter))]
        public System.Collections.Generic.ICollection<Mode_WorldAnalysis> Mode { get; set; } = new System.Collections.ObjectModel.Collection<Mode_WorldAnalysis>();
        [Newtonsoft.Json.JsonProperty("modes", Required = Newtonsoft.Json.Required.Always, ItemConverterType = typeof(Newtonsoft.Json.Converters.StringEnumConverter))]
        public System.Collections.Generic.ICollection<Mode_WorldAnalysis> Modes { get; set; } = new System.Collections.ObjectModel.Collection<Mode_WorldAnalysis>();

        /// <summary>
        /// Subscription validity delay in millisecond
+1 −1
Original line number Diff line number Diff line
@@ -151,7 +151,7 @@ public interface WorldAnalysisInterface
    /// </summary>
    /// <param name="subscriptionUUID">id of the subscription</param>
    /// /// <returns>The unsubscription has been performed or not</returns>
    public InformationSubscriptionResult UnSubscribeToPose(Guid subscriptionUUID);
    public InformationSubscriptionResult UnsubscribeFromPose(Guid subscriptionUUID);

    #endregion

+49 −115
Original line number Diff line number Diff line
@@ -7,11 +7,14 @@ using ETSI.ARF.WorldAnalysis;
using static WorldAnalysisInterface;
using ETSI.ARF.WorldAnalysis.REST;

using WebSocketSharp;
using UnityEngine.Events;

//Implementation of the WorldAnalysis interface
public class WorldAnalysisREST : MonoBehaviour, WorldAnalysisInterface
public partial class WorldAnalysisREST : MonoBehaviour, WorldAnalysisInterface
{
    [Serializable]
    public class StringEvent : UnityEvent<string> { }

    //
    // Inspector variables
    //
@@ -25,13 +28,15 @@ public class WorldAnalysisREST : MonoBehaviour, WorldAnalysisInterface
    [Space(8)]
    public bool isDebug = false;

    //[Serializable]
    //public class StringEvent : UnityEvent<string> { }
    //public StringEvent webSocketMessage;

    //
    // Private members
    //
    private WorldAnalysisClient apiClient;          // For sync calls
    private WorldAnalysisClient apiClientAsync;     // For async calls
    private WebSocketSharp.WebSocket webSocket;     // For WebSockets
    private bool websocketConnected = false;

    //
    // Management of subscriptions
@@ -91,6 +96,11 @@ public class WorldAnalysisREST : MonoBehaviour, WorldAnalysisInterface
        }
    }

    private void OnDestroy()
    {
        WebSocketClient_Close();
    }

    #endregion

    #region Test methods
@@ -117,95 +127,6 @@ public class WorldAnalysisREST : MonoBehaviour, WorldAnalysisInterface
    }
    #endregion

    #region Communication system
    private void CreateWebHookServer()
    {
        throw new Exception("[REST] CreateWebHookServer(): Not implemented!");
    }

    private void DestroyWebHookServer()
    {
        return;
    }

    public WebSocket OpenWebSocketClient(string url)
    {
        webSocket = new WebSocketSharp.WebSocket(url);

        //
        // Define standard callbacks
        //
        webSocket.OnOpen += (sender, e) =>
        {
            Debug.Log("[WS] Connected");
            websocketConnected = true;
            webSocket.Send("RegisterClient:UnitySceneManagement");
        };
        webSocket.OnClose += (sender, e) =>
        {
            Debug.Log("[WS] Disconnected");
            websocketConnected = false;
        };
        webSocket.OnError += (sender, e) => Debug.Log("[WS] Error!");
        webSocket.OnMessage += (sender, e) => HandleWebSocketClient(e.Data);
        webSocket.Connect();

        return webSocket;
    }

    private void OnDestroy()
    {
        // State: red
        CloseWebSocketClient();
    }

    private void CloseWebSocketClient()
    {
        if (websocketConnected)
        {
            webSocket.Send("UnregisterClient");
            webSocket.Close();
        }
    }

    bool ok = false;
    public void HandleWebSocketClient(string data)
    {
        Debug.Log("[WS] Receiving: " + data);

        if (data.Contains("You are now registered"))
        {
            ok = true;
            if (isDebug) webSocket.Send("PoseStart:10"); // test
        }
        else if (data == "PoseStop")
        {
            //SetColor(Color.yellow);
        }
        else if (ok)
        {
            if (data.Contains("estimationState"))
            {
                // Handle pose
                ETSI.ARF.OpenAPI.WorldAnalysis.Pose p = JsonUtility.FromJson<ETSI.ARF.OpenAPI.WorldAnalysis.Pose>(data);
                Debug.Log("[WS][Pose] State: " + p.EstimationState.ToString());

                PoseEstimationResult res = p.EstimationState == PoseEstimationState.OK ? PoseEstimationResult.OK : PoseEstimationResult.FAILURE;

                // Search the corresponding callbacks
                foreach (var item in m_subscriptionsPoses.Values)
                {
                    if (p.Uuid == item.uuidTarget)
                    {
                        item.callback(res, p);
                    }
                }
            }
        }
    }

    #endregion

    #region Lifecycle
    /// <summary>
    /// Check the validity of all subscriptions and delete one if needed
@@ -270,8 +191,8 @@ public class WorldAnalysisREST : MonoBehaviour, WorldAnalysisInterface
        {
            for (int i = 0; i < uuids.Length; i++)
            {
                PoseEstimationResult poseResul = new PoseEstimationResult();
                resul[i] = poseResul;
                PoseEstimationResult poseResult = new PoseEstimationResult();
                resul[i] = poseResult;
                poses[i] = posesList[i];
            }
            return resul;
@@ -288,20 +209,29 @@ public class WorldAnalysisREST : MonoBehaviour, WorldAnalysisInterface
        // Todo: Maintain the callback to the subscription id
        // Get capabilities?
        // Get reloc info?
        subscriptionUUID = Guid.Empty; // default

        SubscriptionSingleRequest body = new SubscriptionSingleRequest();
        body.Target = uuid;
        body.Mode = mode;        
        body.Validity = validity;
        body.WebhookUrl = callback == null ? "" : "https:\\..."; // empty -> app will use websockets (client)!
        body.WebhookUrl = callback != null ? "" : "https:\\..."; // empty -> app will use websockets (client)!

        // Get subscription info from the REST server
        SubscriptionSingle response = apiClient.SubscribeToPose(token, sessionID, body);
        subscriptionUUID = response.Uuid;
        validity = response.Validity;

        //SubscriptionMultipleRequest body_m = new SubscriptionMultipleRequest();
        //body_m.Targets = new List<object>();
        //body_m.Mode = new List<Mode_WorldAnalysis>();
        //body_m.Validity = validity;
        //body_m.WebhookUrl = callback == null ? "" : "https:\\..."; // empty -> app will use websockets (client)!
        //SubscriptionMultiple response = apiClient.SubscribeToPose(token, sessionID, body_m);

        // We add the subscription
        SubscriptionInfo sub = new SubscriptionInfo();
        sub.uuid = response.Uuid;
        sub.uuid = subscriptionUUID;
        sub.timeValidity = Time.time + (validity / 1000.0f);
        sub.pose = new ETSI.ARF.OpenAPI.WorldAnalysis.Pose();
        sub.pose.Mode = mode;
@@ -311,24 +241,27 @@ public class WorldAnalysisREST : MonoBehaviour, WorldAnalysisInterface

        if (!string.IsNullOrEmpty(response.WebhookUrl))
        {
            CloseWebSocketClient();
            // Close other communication channel
            WebSocketClient_Close();

            // todo: create a REST server so that the WA server can send pose update to it
            // How to auto-generate the C# REST server for pose for Unity?
            CreateWebHookServer();
            // Create a REST server so that the WA server can send pose update to it
            // How to auto-generate the C# REST webhook server for pose for Unity?
            string webhookUrl = response.WebhookUrl;
            WebHookServer_Create(webhookUrl);
        }
        else
        {
            DestroyWebHookServer();
            // Close other communication channel
            WebHookServer_Close();

            // todo: Open the websocket?
            string websocketUrl = response.WebsocketUrl;
            if (isDebug) websocketUrl = "ws://localhost:61788/ws"; // for tests
            // Open the websocket
            string websocketUrl = "ws://" + response.WebsocketUrl + "/ws";
            //if (isDebug) websocketUrl = "ws://localhost:61788/ws"; // for tests

            if (string.IsNullOrEmpty(websocketUrl))
            if (!string.IsNullOrEmpty(websocketUrl))
            {
                // Create the WebSockets client here (NOT in the scene scripts)
                if (!websocketConnected) OpenWebSocketClient(websocketUrl);
                if (!websocketConnected) WebSocketClient_Create(websocketUrl);
            }
            else throw new Exception("[REST] No valid WebSockets URL in server reponse.");
        }
@@ -418,15 +351,16 @@ public class WorldAnalysisREST : MonoBehaviour, WorldAnalysisInterface
            //
            if (oldCB != null && callback == null && !string.IsNullOrEmpty(response.WebhookUrl))
            {
                CloseWebSocketClient();
                WebSocketClient_Close();

                // todo: create a REST server so that the WA server can send pose update to it
                // Create a REST server so that the WA server can send pose update to it
                // How to auto-generate the C# REST server for pose for Unity?
                CreateWebHookServer();
                string webhookUrl = response.WebhookUrl;
                WebHookServer_Create(webhookUrl);
            }
            else if (oldCB == null && callback != null && string.IsNullOrEmpty(response.WebhookUrl))
            {
                DestroyWebHookServer();
                WebHookServer_Close();

                // todo: Open the websocket?
                string websocketUrl = response.WebsocketUrl;
@@ -435,7 +369,7 @@ public class WorldAnalysisREST : MonoBehaviour, WorldAnalysisInterface
                if (string.IsNullOrEmpty(websocketUrl))
                {
                    // Create the WebSockets client here (NOT in the scene scripts)
                    if (!websocketConnected) OpenWebSocketClient(websocketUrl);
                    if (!websocketConnected) WebSocketClient_Create(websocketUrl);
                }
                else throw new Exception("[REST] No valid WebSockets URL in server reponse.");
            }
@@ -445,7 +379,7 @@ public class WorldAnalysisREST : MonoBehaviour, WorldAnalysisInterface
        return InformationSubscriptionResult.UNKNOWN_ID;
    }

    public InformationSubscriptionResult UnSubscribeToPose(Guid subscriptionUUID)
    public InformationSubscriptionResult UnsubscribeFromPose(Guid subscriptionUUID)
    {
        if (m_subscriptionsPoses.ContainsKey(subscriptionUUID))
        {
@@ -455,7 +389,7 @@ public class WorldAnalysisREST : MonoBehaviour, WorldAnalysisInterface
            if (m_subscriptionsPoses.Count == 0)
            {
                // Close the connection via websockets
                CloseWebSocketClient();
                WebSocketClient_Close();
            }
            return InformationSubscriptionResult.OK;
        }