#!/usr/bin/env python # # ARF - Augmented Reality Framework (ETSI ISG ARF) # # Copyright 2024 ETSI # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http:#www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # # Last change: August 2024 # import sys import time import pathlib import ssl import asyncio from pprint import pprint from pydantic import ValidationError, validate_call import websockets; #from websockets.sync.client import connect #import websocket; // test import ETSI.ARF.OpenAPI.WorldStorage from ETSI.ARF.OpenAPI.WorldStorage.api import default_api from ETSI.ARF.OpenAPI.WorldStorage.api import trackables_api from ETSI.ARF.OpenAPI.WorldStorage.api import world_anchors_api from ETSI.ARF.OpenAPI.WorldStorage.api import world_links_api import ETSI.ARF.OpenAPI.WorldAnalysis from ETSI.ARF.OpenAPI.WorldAnalysis.api import default_api from ETSI.ARF.OpenAPI.WorldAnalysis.api import capabilities_api from ETSI.ARF.OpenAPI.WorldAnalysis.api import pose_api # Models (classes) from ETSI.ARF.OpenAPI.WorldAnalysis.models.pose import Pose from ETSI.ARF.OpenAPI.WorldAnalysis.models.capability import Capability # recommended to create enviroment # conda create -n openapi # conda activate openapi # to install the World Storage OpenAPI: cd to /CHANGE_PATH/generated folder, and then run "py-exe -m pip install ." # then to run, activate first enviroment: # conda activate openapi # and then run python script: # python .py # See configuration.py for a list of all supported configuration parameters. configuration = ETSI.ARF.OpenAPI.WorldStorage.Configuration(host="https://etsi.hhi.fraunhofer.de") #configuration = ETSI.ARF.OpenAPI.WorldStorage.Configuration(host="https://localhost:44301") #configuration = ETSI.ARF.OpenAPI.WorldStorage.Configuration(host="http://localhost:61788") webs_server = "ws://localhost:61788/ws" #webs_server = "wss://localhost:44301/ws" #webs_server = "wss://analysis.etsi.hhi.fraunhofer.de" # final url print() print("ETSI ISG - ARF World Storage") print() print("Simple request tests") print("====================") print() print("Using REST World Storage server: " + configuration.host) print("Using Websockets server: " + webs_server) print() isTime = False running = True success = 0 #websocket = websockets.connect(my_ws_server) myWebsocket = None serverResponse = "None" modulename = "" msgToSend = "None" # certificate localhost_pem = pathlib.Path(__file__).with_name("localhost.pem") print(f"Using PEM: {localhost_pem}") print() # See here: https://websockets.readthedocs.io/en/stable/howto/quickstart.html#encrypt-connections webs_ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) webs_ssl_context.load_verify_locations(localhost_pem) # # Test the REST server availability (World Storage) # def CheckRESTServer(client): success = 0 # Create an instance of the API class api_instance = default_api.DefaultApi(client) # # Endpoint: default # try: # Test the server availability. api_response = api_instance.get_ping() print("Sending 'ping', got response: " + api_response) success += 1 except ETSI.ARF.OpenAPI.WorldStorage.ApiException as e: print("Exception when calling DefaultApi->get_ping: %s\n" % e) try: # Test the server availability. api_response = api_instance.get_version() print("Sending 'version', got response: " + api_response) success += 1 except ETSI.ARF.OpenAPI.WorldStorage.ApiException as e: print("Exception when calling DefaultApi->get_ping: %s\n" % e) try: # Test the server availability. api_response = api_instance.get_admin() print("Sending 'admin', got response: " + api_response) success += 1 except ETSI.ARF.OpenAPI.WorldStorage.ApiException as e: print("Exception when calling DefaultApi->get_ping: %s\n" % e) return success == 3 # # WebSockets helpers # async def WS_sendNow(websocket, msg, isResponse): print(f"[WS] Send to server: {msg}") await websocket.send(msg) if isResponse: return await WS_receive(websocket) return None async def WS_receive(websocket): #print(f"[WS] Waiting for response...") msgResponse = await websocket.recv() WS_OnReceiveText(msgResponse); return msgResponse def WS_OnReceiveText(msg): # Handle incoming text here? print(f"[WS] Received from server: {msg}") # # WebSockets main loop # async def WS_ConnectAndLoop(): global modulename global msgToSend, serverResponse global running, isTime try: ###ws = websockets.WebSocket(sslopt={"cert_reqs": ssl.CERT_NONE}) #with connect(uri=webs_server, ssl_context=my_ssl_context) as websocket: async with websockets.connect(uri=webs_server) as websocket: #myWebsocket = websocket # Register the user print(f"[WS] Send registration request to server for {modulename}") registrationResponse = await WS_sendNow(websocket, "RegisterModule:" + modulename, isResponse=True) if not registrationResponse.find("You are now registered"): print(f"[WS] Error: Registration of {modulename} was not successful!") return else: print(f"[WS] Registration of {modulename} was succesfull.") print(f"[WS] Entering the websockets main loop...") # # Prepare the list of capabilities # if modulename == "HHI-SpaceMap": enc = ETSI.ARF.OpenAPI.WorldAnalysis.models.EncodingInformationStructure( version = "HHI-0.1", # version data_format = "OTHER") # type/company? # Capability #1 cap1 = Capability() cap1.trackable_type = ETSI.ARF.OpenAPI.WorldAnalysis.models.TrackableType.MAP cap1.encoding_information = enc cap1.framerate = 6 cap1.latency = 0.5 cap1.accuracy = 0.8 cap_json = cap1.to_json() await WS_sendNow(websocket, "Capabilities of:" + modulename + " Data=" + cap_json, isResponse=False) elif modulename == "HHI-MeshDetection": enc = ETSI.ARF.OpenAPI.WorldAnalysis.models.EncodingInformationStructure( version = "HHI-0.0.1", # version data_format = "OTHER") # type/company? # Capability #1 cap1 = Capability() cap1.trackable_type = ETSI.ARF.OpenAPI.WorldAnalysis.models.TrackableType.MESH cap1.encoding_information = enc cap1.framerate = 8 cap1.latency = 0.5 cap1.accuracy = 0.9 cap_json = cap1.to_json() await WS_sendNow(websocket, "Capabilities of:" + modulename + " Data=" + cap_json, isResponse=False) # Capability #2 cap2 = Capability() cap2.trackable_type = ETSI.ARF.OpenAPI.WorldAnalysis.models.TrackableType.FIDUCIAL_MARKER cap2.encoding_information = enc cap2.framerate = 30 cap2.latency = 0.05 cap2.accuracy = 1 cap_json = cap2.to_json() await WS_sendNow(websocket, "Capabilities of:" + modulename + " Data=" + cap_json, isResponse=False) msgToSend = "Time" # test #msgToSend = "Idle" # Receive while running: # # WS LOOP # if msgToSend != "None": serverResponse = await WS_sendNow(websocket, msgToSend, isResponse=True) # Change state if msgToSend == "TimeStart": isTime = True msgToSend = "None" elif msgToSend == "Idle": time.sleep(1) '''serverResponse = await WS_sendNow(websocket, queryFirstJob, isResponse=True) if serverResponse != "Job not found": currentJob = Job.from_json(serverResponse) currentJob.state = JobState.INITIALISING currentJob.creation_time = None currentJob.last_write_time = None currentJob.progress = "0" currentJob.result = 'Scanning location' msgToSend = "UpdateJob(" + currentJob.to_json() + ")" ''' msgToSend = "Idle" '''elif msgToSend == "Busy": isJob = True time.sleep(0.25) progress = int(currentJob.progress) + 10 currentJob.progress = str(progress) if progress >= 100: if currentJob.state == JobState.INITIALISING: currentJob.state = JobState.RUNNING currentJob.result = 'Training and querying objects' currentJob.progress = "0" elif currentJob.state == JobState.RUNNING: currentJob.state = JobState.READY currentJob.result = 'Found x objects' msgToSend = "idle" isJob = False serverResponse = await WS_sendNow(websocket, "UpdateJob(" + currentJob.to_json() + ")", isResponse=True) ''' if isTime: serverResponse = await WS_receive(websocket) if serverResponse == "TimeStop": isTime = False msgToSend = "Idle" # Stop the loop and disconnect the server if msgToSend == "None" and serverResponse == "Stop": running = False except websockets.exceptions.ConnectionClosed as e: print(f"[WS] Connection closed with error: {e}") except Exception as e: print(f"[WS] An error occurred: {e}") #async def main(): def main(): global modulename global messageToSend global myWebsocket # # Handle arguments # mode = sys.argv[1] # # Test rest api # api_client = ETSI.ARF.OpenAPI.WorldStorage.ApiClient(configuration) isServerOk = CheckRESTServer(api_client) if isServerOk == True: print ("[REST] Connection to WS was succesfull.") # # Use this (websockets lib) # modulename = "HHI-SpaceMap" # mode = 1 if mode == "2": modulename = "HHI-MeshDetection" messageToSend = "GetLocalisation" print() print (f"[WS] Try connecting the WA server as { modulename}...") asyncio.run(WS_ConnectAndLoop()) else: print ("[REST] Connection to WS was not succesfull!") #ip = input("Type to continue...") print ("Exiting ARF module.") if __name__ == "__main__": #asyncio.run(main()) main()