Skip to content
Snippets Groups Projects
test_te_service.py 3.46 KiB
Newer Older
# Simple script to test GRPC calls to the TE service.
# First get the TE service IP using:
# > kubectl -n tfs get services
# Change it in this script and run with:
# > PYTHONPATH=./src python test_te_service.py

import json, sys
from common.proto.context_pb2 import ConfigActionEnum, Service, ServiceStatusEnum, ServiceTypeEnum
from common.tools.grpc.Tools import grpc_message_to_json_string
from service.client.TEServiceClient import TEServiceClient

#  {"name": "", 
#     "service_config": {
#        "config_rules": [
#           {
#             "action": "CONFIGACTION_SET",
#              "custom": {
#                 "resource_key": "/lsp-fw",
#                 "resource_value": "{\n\"binding_label\": 1111,\n\"symbolic_name\": \"foo\"\n}"}},
#           {
#              "action": "CONFIGACTION_SET",
#              "custom": {
#                 "resource_key": "/lsp-bw",
#                 "resource_value": "{\n\"binding_label\": 6666,\n\"symbolic_name\": \"bar\"\n}"}}]},
#        "service_constraints": [],
#        "service_endpoint_ids": [
#           {"device_id": {"device_uuid": {"uuid": "RT1"}}, "endpoint_uuid": {"uuid": "eth-src"}},
#           {"device_id": {"device_uuid": {"uuid": "RT6"}}, "endpoint_uuid": {"uuid": "eth-dst"}}],
#        "service_id": {"context_id": {"context_uuid": {"uuid": "admin"}},
#        "service_uuid": {"uuid": "2c025055-bf6c-4250-8560-cf62f2d29e72"}},
#        "service_status": {"service_status": "SERVICESTATUS_PLANNED"},
#        "service_type": "SERVICETYPE_TE"}

service = Service()
service.service_id.context_id.context_uuid.uuid = 'admin'
service.service_id.service_uuid.uuid = 'test-te-service'

service.service_type = ServiceTypeEnum.SERVICETYPE_TE
service.service_status.service_status = ServiceStatusEnum.SERVICESTATUS_PLANNED

# SRC Endpoint:
src_endpoint_id = service.service_endpoint_ids.add()
src_endpoint_id.device_id.device_uuid.uuid = 'RT1'
src_endpoint_id.endpoint_uuid.uuid = 'eth-src'

# DST Endpoint:
dst_endpoint_id = service.service_endpoint_ids.add()
dst_endpoint_id.device_id.device_uuid.uuid = 'RT6'
dst_endpoint_id.endpoint_uuid.uuid = 'eth-dst'

# # Capacity SLA
# sla_capacity = service.service_constraints.add()
# sla_capacity.sla_capacity.capacity_gbps = 10.0

# # Latency SLA
# sla_latency = service.service_constraints.add()
# sla_latency.sla_latency.e2e_latency_ms = 20.0

# Example config rules:
config_rule_1 = service.service_config.config_rules.add()
config_rule_1.action = ConfigActionEnum.CONFIGACTION_SET
config_rule_1.custom.resource_key = '/lsp-fw'
config_rule_1.custom.resource_value = json.dumps({
    'binding_label': 1111, 'symbolic_name': "foo"
})

config_rule_2 = service.service_config.config_rules.add()
config_rule_2.action = ConfigActionEnum.CONFIGACTION_SET
config_rule_2.custom.resource_key = '/lsp-bw'
config_rule_2.custom.resource_value = json.dumps({
    'binding_label': 6666, 'symbolic_name': "bar"
})

def main():
    # Connect:
    te_service_client = TEServiceClient(host='XXX.XXX.XXX.XXX', port=10030)

    # RequestLSP
    print('request:', grpc_message_to_json_string(service))
    service_status = te_service_client.RequestLSP(service)
    print('response:', grpc_message_to_json_string(service_status))

    # DeleteLSP
    #print('request:', grpc_message_to_json_string(service))
    #service_status = te_service_client.DeleteLSP(service)
    #print('response:', grpc_message_to_json_string(service_status))

    # Close:
    te_service_client.close()

    return 0

if __name__ == '__main__':
    sys.exit(main())