#!/usr/bin/env python # # ARF - Augmented Reality Framework (ETSI ISG ARF) # # Copyright 2024 ETSI # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http:#www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # # Last change: August 2024 # import time import pathlib import ssl import asyncio from pprint import pprint import websockets; #from websockets.sync.client import connect #import websocket; // test import ETSI.ARF.OpenAPI.WorldStorage from ETSI.ARF.OpenAPI.WorldStorage.api import default_api from ETSI.ARF.OpenAPI.WorldStorage.api import trackables_api from ETSI.ARF.OpenAPI.WorldStorage.api import world_anchors_api from ETSI.ARF.OpenAPI.WorldStorage.api import world_links_api # recommended to create enviroment # conda create -n openapi # conda activate openapi # to install the World Storage OpenAPI: cd to /CHANGE_PATH/generated folder, and then run "py-exe -m pip install ." # then to run, activate first enviroment: # conda activate openapi # and then run python script: # python .py # See configuration.py for a list of all supported configuration parameters. configuration = ETSI.ARF.OpenAPI.WorldStorage.Configuration(host="https://etsi.hhi.fraunhofer.de") #configuration = ETSI.ARF.OpenAPI.WorldStorage.Configuration(host="https://localhost:44301") #configuration = ETSI.ARF.OpenAPI.WorldStorage.Configuration(host="http://localhost:61788") print() print("ETSI ISG - ARF World Storage") print() print("Simple request tests") print("====================") print() print("Using WS server: " + configuration.host) print() isTime = False running = True success = 0 #websocket = websockets.connect(my_ws_server) myWebsocket = None serverResponse = "None" username = "" msgToSend = "None" # certificate localhost_pem = pathlib.Path(__file__).with_name("localhost.pem") print(f"Using PEM: {localhost_pem}") print() # See here: https://websockets.readthedocs.io/en/stable/howto/quickstart.html#encrypt-connections my_ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) my_ssl_context.load_verify_locations(localhost_pem) arf_ws_server = "ws://localhost:61788/ws" #arf_ws_server = "wss://localhost:44301/ws" #arf_ws_server = "ws://localhost:8765" # # Test the REST server availability # def CheckRESTServer(client): success = 0 # Create an instance of the API class api_instance = default_api.DefaultApi(client) # # Endpoint: default # try: # Test the server availability. api_response = api_instance.get_ping() print("Sending 'ping', got response: " + api_response) success += 1 except ETSI.ARF.OpenAPI.WorldStorage.ApiException as e: print("Exception when calling DefaultApi->get_ping: %s\n" % e) try: # Test the server availability. api_response = api_instance.get_version() print("Sending 'version', got response: " + api_response) success += 1 except ETSI.ARF.OpenAPI.WorldStorage.ApiException as e: print("Exception when calling DefaultApi->get_ping: %s\n" % e) try: # Test the server availability. api_response = api_instance.get_admin() print("Sending 'admin', got response: " + api_response) success += 1 except ETSI.ARF.OpenAPI.WorldStorage.ApiException as e: print("Exception when calling DefaultApi->get_ping: %s\n" % e) return success == 3 # # WebSockets helpers # async def WS_sendNow(websocket, msg, isResponse): print(f"[WS] Send to server: {msg}") await websocket.send(msg) if isResponse: return await WS_receive(websocket) return None async def WS_receive(websocket): #print(f"[WS] Waiting for response...") msgResponse = await websocket.recv() WS_OnReceiveText(msgResponse); return msgResponse def WS_OnReceiveText(msg): # Handle incoming text here? print(f"[WS] Received from server: {msg}") # # WebSockets main loop # async def WS_ConnectAndLoop(): global username global msgToSend, serverResponse global running, isTime try: ###ws = websockets.WebSocket(sslopt={"cert_reqs": ssl.CERT_NONE}) #with connect(uri=my_ws_server, ssl_context=my_ssl_context) as websocket: async with websockets.connect(uri=arf_ws_server) as websocket: #myWebsocket = websocket # Register the user print(f"[WS] Send registration request to server for {username}") registrationResponse = await WS_sendNow(websocket, "module:" + username, isResponse=True) if not registrationResponse.find("You are now registered"): print(f"[WS] Error: Registration of {username} was not successful!") return else: print(f"[WS] Registration of {username} was succesfull.") print(f"[WS] Entering the websockets main loop...") # Prepare the list of capabilities await WS_sendNow(websocket, "Capabilities:" + username, isResponse=True) msgToSend = "Time" #msgToSend = "Idle" # Receive while running: # # WS LOOP # if msgToSend != "None": serverResponse = await WS_sendNow(websocket, msgToSend, isResponse=True) # Change state if msgToSend == "Time": isTime = True msgToSend = "None" elif msgToSend == "Idle": time.sleep(1) '''serverResponse = await WS_sendNow(websocket, queryFirstJob, isResponse=True) if serverResponse != "Job not found": currentJob = Job.from_json(serverResponse) currentJob.state = JobState.INITIALISING currentJob.creation_time = None currentJob.last_write_time = None currentJob.progress = "0" currentJob.result = 'Scanning location' msgToSend = "UpdateJob(" + currentJob.to_json() + ")" ''' msgToSend = "Idle" '''elif msgToSend == "Busy": isJob = True time.sleep(0.25) progress = int(currentJob.progress) + 10 currentJob.progress = str(progress) if progress >= 100: if currentJob.state == JobState.INITIALISING: currentJob.state = JobState.RUNNING currentJob.result = 'Training and querying objects' currentJob.progress = "0" elif currentJob.state == JobState.RUNNING: currentJob.state = JobState.READY currentJob.result = 'Found x objects' msgToSend = "idle" isJob = False serverResponse = await WS_sendNow(websocket, "UpdateJob(" + currentJob.to_json() + ")", isResponse=True) ''' if isTime: serverResponse = await WS_receive(websocket) if serverResponse == "TimeStop": isTime = False msgToSend = "Idle" # Stop the loop and disconnect the server if msgToSend == "None" and serverResponse == "Stop": running = False except websockets.exceptions.ConnectionClosed as e: print(f"[WS] Connection closed with error: {e}") except Exception as e: print(f"[WS] An error occurred: {e}") ''' # WebSocket-Client (test) def on_message(ws, message): global isJob, messageToSend if message == "Stop": ws.close() else: print(f"Received from server: {message}") if isJob: isJob = False print(f"Unregister me!") ws.send(f"UnregisterClient") print(f"Waiting for response...") else: isJob = True; messageToSend = "StartJob" print(f"Send to server: {messageToSend}") ws.send(messageToSend) print(f"Waiting for response...") def on_error(ws, error): print(f"Encountered error: {error}") def on_close(ws, close_status_code, close_msg): print("Connection closed.") def on_open(ws): print() print(f"Send NEW registration request to server for: {username}") ws.send(username) print(f"Waiting for response...") ''' #async def main(): def main(): global username global messageToSend global myWebsocket # # Test rest api # api_client = ETSI.ARF.OpenAPI.WorldStorage.ApiClient(configuration) isServerOk = CheckRESTServer(api_client) if isServerOk == True: print ("[REST] Connection to WS was succesfull.") else: print ("[REST] Connection to WS was not succesfull!") # # Use this (websockets lib) # username = "HHI" messageToSend = "GetLocalisation" print() print (f"[WS] Try connecting the WA server as { username}...") asyncio.run(WS_ConnectAndLoop()) #ip = input("Type to continue...") # # Test websockets 2 # '''ws = websocket.WebSocketApp(my_ws_server, on_open=on_open, on_message=on_message, on_error=on_error, on_close=on_close) #ws.on_open = on_open ws.run_forever()''' print ("Exiting ARF module.") if __name__ == "__main__": #asyncio.run(main()) main()