Newer
Older
#!/usr/bin/env python
#
# ARF - Augmented Reality Framework (ETSI ISG ARF)
#
# Copyright 2024 ETSI
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http:#www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# Author: Fraunhofer HHI, SylR
# Last change: September 2024
import time
import pathlib
import ssl
import asyncio
from pprint import pprint
from pydantic import ValidationError, validate_call
import websockets;
#from websockets.sync.client import connect
#import websocket; // test
import ETSI.ARF.OpenAPI.WorldStorage
from ETSI.ARF.OpenAPI.WorldStorage.api import default_api
from ETSI.ARF.OpenAPI.WorldStorage.api import trackables_api
from ETSI.ARF.OpenAPI.WorldStorage.api import world_anchors_api
from ETSI.ARF.OpenAPI.WorldStorage.api import world_links_api
import ETSI.ARF.OpenAPI.WorldAnalysis
from ETSI.ARF.OpenAPI.WorldAnalysis.api import default_api
from ETSI.ARF.OpenAPI.WorldAnalysis.api import capabilities_api
from ETSI.ARF.OpenAPI.WorldAnalysis.api import pose_api
# Models (classes)
from ETSI.ARF.OpenAPI.WorldAnalysis.models.pose import Pose
from ETSI.ARF.OpenAPI.WorldAnalysis.models.capability import Capability
# recommended to create enviroment
# conda create -n openapi
# conda activate openapi
# to install the World Storage OpenAPI: cd to /CHANGE_PATH/generated folder, and then run "py-exe -m pip install ."
# then to run, activate first enviroment:
# conda activate openapi
# and then run python script:
# python <this script>.py
#
# Web server (World Storage)
#
# for production
webserver_url = "https://etsi.hhi.fraunhofer.de" # public
# for development
#webserver_url = "http://localhost:61788"
webserver_url = "https://localhost:44301" # secure
# See configuration.py for a list of all supported configuration parameters.
configuration = ETSI.ARF.OpenAPI.WorldStorage.Configuration(host=webserver_url)
#
# WebSocket server (World Analysis)
#
# for production
websocket_url ="ws://192.168.20.29:8084/ws" # local network
#websocket_url = "wss://analysis.etsi.hhi.fraunhofer.de" # public
# for development
#websocket_url = "ws://localhost:61788/ws"
websocket_url = "wss://localhost:44301/ws" # secure
print()
print("ETSI ISG - ARF World Storage")
print()
print("Simple request tests")
print("====================")
print()
print("Using REST World Storage server: " + configuration.host)
print("Using WebSockets server: " + websocket_url)
print()
running = True
success = 0
#websocket = websockets.connect(my_ws_server)
myWebsocket = None
serverResponse = "None"
isPose = False
secretToken = "dev"
#
# Handle some certificates
#
# See here: https://websockets.readthedocs.io/en/stable/howto/quickstart.html#encrypt-connections
#localhost_pem = pathlib.Path(__file__).with_name("localhost.pem")
#webs_ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
#webs_ssl_context.load_verify_locations(localhost_pem)
#print(f"Using PEM: {localhost_pem}")
#print()
###################################################################
# REST -> World Storage
###################################################################
# Test the REST server availability (World Storage)
#
def CheckRESTServer(client):
success = 0
# Create an instance of the API class
api_instance = default_api.DefaultApi(client)
#
# Endpoint: default
#
try:
# Test the server availability.
api_response = api_instance.get_ping()
print("Sending 'ping', got response: " + api_response)
success += 1
except ETSI.ARF.OpenAPI.WorldStorage.ApiException as e:
print("[REST] Exception when calling DefaultApi->get_ping: %s\n" % e)
try:
# Test the server availability.
api_response = api_instance.get_version()
print("Sending 'version', got response: " + api_response)
success += 1
except ETSI.ARF.OpenAPI.WorldStorage.ApiException as e:
print("[REST] Exception when calling DefaultApi->get_ping: %s\n" % e)
try:
# Test the server availability.
api_response = api_instance.get_admin()
print("Sending 'admin', got response: " + api_response)
success += 1
except ETSI.ARF.OpenAPI.WorldStorage.ApiException as e:
print("[REST] Exception when calling DefaultApi->get_ping: %s\n" % e)
#
# Get the list of trackables
#
def REST_GetTrackables():
global firstJob
# Enter a context with an instance of the API client
with ETSI.ARF.OpenAPI.WorldStorage.ApiClient(configuration) as api_client:
api_instance_t = trackables_api.TrackablesApi(api_client)
try:
list_response = api_instance_t.get_trackables(token="dev")
print("Querying Trackables: got list with " + str(len(list_response)) + " items:")
for item in list_response:
print(" UUID: " + str(item.uuid) + " Name: " + item.name + " (Type: " + str(item.trackable_type) + ")")
except ETSI.ARF.OpenAPI.WorldStorage.ApiException as e:
print("[REST] Exception when calling TrackablesApi->get_trackables: %s\n" % e)
###################################################################
# WebSockets -> World Analysis
###################################################################
async def WS_send(websocket, msg, isResponse):
print(f"[WS] Send to server: {msg}")
await websocket.send(msg)
if isResponse:
return await WS_receive(websocket)
return None
async def WS_receive(websocket):
#print(f"[WS] Waiting for response...")
msgResponse = await websocket.recv()
# Handle incoming text here?
print(f"[WS] Received from server: {msg}")
#
# WebSockets main loop
#
async def WS_ConnectAndLoop():
global running, isTime, isPose
try:
###ws = websockets.WebSocket(sslopt={"cert_reqs": ssl.CERT_NONE})
#with connect(uri=websocket
#_server, ssl_context=my_ssl_context) as websocket:
async with websockets.connect(uri=websocket_url) as websocket:
#myWebsocket = websocket
# Register the user
print(f"[WS] Send registration request to server for {modulename}")
registrationResponse = await WS_send(websocket, "RegisterModule:" + modulename, isResponse=True)
if not registrationResponse.find("You are now registered"):
print(f"[WS] Error: Registration of {modulename} was not successful!")
print(f"[WS] Registration of {modulename} was succesfull.")
print(f"[WS] Entering the websockets main loop...")
#
if modulename == "HHI-SpaceMap":
enc = ETSI.ARF.OpenAPI.WorldAnalysis.models.EncodingInformationStructure(
version = "HHI-0.1", # version
data_format = "OTHER") # type/company?
# Capability #1
cap1 = Capability()
cap1.trackable_type = ETSI.ARF.OpenAPI.WorldAnalysis.models.TrackableType.MAP
cap1.encoding_information = enc
cap1.framerate = 6
cap1.latency = 0.5
cap1.accuracy = 0.8
cap_json = cap1.to_json()
await WS_send(websocket, "Capabilities=" + cap_json, isResponse=False)
elif modulename == "HHI-MeshDetection":
enc = ETSI.ARF.OpenAPI.WorldAnalysis.models.EncodingInformationStructure(
version = "HHI-0.0.1", # version
data_format = "OTHER") # type/company?
# Capability #1
cap1 = Capability()
cap1.trackable_type = ETSI.ARF.OpenAPI.WorldAnalysis.models.TrackableType.MESH
cap1.encoding_information = enc
cap1.framerate = 8
cap1.latency = 0.5
cap1.accuracy = 0.9
cap_json = cap1.to_json()
await WS_send(websocket, "Capabilities=" + cap_json, isResponse=False)
# Capability #2
cap2 = Capability()
cap2.trackable_type = ETSI.ARF.OpenAPI.WorldAnalysis.models.TrackableType.FIDUCIAL_MARKER
cap2.encoding_information = enc
cap2.framerate = 30
cap2.latency = 0.05
cap2.accuracy = 1
cap_json = cap2.to_json()
await WS_send(websocket, "Capabilities=" + cap_json, isResponse=False)
msgToSend = "Idle"
# Is s message to be send to the server?
await WS_send(websocket, msgToSend, isResponse=False)
msgToSend = "None"
# Wait for a message
serverMessage = await WS_receive(websocket)
#
# Management
#
# Stop the loop and disconnect the server
isPose = False
running = False
elif serverMessage.startswith("SubscribePose:"):
uuid = serverMessage.split(':')[1]
msgToSend = "PoseIsRegistered"
isPose = True
elif serverMessage.startswith("UnsubscribePose:"):
uuid = serverMessage.split(':')[1]
isPose = False
elif serverMessage.startswith("ConfigureFramerate:"):
fps = serverMessage.split(':')[1]
#
# Special msg for simulating a global user's position
#
elif serverMessage.startswith("CurrentUserPose="):
userPose = serverMessage.split('=')[1]
print(f"New user current position is: {userPose }")
#
# Pose requests
#
elif serverMessage.startswith("GetPose:"): # 2 args
uuid2 = serverMessage.split(':')[1]
mode = serverMessage.split(':')[2]
WS_SendPose(uuid2)
elif isPose == True and serverMessage == "RequestNextPose":
time.sleep(1)
WS_SendPose(uuid)
except websockets.exceptions.ConnectionClosed as e:
print(f"[WS] Connection closed with error: {e}")
except Exception as e:
print(f"[WS] An error occurred: {e}")
def WS_SendPose(uuid):
# Send the pose
p = Pose()
# Get the reloc infos?
msgToSend = "NewPose:" + p.to_json()
print(f"Send pose for: uuid={ uuid }")
global messageToSend
global myWebsocket
#
# Handle arguments
#
mode = sys.argv[1]
#
# Test rest api
#
api_client = ETSI.ARF.OpenAPI.WorldStorage.ApiClient(configuration)
isServerOk = CheckRESTServer(api_client)
if isServerOk == True:
print ("[REST] Connection to WS was succesfull.")
REST_GetTrackables()
#
# Use this (websockets lib)
#
modulename = "HHI-SpaceMap" # mode = 1
if mode == "2": modulename = "HHI-MeshDetection"
messageToSend = "GetLocalisation"
print()
print (f"[WS] Try connecting the WA server as { modulename}...")
asyncio.run(WS_ConnectAndLoop())