#!/usr/bin/env python # # ARF - Augmented Reality Framework (ETSI ISG ARF) # # Copyright 2024 ETSI # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http:#www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # # Last change: August 2024 # import sys import time import pathlib import ssl import asyncio from pprint import pprint from pydantic import ValidationError, validate_call import websockets; #from websockets.sync.client import connect #import websocket; // test import ETSI.ARF.OpenAPI.WorldStorage from ETSI.ARF.OpenAPI.WorldStorage.api import default_api from ETSI.ARF.OpenAPI.WorldStorage.api import trackables_api from ETSI.ARF.OpenAPI.WorldStorage.api import world_anchors_api from ETSI.ARF.OpenAPI.WorldStorage.api import world_links_api import ETSI.ARF.OpenAPI.WorldAnalysis from ETSI.ARF.OpenAPI.WorldAnalysis.api import default_api from ETSI.ARF.OpenAPI.WorldAnalysis.api import capabilities_api from ETSI.ARF.OpenAPI.WorldAnalysis.api import pose_api # Models (classes) from ETSI.ARF.OpenAPI.WorldAnalysis.models.pose import Pose from ETSI.ARF.OpenAPI.WorldAnalysis.models.capability import Capability # recommended to create enviroment # conda create -n openapi # conda activate openapi # to install the World Storage OpenAPI: cd to /CHANGE_PATH/generated folder, and then run "py-exe -m pip install ." # then to run, activate first enviroment: # conda activate openapi # and then run python script: # python .py # See configuration.py for a list of all supported configuration parameters. configuration = ETSI.ARF.OpenAPI.WorldStorage.Configuration(host="https://etsi.hhi.fraunhofer.de") #configuration = ETSI.ARF.OpenAPI.WorldStorage.Configuration(host="https://localhost:44301") #configuration = ETSI.ARF.OpenAPI.WorldStorage.Configuration(host="http://localhost:61788") webs_server = "ws://localhost:61788/ws" #webs_server = "wss://localhost:44301/ws" #webs_server = "wss://analysis.etsi.hhi.fraunhofer.de" # final url print() print("ETSI ISG - ARF World Storage") print() print("Simple request tests") print("====================") print() print("Using REST World Storage server: " + configuration.host) print("Using Websockets server: " + webs_server) print() isTime = False isPose = False running = True success = 0 #websocket = websockets.connect(my_ws_server) myWebsocket = None serverResponse = "None" modulename = "" msgToSend = "None" # certificate localhost_pem = pathlib.Path(__file__).with_name("localhost.pem") print(f"Using PEM: {localhost_pem}") print() # See here: https://websockets.readthedocs.io/en/stable/howto/quickstart.html#encrypt-connections webs_ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) webs_ssl_context.load_verify_locations(localhost_pem) # # Test the REST server availability (World Storage) # def CheckRESTServer(client): success = 0 # Create an instance of the API class api_instance = default_api.DefaultApi(client) # # Endpoint: default # try: # Test the server availability. api_response = api_instance.get_ping() print("Sending 'ping', got response: " + api_response) success += 1 except ETSI.ARF.OpenAPI.WorldStorage.ApiException as e: print("Exception when calling DefaultApi->get_ping: %s\n" % e) try: # Test the server availability. api_response = api_instance.get_version() print("Sending 'version', got response: " + api_response) success += 1 except ETSI.ARF.OpenAPI.WorldStorage.ApiException as e: print("Exception when calling DefaultApi->get_ping: %s\n" % e) try: # Test the server availability. api_response = api_instance.get_admin() print("Sending 'admin', got response: " + api_response) success += 1 except ETSI.ARF.OpenAPI.WorldStorage.ApiException as e: print("Exception when calling DefaultApi->get_ping: %s\n" % e) return success == 3 # # WebSockets helpers # async def WS_send(websocket, msg, isResponse): print(f"[WS] Send to server: {msg}") await websocket.send(msg) if isResponse: return await WS_receive(websocket) return None async def WS_receive(websocket): #print(f"[WS] Waiting for response...") msgResponse = await websocket.recv() _OnReceiveText(msgResponse); return msgResponse def _OnReceiveText(msg): # Handle incoming text here? print(f"[WS] Received from server: {msg}") # # WebSockets main loop # async def WS_ConnectAndLoop(): global modulename global msgToSend, serverResponse global running, isTime try: ###ws = websockets.WebSocket(sslopt={"cert_reqs": ssl.CERT_NONE}) #with connect(uri=webs_server, ssl_context=my_ssl_context) as websocket: async with websockets.connect(uri=webs_server) as websocket: #myWebsocket = websocket # Register the user print(f"[WS] Send registration request to server for {modulename}") registrationResponse = await WS_send(websocket, "RegisterModule:" + modulename, isResponse=True) if not registrationResponse.find("You are now registered"): print(f"[WS] Error: Registration of {modulename} was not successful!") return else: print(f"[WS] Registration of {modulename} was succesfull.") print(f"[WS] Entering the websockets main loop...") # # Prepare the list of capabilities # if modulename == "HHI-SpaceMap": enc = ETSI.ARF.OpenAPI.WorldAnalysis.models.EncodingInformationStructure( version = "HHI-0.1", # version data_format = "OTHER") # type/company? # Capability #1 cap1 = Capability() cap1.trackable_type = ETSI.ARF.OpenAPI.WorldAnalysis.models.TrackableType.MAP cap1.encoding_information = enc cap1.framerate = 6 cap1.latency = 0.5 cap1.accuracy = 0.8 cap_json = cap1.to_json() await WS_send(websocket, "Capabilities of:" + modulename + " Data=" + cap_json, isResponse=False) elif modulename == "HHI-MeshDetection": enc = ETSI.ARF.OpenAPI.WorldAnalysis.models.EncodingInformationStructure( version = "HHI-0.0.1", # version data_format = "OTHER") # type/company? # Capability #1 cap1 = Capability() cap1.trackable_type = ETSI.ARF.OpenAPI.WorldAnalysis.models.TrackableType.MESH cap1.encoding_information = enc cap1.framerate = 8 cap1.latency = 0.5 cap1.accuracy = 0.9 cap_json = cap1.to_json() await WS_send(websocket, "Capabilities of:" + modulename + " Data=" + cap_json, isResponse=False) # Capability #2 cap2 = Capability() cap2.trackable_type = ETSI.ARF.OpenAPI.WorldAnalysis.models.TrackableType.FIDUCIAL_MARKER cap2.encoding_information = enc cap2.framerate = 30 cap2.latency = 0.05 cap2.accuracy = 1 cap_json = cap2.to_json() await WS_send(websocket, "Capabilities of:" + modulename + " Data=" + cap_json, isResponse=False) #msgToSend = "Idle" msgToSend = "TimeStart" # test #msgToSend = "PoseStart" # # Websocket LOOP # while running: # Is s message to be send to the server? if msgToSend != "None": await WS_send(websocket, msgToSend, isResponse=False) msgToSend = "None" # Wait for a message serverMessage = await WS_receive(websocket) # Stop the loop and disconnect the server if serverMessage == "SessionStop": isPose = False running = False elif serverMessage.startswith("Time="): isTime = True datetime = serverMessage.split('=') print(f"New server time is: { datetime[1] }") elif serverMessage.startswith("TimeStop"): isTime = False elif serverMessage.startswith("ConfigureFramerate:"): json = serverMessage.split(':')[1] elif serverMessage.startswith("GetPose:"): uuid2 = serverMessage.split(':')[1] mode = serverMessage.split(':')[2] # Send the pose p = Pose() msgToSend = "PoseNew:" + p.to_json() print(f"Send pose for: uuid={ uuid2 }") elif serverMessage.startswith("SubscribePose:"): uuid = serverMessage.split(':')[1] msgToSend = "PoseIsRegistered" isPose = True elif serverMessage.startswith("UnsubscribePose:"): uuid = serverMessage.split(':')[1] isPose = False elif isPose == True and serverMessage == "RequestNextPose": time.sleep(1) # Send the pose p = Pose() msgToSend = "PoseNew:" + p.to_json() print(f"Send pose for: uuid={ uuid }") elif isTime: serverResponse = await WS_receive(websocket) if serverResponse == "TimeStop": isTime = False msgToSend = "Idle" except websockets.exceptions.ConnectionClosed as e: print(f"[WS] Connection closed with error: {e}") except Exception as e: print(f"[WS] An error occurred: {e}") #async def main(): def main(): global modulename global messageToSend global myWebsocket # # Handle arguments # mode = sys.argv[1] # # Test rest api # api_client = ETSI.ARF.OpenAPI.WorldStorage.ApiClient(configuration) isServerOk = CheckRESTServer(api_client) if isServerOk == True: print ("[REST] Connection to WS was succesfull.") # # Use this (websockets lib) # modulename = "HHI-SpaceMap" # mode = 1 if mode == "2": modulename = "HHI-MeshDetection" messageToSend = "GetLocalisation" print() print (f"[WS] Try connecting the WA server as { modulename}...") asyncio.run(WS_ConnectAndLoop()) else: print ("[REST] Connection to WS was not succesfull!") #ip = input("Type to continue...") print ("Exiting ARF module.") if __name__ == "__main__": #asyncio.run(main()) main()