Skip to content
Snippets Groups Projects
Commit e45720d1 authored by Lluis Gifre Renom's avatar Lluis Gifre Renom
Browse files

InterDomain component:

- Migrated to use new generic gRPC servicer and clients
- Migrated to use new settings framework
- Commented wrong/outdated unitary tests (to be coded)
- Minor code formatting/styling
parent eb1c97b8
No related branches found
No related tags found
1 merge request!54Release 2.0.0
......@@ -13,6 +13,8 @@
# limitations under the License.
import grpc, logging
from common.Constants import ServiceNameEnum
from common.Settings import get_service_host, get_service_port_grpc
from common.tools.client.RetryDecorator import retry, delay_exponential
from common.tools.grpc.Tools import grpc_message_to_json_string
from interdomain.proto.context_pb2 import AuthenticationResult, Slice, SliceId, SliceStatus, TeraFlowController
......@@ -24,8 +26,10 @@ DELAY_FUNCTION = delay_exponential(initial=0.01, increment=2.0, maximum=5.0)
RETRY_DECORATOR = retry(max_retries=MAX_RETRIES, delay_function=DELAY_FUNCTION, prepare_method_name='connect')
class InterdomainClient:
def __init__(self, address, port):
self.endpoint = '{:s}:{:s}'.format(str(address), str(port))
def __init__(self, host=None, port=None):
if not host: host = get_service_host(ServiceNameEnum.INTERDOMAIN)
if not port: port = get_service_port_grpc(ServiceNameEnum.INTERDOMAIN)
self.endpoint = '{:s}:{:s}'.format(str(host), str(port))
LOGGER.debug('Creating channel to {:s}...'.format(self.endpoint))
self.channel = None
self.stub = None
......
......@@ -12,65 +12,18 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import grpc, logging
from concurrent import futures
from grpc_health.v1.health import HealthServicer, OVERALL_HEALTH
from grpc_health.v1.health_pb2 import HealthCheckResponse
from grpc_health.v1.health_pb2_grpc import add_HealthServicer_to_server
from context.client.ContextClient import ContextClient
from interdomain.Config import GRPC_SERVICE_PORT, GRPC_MAX_WORKERS, GRPC_GRACE_PERIOD
from common.Constants import ServiceNameEnum
from common.Settings import get_service_port_grpc
from common.tools.service.GenericGrpcService import GenericGrpcService
from interdomain.proto.interdomain_pb2_grpc import add_InterdomainServiceServicer_to_server
from slice.client.SliceClient import SliceClient
from .InterdomainServiceServicerImpl import InterdomainServiceServicerImpl
from .RemoteDomainClients import RemoteDomainClients
BIND_ADDRESS = '0.0.0.0'
LOGGER = logging.getLogger(__name__)
class InterdomainService(GenericGrpcService):
def __init__(self, remote_domain_clients : RemoteDomainClients, cls_name: str = __name__) -> None:
port = get_service_port_grpc(ServiceNameEnum.INTERDOMAIN)
super().__init__(port, cls_name=cls_name)
self.interdomain_servicer = InterdomainServiceServicerImpl(remote_domain_clients)
class InterdomainService:
def __init__(
self, context_client : ContextClient, slice_client : SliceClient, remote_domain_clients : RemoteDomainClients,
address=BIND_ADDRESS, port=GRPC_SERVICE_PORT, max_workers=GRPC_MAX_WORKERS, grace_period=GRPC_GRACE_PERIOD
):
self.context_client = context_client
self.slice_client = slice_client
self.remote_domain_clients = remote_domain_clients
self.address = address
self.port = port
self.endpoint = None
self.max_workers = max_workers
self.grace_period = grace_period
self.interdomain_servicer = None
self.health_servicer = None
self.pool = None
self.server = None
def start(self):
self.endpoint = '{:s}:{:s}'.format(str(self.address), str(self.port))
LOGGER.info('Starting Service (tentative endpoint: {:s}, max_workers: {:s})...'.format(
str(self.endpoint), str(self.max_workers)))
self.pool = futures.ThreadPoolExecutor(max_workers=self.max_workers)
self.server = grpc.server(self.pool) # , interceptors=(tracer_interceptor,))
self.interdomain_servicer = InterdomainServiceServicerImpl(
self.context_client, self.slice_client, self.remote_domain_clients)
def install_servicers(self):
add_InterdomainServiceServicer_to_server(self.interdomain_servicer, self.server)
self.health_servicer = HealthServicer(
experimental_non_blocking=True, experimental_thread_pool=futures.ThreadPoolExecutor(max_workers=1))
add_HealthServicer_to_server(self.health_servicer, self.server)
port = self.server.add_insecure_port(self.endpoint)
self.endpoint = '{:s}:{:s}'.format(str(self.address), str(port))
LOGGER.info('Listening on {:s}...'.format(str(self.endpoint)))
self.server.start()
self.health_servicer.set(OVERALL_HEALTH, HealthCheckResponse.SERVING) # pylint: disable=maybe-no-member
LOGGER.debug('Service started')
def stop(self):
LOGGER.debug('Stopping service (grace period {:s} seconds)...'.format(str(self.grace_period)))
self.health_servicer.enter_graceful_shutdown()
self.server.stop(self.grace_period)
LOGGER.debug('Service stopped')
......@@ -14,7 +14,7 @@
import grpc, logging
from common.rpc_method_wrapper.Decorator import create_metrics, safe_and_metered_rpc_method
from common.tools.grpc.Tools import grpc_message_to_json_string
#from common.tools.grpc.Tools import grpc_message_to_json_string
from context.client.ContextClient import ContextClient
from context.proto.context_pb2 import SliceStatusEnum
from interdomain.proto.context_pb2 import AuthenticationResult, Slice, SliceId, SliceStatus, TeraFlowController
......@@ -29,18 +29,16 @@ METHOD_NAMES = ['RequestSlice', 'Authenticate', 'LookUpSlice', 'OrderSliceFromCa
METRICS = create_metrics(SERVICE_NAME, METHOD_NAMES)
class InterdomainServiceServicerImpl(InterdomainServiceServicer):
def __init__(
self, context_client : ContextClient, slice_client : SliceClient,
remote_domain_clients : RemoteDomainClients
):
def __init__(self, remote_domain_clients : RemoteDomainClients):
LOGGER.debug('Creating Servicer...')
self.context_client = context_client
self.slice_client = slice_client
self.remote_domain_clients = remote_domain_clients
LOGGER.debug('Servicer Created')
@safe_and_metered_rpc_method(METRICS, LOGGER)
def RequestSlice(self, request : Slice, context : grpc.ServicerContext) -> SliceId:
context_client = ContextClient()
slice_client = SliceClient()
domains_to_endpoints = {}
local_domain_uuid = None
for slice_endpoint_id in request.slice_endpoint_ids:
......@@ -90,7 +88,7 @@ class InterdomainServiceServicerImpl(InterdomainServiceServicer):
if remote_slice.slice_status.slice_status != SliceStatusEnum.SLICESTATUS_ACTIVE:
raise Exception('Remote Slice creation failed. Wrong Slice status returned')
#self.context_client.SetSlice(remote_slice)
#context_client.SetSlice(remote_slice)
#subslice_id = reply.slice_subslice_ids.add()
#subslice_id.CopyFrom(remote_slice.slice_id)
......@@ -112,7 +110,7 @@ class InterdomainServiceServicerImpl(InterdomainServiceServicer):
slice_endpoint_id.device_id.device_uuid.uuid = 'R1@D2'
slice_endpoint_id.endpoint_uuid.uuid = '2/1'
local_slice_reply = self.slice_client.CreateSlice(local_slice_request)
local_slice_reply = slice_client.CreateSlice(local_slice_request)
if local_slice_reply != local_slice_request.slice_id: # pylint: disable=no-member
raise Exception('Local Slice creation failed. Wrong Slice Id was returned')
......@@ -120,7 +118,7 @@ class InterdomainServiceServicerImpl(InterdomainServiceServicer):
subslice_id.context_id.context_uuid.uuid = local_slice_request.slice_id.context_id.context_uuid.uuid
subslice_id.slice_uuid.uuid = local_slice_request.slice_id.slice_uuid.uuid
self.context_client.SetSlice(reply)
context_client.SetSlice(reply)
return reply.slice_id
@safe_and_metered_rpc_method(METRICS, LOGGER)
......@@ -133,7 +131,8 @@ class InterdomainServiceServicerImpl(InterdomainServiceServicer):
@safe_and_metered_rpc_method(METRICS, LOGGER)
def LookUpSlice(self, request : Slice, context : grpc.ServicerContext) -> SliceId:
try:
slice_ = self.context_client.GetSlice(request.slice_id)
context_client = ContextClient()
slice_ = context_client.GetSlice(request.slice_id)
return slice_.slice_id
except grpc.RpcError:
#LOGGER.exception('Unable to get slice({:s})'.format(grpc_message_to_json_string(request.slice_id)))
......@@ -146,7 +145,9 @@ class InterdomainServiceServicerImpl(InterdomainServiceServicer):
@safe_and_metered_rpc_method(METRICS, LOGGER)
def CreateSliceAndAddToCatalog(self, request : Slice, context : grpc.ServicerContext) -> Slice:
reply = self.slice_client.CreateSlice(request)
context_client = ContextClient()
slice_client = SliceClient()
reply = slice_client.CreateSlice(request)
if reply != request.slice_id: # pylint: disable=no-member
raise Exception('Slice creation failed. Wrong Slice Id was returned')
return self.context_client.GetSlice(request.slice_id)
return context_client.GetSlice(request.slice_id)
......@@ -26,16 +26,16 @@ class RemoteDomainClients:
self.peer_domain = {}
def add_peer(
self, domain_name : str, address : str, port : int, context_uuid : str = DEFAULT_CONTEXT_UUID
self, domain_name : str, host : str, port : int, context_uuid : str = DEFAULT_CONTEXT_UUID
) -> None:
while True:
try:
remote_teraflow_ip = socket.gethostbyname(address)
remote_teraflow_ip = socket.gethostbyname(host)
if len(remote_teraflow_ip) > 0: break
except socket.gaierror as e:
if str(e) == '[Errno -2] Name or service not known': continue
interdomain_client = InterdomainClient(address, port)
interdomain_client = InterdomainClient(host=host, port=port)
request = TeraFlowController()
request.context_id.context_uuid.uuid = DEFAULT_CONTEXT_UUID # pylint: disable=no-member
request.ip_address = get_setting('INTERDOMAINSERVICE_SERVICE_HOST', default='0.0.0.0')
......
......@@ -14,14 +14,12 @@
import logging, signal, sys, threading
from prometheus_client import start_http_server
from common.Settings import get_setting, wait_for_environment_variables
from context.client.ContextClient import ContextClient
from interdomain.service.RemoteDomainClients import RemoteDomainClients
from interdomain.Config import (
CONTEXT_SERVICE_HOST, CONTEXT_SERVICE_PORT, SLICE_SERVICE_HOST, SLICE_SERVICE_PORT, GRPC_SERVICE_PORT,
GRPC_MAX_WORKERS, GRPC_GRACE_PERIOD, LOG_LEVEL, METRICS_PORT)
from slice.client.SliceClient import SliceClient
from common.Constants import ServiceNameEnum
from common.Settings import (
ENVVAR_SUFIX_SERVICE_HOST, ENVVAR_SUFIX_SERVICE_PORT_GRPC, get_env_var_name, get_log_level, get_metrics_port,
get_service_port_grpc, wait_for_environment_variables)
from .InterdomainService import InterdomainService
from .RemoteDomainClients import RemoteDomainClients
terminate = threading.Event()
LOGGER : logging.Logger = None
......@@ -33,55 +31,36 @@ def signal_handler(signal, frame): # pylint: disable=redefined-outer-name
def main():
global LOGGER # pylint: disable=global-statement
grpc_service_port = get_setting('INTERDOMAINSERVICE_SERVICE_PORT_GRPC', default=GRPC_SERVICE_PORT )
max_workers = get_setting('MAX_WORKERS', default=GRPC_MAX_WORKERS )
grace_period = get_setting('GRACE_PERIOD', default=GRPC_GRACE_PERIOD )
log_level = get_setting('LOG_LEVEL', default=LOG_LEVEL )
metrics_port = get_setting('METRICS_PORT', default=METRICS_PORT )
log_level = get_log_level()
logging.basicConfig(level=log_level)
LOGGER = logging.getLogger(__name__)
wait_for_environment_variables([
'CONTEXTSERVICE_SERVICE_HOST', 'CONTEXTSERVICE_SERVICE_PORT_GRPC',
'SLICESERVICE_SERVICE_HOST', 'SLICESERVICE_SERVICE_PORT_GRPC',
get_env_var_name(ServiceNameEnum.CONTEXT, ENVVAR_SUFIX_SERVICE_HOST ),
get_env_var_name(ServiceNameEnum.CONTEXT, ENVVAR_SUFIX_SERVICE_PORT_GRPC),
get_env_var_name(ServiceNameEnum.SLICE, ENVVAR_SUFIX_SERVICE_HOST ),
get_env_var_name(ServiceNameEnum.SLICE, ENVVAR_SUFIX_SERVICE_PORT_GRPC),
])
context_service_host = get_setting('CONTEXTSERVICE_SERVICE_HOST', default=CONTEXT_SERVICE_HOST )
context_service_port = get_setting('CONTEXTSERVICE_SERVICE_PORT_GRPC', default=CONTEXT_SERVICE_PORT )
slice_service_host = get_setting('SLICESERVICE_SERVICE_HOST', default=SLICE_SERVICE_HOST )
slice_service_port = get_setting('SLICESERVICE_SERVICE_PORT_GRPC', default=SLICE_SERVICE_PORT )
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
LOGGER.info('Starting...')
# Start metrics server
metrics_port = get_metrics_port()
start_http_server(metrics_port)
# Initialize Context Client
if context_service_host is None or context_service_port is None:
raise Exception('Wrong address({:s}):port({:s}) of Context component'.format(
str(context_service_host), str(context_service_port)))
context_client = ContextClient(context_service_host, context_service_port)
# Initialize Slice Client
if slice_service_host is None or slice_service_port is None:
raise Exception('Wrong address({:s}):port({:s}) of Slice component'.format(
str(slice_service_host), str(slice_service_port)))
slice_client = SliceClient(slice_service_host, slice_service_port)
# Define remote domain clients
remote_domain_clients = RemoteDomainClients()
# Starting Interdomain service
grpc_service = InterdomainService(
context_client, slice_client, remote_domain_clients, port=grpc_service_port, max_workers=max_workers,
grace_period=grace_period)
grpc_service = InterdomainService(remote_domain_clients)
grpc_service.start()
remote_domain_clients.add_peer('remote-teraflow', 'remote-teraflow', GRPC_SERVICE_PORT)
# TODO: improve with configuration the definition of the remote peers
interdomain_service_port_grpc = get_service_port_grpc(ServiceNameEnum.INTERDOMAIN)
remote_domain_clients.add_peer('remote-teraflow', 'remote-teraflow', interdomain_service_port_grpc)
# Wait for Ctrl+C or termination signal
while not terminate.wait(timeout=0.1): pass
......
......@@ -13,134 +13,131 @@
# limitations under the License.
import logging, grpc
import os
import sqlite3
import pytest
from typing import Tuple
from interdomain.proto import context_pb2, kpi_sample_types_pb2, monitoring_pb2
from interdomain.client.interdomain_client import InterdomainClient
from interdomain.Config import GRPC_SERVICE_PORT, GRPC_MAX_WORKERS, GRPC_GRACE_PERIOD
from interdomain.service.InterdomainService import InterdomainService
from common.orm.Database import Database
from common.orm.Factory import get_database_backend, BackendEnum as DatabaseBackendEnum
from common.message_broker.Factory import get_messagebroker_backend, BackendEnum as MessageBrokerBackendEnum
from common.message_broker.MessageBroker import MessageBroker
LOGGER = logging.getLogger(__name__)
LOGGER.setLevel(logging.DEBUG)
###########################
# Tests Setup
###########################
SERVER_ADDRESS = '127.0.0.1'
LISTEN_ADDRESS = '[::]'
GRPC_PORT_MONITORING = 9090
GRPC_PORT_CONTEXT = 10000 + grpc_port_context # avoid privileged ports
SCENARIOS = [ # comment/uncomment scenarios to activate/deactivate them in the test unit
('all_inmemory', DatabaseBackendEnum.INMEMORY, {}, MessageBrokerBackendEnum.INMEMORY, {} ),
]
# This fixture will be requested by test cases and last during testing session
@pytest.fixture(scope='session')
def interdomain_service():
LOGGER.warning('interdomain_service begin')
interdomain_port = GRPC_INTERDOMAIN_PORT
max_workers = GRPC_MAX_WORKERS
grace_period = GRPC_GRACE_PERIOD
LOGGER.info('Initializing InterdomainService...')
grpc_service = InterdomainService(port=interdomain_port, max_workers=max_workers, grace_period=grace_period)
server = grpc_service.start()
# yield the server, when test finishes, execution will resume to stop it
LOGGER.warning('interdomain_service yielding')
yield server
LOGGER.info('Terminating InterdomainService...')
grpc_service.stop()
# This fixture will be requested by test cases and last during testing session.
# The client requires the server, so client fixture has the server as dependency.
@pytest.fixture(scope='session')
def interdomain_client(interdomain_service):
LOGGER.warning('interdomain_client begin')
client = InterdomainClient(server=SERVER_ADDRESS, port=GRPC_PORT_INTERDOMAIN) # instantiate the client
LOGGER.warning('interdomain_client returning')
return client
# This fixture will be requested by test cases and last during testing session.
@pytest.fixture(scope='session')
def create_TeraFlowController():
LOGGER.warning('create_TeraFlowController begin')
# form request
tf_ctl = context_pb2.TeraFlowController()
tf_ctl.context_id = context_pb2.ContextId()
tf_ctl.context_id.context_uuid = context_pb2.Uuid()
tf_ctl.context_id.context_uuid.uuid = str(1)
tf_ctl.ip_address = "127.0.0.1"
tf_ctl.port = 9090
return tf_ctl
@pytest.fixture(scope='session')
def create_TransportSlice():
LOGGER.warning('create_TransportSlice begin')
# form request
slice_req = slice_pb2.TransportSlice()
slice_req.contextId = context_pb2.ContextId()
slice_req.contextId.context_uuid = context_pb2.Uuid()
slice_req.contextId.context_uuid.uuid = str(1)
slice_req.slice_id = context_pb2.Uuid()
slice_req.slice_id.context_uuid.uuid = str(1)
return slice_req
###########################
# Tests Implementation
###########################
# Test case that makes use of client fixture to test server's CreateKpi method
def test_Authenticate(interdomain_client,create_TeraFlowController):
# make call to server
LOGGER.warning('test_Authenticate requesting')
response = interdomain_client.Authenticate(create_TeraFlowController)
LOGGER.debug(str(response))
assert isinstance(response, context.AuthenticationResult)
# Test case that makes use of client fixture to test server's MonitorKpi method
def test_LookUpSlice(interdomain_client,create_TransportSlice):
LOGGER.warning('test_LookUpSlice begin')
response = interdomain_client.LookUpSlice(create_TransportSlice)
LOGGER.debug(str(response))
assert isinstance(response, slice.SliceId)
# Test case that makes use of client fixture to test server's GetStreamKpi method
def test_CreateSliceAndAddToCatalog(interdomain_client,create_TransportSlice):
LOGGER.warning('test_CreateSliceAndAddToCatalog begin')
response = interdomain_client.CreateSliceAndAddToCatalog(create_TransportSlice)
LOGGER.debug(str(response))
assert isinstance(response, slice.SliceId)
# Test case that makes use of client fixture to test server's IncludeKpi method
def test_OrderSliceFromCatalog(interdomain_client,create_TransportSlice):
# make call to server
LOGGER.warning('test_OrderSliceFromCatalog requesting')
response = interdomain_client.OrderSliceFromCatalog(create_TransportSlice)
LOGGER.debug(str(response))
assert isinstance(response, slice.SliceId)
#import logging, grpc
#import os
#import sqlite3
#
#import pytest
#from typing import Tuple
#
#from interdomain.proto import context_pb2, kpi_sample_types_pb2, monitoring_pb2
#from interdomain.client.interdomain_client import InterdomainClient
#from interdomain.Config import GRPC_SERVICE_PORT, GRPC_MAX_WORKERS, GRPC_GRACE_PERIOD
#from interdomain.service.InterdomainService import InterdomainService
#
#from common.orm.Database import Database
#from common.orm.Factory import get_database_backend, BackendEnum as DatabaseBackendEnum
#from common.message_broker.Factory import get_messagebroker_backend, BackendEnum as MessageBrokerBackendEnum
#from common.message_broker.MessageBroker import MessageBroker
#
#LOGGER = logging.getLogger(__name__)
#LOGGER.setLevel(logging.DEBUG)
#
############################
## Tests Setup
############################
#
#SERVER_ADDRESS = '127.0.0.1'
#LISTEN_ADDRESS = '[::]'
#GRPC_PORT_MONITORING = 9090
#
#GRPC_PORT_CONTEXT = 10000 + grpc_port_context # avoid privileged ports
#
#SCENARIOS = [ # comment/uncomment scenarios to activate/deactivate them in the test unit
# ('all_inmemory', DatabaseBackendEnum.INMEMORY, {}, MessageBrokerBackendEnum.INMEMORY, {} ),
#]
#
#
## This fixture will be requested by test cases and last during testing session
#@pytest.fixture(scope='session')
#def interdomain_service():
# LOGGER.warning('interdomain_service begin')
#
# interdomain_port = GRPC_INTERDOMAIN_PORT
# max_workers = GRPC_MAX_WORKERS
# grace_period = GRPC_GRACE_PERIOD
#
# LOGGER.info('Initializing InterdomainService...')
# grpc_service = InterdomainService(port=interdomain_port, max_workers=max_workers, grace_period=grace_period)
# server = grpc_service.start()
#
# # yield the server, when test finishes, execution will resume to stop it
# LOGGER.warning('interdomain_service yielding')
# yield server
#
# LOGGER.info('Terminating InterdomainService...')
# grpc_service.stop()
#
## This fixture will be requested by test cases and last during testing session.
## The client requires the server, so client fixture has the server as dependency.
#@pytest.fixture(scope='session')
#def interdomain_client(interdomain_service):
# LOGGER.warning('interdomain_client begin')
# client = InterdomainClient(server=SERVER_ADDRESS, port=GRPC_PORT_INTERDOMAIN) # instantiate the client
# LOGGER.warning('interdomain_client returning')
# return client
#
## This fixture will be requested by test cases and last during testing session.
#@pytest.fixture(scope='session')
#def create_TeraFlowController():
# LOGGER.warning('create_TeraFlowController begin')
# # form request
# tf_ctl = context_pb2.TeraFlowController()
# tf_ctl.context_id = context_pb2.ContextId()
# tf_ctl.context_id.context_uuid = context_pb2.Uuid()
# tf_ctl.context_id.context_uuid.uuid = str(1)
# tf_ctl.ip_address = "127.0.0.1"
# tf_ctl.port = 9090
# return tf_ctl
#
#@pytest.fixture(scope='session')
#def create_TransportSlice():
# LOGGER.warning('create_TransportSlice begin')
#
# # form request
# slice_req = slice_pb2.TransportSlice()
# slice_req.contextId = context_pb2.ContextId()
# slice_req.contextId.context_uuid = context_pb2.Uuid()
# slice_req.contextId.context_uuid.uuid = str(1)
# slice_req.slice_id = context_pb2.Uuid()
# slice_req.slice_id.context_uuid.uuid = str(1)
#
# return slice_req
#
#
############################
## Tests Implementation
############################
#
#
## Test case that makes use of client fixture to test server's CreateKpi method
#def test_Authenticate(interdomain_client,create_TeraFlowController):
# # make call to server
# LOGGER.warning('test_Authenticate requesting')
# response = interdomain_client.Authenticate(create_TeraFlowController)
# LOGGER.debug(str(response))
# assert isinstance(response, context.AuthenticationResult)
#
## Test case that makes use of client fixture to test server's MonitorKpi method
#def test_LookUpSlice(interdomain_client,create_TransportSlice):
# LOGGER.warning('test_LookUpSlice begin')
#
# response = interdomain_client.LookUpSlice(create_TransportSlice)
# LOGGER.debug(str(response))
# assert isinstance(response, slice.SliceId)
#
## Test case that makes use of client fixture to test server's GetStreamKpi method
#def test_CreateSliceAndAddToCatalog(interdomain_client,create_TransportSlice):
# LOGGER.warning('test_CreateSliceAndAddToCatalog begin')
# response = interdomain_client.CreateSliceAndAddToCatalog(create_TransportSlice)
# LOGGER.debug(str(response))
# assert isinstance(response, slice.SliceId)
#
## Test case that makes use of client fixture to test server's IncludeKpi method
#def test_OrderSliceFromCatalog(interdomain_client,create_TransportSlice):
# # make call to server
# LOGGER.warning('test_OrderSliceFromCatalog requesting')
# response = interdomain_client.OrderSliceFromCatalog(create_TransportSlice)
# LOGGER.debug(str(response))
# assert isinstance(response, slice.SliceId)
#
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment