Skip to content
WorldAnalysisWrapper.py 11.2 KiB
Newer Older
#!/usr/bin/env python

#
# ARF - Augmented Reality Framework (ETSI ISG ARF)
#
# Copyright 2024 ETSI
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http:#www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# Last change: August 2024
#

import time
import pathlib
import ssl
import asyncio
from pprint import pprint
from pydantic import ValidationError, validate_call

import websockets;
#from websockets.sync.client import connect
#import websocket;  // test

import ETSI.ARF.OpenAPI.WorldStorage
from ETSI.ARF.OpenAPI.WorldStorage.api import default_api
from ETSI.ARF.OpenAPI.WorldStorage.api import trackables_api
from ETSI.ARF.OpenAPI.WorldStorage.api import world_anchors_api
from ETSI.ARF.OpenAPI.WorldStorage.api import world_links_api

import ETSI.ARF.OpenAPI.WorldAnalysis
from ETSI.ARF.OpenAPI.WorldAnalysis.api import default_api
from ETSI.ARF.OpenAPI.WorldAnalysis.api import capabilities_api
from ETSI.ARF.OpenAPI.WorldAnalysis.api import pose_api
# Models (classes)
from ETSI.ARF.OpenAPI.WorldAnalysis.models.pose import Pose
from ETSI.ARF.OpenAPI.WorldAnalysis.models.capability import Capability

# recommended to create enviroment
# conda create -n openapi
# conda activate openapi
# to install the World Storage OpenAPI: cd to /CHANGE_PATH/generated folder, and then run "py-exe -m pip install ."

# then to run, activate first enviroment:
# conda activate openapi
# and then run python script:
# python <this script>.py


# See configuration.py for a list of all supported configuration parameters.
configuration = ETSI.ARF.OpenAPI.WorldStorage.Configuration(host="https://etsi.hhi.fraunhofer.de")
#configuration = ETSI.ARF.OpenAPI.WorldStorage.Configuration(host="https://localhost:44301")
#configuration = ETSI.ARF.OpenAPI.WorldStorage.Configuration(host="http://localhost:61788")

webs_server = "ws://localhost:61788/ws"
#webs_server = "wss://localhost:44301/ws"
#webs_server = "wss://analysis.etsi.hhi.fraunhofer.de"  # final url

print()
print("ETSI ISG - ARF World Storage")
print()
print("Simple request tests")
print("====================")
print()
print("Using REST World Storage server: " + configuration.host)
print("Using Websockets server: " + webs_server)
print()

isTime = False
isPose = False
running = True
success = 0
#websocket = websockets.connect(my_ws_server)
myWebsocket = None
serverResponse = "None"
modulename = ""
msgToSend = "None"

# certificate
localhost_pem = pathlib.Path(__file__).with_name("localhost.pem")
print(f"Using PEM: {localhost_pem}")
print()

# See here: https://websockets.readthedocs.io/en/stable/howto/quickstart.html#encrypt-connections
webs_ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
webs_ssl_context.load_verify_locations(localhost_pem)
# Test the REST server availability (World Storage)
#
def CheckRESTServer(client):
    success = 0

    # Create an instance of the API class
    api_instance = default_api.DefaultApi(client)

    #
    # Endpoint: default
    #
    try:
        # Test the server availability.
        api_response = api_instance.get_ping()
        print("Sending 'ping', got response: " + api_response)
        success += 1
    except ETSI.ARF.OpenAPI.WorldStorage.ApiException as e:
        print("Exception when calling DefaultApi->get_ping: %s\n" % e)

    try:
        # Test the server availability.
        api_response = api_instance.get_version()
        print("Sending 'version', got response: " + api_response)
        success += 1
    except ETSI.ARF.OpenAPI.WorldStorage.ApiException as e:
        print("Exception when calling DefaultApi->get_ping: %s\n" % e)

    try:
        # Test the server availability.
        api_response = api_instance.get_admin()
        print("Sending 'admin', got response: " + api_response)
        success += 1
    except ETSI.ARF.OpenAPI.WorldStorage.ApiException as e:
        print("Exception when calling DefaultApi->get_ping: %s\n" % e)

    return success == 3

#
# WebSockets helpers
#
async def WS_send(websocket, msg, isResponse):
    print(f"[WS] Send to server: {msg}")
    await websocket.send(msg)
    if isResponse: 
        return await WS_receive(websocket)
    return None

async def WS_receive(websocket):
    #print(f"[WS] Waiting for response...")
    msgResponse = await websocket.recv()
    _OnReceiveText(msgResponse);
    return msgResponse
    
def _OnReceiveText(msg):
    # Handle incoming text here?
    print(f"[WS] Received from server: {msg}")

#
# WebSockets main loop
#
async def WS_ConnectAndLoop():
    global modulename
    global msgToSend, serverResponse
    global running, isTime

    try:
        ###ws = websockets.WebSocket(sslopt={"cert_reqs": ssl.CERT_NONE})
        #with connect(uri=webs_server, ssl_context=my_ssl_context) as websocket:

        async with websockets.connect(uri=webs_server) as websocket:
            
            #myWebsocket = websocket

            # Register the user
            print(f"[WS] Send registration request to server for {modulename}")
            registrationResponse = await WS_send(websocket, "RegisterModule:" + modulename, isResponse=True)
            
            if not registrationResponse.find("You are now registered"):
                print(f"[WS] Error: Registration of {modulename} was not successful!")
                print(f"[WS] Registration of {modulename} was succesfull.")
                print(f"[WS] Entering the websockets main loop...")

            # Prepare the list of capabilities
            #
            if modulename == "HHI-SpaceMap":
                
                enc = ETSI.ARF.OpenAPI.WorldAnalysis.models.EncodingInformationStructure(
                version = "HHI-0.1",        # version
                data_format = "OTHER")      # type/company?

                # Capability #1
                cap1 = Capability()
                cap1.trackable_type = ETSI.ARF.OpenAPI.WorldAnalysis.models.TrackableType.MAP
                cap1.encoding_information = enc
                cap1.framerate = 6
                cap1.latency = 0.5
                cap1.accuracy = 0.8
                cap_json = cap1.to_json()
                await WS_send(websocket, "Capabilities of:" + modulename + " Data=" + cap_json, isResponse=False)

            elif modulename == "HHI-MeshDetection":
                
                enc = ETSI.ARF.OpenAPI.WorldAnalysis.models.EncodingInformationStructure(
                version = "HHI-0.0.1",      # version
                data_format = "OTHER")      # type/company?

                # Capability #1
                cap1 = Capability()
                cap1.trackable_type = ETSI.ARF.OpenAPI.WorldAnalysis.models.TrackableType.MESH
                cap1.encoding_information = enc
                cap1.framerate = 8
                cap1.latency = 0.5
                cap1.accuracy = 0.9
                cap_json = cap1.to_json()
                await WS_send(websocket, "Capabilities of:" + modulename + " Data=" + cap_json, isResponse=False)

                # Capability #2
                cap2 = Capability()
                cap2.trackable_type = ETSI.ARF.OpenAPI.WorldAnalysis.models.TrackableType.FIDUCIAL_MARKER
                cap2.encoding_information = enc
                cap2.framerate = 30
                cap2.latency = 0.05
                cap2.accuracy = 1
                cap_json = cap2.to_json()
                await WS_send(websocket, "Capabilities of:" + modulename + " Data=" + cap_json, isResponse=False)
            #msgToSend = "Idle"
            msgToSend = "TimeStart" # test
            #msgToSend = "PoseStart"
            #
            # Websocket LOOP
            #
            while running:

                # Is s message to be send to the server?
                if msgToSend != "None":
                    await WS_send(websocket, msgToSend, isResponse=False)
                    msgToSend = "None"

                # Wait for a message
                serverMessage = await WS_receive(websocket)

                # Stop the loop and disconnect the server
                if serverMessage == "SessionStop":
                    isPose = False
                    running = False

                elif serverMessage.startswith("Time="):
                    isTime = True
                    datetime = serverMessage.split('=')
                    print(f"New server time is: { datetime[1] }")

                elif serverMessage.startswith("TimeStop"):
                    isTime = False                    

                elif serverMessage.startswith("ConfigureFramerate:"):
                    json = serverMessage.split(':')[1]

                elif serverMessage.startswith("GetPose:"):
                    uuid2 = serverMessage.split(':')[1]
                    mode = serverMessage.split(':')[2]

                    # Send the pose
                    p = Pose()
                    msgToSend = "PoseNew:" + p.to_json()
                    print(f"Send pose for: uuid={ uuid2 }")

                elif serverMessage.startswith("SubscribePose:"):
                    uuid = serverMessage.split(':')[1]
                    msgToSend = "PoseIsRegistered"
                    isPose = True

                elif serverMessage.startswith("UnsubscribePose:"):
                    uuid = serverMessage.split(':')[1]
                    isPose = False                    

                elif isPose == True and serverMessage == "RequestNextPose":
                    time.sleep(1)
                    # Send the pose
                    p = Pose()
                    msgToSend = "PoseNew:" + p.to_json()
                    print(f"Send pose for: uuid={ uuid }")
                elif isTime:
                    serverResponse = await WS_receive(websocket)
                    if serverResponse == "TimeStop": 
                        isTime = False
                        msgToSend = "Idle"


    except websockets.exceptions.ConnectionClosed as e:
        print(f"[WS] Connection closed with error: {e}")
    
    except Exception as e:
        print(f"[WS] An error occurred: {e}")

#async def main():
def main():
    global modulename
    global messageToSend
    global myWebsocket

    #
    # Handle arguments
    #
    mode = sys.argv[1]

    #
    # Test rest api
    #
    api_client = ETSI.ARF.OpenAPI.WorldStorage.ApiClient(configuration)
    isServerOk = CheckRESTServer(api_client)
    if isServerOk == True:
        print ("[REST] Connection to WS was succesfull.")

        #
        # Use this (websockets lib)
        #
        modulename = "HHI-SpaceMap" # mode = 1
        if mode == "2": modulename = "HHI-MeshDetection"

        messageToSend = "GetLocalisation"
        print()
        print (f"[WS] Try connecting the WA server as { modulename}...")
        asyncio.run(WS_ConnectAndLoop())
    
    else:
        print ("[REST] Connection to WS was not succesfull!")

    #ip = input("Type <ENTER> to continue...")
    print ("Exiting ARF module.")

if __name__ == "__main__":
    #asyncio.run(main())
    main()