Skip to content
WorldAnalysisWrapper.py 11.6 KiB
Newer Older
#!/usr/bin/env python

#
# ARF - Augmented Reality Framework (ETSI ISG ARF)
#
# Copyright 2024 ETSI
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http:#www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# Last change: August 2024
#

import time
import pathlib
import ssl
import asyncio
from pprint import pprint
from pydantic import ValidationError, validate_call

import websockets;
#from websockets.sync.client import connect
#import websocket;  // test

import ETSI.ARF.OpenAPI.WorldStorage
from ETSI.ARF.OpenAPI.WorldStorage.api import default_api
from ETSI.ARF.OpenAPI.WorldStorage.api import trackables_api
from ETSI.ARF.OpenAPI.WorldStorage.api import world_anchors_api
from ETSI.ARF.OpenAPI.WorldStorage.api import world_links_api

import ETSI.ARF.OpenAPI.WorldAnalysis
from ETSI.ARF.OpenAPI.WorldAnalysis.api import default_api
from ETSI.ARF.OpenAPI.WorldAnalysis.api import capabilities_api
from ETSI.ARF.OpenAPI.WorldAnalysis.api import pose_api
# Models (classes)
from ETSI.ARF.OpenAPI.WorldAnalysis.models.pose import Pose
from ETSI.ARF.OpenAPI.WorldAnalysis.models.capability import Capability

# recommended to create enviroment
# conda create -n openapi
# conda activate openapi
# to install the World Storage OpenAPI: cd to /CHANGE_PATH/generated folder, and then run "py-exe -m pip install ."

# then to run, activate first enviroment:
# conda activate openapi
# and then run python script:
# python <this script>.py


# See configuration.py for a list of all supported configuration parameters.
configuration = ETSI.ARF.OpenAPI.WorldStorage.Configuration(host="https://etsi.hhi.fraunhofer.de")
#configuration = ETSI.ARF.OpenAPI.WorldStorage.Configuration(host="https://localhost:44301")
#configuration = ETSI.ARF.OpenAPI.WorldStorage.Configuration(host="http://localhost:61788")

webs_server = "ws://localhost:61788/ws"
#webs_server = "wss://localhost:44301/ws"
#webs_server = "wss://analysis.etsi.hhi.fraunhofer.de"  # final url

print()
print("ETSI ISG - ARF World Storage")
print()
print("Simple request tests")
print("====================")
print()
print("Using REST World Storage server: " + configuration.host)
print("Using Websockets server: " + webs_server)
print()

isTime = False
running = True
success = 0
#websocket = websockets.connect(my_ws_server)
myWebsocket = None
serverResponse = "None"
modulename = ""
msgToSend = "None"

# certificate
localhost_pem = pathlib.Path(__file__).with_name("localhost.pem")
print(f"Using PEM: {localhost_pem}")
print()

# See here: https://websockets.readthedocs.io/en/stable/howto/quickstart.html#encrypt-connections
webs_ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
webs_ssl_context.load_verify_locations(localhost_pem)
# Test the REST server availability (World Storage)
#
def CheckRESTServer(client):
    success = 0

    # Create an instance of the API class
    api_instance = default_api.DefaultApi(client)

    #
    # Endpoint: default
    #
    try:
        # Test the server availability.
        api_response = api_instance.get_ping()
        print("Sending 'ping', got response: " + api_response)
        success += 1
    except ETSI.ARF.OpenAPI.WorldStorage.ApiException as e:
        print("Exception when calling DefaultApi->get_ping: %s\n" % e)

    try:
        # Test the server availability.
        api_response = api_instance.get_version()
        print("Sending 'version', got response: " + api_response)
        success += 1
    except ETSI.ARF.OpenAPI.WorldStorage.ApiException as e:
        print("Exception when calling DefaultApi->get_ping: %s\n" % e)

    try:
        # Test the server availability.
        api_response = api_instance.get_admin()
        print("Sending 'admin', got response: " + api_response)
        success += 1
    except ETSI.ARF.OpenAPI.WorldStorage.ApiException as e:
        print("Exception when calling DefaultApi->get_ping: %s\n" % e)

    return success == 3

#
# WebSockets helpers
#
async def WS_sendNow(websocket, msg, isResponse):
    print(f"[WS] Send to server: {msg}")
    await websocket.send(msg)
    if isResponse: 
        return await WS_receive(websocket)
    return None

async def WS_receive(websocket):
    #print(f"[WS] Waiting for response...")
    msgResponse = await websocket.recv()
    WS_OnReceiveText(msgResponse);
    return msgResponse
    
def WS_OnReceiveText(msg):
    # Handle incoming text here?
    print(f"[WS] Received from server: {msg}")

#
# WebSockets main loop
#
async def WS_ConnectAndLoop():
    global modulename
    global msgToSend, serverResponse
    global running, isTime

    try:
        ###ws = websockets.WebSocket(sslopt={"cert_reqs": ssl.CERT_NONE})
        #with connect(uri=webs_server, ssl_context=my_ssl_context) as websocket:

        async with websockets.connect(uri=webs_server) as websocket:
            
            #myWebsocket = websocket

            # Register the user
            print(f"[WS] Send registration request to server for {modulename}")
            registrationResponse = await WS_sendNow(websocket, "RegisterModule:" + modulename, isResponse=True)
            
            if not registrationResponse.find("You are now registered"):
                print(f"[WS] Error: Registration of {modulename} was not successful!")
                print(f"[WS] Registration of {modulename} was succesfull.")
                print(f"[WS] Entering the websockets main loop...")

            # Prepare the list of capabilities
            #
            if modulename == "HHI-SpaceMap":
                
                enc = ETSI.ARF.OpenAPI.WorldAnalysis.models.EncodingInformationStructure(
                version = "HHI-0.1",        # version
                data_format = "OTHER")      # type/company?

                # Capability #1
                cap1 = Capability()
                cap1.trackable_type = ETSI.ARF.OpenAPI.WorldAnalysis.models.TrackableType.MAP
                cap1.encoding_information = enc
                cap1.framerate = 6
                cap1.latency = 0.5
                cap1.accuracy = 0.8
                cap_json = cap1.to_json()
                await WS_sendNow(websocket, "Capabilities of:" + modulename + " Data=" + cap_json, isResponse=False)

            elif modulename == "HHI-MeshDetection":
                
                enc = ETSI.ARF.OpenAPI.WorldAnalysis.models.EncodingInformationStructure(
                version = "HHI-0.0.1",      # version
                data_format = "OTHER")      # type/company?

                # Capability #1
                cap1 = Capability()
                cap1.trackable_type = ETSI.ARF.OpenAPI.WorldAnalysis.models.TrackableType.MESH
                cap1.encoding_information = enc
                cap1.framerate = 8
                cap1.latency = 0.5
                cap1.accuracy = 0.9
                cap_json = cap1.to_json()
                await WS_sendNow(websocket, "Capabilities of:" + modulename + " Data=" + cap_json, isResponse=False)

                # Capability #2
                cap2 = Capability()
                cap2.trackable_type = ETSI.ARF.OpenAPI.WorldAnalysis.models.TrackableType.FIDUCIAL_MARKER
                cap2.encoding_information = enc
                cap2.framerate = 30
                cap2.latency = 0.05
                cap2.accuracy = 1
                cap_json = cap2.to_json()
                await WS_sendNow(websocket, "Capabilities of:" + modulename + " Data=" + cap_json, isResponse=False)

            msgToSend = "Time" # test
            #msgToSend = "Idle"

            # Receive
            while running:
                #
                # WS LOOP
                #
                if msgToSend != "None":
                    serverResponse = await WS_sendNow(websocket, msgToSend, isResponse=True)
                    
                    # Change state
                    if msgToSend == "TimeStart":
                        isTime = True
                        msgToSend = "None"

                    elif msgToSend == "Idle":
                        time.sleep(1)
                        '''serverResponse = await WS_sendNow(websocket, queryFirstJob, isResponse=True)
                        if serverResponse != "Job not found":
                            currentJob = Job.from_json(serverResponse)
                            currentJob.state = JobState.INITIALISING
                            currentJob.creation_time = None
                            currentJob.last_write_time = None
                            currentJob.progress = "0"
                            currentJob.result = 'Scanning location'
                            msgToSend = "UpdateJob(" + currentJob.to_json() + ")"   '''         
                        msgToSend = "Idle"

                    '''elif msgToSend == "Busy":
                        isJob = True
                        time.sleep(0.25)
                        progress = int(currentJob.progress) + 10
                        currentJob.progress = str(progress)
                        if progress >= 100:
                            if currentJob.state == JobState.INITIALISING:                                
                                currentJob.state = JobState.RUNNING
                                currentJob.result = 'Training and querying objects'
                                currentJob.progress = "0"
                            elif currentJob.state == JobState.RUNNING:
                                currentJob.state = JobState.READY
                                currentJob.result = 'Found x objects'
                                msgToSend = "idle"
                                isJob = False
                                
                        serverResponse = await WS_sendNow(websocket, "UpdateJob(" + currentJob.to_json() + ")", isResponse=True)
'''
                if isTime:
                    serverResponse = await WS_receive(websocket)
                    if serverResponse == "TimeStop": 
                        isTime = False
                        msgToSend = "Idle"

                # Stop the loop and disconnect the server
                if msgToSend == "None" and serverResponse == "Stop":
                    running = False

    except websockets.exceptions.ConnectionClosed as e:
        print(f"[WS] Connection closed with error: {e}")
    
    except Exception as e:
        print(f"[WS] An error occurred: {e}")

#async def main():
def main():
    global modulename
    global messageToSend
    global myWebsocket

    #
    # Handle arguments
    #
    mode = sys.argv[1]

    #
    # Test rest api
    #
    api_client = ETSI.ARF.OpenAPI.WorldStorage.ApiClient(configuration)
    isServerOk = CheckRESTServer(api_client)
    if isServerOk == True:
        print ("[REST] Connection to WS was succesfull.")

        #
        # Use this (websockets lib)
        #
        modulename = "HHI-SpaceMap" # mode = 1
        if mode == "2": modulename = "HHI-MeshDetection"

        messageToSend = "GetLocalisation"
        print()
        print (f"[WS] Try connecting the WA server as { modulename}...")
        asyncio.run(WS_ConnectAndLoop())
    
    else:
        print ("[REST] Connection to WS was not succesfull!")

    #ip = input("Type <ENTER> to continue...")
    print ("Exiting ARF module.")

if __name__ == "__main__":
    #asyncio.run(main())
    main()