Commit 8fe0daf6 authored by Sylvain Renault's avatar Sylvain Renault
Browse files

Python test for communication with the world storage api.

parent a22a1993
Loading
Loading
Loading
Loading
+2 −3
Original line number Diff line number Diff line
@@ -76,14 +76,13 @@ conda activate openapi
Install the World Storage OpenAPI:

```
cd client\generated
pip install .
pip install .\client\generated
```

In case of not having the pip installed for your cml you can use following line:

```
py -m pip install .
py -m pip install .\client\generated
```

## Running the test script
+203 −0
Original line number Diff line number Diff line
@@ -17,17 +17,28 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
# Last change: Mai 2024
# Author: Fraunhofer HHI, SylR
# Last change: September 2024
#

import time
from pprint import pprint
from pydantic import ValidationError, validate_call

import ETSI.ARF.OpenAPI.WorldStorage
from ETSI.ARF.OpenAPI.WorldStorage.api import default_api
from ETSI.ARF.OpenAPI.WorldStorage.api import trackables_api
from ETSI.ARF.OpenAPI.WorldStorage.api import world_anchors_api
from ETSI.ARF.OpenAPI.WorldStorage.api import world_links_api
from ETSI.ARF.OpenAPI.WorldStorage.api import relocalization_information_api

# Models (classes)
from ETSI.ARF.OpenAPI.WorldStorage.models.uuid_and_mode import UuidAndMode
from ETSI.ARF.OpenAPI.WorldStorage.models.mode_world_storage import ModeWorldStorage
from ETSI.ARF.OpenAPI.WorldStorage.models.capability import Capability
from ETSI.ARF.OpenAPI.WorldStorage.models.reloc_object import RelocObject
from ETSI.ARF.OpenAPI.WorldStorage.models.relocalization_information import RelocalizationInformation
from ETSI.ARF.OpenAPI.WorldStorage.models.relocalization_informations import RelocalizationInformations

# recommended to create enviroment
# conda create -n openapi
@@ -39,11 +50,16 @@ from ETSI.ARF.OpenAPI.WorldStorage.api import world_links_api
# and then run python script:
# python <this script>.py

# for production
webserver_url = "https://etsi.hhi.fraunhofer.de"  # public

# for development
#webserver_url = "http://localhost:61788"
webserver_url = "https://localhost:44301"  # secure


# See configuration.py for a list of all supported configuration parameters.
configuration = ETSI.ARF.OpenAPI.WorldStorage.Configuration(
    host="https://etsi.hhi.fraunhofer.de"
)
configuration = ETSI.ARF.OpenAPI.WorldStorage.Configuration(host=webserver_url)

print()
print("ETSI ISG - ARF World Storage")
@@ -72,7 +88,7 @@ def CheckRESTServer(client) -> bool:
        print("Sending 'ping', got response: " + api_response)
        success += 1
    except ETSI.ARF.OpenAPI.WorldStorage.ApiException as e:
        print("Exception when calling DefaultApi->get_ping: %s\n" % e)
        print("[REST] Exception when calling DefaultApi->get_ping: %s\n" % e)

    try:
        # Test the server availability.
@@ -80,7 +96,7 @@ def CheckRESTServer(client) -> bool:
        print("Sending 'version', got response: " + api_response)
        success += 1
    except ETSI.ARF.OpenAPI.WorldStorage.ApiException as e:
        print("Exception when calling DefaultApi->get_ping: %s\n" % e)
        print("[REST] Exception when calling DefaultApi->get_ping: %s\n" % e)

    try:
        # Test the server availability.
@@ -88,10 +104,15 @@ def CheckRESTServer(client) -> bool:
        print("Sending 'admin', got response: " + api_response)
        success += 1
    except ETSI.ARF.OpenAPI.WorldStorage.ApiException as e:
        print("Exception when calling DefaultApi->get_ping: %s\n" % e)
        print("[REST] Exception when calling DefaultApi->get_ping: %s\n" % e)

    return success == 3

#
# Main
#
devToken = "dev"

# Enter a context with an instance of the API client
with ETSI.ARF.OpenAPI.WorldStorage.ApiClient(configuration) as api_client:
 
@@ -99,46 +120,83 @@ with ETSI.ARF.OpenAPI.WorldStorage.ApiClient(configuration) as api_client:
    isServerOk = CheckRESTServer(api_client)

    if isServerOk == True:
        print ("REST Connection was succesfull.")
        print ("[REST] Connection was succesfull.")

        #
        # Endpoint: trackables
        #
        print()
        print("2. Testing trackables endpoints")
        print("2. Testing Trackables endpoint:")
        api_instance_t = trackables_api.TrackablesApi(api_client)
        try:
            list_response = api_instance_t.get_trackables()
            print("Querying Trackables: got list with " + str(len(list_response)) + " items:")
            for item in list_response:
                print("   UUID: " + str(item.uuid) + " Name: " + item.name + " (Type: " + str(item.trackable_type) + ")")
            trackables = api_instance_t.get_trackables(token=devToken).trackables
            print("\tQuerying Trackables: got list with " + str(len(trackables)) + " items:")
            for item in trackables:
                print("\t\tUUID: " + str(item.uuid) + " Name: " + item.name + " (Type: " + str(item.trackable_type) + ")")
        except ETSI.ARF.OpenAPI.WorldStorage.ApiException as e:
            print("Exception when calling TrackablesApi->get_trackables: %s\n" % e)
            print("[REST] Exception when calling TrackablesApi->get_trackables: %s\n" % e)

        print()
        print("3. Testing anchors endpoints")
        print("3. Testing World Anchors endpoint:")
        api_instance_wa = world_anchors_api.WorldAnchorsApi(api_client)
        try:
            list_response = api_instance_wa.get_world_anchors()
            print("Querying World Anchors: got list with " + str(len(list_response)) + " items:")
            for item in list_response:
                print("   UUID: " + str(item.uuid) + " Name: " + item.name )
            anchors = api_instance_wa.get_world_anchors(token=devToken).world_anchors
            print("\tQuerying World Anchors: got list with " + str(len(anchors)) + " items:")
            for item in anchors:
                print("\t\tUUID: " + str(item.uuid) + " Name: " + item.name )
        except ETSI.ARF.OpenAPI.WorldStorage.ApiException as e:
            print("Exception when calling WorldAnchorsApi->get_world_anchors: %s\n" % e)
            print("[REST] Exception when calling WorldAnchorsApi->get_world_anchors: %s\n" % e)

        """ 
        print()
        print("4. Testing links endpoints")
        print("4. Testing World Links endpoint:")
        api_instance_wl = world_links_api.WorldLinksApi(api_client)
        try:
            list_response = api_instance_wl.get_world_links()
            print("Querying World Links: got list with " + str(len(list_response)) + " items:")
            for item in list_response:
                print("   Link from UUID: " + str(item.uuid_from) + " to UUID: " + str(item.uuidto))
            links = api_instance_wl.get_world_links(token=devToken).world_links
            print("\tQuerying World Links: got list with " + str(len(links)) + " items:")
            for item in links:
                print("\t\tUUID: " + str(item.uuid) + " Link from UUID: " + str(item.uuid_from) + " to UUID: " + str(item.uuidto))
        except ETSI.ARF.OpenAPI.WorldStorage.ApiException as e:
            print("[REST] Exception when calling WorldLinksApi->get_world_links: %s\n" % e)
        """
    
        print()
        print("5. Testing the Relocalization Info endpoint:")
        api_instance_reloc = relocalization_information_api.RelocalizationInformationApi(api_client)
        
        # create a fake reloc request
        enc = ETSI.ARF.OpenAPI.WorldStorage.models.EncodingInformationStructure(
                version = "HHI-0.1",        # version
                data_format = "OTHER")      # type/company?
        
        # Capability #1
        cap1 = Capability()
        cap1.trackable_type = ETSI.ARF.OpenAPI.WorldStorage.models.TrackableType.MAP
        cap1.encoding_information = enc
        cap1.framerate = 6
        cap1.latency = 0.5
        cap1.accuracy = 0.8
        
        arCapabilities = [ cap1 ]        

        uuid2track = UuidAndMode()
        #uuid2track.uuid = trackables[0].uuid
        #uuid2track.uuid = trackables[0].name # work around
        uuid2track.uuid = "cb0630db-2151-47d3-85d4-09c079cd8be1"
        uuid2track.mode = ModeWorldStorage.REQUEST_TO_TRACKABLES

        arObjects = [ uuid2track ]

        try:
            print("\tQuerying reloc info for UUID: " + str(uuid2track.uuid))

            reloc_response = api_instance_reloc.get_relocalization_information(token=devToken, capabilities=arCapabilities, uuids=arObjects).world_links
            print("\t\tGot:" + reloc_response.reloc_info[0].reloc_objects[0].mode)
        except ETSI.ARF.OpenAPI.WorldStorage.ApiException as e:
            print("Exception when calling WorldLinksApi->get_world_links: %s\n" % e)
            print("[REST] Exception when calling RelocalizationInformationApi->get_relocalization_information: %s\n" % e)

    else:
        print ("REST Connection was not succesfull!")
        print ("[REST] Connection was not succesfull!")


print ()

openapi @ 57348800

Original line number Diff line number Diff line
Subproject commit b639a02180c2b5e301c77483b3a2fa645ba94169
Subproject commit 5734880080c52572af65fe6d2703228ed6ee5653