#!/usr/bin/env python # # ARF - Augmented Reality Framework (ETSI ISG ARF) # # Copyright 2024 ETSI # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http:#www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # # Author: Fraunhofer HHI, SylR # Last change: September 2024 # import sys import time import pathlib import ssl import asyncio from pprint import pprint from pydantic import ValidationError, validate_call import websockets; #from websockets.sync.client import connect #import websocket; // test import ETSI.ARF.OpenAPI.WorldStorage from ETSI.ARF.OpenAPI.WorldStorage.api import default_api from ETSI.ARF.OpenAPI.WorldStorage.api import trackables_api from ETSI.ARF.OpenAPI.WorldStorage.api import world_anchors_api from ETSI.ARF.OpenAPI.WorldStorage.api import world_links_api import ETSI.ARF.OpenAPI.WorldAnalysis from ETSI.ARF.OpenAPI.WorldAnalysis.api import default_api from ETSI.ARF.OpenAPI.WorldAnalysis.api import capabilities_api from ETSI.ARF.OpenAPI.WorldAnalysis.api import pose_api # Models (classes) from ETSI.ARF.OpenAPI.WorldAnalysis.models.pose import Pose from ETSI.ARF.OpenAPI.WorldAnalysis.models.capability import Capability # recommended to create enviroment # conda create -n openapi # conda activate openapi # to install the World Storage OpenAPI: cd to /CHANGE_PATH/generated folder, and then run "py-exe -m pip install ." # then to run, activate first enviroment: # conda activate openapi # and then run python script: # python .py # # Web server (World Storage) # # for production webserver_url = "https://etsi.hhi.fraunhofer.de" # public # for development #webserver_url = "http://localhost:61788" webserver_url = "https://localhost:44301" # secure # See configuration.py for a list of all supported configuration parameters. configuration = ETSI.ARF.OpenAPI.WorldStorage.Configuration(host=webserver_url) # # WebSocket server (World Analysis) # # for production websocket_url ="ws://192.168.20.29:8084/ws" # local network #websocket_url = "wss://analysis.etsi.hhi.fraunhofer.de" # public # for development #websocket_url = "ws://localhost:61788/ws" websocket_url = "wss://localhost:44301/ws" # secure print() print("ETSI ISG - ARF World Storage") print() print("Simple request tests") print("====================") print() print("Using REST World Storage server: " + configuration.host) print("Using WebSockets server: " + websocket_url) print() running = True success = 0 #websocket = websockets.connect(my_ws_server) myWebsocket = None serverResponse = "None" modulename = "" msgToSend = "None" isPose = False secretToken = "dev" # # Handle some certificates # # See here: https://websockets.readthedocs.io/en/stable/howto/quickstart.html#encrypt-connections #localhost_pem = pathlib.Path(__file__).with_name("localhost.pem") #webs_ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) #webs_ssl_context.load_verify_locations(localhost_pem) #print(f"Using PEM: {localhost_pem}") #print() ################################################################### # REST -> World Storage ################################################################### # # Test the REST server availability (World Storage) # def CheckRESTServer(client): success = 0 # Create an instance of the API class api_instance = default_api.DefaultApi(client) # # Endpoint: default # try: # Test the server availability. api_response = api_instance.get_ping() print("Sending 'ping', got response: " + api_response) success += 1 except ETSI.ARF.OpenAPI.WorldStorage.ApiException as e: print("[REST] Exception when calling DefaultApi->get_ping: %s\n" % e) try: # Test the server availability. api_response = api_instance.get_version() print("Sending 'version', got response: " + api_response) success += 1 except ETSI.ARF.OpenAPI.WorldStorage.ApiException as e: print("[REST] Exception when calling DefaultApi->get_ping: %s\n" % e) try: # Test the server availability. api_response = api_instance.get_admin() print("Sending 'admin', got response: " + api_response) success += 1 except ETSI.ARF.OpenAPI.WorldStorage.ApiException as e: print("[REST] Exception when calling DefaultApi->get_ping: %s\n" % e) return success == 3 # # Get the list of trackables # def REST_GetTrackables(): global firstJob # Enter a context with an instance of the API client with ETSI.ARF.OpenAPI.WorldStorage.ApiClient(configuration) as api_client: api_instance_t = trackables_api.TrackablesApi(api_client) try: list_response = api_instance_t.get_trackables(token="dev") print("Querying Trackables: got list with " + str(len(list_response)) + " items:") for item in list_response: print(" UUID: " + str(item.uuid) + " Name: " + item.name + " (Type: " + str(item.trackable_type) + ")") except ETSI.ARF.OpenAPI.WorldStorage.ApiException as e: print("[REST] Exception when calling TrackablesApi->get_trackables: %s\n" % e) ################################################################### # WebSockets -> World Analysis ################################################################### # # WebSockets helpers # async def WS_send(websocket, msg, isResponse): print(f"[WS] Send to server: {msg}") await websocket.send(msg) if isResponse: return await WS_receive(websocket) return None async def WS_receive(websocket): #print(f"[WS] Waiting for response...") msgResponse = await websocket.recv() _OnReceiveText(msgResponse); return msgResponse def _OnReceiveText(msg): # Handle incoming text here? print(f"[WS] Received from server: {msg}") # # WebSockets main loop # async def WS_ConnectAndLoop(): global modulename global msgToSend, serverResponse global running, isTime, isPose try: ###ws = websockets.WebSocket(sslopt={"cert_reqs": ssl.CERT_NONE}) #with connect(uri=websocket #_server, ssl_context=my_ssl_context) as websocket: async with websockets.connect(uri=websocket_url) as websocket: #myWebsocket = websocket # Register the user print(f"[WS] Send registration request to server for {modulename}") registrationResponse = await WS_send(websocket, "RegisterModule:" + modulename, isResponse=True) if not registrationResponse.find("You are now registered"): print(f"[WS] Error: Registration of {modulename} was not successful!") return else: print(f"[WS] Registration of {modulename} was succesfull.") print(f"[WS] Entering the websockets main loop...") # # Prepare the list of capabilities # if modulename == "HHI-SpaceMap": enc = ETSI.ARF.OpenAPI.WorldAnalysis.models.EncodingInformationStructure( version = "HHI-0.1", # version data_format = "OTHER") # type/company? # Capability #1 cap1 = Capability() cap1.trackable_type = ETSI.ARF.OpenAPI.WorldAnalysis.models.TrackableType.MAP cap1.encoding_information = enc cap1.framerate = 6 cap1.latency = 0.5 cap1.accuracy = 0.8 cap_json = cap1.to_json() await WS_send(websocket, "Capabilities=" + cap_json, isResponse=False) elif modulename == "HHI-MeshDetection": enc = ETSI.ARF.OpenAPI.WorldAnalysis.models.EncodingInformationStructure( version = "HHI-0.0.1", # version data_format = "OTHER") # type/company? # Capability #1 cap1 = Capability() cap1.trackable_type = ETSI.ARF.OpenAPI.WorldAnalysis.models.TrackableType.MESH cap1.encoding_information = enc cap1.framerate = 8 cap1.latency = 0.5 cap1.accuracy = 0.9 cap_json = cap1.to_json() await WS_send(websocket, "Capabilities=" + cap_json, isResponse=False) # Capability #2 cap2 = Capability() cap2.trackable_type = ETSI.ARF.OpenAPI.WorldAnalysis.models.TrackableType.FIDUCIAL_MARKER cap2.encoding_information = enc cap2.framerate = 30 cap2.latency = 0.05 cap2.accuracy = 1 cap_json = cap2.to_json() await WS_send(websocket, "Capabilities=" + cap_json, isResponse=False) msgToSend = "Idle" #msgToSend = "PoseStart" # # Websocket LOOP # while running: # Is s message to be send to the server? if msgToSend != "None": await WS_send(websocket, msgToSend, isResponse=False) msgToSend = "None" # Wait for a message serverMessage = await WS_receive(websocket) # # Management # if serverMessage == "SessionStop": # Stop the loop and disconnect the server isPose = False running = False elif serverMessage.startswith("SubscribePose:"): uuid = serverMessage.split(':')[1] msgToSend = "PoseIsRegistered" isPose = True elif serverMessage.startswith("UnsubscribePose:"): uuid = serverMessage.split(':')[1] isPose = False elif serverMessage.startswith("ConfigureFramerate:"): fps = serverMessage.split(':')[1] # # Special msg for simulating a global user's position # elif serverMessage.startswith("CurrentUserPose="): userPose = serverMessage.split('=')[1] print(f"New user current position is: {userPose }") # # Pose requests # elif serverMessage.startswith("GetPose:"): # 2 args uuid2 = serverMessage.split(':')[1] mode = serverMessage.split(':')[2] WS_SendPose(uuid2) elif isPose == True and serverMessage == "RequestNextPose": time.sleep(1) WS_SendPose(uuid) except websockets.exceptions.ConnectionClosed as e: print(f"[WS] Connection closed with error: {e}") except Exception as e: print(f"[WS] An error occurred: {e}") def WS_SendPose(uuid): # Send the pose p = Pose() # Get the reloc infos? msgToSend = "NewPose:" + p.to_json() print(f"Send pose for: uuid={ uuid }") #async def main(): def main(): global modulename global messageToSend global myWebsocket # # Handle arguments # mode = sys.argv[1] # # Test rest api # api_client = ETSI.ARF.OpenAPI.WorldStorage.ApiClient(configuration) isServerOk = CheckRESTServer(api_client) if isServerOk == True: print ("[REST] Connection to WS was succesfull.") REST_GetTrackables() # # Use this (websockets lib) # modulename = "HHI-SpaceMap" # mode = 1 if mode == "2": modulename = "HHI-MeshDetection" messageToSend = "GetLocalisation" print() print (f"[WS] Try connecting the WA server as { modulename}...") asyncio.run(WS_ConnectAndLoop()) else: print ("[REST] Connection to WS was not succesfull!") #ip = input("Type to continue...") print ("Exiting ARF module.") if __name__ == "__main__": #asyncio.run(main()) main()