Skip to content
Snippets Groups Projects
Commit 10ec2199 authored by Lluis Gifre Renom's avatar Lluis Gifre Renom
Browse files

Added L3NM-OpenConfig Service Handler

parent 6792caf1
No related branches found
No related tags found
1 merge request!54Release 2.0.0
Showing
with 528 additions and 1011 deletions
...@@ -7,7 +7,7 @@ pip install --upgrade pip setuptools wheel pip-tools pylint pytest pytest-benchm ...@@ -7,7 +7,7 @@ pip install --upgrade pip setuptools wheel pip-tools pylint pytest pytest-benchm
echo "" > requirements.in echo "" > requirements.in
#TODO: include here your component #TODO: include here your component
COMPONENTS="compute context device monitoring centralizedattackdetector opticalcentralizedattackdetector opticalattackmitigator dbscanserving" COMPONENTS="compute context device service monitoring centralizedattackdetector opticalcentralizedattackdetector opticalattackmitigator dbscanserving"
# compiling dependencies from all components # compiling dependencies from all components
for component in $COMPONENTS for component in $COMPONENTS
......
# Create a set of tests enabling to run tests as follows ...
# from common.tests.PytestGenerateTests import pytest_generate_tests # pylint: disable=unused-import
#
# scenario1 = ('basic', {'attribute': 'value'})
# scenario2 = ('advanced', {'attribute': 'value2'})
#
# class TestSampleWithScenarios:
# scenarios = [scenario1, scenario2]
#
# def test_demo1(self, attribute):
# assert isinstance(attribute, str)
#
# def test_demo2(self, attribute):
# assert isinstance(attribute, str)
#
# ... and run them as:
# $ pytest --log-level=INFO --verbose my_test.py
# =================== test session starts ===================
# platform linux -- Python 3.9.6, pytest-6.2.4, py-1.10.0, pluggy-0.13.1 -- /home/.../.pyenv/.../bin/python3.9
# cachedir: .pytest_cache
# benchmark: 3.4.1 (defaults: timer=time.perf_counter disable_gc=False min_rounds=5 min_time=0.000005 max_time=1.0
# calibration_precision=10 warmup=False warmup_iterations=100000)
# rootdir: /home/.../tests
# plugins: benchmark-3.4.1
# collected 4 items
#
# my_test.py::TestSampleWithScenarios::test_demo1[basic] PASSED [ 25%]
# my_test.py::TestSampleWithScenarios::test_demo2[basic] PASSED [ 50%]
# my_test.py::TestSampleWithScenarios::test_demo1[advanced] PASSED [ 75%]
# my_test.py::TestSampleWithScenarios::test_demo2[advanced] PASSED [100%]
#
# ==================== 4 passed in 0.02s ====================
def pytest_generate_tests(metafunc):
idlist = []
argvalues = []
for scenario in metafunc.cls.scenarios:
idlist.append(scenario[0])
items = scenario[1].items()
argnames = [x[0] for x in items]
argvalues.append([x[1] for x in items])
metafunc.parametrize(argnames, argvalues, ids=idlist, scope='class')
from typing import Dict
import grpc, logging
from prometheus_client import Counter, Histogram
from common.database.api.Database import Database
from common.exceptions.ServiceException import ServiceException
from service.proto.context_pb2 import Empty
from service.proto.service_pb2 import ConnectionList, Service, ServiceId, ServiceList
from service.proto.service_pb2_grpc import ServiceServiceServicer
from service.service.Tools import check_service_id_request, check_service_request
LOGGER = logging.getLogger(__name__)
GETSERVICELIST_COUNTER_STARTED = Counter ('service_getservicelist_counter_started',
'Service:GetServiceList counter of requests started' )
GETSERVICELIST_COUNTER_COMPLETED = Counter ('service_getservicelist_counter_completed',
'Service:GetServiceList counter of requests completed')
GETSERVICELIST_COUNTER_FAILED = Counter ('service_getservicelist_counter_failed',
'Service:GetServiceList counter of requests failed' )
GETSERVICELIST_HISTOGRAM_DURATION = Histogram('service_getservicelist_histogram_duration',
'Service:GetServiceList histogram of request duration')
CREATESERVICE_COUNTER_STARTED = Counter ('service_createservice_counter_started',
'Service:CreateService counter of requests started' )
CREATESERVICE_COUNTER_COMPLETED = Counter ('service_createservice_counter_completed',
'Service:CreateService counter of requests completed')
CREATESERVICE_COUNTER_FAILED = Counter ('service_createservice_counter_failed',
'Service:CreateService counter of requests failed' )
CREATESERVICE_HISTOGRAM_DURATION = Histogram('service_createservice_histogram_duration',
'Service:CreateService histogram of request duration')
UPDATESERVICE_COUNTER_STARTED = Counter ('service_updateservice_counter_started',
'Service:UpdateService counter of requests started' )
UPDATESERVICE_COUNTER_COMPLETED = Counter ('service_updateservice_counter_completed',
'Service:UpdateService counter of requests completed')
UPDATESERVICE_COUNTER_FAILED = Counter ('service_updateservice_counter_failed',
'Service:UpdateService counter of requests failed' )
UPDATESERVICE_HISTOGRAM_DURATION = Histogram('service_updateservice_histogram_duration',
'Service:UpdateService histogram of request duration')
DELETESERVICE_COUNTER_STARTED = Counter ('service_deleteservice_counter_started',
'Service:DeleteService counter of requests started' )
DELETESERVICE_COUNTER_COMPLETED = Counter ('service_deleteservice_counter_completed',
'Service:DeleteService counter of requests completed')
DELETESERVICE_COUNTER_FAILED = Counter ('service_deleteservice_counter_failed',
'Service:DeleteService counter of requests failed' )
DELETESERVICE_HISTOGRAM_DURATION = Histogram('service_deleteservice_histogram_duration',
'Service:DeleteService histogram of request duration')
GETSERVICEBYID_COUNTER_STARTED = Counter ('service_getservicebyid_counter_started',
'Service:GetServiceById counter of requests started' )
GETSERVICEBYID_COUNTER_COMPLETED = Counter ('service_getservicebyid_counter_completed',
'Service:GetServiceById counter of requests completed')
GETSERVICEBYID_COUNTER_FAILED = Counter ('service_getservicebyid_counter_failed',
'Service:GetServiceById counter of requests failed' )
GETSERVICEBYID_HISTOGRAM_DURATION = Histogram('service_getservicebyid_histogram_duration',
'Service:GetServiceById histogram of request duration')
GETCONNECTIONLIST_COUNTER_STARTED = Counter ('service_getconnectionlist_counter_started',
'Service:GetConnectionList counter of requests started' )
GETCONNECTIONLIST_COUNTER_COMPLETED = Counter ('service_getconnectionlist_counter_completed',
'Service:GetConnectionList counter of requests completed')
GETCONNECTIONLIST_COUNTER_FAILED = Counter ('service_getconnectionlist_counter_failed',
'Service:GetConnectionList counter of requests failed' )
GETCONNECTIONLIST_HISTOGRAM_DURATION = Histogram('service_getconnectionlist_histogram_duration',
'Service:GetConnectionList histogram of request duration')
class ServiceServiceServicerImpl(ServiceServiceServicer):
def __init__(self, database : Database):
LOGGER.debug('Creating Servicer...')
self.database = database
LOGGER.debug('Servicer Created')
@GETSERVICELIST_HISTOGRAM_DURATION.time()
def GetServiceList(self, request : Empty, grpc_context : grpc.ServicerContext) -> ServiceList:
GETSERVICELIST_COUNTER_STARTED.inc()
try:
LOGGER.debug('GetServiceList request: {}'.format(str(request)))
# ----- Validate request data and pre-conditions -----------------------------------------------------------
# ----- Retrieve data from the database --------------------------------------------------------------------
db_context_uuids = self.database.contexts.get()
json_services = []
for db_context_uuid in db_context_uuids:
db_context = self.database.context(db_context_uuid)
json_services.extend(db_context.dump_services())
# ----- Compose reply --------------------------------------------------------------------------------------
reply = ServiceList(cs=json_services)
LOGGER.debug('GetServiceList reply: {}'.format(str(reply)))
GETSERVICELIST_COUNTER_COMPLETED.inc()
return reply
except ServiceException as e: # pragma: no cover (ServiceException not thrown)
LOGGER.exception('GetServiceList exception')
GETSERVICELIST_COUNTER_FAILED.inc()
grpc_context.abort(e.code, e.details)
except Exception as e: # pragma: no cover
LOGGER.exception('GetServiceList exception')
GETSERVICELIST_COUNTER_FAILED.inc()
grpc_context.abort(grpc.StatusCode.INTERNAL, str(e))
@CREATESERVICE_HISTOGRAM_DURATION.time()
def CreateService(self, request : Service, grpc_context : grpc.ServicerContext) -> ServiceId:
CREATESERVICE_COUNTER_STARTED.inc()
try:
LOGGER.debug('CreateService request: {}'.format(str(request)))
# ----- Validate request data and pre-conditions -----------------------------------------------------------
context_id, service_id, service_type, service_config, service_state, db_endpoints, constraint_tuples = \
check_service_request('CreateService', request, self.database, LOGGER)
# ----- Implement changes in the database ------------------------------------------------------------------
db_context = self.database.context(context_id)
db_service = db_context.service(service_id)
db_service.create(service_type, service_config, service_state)
for db_endpoint in db_endpoints:
service_endpoint_id = '{}:{}/{}'.format(
db_endpoint.topology_uuid, db_endpoint.device_uuid, db_endpoint.endpoint_uuid)
db_service.endpoint(service_endpoint_id).create(db_endpoint)
for cons_type,cons_value in constraint_tuples: db_service.constraint(cons_type).create(cons_value)
# ----- Compose reply --------------------------------------------------------------------------------------
reply = ServiceId(**db_service.dump_id())
LOGGER.debug('CreateService reply: {}'.format(str(reply)))
CREATESERVICE_COUNTER_COMPLETED.inc()
return reply
except ServiceException as e:
LOGGER.exception('CreateService exception')
CREATESERVICE_COUNTER_FAILED.inc()
grpc_context.abort(e.code, e.details)
except Exception as e: # pragma: no cover
LOGGER.exception('CreateService exception')
CREATESERVICE_COUNTER_FAILED.inc()
grpc_context.abort(grpc.StatusCode.INTERNAL, str(e))
@UPDATESERVICE_HISTOGRAM_DURATION.time()
def UpdateService(self, request : Service, grpc_context : grpc.ServicerContext) -> ServiceId:
UPDATESERVICE_COUNTER_STARTED.inc()
try:
LOGGER.debug('UpdateService request: {}'.format(str(request)))
# ----- Validate request data and pre-conditions -----------------------------------------------------------
context_id, service_id, service_type, service_config, service_state, db_endpoints, constraint_tuples = \
check_service_request('UpdateService', request, self.database, LOGGER)
# ----- Implement changes in the database ------------------------------------------------------------------
db_context = self.database.context(context_id)
db_service = db_context.service(service_id)
# Update service attributes
db_service.update(update_attributes={
'service_type' : service_type,
'service_config': service_config,
'service_state' : service_state,
})
# Update service constraints; first add missing, then remove existing, but not added to Service
db_service_constraint_types = set(db_service.constraints.get())
for constraint_type,constraint_value in constraint_tuples:
if constraint_type in db_service_constraint_types:
db_service.constraint(constraint_type).update(update_attributes={
'constraint_value': constraint_value
})
else:
db_service.constraint(constraint_type).create(constraint_value)
db_service_constraint_types.discard(constraint_type)
for constraint_type in db_service_constraint_types:
db_service.constraint(constraint_type).delete()
# Update service endpoints; first add missing, then remove existing, but not added to Service
db_service_endpoint_uuids = set(db_service.endpoints.get())
for db_endpoint in db_endpoints:
service_endpoint_id = '{}:{}/{}'.format(
db_endpoint.topology_uuid, db_endpoint.device_uuid, db_endpoint.endpoint_uuid)
if service_endpoint_id not in db_service_endpoint_uuids:
db_service.endpoint(service_endpoint_id).create(db_endpoint)
db_service_endpoint_uuids.discard(service_endpoint_id)
for db_service_endpoint_uuid in db_service_endpoint_uuids:
db_service.endpoint(db_service_endpoint_uuid).delete()
# ----- Compose reply --------------------------------------------------------------------------------------
reply = ServiceId(**db_service.dump_id())
LOGGER.debug('UpdateService reply: {}'.format(str(reply)))
UPDATESERVICE_COUNTER_COMPLETED.inc()
return reply
except ServiceException as e:
LOGGER.exception('UpdateService exception')
UPDATESERVICE_COUNTER_FAILED.inc()
grpc_context.abort(e.code, e.details)
except Exception as e: # pragma: no cover
LOGGER.exception('UpdateService exception')
UPDATESERVICE_COUNTER_FAILED.inc()
grpc_context.abort(grpc.StatusCode.INTERNAL, str(e))
@DELETESERVICE_HISTOGRAM_DURATION.time()
def DeleteService(self, request : ServiceId, grpc_context : grpc.ServicerContext) -> Empty:
DELETESERVICE_COUNTER_STARTED.inc()
try:
LOGGER.debug('DeleteService request: {}'.format(str(request)))
# ----- Validate request data and pre-conditions -----------------------------------------------------------
context_id, service_id = check_service_id_request('DeleteService', request, self.database, LOGGER)
# ----- Implement changes in the database ------------------------------------------------------------------
db_context = self.database.context(context_id)
db_service = db_context.service(service_id)
db_service.delete()
# ----- Compose reply --------------------------------------------------------------------------------------
reply = Empty()
LOGGER.debug('DeleteService reply: {}'.format(str(reply)))
DELETESERVICE_COUNTER_COMPLETED.inc()
return reply
except ServiceException as e:
LOGGER.exception('DeleteService exception')
DELETESERVICE_COUNTER_FAILED.inc()
grpc_context.abort(e.code, e.details)
except Exception as e: # pragma: no cover
LOGGER.exception('DeleteService exception')
DELETESERVICE_COUNTER_FAILED.inc()
grpc_context.abort(grpc.StatusCode.INTERNAL, str(e))
@GETSERVICEBYID_HISTOGRAM_DURATION.time()
def GetServiceById(self, request : ServiceId, grpc_context : grpc.ServicerContext) -> Service:
GETSERVICEBYID_COUNTER_STARTED.inc()
try:
LOGGER.debug('GetServiceById request: {}'.format(str(request)))
# ----- Validate request data and pre-conditions -----------------------------------------------------------
context_id, service_id = check_service_id_request('GetServiceById', request, self.database, LOGGER)
# ----- Retrieve data from the database --------------------------------------------------------------------
db_context = self.database.context(context_id)
db_service = db_context.service(service_id)
# ----- Compose reply --------------------------------------------------------------------------------------
reply = Service(**db_service.dump())
LOGGER.debug('GetServiceById reply: {}'.format(str(reply)))
GETSERVICEBYID_COUNTER_COMPLETED.inc()
return reply
except ServiceException as e:
LOGGER.exception('GetServiceById exception')
GETSERVICEBYID_COUNTER_FAILED.inc()
grpc_context.abort(e.code, e.details)
except Exception as e: # pragma: no cover
LOGGER.exception('GetServiceById exception')
GETSERVICEBYID_COUNTER_FAILED.inc()
grpc_context.abort(grpc.StatusCode.INTERNAL, str(e))
@GETCONNECTIONLIST_HISTOGRAM_DURATION.time()
def GetConnectionList(self, request : Empty, grpc_context : grpc.ServicerContext) -> ConnectionList:
GETCONNECTIONLIST_COUNTER_STARTED.inc()
try:
LOGGER.debug('GetConnectionList request: {}'.format(str(request)))
# ----- Validate request data and pre-conditions -----------------------------------------------------------
# ----- Retrieve data from the database --------------------------------------------------------------------
raise ServiceException(grpc.StatusCode.UNIMPLEMENTED, 'RPC GetConnectionList() not implemented')
# ----- Compose reply --------------------------------------------------------------------------------------
#reply = ConnectionList()
#LOGGER.debug('GetConnectionList reply: {}'.format(str(reply)))
#GETCONNECTIONLIST_COUNTER_COMPLETED.inc()
#return reply
except ServiceException as e:
LOGGER.exception('GetConnectionList exception')
GETCONNECTIONLIST_COUNTER_FAILED.inc()
grpc_context.abort(e.code, e.details)
except Exception as e: # pragma: no cover
LOGGER.exception('GetConnectionList exception')
GETCONNECTIONLIST_COUNTER_FAILED.inc()
grpc_context.abort(grpc.StatusCode.INTERNAL, str(e))
import grpc, logging
from typing import Dict, List, Set, Tuple
from common.Checkers import chk_options, chk_string
from common.database.api.Database import Database
from common.database.api.context.Constants import DEFAULT_TOPOLOGY_ID
from common.database.api.context.topology.device.Endpoint import Endpoint
from common.database.api.context.service.ServiceState import ServiceState, servicestate_enum_values, \
to_servicestate_enum
from common.database.api.context.service.ServiceType import ServiceType, servicetype_enum_values, to_servicetype_enum
from common.exceptions.ServiceException import ServiceException
from common.tools.service.DeviceCheckers import check_device_endpoint_exists
from common.tools.service.EndpointIdCheckers import check_endpoint_id
from common.tools.service.EnumCheckers import check_enum
from common.tools.service.ServiceCheckers import check_service_exists, check_service_not_exists
from service.proto.context_pb2 import Constraint
from service.proto.service_pb2 import Service, ServiceId
# For each method name, define acceptable service types. Empty set means accept all.
ACCEPTED_SERVICE_TYPES : Dict[str, Set[ServiceType]] = {
'CreateService': set([ServiceType.L2NM, ServiceType.L3NM, ServiceType.TAPI_CONNECTIVITY_SERVICE]),
'UpdateService': set([ServiceType.L2NM, ServiceType.L3NM, ServiceType.TAPI_CONNECTIVITY_SERVICE]),
}
# For each method name, define acceptable service states. Empty set means accept all.
ACCEPTED_SERVICE_STATES : Dict[str, Set[ServiceState]] = {
'CreateService': set([ServiceState.PLANNED]),
'UpdateService': set([ServiceState.PLANNED, ServiceState.ACTIVE, ServiceState.PENDING_REMOVAL]),
}
def _check_service_exists(method_name : str, database : Database, context_id : str, service_id : str):
if method_name in ['CreateService']:
check_service_not_exists(database, context_id, service_id)
elif method_name in ['UpdateService', 'DeleteService', 'GetServiceById']:
check_service_exists(database, context_id, service_id)
else: # pragma: no cover (test requires malforming the code)
msg = 'Unexpected condition [_check_service_exists(method_name={}, context_id={}, service_id={})]'
msg = msg.format(str(method_name), str(context_id), str(service_id))
raise ServiceException(grpc.StatusCode.UNIMPLEMENTED, msg)
def check_service_type(method_name : str, value : str) -> ServiceType:
return check_enum('ServiceType', method_name, value, to_servicetype_enum, ACCEPTED_SERVICE_TYPES)
def check_service_state(method_name : str, value : str) -> ServiceState:
return check_enum('ServiceState', method_name, value, to_servicestate_enum, ACCEPTED_SERVICE_STATES)
def check_service_constraint(
logger : logging.Logger, constraint_number : int, parent_name : str, constraint : Constraint,
add_constraints : Dict[str, Dict[str, Set[str]]]) -> Tuple[str, str]:
try:
constraint_type = chk_string('constraint[#{}].constraint_type'.format(constraint_number),
constraint.constraint_type,
allow_empty=False)
constraint_value = chk_string('constraint[#{}].constraint_value'.format(constraint_number),
constraint.constraint_value,
allow_empty=False)
except Exception as e:
logger.exception('Invalid arguments:')
raise ServiceException(grpc.StatusCode.INVALID_ARGUMENT, str(e))
if constraint_type in add_constraints:
msg = 'Duplicated ConstraintType({}) in {}.'
msg = msg.format(constraint_type, parent_name)
raise ServiceException(grpc.StatusCode.INVALID_ARGUMENT, msg)
add_constraints[constraint_type] = constraint_value
return constraint_type, constraint_value
def check_service_request(
method_name : str, request : Service, database : Database, logger : logging.Logger
) -> Tuple[str, str, ServiceType, str, ServiceState, List[Endpoint], List[Tuple[str, str]]]:
# ----- Parse attributes -------------------------------------------------------------------------------------------
try:
context_id = chk_string ('service.cs_id.contextId.contextUuid.uuid',
request.cs_id.contextId.contextUuid.uuid,
allow_empty=False)
service_id = chk_string ('service.cs_id.cs_id.uuid',
request.cs_id.cs_id.uuid,
allow_empty=False)
service_type = chk_options('service.serviceType',
request.serviceType,
servicetype_enum_values())
service_config = chk_string ('service.serviceConfig.serviceConfig',
request.serviceConfig.serviceConfig,
allow_empty=True)
service_state = chk_options('service.serviceState.serviceState',
request.serviceState.serviceState,
servicestate_enum_values())
except Exception as e:
logger.exception('Invalid arguments:')
raise ServiceException(grpc.StatusCode.INVALID_ARGUMENT, str(e))
service_type = check_service_type(method_name, service_type)
service_state = check_service_state(method_name, service_state)
# ----- Check if service exists in database ------------------------------------------------------------------------
_check_service_exists(method_name, database, context_id, service_id)
# ----- Parse constraints ------------------------------------------------------------------------------------------
add_constraints : Dict[str, str] = {}
constraint_tuples : List[Tuple[str, str]] = []
for constraint_number,constraint in enumerate(request.constraint):
parent_name = 'Constraint(#{}) of Context({})/Service({})'.format(constraint_number, context_id, service_id)
constraint_type, constraint_value = check_service_constraint(
logger, constraint_number, parent_name, constraint, add_constraints)
constraint_tuples.append((constraint_type, constraint_value))
# ----- Parse endpoints and check if they exist in the database as device endpoints --------------------------------
add_topology_devices_endpoints : Dict[str, Dict[str, Set[str]]] = {}
db_endpoints : List[Endpoint] = []
for endpoint_number,endpoint_id in enumerate(request.endpointList):
parent_name = 'Endpoint(#{}) of Context({})/Service({})'.format(endpoint_number, context_id, service_id)
ep_topology_id, ep_device_id, ep_port_id = check_endpoint_id(
logger, endpoint_number, parent_name, endpoint_id, add_topology_devices_endpoints,
predefined_context_id=context_id, acceptable_context_ids=set([context_id]))
db_endpoint = check_device_endpoint_exists(
database, parent_name, context_id, ep_topology_id, ep_device_id, ep_port_id)
db_endpoints.append(db_endpoint)
return context_id, service_id, service_type, service_config, service_state, db_endpoints, constraint_tuples
def check_service_id_request(
method_name : str, request : ServiceId, database : Database, logger : logging.Logger) -> Tuple[str, str]:
# ----- Parse attributes -------------------------------------------------------------------------------------------
try:
context_id = chk_string ('service_id.contextId.contextUuid.uuid',
request.contextId.contextUuid.uuid,
allow_empty=False)
service_id = chk_string ('service_id.cs_id.uuid',
request.cs_id.uuid,
allow_empty=False)
except Exception as e:
logger.exception('Invalid arguments:')
raise ServiceException(grpc.StatusCode.INVALID_ARGUMENT, str(e))
# ----- Check if service exists in database ------------------------------------------------------------------------
_check_service_exists(method_name, database, context_id, service_id)
return context_id, service_id
import copy, grpc, logging, pytest
from google.protobuf.json_format import MessageToDict
from common.database.Factory import get_database, DatabaseEngineEnum
from common.database.api.Database import Database
from common.database.api.context.Constants import DEFAULT_CONTEXT_ID, DEFAULT_TOPOLOGY_ID
from common.database.tests.script import populate_example
from common.tests.Assertions import validate_empty, validate_service, validate_service_id, \
validate_service_list_is_empty, validate_service_list_is_not_empty
from service.Config import GRPC_SERVICE_PORT, GRPC_MAX_WORKERS, GRPC_GRACE_PERIOD
from service.client.ServiceClient import ServiceClient
from service.proto.context_pb2 import Empty
from service.proto.service_pb2 import Service, ServiceId, ServiceStateEnum, ServiceType
from service.service.ServiceService import ServiceService
port = 10000 + GRPC_SERVICE_PORT # avoid privileged ports
LOGGER = logging.getLogger(__name__)
LOGGER.setLevel(logging.DEBUG)
# use "copy.deepcopy" to prevent propagating forced changes during tests
CONTEXT_ID = {'contextUuid': {'uuid': DEFAULT_CONTEXT_ID}}
TOPOLOGY_ID = {'contextId': copy.deepcopy(CONTEXT_ID), 'topoId': {'uuid': DEFAULT_TOPOLOGY_ID}}
SERVICE_ID = {'contextId': copy.deepcopy(CONTEXT_ID), 'cs_id': {'uuid': 'DEV1'}}
SERVICE = {
'cs_id': copy.deepcopy(SERVICE_ID),
'serviceType': ServiceType.L3NM,
'serviceConfig': {'serviceConfig': '<config/>'},
'serviceState': {'serviceState': ServiceStateEnum.PLANNED},
'constraint': [
{'constraint_type': 'latency_ms', 'constraint_value': '100'},
{'constraint_type': 'hops', 'constraint_value': '5'},
],
'endpointList' : [
{'topoId': copy.deepcopy(TOPOLOGY_ID), 'dev_id': {'device_id': {'uuid': 'DEV1'}}, 'port_id': {'uuid' : 'EP5'}},
{'topoId': copy.deepcopy(TOPOLOGY_ID), 'dev_id': {'device_id': {'uuid': 'DEV2'}}, 'port_id': {'uuid' : 'EP5'}},
{'topoId': copy.deepcopy(TOPOLOGY_ID), 'dev_id': {'device_id': {'uuid': 'DEV3'}}, 'port_id': {'uuid' : 'EP5'}},
]
}
@pytest.fixture(scope='session')
def database():
_database = get_database(engine=DatabaseEngineEnum.INMEMORY)
populate_example(_database, add_services=False)
return _database
@pytest.fixture(scope='session')
def service_service(database):
_service = ServiceService(
database, port=port, max_workers=GRPC_MAX_WORKERS, grace_period=GRPC_GRACE_PERIOD)
_service.start()
yield _service
_service.stop()
@pytest.fixture(scope='session')
def service_client(service_service):
_client = ServiceClient(address='127.0.0.1', port=port)
yield _client
_client.close()
def test_get_services_empty(service_client : ServiceClient):
# should work
validate_service_list_is_empty(MessageToDict(
service_client.GetServiceList(Empty()),
including_default_value_fields=True, preserving_proto_field_name=True,
use_integers_for_enums=False))
def test_create_service_wrong_service_attributes(service_client : ServiceClient):
# should fail with wrong service context
with pytest.raises(grpc._channel._InactiveRpcError) as e:
copy_service = copy.deepcopy(SERVICE)
copy_service['cs_id']['contextId']['contextUuid']['uuid'] = ''
service_client.CreateService(Service(**copy_service))
assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
msg = 'service.cs_id.contextId.contextUuid.uuid() is out of range: '\
'allow_empty(False) min_length(None) max_length(None) allowed_lengths(None).'
assert e.value.details() == msg
# should fail with service context does not exist
with pytest.raises(grpc._channel._InactiveRpcError) as e:
copy_service = copy.deepcopy(SERVICE)
copy_service['cs_id']['contextId']['contextUuid']['uuid'] = 'wrong-context'
service_client.CreateService(Service(**copy_service))
assert e.value.code() == grpc.StatusCode.NOT_FOUND
msg = 'Context(wrong-context) does not exist in the database.'
assert e.value.details() == msg
# should fail with wrong service id
with pytest.raises(grpc._channel._InactiveRpcError) as e:
copy_service = copy.deepcopy(SERVICE)
copy_service['cs_id']['cs_id']['uuid'] = ''
service_client.CreateService(Service(**copy_service))
assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
msg = 'service.cs_id.cs_id.uuid() is out of range: '\
'allow_empty(False) min_length(None) max_length(None) allowed_lengths(None).'
assert e.value.details() == msg
# should fail with wrong service type
with pytest.raises(grpc._channel._InactiveRpcError) as e:
copy_service = copy.deepcopy(SERVICE)
copy_service['serviceType'] = ServiceType.UNKNOWN
service_client.CreateService(Service(**copy_service))
assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
msg = 'Method(CreateService) does not accept ServiceType(UNKNOWN). '\
'Permitted values for Method(CreateService) are '\
'ServiceType([\'L2NM\', \'L3NM\', \'TAPI_CONNECTIVITY_SERVICE\']).'
assert e.value.details() == msg
# should fail with wrong service state
with pytest.raises(grpc._channel._InactiveRpcError) as e:
copy_service = copy.deepcopy(SERVICE)
copy_service['serviceState']['serviceState'] = ServiceStateEnum.PENDING_REMOVAL
service_client.CreateService(Service(**copy_service))
assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
msg = 'Method(CreateService) does not accept ServiceState(PENDING_REMOVAL). '\
'Permitted values for Method(CreateService) are '\
'ServiceState([\'PLANNED\']).'
assert e.value.details() == msg
def test_create_service_wrong_constraint(service_client : ServiceClient):
# should fail with wrong constraint type
with pytest.raises(grpc._channel._InactiveRpcError) as e:
copy_service = copy.deepcopy(SERVICE)
copy_service['constraint'][0]['constraint_type'] = ''
service_client.CreateService(Service(**copy_service))
assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
msg = 'constraint[#0].constraint_type() is out of range: '\
'allow_empty(False) min_length(None) max_length(None) allowed_lengths(None).'
assert e.value.details() == msg
# should fail with wrong constraint value
with pytest.raises(grpc._channel._InactiveRpcError) as e:
copy_service = copy.deepcopy(SERVICE)
copy_service['constraint'][0]['constraint_value'] = ''
service_client.CreateService(Service(**copy_service))
assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
msg = 'constraint[#0].constraint_value() is out of range: '\
'allow_empty(False) min_length(None) max_length(None) allowed_lengths(None).'
assert e.value.details() == msg
# should fail with dupplicated constraint type
with pytest.raises(grpc._channel._InactiveRpcError) as e:
copy_service = copy.deepcopy(SERVICE)
copy_service['constraint'][1] = copy_service['constraint'][0]
service_client.CreateService(Service(**copy_service))
assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
msg = 'Duplicated ConstraintType(latency_ms) in Constraint(#1) of Context(admin)/Service(DEV1).'
assert e.value.details() == msg
def test_create_service_wrong_endpoint(service_client : ServiceClient, database : Database):
# should fail with wrong endpoint context
with pytest.raises(grpc._channel._InactiveRpcError) as e:
copy_service = copy.deepcopy(SERVICE)
copy_service['endpointList'][0]['topoId']['contextId']['contextUuid']['uuid'] = 'wrong-context'
print(copy_service)
service_client.CreateService(Service(**copy_service))
assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
msg = 'Context(wrong-context) in Endpoint(#0) of '\
'Context(admin)/Service(DEV1) mismatches acceptable Contexts({\'admin\'}). '\
'Optionally, leave field empty to use predefined Context(admin).'
assert e.value.details() == msg
# should fail with wrong endpoint topology
with pytest.raises(grpc._channel._InactiveRpcError) as e:
copy_service = copy.deepcopy(SERVICE)
copy_service['endpointList'][0]['topoId']['topoId']['uuid'] = 'wrong-topo'
service_client.CreateService(Service(**copy_service))
assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
msg = 'Context(admin)/Topology(wrong-topo) in Endpoint(#0) of '\
'Context(admin)/Service(DEV1) mismatches acceptable Topologies({\'admin\'}). '\
'Optionally, leave field empty to use predefined Topology(admin).'
assert e.value.details() == msg
# should fail with endpoint device is empty
with pytest.raises(grpc._channel._InactiveRpcError) as e:
copy_service = copy.deepcopy(SERVICE)
copy_service['endpointList'][0]['dev_id']['device_id']['uuid'] = ''
service_client.CreateService(Service(**copy_service))
assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
msg = 'endpoint_id[#0].dev_id.device_id.uuid() is out of range: '\
'allow_empty(False) min_length(None) max_length(None) allowed_lengths(None).'
assert e.value.details() == msg
# should fail with endpoint device not found
with pytest.raises(grpc._channel._InactiveRpcError) as e:
copy_service = copy.deepcopy(SERVICE)
copy_service['endpointList'][0]['dev_id']['device_id']['uuid'] = 'wrong-device'
service_client.CreateService(Service(**copy_service))
assert e.value.code() == grpc.StatusCode.NOT_FOUND
msg = 'Context(admin)/Topology(admin)/Device(wrong-device) in Endpoint(#0) of '\
'Context(admin)/Service(DEV1) does not exist in the database.'
assert e.value.details() == msg
# should fail with endpoint device duplicated
with pytest.raises(grpc._channel._InactiveRpcError) as e:
copy_service = copy.deepcopy(SERVICE)
copy_service['endpointList'][1] = copy_service['endpointList'][0]
service_client.CreateService(Service(**copy_service))
assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
msg = 'Duplicated Context(admin)/Topology(admin)/Device(DEV1) in Endpoint(#1) of '\
'Context(admin)/Service(DEV1).'
assert e.value.details() == msg
# should fail with endpoint port is empty
with pytest.raises(grpc._channel._InactiveRpcError) as e:
copy_service = copy.deepcopy(SERVICE)
copy_service['endpointList'][0]['port_id']['uuid'] = ''
service_client.CreateService(Service(**copy_service))
assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
msg = 'endpoint_id[#0].port_id.uuid() is out of range: '\
'allow_empty(False) min_length(None) max_length(None) allowed_lengths(None).'
assert e.value.details() == msg
# should fail with endpoint port not found
with pytest.raises(grpc._channel._InactiveRpcError) as e:
copy_service = copy.deepcopy(SERVICE)
copy_service['endpointList'][0]['port_id']['uuid'] = 'wrong-port'
service_client.CreateService(Service(**copy_service))
assert e.value.code() == grpc.StatusCode.NOT_FOUND
msg = 'Context(admin)/Topology(admin)/Device(DEV1)/Port(wrong-port) in Endpoint(#0) of '\
'Context(admin)/Service(DEV1) does not exist in the database.'
assert e.value.details() == msg
def test_get_service_does_not_exist(service_client : ServiceClient):
# should fail with service context does not exist
with pytest.raises(grpc._channel._InactiveRpcError) as e:
copy_service_id = copy.deepcopy(SERVICE_ID)
copy_service_id['contextId']['contextUuid']['uuid'] = 'wrong-context'
service_client.GetServiceById(ServiceId(**copy_service_id))
assert e.value.code() == grpc.StatusCode.NOT_FOUND
msg = 'Context(wrong-context) does not exist in the database.'
assert e.value.details() == msg
# should fail with service does not exist
with pytest.raises(grpc._channel._InactiveRpcError) as e:
service_client.GetServiceById(ServiceId(**SERVICE_ID))
assert e.value.code() == grpc.StatusCode.NOT_FOUND
msg = 'Context(admin)/Service(DEV1) does not exist in the database.'
assert e.value.details() == msg
def test_update_service_does_not_exist(service_client : ServiceClient):
# should fail with service does not exist
with pytest.raises(grpc._channel._InactiveRpcError) as e:
service_client.UpdateService(Service(**SERVICE))
assert e.value.code() == grpc.StatusCode.NOT_FOUND
msg = 'Context(admin)/Service(DEV1) does not exist in the database.'
assert e.value.details() == msg
def test_create_service(service_client : ServiceClient):
# should work
validate_service_id(MessageToDict(
service_client.CreateService(Service(**SERVICE)),
including_default_value_fields=True, preserving_proto_field_name=True,
use_integers_for_enums=False))
def test_create_service_already_exists(service_client : ServiceClient):
# should fail with service already exists
with pytest.raises(grpc._channel._InactiveRpcError) as e:
service_client.CreateService(Service(**SERVICE))
assert e.value.code() == grpc.StatusCode.ALREADY_EXISTS
msg = 'Context(admin)/Service(DEV1) already exists in the database.'
assert e.value.details() == msg
def test_get_service(service_client : ServiceClient):
# should work
validate_service(MessageToDict(
service_client.GetServiceById(ServiceId(**SERVICE_ID)),
including_default_value_fields=True, preserving_proto_field_name=True,
use_integers_for_enums=False))
def test_update_service(service_client : ServiceClient):
# should work
copy_service = copy.deepcopy(SERVICE)
copy_service['serviceConfig']['serviceConfig'] = '<newconfig/>'
copy_service['serviceState']['serviceState'] = ServiceStateEnum.ACTIVE
copy_service['constraint'] = [
{'constraint_type': 'latency_ms', 'constraint_value': '200'},
{'constraint_type': 'bandwidth_gbps', 'constraint_value': '100'},
]
copy_service['endpointList'] = [
{
'topoId': {'contextId': {'contextUuid': {'uuid': 'admin'}}, 'topoId': {'uuid': 'admin'}},
'dev_id': {'device_id': {'uuid': 'DEV1'}},
'port_id': {'uuid' : 'EP5'}
},
{
'topoId': {'contextId': {'contextUuid': {'uuid': 'admin'}}, 'topoId': {'uuid': 'admin'}},
'dev_id': {'device_id': {'uuid': 'DEV2'}},
'port_id': {'uuid' : 'EP6'}
},
]
validate_service_id(MessageToDict(
service_client.UpdateService(Service(**copy_service)),
including_default_value_fields=True, preserving_proto_field_name=True,
use_integers_for_enums=False))
def test_delete_service_wrong_service_id(service_client : ServiceClient):
# should fail with service context is empty
with pytest.raises(grpc._channel._InactiveRpcError) as e:
copy_service_id = copy.deepcopy(SERVICE_ID)
copy_service_id['contextId']['contextUuid']['uuid'] = ''
service_client.DeleteService(ServiceId(**copy_service_id))
assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
msg = 'service_id.contextId.contextUuid.uuid() is out of range: '\
'allow_empty(False) min_length(None) max_length(None) allowed_lengths(None).'
assert e.value.details() == msg
# should fail with service context does not exist
with pytest.raises(grpc._channel._InactiveRpcError) as e:
copy_service_id = copy.deepcopy(SERVICE_ID)
copy_service_id['contextId']['contextUuid']['uuid'] = 'wrong-context'
service_client.DeleteService(ServiceId(**copy_service_id))
assert e.value.code() == grpc.StatusCode.NOT_FOUND
msg = 'Context(wrong-context) does not exist in the database.'
assert e.value.details() == msg
# should fail with service id is empty
with pytest.raises(grpc._channel._InactiveRpcError) as e:
copy_service_id = copy.deepcopy(SERVICE_ID)
copy_service_id['cs_id']['uuid'] = ''
service_client.DeleteService(ServiceId(**copy_service_id))
assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
msg = 'service_id.cs_id.uuid() is out of range: '\
'allow_empty(False) min_length(None) max_length(None) allowed_lengths(None).'
assert e.value.details() == msg
# should fail with service id is empty
with pytest.raises(grpc._channel._InactiveRpcError) as e:
copy_service_id = copy.deepcopy(SERVICE_ID)
copy_service_id['cs_id']['uuid'] = 'wrong-service'
service_client.DeleteService(ServiceId(**copy_service_id))
assert e.value.code() == grpc.StatusCode.NOT_FOUND
msg = 'Context(admin)/Service(wrong-service) does not exist in the database.'
assert e.value.details() == msg
def test_delete_service(service_client : ServiceClient):
# should work
validate_empty(MessageToDict(
service_client.DeleteService(ServiceId(**SERVICE_ID)),
including_default_value_fields=True, preserving_proto_field_name=True,
use_integers_for_enums=False))
def test_get_services_empty_2(service_client : ServiceClient):
# should work
validate_service_list_is_empty(MessageToDict(
service_client.GetServiceList(Empty()),
including_default_value_fields=True, preserving_proto_field_name=True,
use_integers_for_enums=False))
def test_create_service_empty_endpoints(service_client : ServiceClient):
# should work
copy_service = copy.deepcopy(SERVICE)
copy_service['endpointList'][0]['topoId']['contextId']['contextUuid']['uuid'] = ''
copy_service['endpointList'][0]['topoId']['topoId']['uuid'] = ''
validate_service_id(MessageToDict(
service_client.CreateService(Service(**copy_service)),
including_default_value_fields=True, preserving_proto_field_name=True,
use_integers_for_enums=False))
def test_get_services_full(service_client : ServiceClient):
# should work
validate_service_list_is_not_empty(MessageToDict(
service_client.GetServiceList(Empty()),
including_default_value_fields=True, preserving_proto_field_name=True,
use_integers_for_enums=False))
import json import json
from copy import deepcopy
from typing import Any, Dict, Union from typing import Any, Dict, Union
from service.proto.context_pb2 import ConfigActionEnum from service.proto.context_pb2 import ConfigActionEnum
...@@ -16,13 +15,24 @@ def config_rule_delete(resource_key : str, resource_value : Union[str, Dict[str, ...@@ -16,13 +15,24 @@ def config_rule_delete(resource_key : str, resource_value : Union[str, Dict[str,
def constraint(constraint_type, constraint_value): def constraint(constraint_type, constraint_value):
return {'constraint_type': str(constraint_type), 'constraint_value': str(constraint_value)} return {'constraint_type': str(constraint_type), 'constraint_value': str(constraint_value)}
def endpoint_id(device_id, endpoint_uuid, topology_id=None): def context_id(context_uuid):
result = {'device_id': deepcopy(device_id), 'endpoint_uuid': {'uuid': endpoint_uuid}} return {'context_uuid': {'uuid': context_uuid}}
if topology_id is not None: result['topology_id'] = deepcopy(topology_id)
def topology_id(topology_uuid, context_uuid=None):
result = {'topology_uuid': {'uuid': topology_uuid}}
if context_uuid is not None: result['context_id'] = context_id(context_uuid)
return result
def device_id(device_uuid):
return {'device_uuid': {'uuid': device_uuid}}
def endpoint_id(device_uuid, endpoint_uuid, context_uuid=None, topology_uuid=None):
result = {'device_id': device_id(device_uuid), 'endpoint_uuid': {'uuid': endpoint_uuid}}
if topology_id is not None: result['topology_id'] = topology_id(topology_uuid, context_uuid=context_uuid)
return result return result
def endpoint(device_id, endpoint_uuid, endpoint_type, topology_id=None): def endpoint(device_uuid, endpoint_uuid, endpoint_type, context_uuid=None, topology_uuid=None):
return { return {
'endpoint_id': endpoint_id(device_id, endpoint_uuid, topology_id=topology_id), 'endpoint_id': endpoint_id(device_uuid, endpoint_uuid, context_uuid=context_uuid, topology_uuid=topology_uuid),
'endpoint_type': endpoint_type, 'endpoint_type': endpoint_type,
} }
from ..service_handler_api.FilterFields import FilterFieldEnum, ORM_DeviceDriverEnum, ORM_ServiceTypeEnum from ..service_handler_api.FilterFields import FilterFieldEnum, ORM_DeviceDriverEnum, ORM_ServiceTypeEnum
from .l3nm_emulated.L3NMEmulatedServiceHandler import L3NMEmulatedServiceHandler from .l3nm_emulated.L3NMEmulatedServiceHandler import L3NMEmulatedServiceHandler
from .l3nm_openconfig.L3NMOpenConfigServiceHandler import L3NMOpenConfigServiceHandler
SERVICE_HANDLERS = [ SERVICE_HANDLERS = [
(L3NMEmulatedServiceHandler, [ (L3NMEmulatedServiceHandler, [
...@@ -8,4 +9,10 @@ SERVICE_HANDLERS = [ ...@@ -8,4 +9,10 @@ SERVICE_HANDLERS = [
FilterFieldEnum.DEVICE_DRIVER : ORM_DeviceDriverEnum.UNDEFINED, FilterFieldEnum.DEVICE_DRIVER : ORM_DeviceDriverEnum.UNDEFINED,
} }
]), ]),
(L3NMOpenConfigServiceHandler, [
{
FilterFieldEnum.SERVICE_TYPE : ORM_ServiceTypeEnum.L3NM,
FilterFieldEnum.DEVICE_DRIVER : ORM_DeviceDriverEnum.OPENCONFIG,
}
]),
] ]
import anytree, json, logging
from typing import Any, Dict, List, Optional, Tuple, Union
from common.orm.Database import Database
from common.orm.HighLevel import get_object
from common.orm.backend.Tools import key_to_str
from common.type_checkers.Checkers import chk_length, chk_type
from context.client.ContextClient import ContextClient
from device.client.DeviceClient import DeviceClient
from device.proto.context_pb2 import Device
from service.service.database.ConfigModel import ORM_ConfigActionEnum, get_config_rules
from service.service.database.ContextModel import ContextModel
from service.service.database.DeviceModel import DeviceModel
from service.service.database.ServiceModel import ServiceModel
from service.service.service_handler_api._ServiceHandler import _ServiceHandler
from service.service.service_handler_api.AnyTreeTools import TreeNode, delete_subnode, get_subnode, set_subnode_value
from service.service.service_handlers.Tools import config_rule_set, config_rule_delete
LOGGER = logging.getLogger(__name__)
class L3NMOpenConfigServiceHandler(_ServiceHandler):
def __init__( # pylint: disable=super-init-not-called
self, db_service : ServiceModel, database : Database, context_client : ContextClient,
device_client : DeviceClient, **settings
) -> None:
self.__db_service = db_service
self.__database = database
self.__context_client = context_client # pylint: disable=unused-private-member
self.__device_client = device_client
self.__db_context : ContextModel = get_object(self.__database, ContextModel, self.__db_service.context_fk)
str_service_key = key_to_str([self.__db_context.context_uuid, self.__db_service.service_uuid])
db_config = get_config_rules(self.__database, str_service_key, 'running')
self.__resolver = anytree.Resolver(pathattr='name')
self.__config = TreeNode('.')
for action, resource_key, resource_value in db_config:
if action == ORM_ConfigActionEnum.SET:
try:
resource_value = json.loads(resource_value)
except: # pylint: disable=bare-except
pass
set_subnode_value(self.__resolver, self.__config, resource_key, resource_value)
elif action == ORM_ConfigActionEnum.DELETE:
delete_subnode(self.__resolver, self.__config, resource_key)
def SetEndpoint(self, endpoints : List[Tuple[str, str, Optional[str]]]) -> List[Union[bool, Exception]]:
chk_type('endpoints', endpoints, list)
if len(endpoints) == 0: return []
service_uuid = self.__db_service.service_uuid
network_instance_name = '{:s}-NetInst'.format(service_uuid)
network_interface_name = '{:s}-NetIf'.format(service_uuid)
network_subinterface_name = '{:s}-NetSubIf'.format(service_uuid)
settings : TreeNode = get_subnode(self.__resolver, self.__config, 'settings', None)
if settings is None: raise Exception('Unable to retrieve service settings')
json_settings : Dict = settings.value
route_distinguisher = json_settings.get('route_distinguisher', '0:0') # '60001:801'
mtu = json_settings.get('mtu', 1450 ) # 1512
address_families = json_settings.get('address_families', [] ) # ['IPV4']
results = []
for endpoint in endpoints:
try:
chk_type('endpoint', endpoint, (tuple, list))
chk_length('endpoint', endpoint, min_length=2, max_length=3)
if len(endpoint) == 2:
device_uuid, endpoint_uuid = endpoint
else:
device_uuid, endpoint_uuid, _ = endpoint # ignore topology_uuid by now
endpoint_settings_uri = 'device[{:s}]/endpoint[{:s}]/settings'.format(device_uuid, endpoint_uuid)
endpoint_settings : TreeNode = get_subnode(self.__resolver, self.__config, endpoint_settings_uri, None)
if endpoint_settings is None:
raise Exception('Unable to retrieve service settings for endpoint({:s})'.format(
str(endpoint_settings_uri)))
json_endpoint_settings : Dict = endpoint_settings.value
router_id = json_endpoint_settings.get('router_id', '0.0.0.0') # '10.95.0.10'
sub_interface_index = json_endpoint_settings.get('sub_interface_index', 0 ) # 1
db_device : DeviceModel = get_object(self.__database, DeviceModel, device_uuid, raise_if_not_found=True)
json_device = db_device.dump(include_config_rules=False, include_drivers=True, include_endpoints=True)
json_device_config : Dict = json_device.setdefault('device_config', {})
json_device_config_rules : List = json_device_config.setdefault('config_rules', [])
json_device_config_rules.extend([
config_rule_set(
'/network_instance[{:s}]'.format(network_instance_name), {
'name': network_instance_name, 'type': 'L3VRF', 'router_id': router_id,
'route_distinguisher': route_distinguisher, 'address_families': address_families,
}),
config_rule_set(
'/interface[{:s}]'.format(endpoint_uuid), {
'name': endpoint_uuid, 'description': network_interface_name, 'mtu': mtu,
}),
config_rule_set(
'/interface[{:s}]/subinterface[{:d}]'.format(endpoint_uuid, sub_interface_index), {
'name': endpoint_uuid, 'index': sub_interface_index,
'description': network_subinterface_name, 'mtu': mtu,
}),
config_rule_set(
'/network_instance[{:s}]/interface[{:s}]'.format(network_instance_name, endpoint_uuid), {
'name': network_instance_name, 'id': endpoint_uuid,
}),
])
self.__device_client.ConfigureDevice(Device(**json_device))
results.append(True)
except Exception as e: # pylint: disable=broad-except
LOGGER.exception('Unable to SetEndpoint({:s})'.format(str(endpoint)))
results.append(e)
return results
def DeleteEndpoint(self, endpoints : List[Tuple[str, str, Optional[str]]]) -> List[Union[bool, Exception]]:
chk_type('endpoints', endpoints, list)
if len(endpoints) == 0: return []
service_uuid = self.__db_service.service_uuid
network_instance_name = '{:s}-NetInst'.format(service_uuid)
results = []
for endpoint in endpoints:
try:
chk_type('endpoint', endpoint, (tuple, list))
chk_length('endpoint', endpoint, min_length=2, max_length=3)
if len(endpoint) == 2:
device_uuid, endpoint_uuid = endpoint
else:
device_uuid, endpoint_uuid, _ = endpoint # ignore topology_uuid by now
endpoint_settings_uri = 'device[{:s}]/endpoint[{:s}]/settings'.format(device_uuid, endpoint_uuid)
endpoint_settings : TreeNode = get_subnode(self.__resolver, self.__config, endpoint_settings_uri, None)
if endpoint_settings is None:
raise Exception('Unable to retrieve service settings for endpoint({:s})'.format(
str(endpoint_settings_uri)))
json_endpoint_settings : Dict = endpoint_settings.value
sub_interface_index = json_endpoint_settings.get('sub_interface_index', 0 ) # 1
db_device : DeviceModel = get_object(self.__database, DeviceModel, device_uuid, raise_if_not_found=True)
json_device = db_device.dump(include_config_rules=False, include_drivers=True, include_endpoints=True)
json_device_config : Dict = json_device.setdefault('device_config', {})
json_device_config_rules : List = json_device_config.setdefault('config_rules', [])
json_device_config_rules.extend([
config_rule_delete(
'/network_instance[{:s}]/interface[{:s}]'.format(network_instance_name, endpoint_uuid), {
'name': network_instance_name, 'id': endpoint_uuid
}),
config_rule_delete(
'/interface[{:s}]/subinterface[{:d}]'.format(endpoint_uuid, sub_interface_index), {
'name': endpoint_uuid, 'index': sub_interface_index,
}),
config_rule_delete(
'/network_instance[{:s}]'.format(network_instance_name), {
'name': network_instance_name
}),
])
self.__device_client.ConfigureDevice(Device(**json_device))
results.append(True)
except Exception as e: # pylint: disable=broad-except
LOGGER.exception('Unable to DeleteEndpoint({:s})'.format(str(endpoint)))
results.append(e)
return results
def SetConstraint(self, constraints : List[Tuple[str, Any]]) -> List[Union[bool, Exception]]:
chk_type('constraints', constraints, list)
if len(constraints) == 0: return []
msg = '[SetConstraint] Method not implemented. Constraints({:s}) are being ignored.'
LOGGER.warning(msg.format(str(constraints)))
return [True for _ in range(len(constraints))]
def DeleteConstraint(self, constraints : List[Tuple[str, Any]]) -> List[Union[bool, Exception]]:
chk_type('constraints', constraints, list)
if len(constraints) == 0: return []
msg = '[DeleteConstraint] Method not implemented. Constraints({:s}) are being ignored.'
LOGGER.warning(msg.format(str(constraints)))
return [True for _ in range(len(constraints))]
def SetConfig(self, resources : List[Tuple[str, Any]]) -> List[Union[bool, Exception]]:
chk_type('resources', resources, list)
if len(resources) == 0: return []
results = []
for resource in resources:
try:
resource_key, resource_value = resource
resource_value = json.loads(resource_value)
set_subnode_value(self.__resolver, self.__config, resource_key, resource_value)
results.append(True)
except Exception as e: # pylint: disable=broad-except
LOGGER.exception('Unable to SetConfig({:s})'.format(str(resource)))
results.append(e)
return results
def DeleteConfig(self, resources : List[Tuple[str, Any]]) -> List[Union[bool, Exception]]:
chk_type('resources', resources, list)
if len(resources) == 0: return []
results = []
for resource in resources:
try:
resource_key, _ = resource
delete_subnode(self.__resolver, self.__config, resource_key)
except Exception as e: # pylint: disable=broad-except
LOGGER.exception('Unable to DeleteConfig({:s})'.format(str(resource)))
results.append(e)
return results
# Add here your files containing confidential testbed details such as IP addresses, ports, usernames, passwords, etc.
ServiceHandler_L3NM_OC.py
import operator
from copy import deepcopy from copy import deepcopy
from common.Constants import DEFAULT_CONTEXT_UUID, DEFAULT_TOPOLOGY_UUID from common.Constants import DEFAULT_CONTEXT_UUID, DEFAULT_TOPOLOGY_UUID
from service.proto.context_pb2 import DeviceDriverEnum, DeviceOperationalStatusEnum
from service.service.service_handlers.Tools import config_rule_set
# use "deepcopy" to prevent propagating forced changes during tests # use "deepcopy" to prevent propagating forced changes during tests
...@@ -22,41 +19,3 @@ TOPOLOGY = { ...@@ -22,41 +19,3 @@ TOPOLOGY = {
'device_ids': [], 'device_ids': [],
'link_ids': [], 'link_ids': [],
} }
DEVICE_EMU1_UUID = 'EMU-1'
DEVICE_EMU1_ID = {'device_uuid': {'uuid': DEVICE_EMU1_UUID}}
DEVICE_EMU1 = {
'device_id': deepcopy(DEVICE_EMU1_ID),
'device_type': 'emulated',
'device_config': {'config_rules': []},
'device_operational_status': DeviceOperationalStatusEnum.DEVICEOPERATIONALSTATUS_ENABLED,
'device_drivers': [DeviceDriverEnum.DEVICEDRIVER_UNDEFINED],
'device_endpoints': [],
}
DEVICE_EMU2_UUID = 'EMU-2'
DEVICE_EMU2_ID = {'device_uuid': {'uuid': DEVICE_EMU2_UUID}}
DEVICE_EMU2 = {
'device_id': deepcopy(DEVICE_EMU2_ID),
'device_type': 'emulated',
'device_config': {'config_rules': []},
'device_operational_status': DeviceOperationalStatusEnum.DEVICEOPERATIONALSTATUS_ENABLED,
'device_drivers': [DeviceDriverEnum.DEVICEDRIVER_UNDEFINED],
'device_endpoints': [],
}
DEVICE_EMU_ENDPOINTS = []
for endpoint_uuid in ['EP1', 'EP2', 'EP3', 'EP100']:
DEVICE_EMU_ENDPOINTS.append((endpoint_uuid, '10Gbps', []))
DEVICE_EMU_CONNECT_RULES = [
config_rule_set('_connect/address', '127.0.0.1'),
config_rule_set('_connect/port', '0'),
config_rule_set('_connect/settings', {'endpoints': [
{
'uuid': endpoint_uuid, 'type': endpoint_type,
'sample_types': list(map(operator.attrgetter('value'), endpoint_sample_types)),
}
for endpoint_uuid,endpoint_type,endpoint_sample_types in DEVICE_EMU_ENDPOINTS
]}),
]
from copy import deepcopy
from service.proto.context_pb2 import DeviceDriverEnum, DeviceOperationalStatusEnum, ServiceStatusEnum, ServiceTypeEnum
from service.service.service_handlers.Tools import config_rule_set, constraint, device_id, endpoint_id
from .CommonObjects import CONTEXT_ID
# use "deepcopy" to prevent propagating forced changes during tests
SERVICE_HANDLER_NAME = 'l3nm_emulated'
SERVICE_UUID = 'SVC_L3NM_EMU'
SERVICE_ID = {
'context_id': deepcopy(CONTEXT_ID),
'service_uuid': {'uuid': SERVICE_UUID}
}
SERVICE_DESCRIPTOR = {
'service_id': deepcopy(SERVICE_ID),
'service_type': ServiceTypeEnum.SERVICETYPE_L3NM,
'service_endpoint_ids' : [],
'service_constraints': [],
'service_status': {'service_status': ServiceStatusEnum.SERVICESTATUS_PLANNED},
'service_config': {'config_rules': []},
}
DEVICE1_UUID = 'EMULATED-ROUTER-1'
DEVICE2_UUID = 'EMULATED-ROUTER-2'
DEVICE_ATTRIBUTES = { # device_uuid => {device_settings}
DEVICE1_UUID: {
'type' : 'emulated',
'address' : '10.95.86.155',
'port' : '830',
'drivers' : [DeviceDriverEnum.DEVICEDRIVER_UNDEFINED],
'endpoint': 'EP100',
},
DEVICE2_UUID: {
'type' : 'emulated',
'address' : '10.96.86.149',
'port' : '830',
'drivers' : [DeviceDriverEnum.DEVICEDRIVER_UNDEFINED],
'endpoint': 'EP100',
},
}
SERVICE_DEVICE_UUIDS = [DEVICE1_UUID, DEVICE2_UUID]
SERVICE_ENDPOINT_IDS = [
endpoint_id(device_uuid, DEVICE_ATTRIBUTES[device_uuid]['endpoint'])
for device_uuid in SERVICE_DEVICE_UUIDS
]
SERVICE_CONFIG_RULES = [
config_rule_set(
'settings', {
'route_distinguisher': '60001:801', 'mtu': 1512, 'address_families': ['IPV4']
}),
config_rule_set(
'device[{:s}]/endpoint[{:s}]/settings'.format(DEVICE1_UUID, DEVICE_ATTRIBUTES[DEVICE1_UUID]['endpoint']), {
'router_id': '10.0.0.1', 'sub_interface_index': 1,
}),
config_rule_set(
'device[{:s}]/endpoint[{:s}]/settings'.format(DEVICE2_UUID, DEVICE_ATTRIBUTES[DEVICE2_UUID]['endpoint']), {
'router_id': '10.0.0.2', 'sub_interface_index': 1,
}),
]
SERVICE_CONSTRAINTS = [
constraint('latency_ms', 15.2),
constraint('jitter_us', 1.2),
]
def get_device_descriptor(device_uuid, enabled=True):
device_operational_status = (
DeviceOperationalStatusEnum.DEVICEOPERATIONALSTATUS_ENABLED \
if enabled else \
DeviceOperationalStatusEnum.DEVICEOPERATIONALSTATUS_DISABLED)
return {
'device_id': device_id(device_uuid),
'device_type': DEVICE_ATTRIBUTES[device_uuid]['type'],
'device_config': {'config_rules': []},
'device_operational_status': device_operational_status,
'device_drivers': DEVICE_ATTRIBUTES[device_uuid]['drivers'],
'device_endpoints': [],
}
def get_connect_rules(device_uuid):
return [
config_rule_set('_connect/address', '127.0.0.1'),
config_rule_set('_connect/port', '0'),
config_rule_set('_connect/settings', {'endpoints': [
{'uuid': endpoint_uuid, 'type': '10Gbps', 'sample_types': []}
for endpoint_uuid in ['EP1', 'EP2', 'EP3', 'EP100']
]}),
]
TEST_SERVICE_HANDLER = (SERVICE_HANDLER_NAME, {
'service_id' : SERVICE_ID,
'service_descriptor' : SERVICE_DESCRIPTOR,
'service_endpoint_ids' : SERVICE_ENDPOINT_IDS,
'service_config_rules' : SERVICE_CONFIG_RULES,
'service_constraints' : SERVICE_CONSTRAINTS,
'service_device_uuids' : SERVICE_DEVICE_UUIDS,
'get_device_descriptor' : get_device_descriptor,
'get_connect_rules' : get_connect_rules,
})
# Add/comment in this file the service handlers to be tested by the unitry tests.
SERVICE_HANDLERS_TO_TEST = []
try:
from service.tests.ServiceHandler_L3NM_EMU import TEST_SERVICE_HANDLER
SERVICE_HANDLERS_TO_TEST.append(TEST_SERVICE_HANDLER)
except ImportError:
pass
try:
from service.tests.ServiceHandler_L3NM_OC import TEST_SERVICE_HANDLER
SERVICE_HANDLERS_TO_TEST.append(TEST_SERVICE_HANDLER)
except ImportError:
pass
from copy import deepcopy
from service.proto.context_pb2 import ServiceStatusEnum, ServiceTypeEnum
from .CommonObjects import CONTEXT_ID
from service.service.service_handlers.Tools import config_rule_set, constraint, endpoint_id
# use "deepcopy" to prevent propagating forced changes during tests
SERVICE_L3NM_EMU_UUID = 'SVC_L3NM_EMU'
SERVICE_L3NM_EMU_ID = {
'context_id': deepcopy(CONTEXT_ID),
'service_uuid': {'uuid': SERVICE_L3NM_EMU_UUID}
}
SERVICE_L3NM_EMU = {
'service_id': deepcopy(SERVICE_L3NM_EMU_ID),
'service_type': ServiceTypeEnum.SERVICETYPE_L3NM,
'service_endpoint_ids' : [],
'service_constraints': [],
'service_status': {'service_status': ServiceStatusEnum.SERVICESTATUS_PLANNED},
'service_config': {'config_rules': []},
}
DEVICE1_UUID = 'EMU-1'
DEVICE1_ID = {'device_uuid': {'uuid': DEVICE1_UUID}}
DEVICE1_ENDPOINT_UUID = 'EP100'
DEVICE2_UUID = 'EMU-2'
DEVICE2_ID = {'device_uuid': {'uuid': DEVICE2_UUID}}
DEVICE2_ENDPOINT_UUID = 'EP100'
SERVICE_L3NM_EMU_ENDPOINT_IDS = [
endpoint_id(DEVICE1_ID, DEVICE1_ENDPOINT_UUID),
endpoint_id(DEVICE2_ID, DEVICE2_ENDPOINT_UUID),
]
SERVICE_L3NM_EMU_CONSTRAINTS = [
constraint('latency_ms', 15.2),
constraint('jitter_us', 1.2),
]
SERVICE_L3NM_EMU_CONFIG_RULES = [
config_rule_set('settings', {
'route_distinguisher': '60001:801', 'mtu': 1512, 'address_families': ['IPV4']
}),
config_rule_set('device[{:s}]/endpoint[{:s}]/settings'.format(DEVICE1_UUID, DEVICE1_ENDPOINT_UUID), {
'router_id': '10.0.0.1', 'sub_interface_index': 1,
}),
config_rule_set('device[{:s}]/endpoint[{:s}]/settings'.format(DEVICE2_UUID, DEVICE2_ENDPOINT_UUID), {
'router_id': '10.0.0.2', 'sub_interface_index': 1,
}),
]
import copy, grpc, logging, operator, os, pytest import copy, grpc, logging, os, pytest
from typing import Tuple from typing import Tuple
from google.protobuf.json_format import MessageToDict from google.protobuf.json_format import MessageToDict
from common.orm.Database import Database from common.orm.Database import Database
from common.orm.Factory import get_database_backend, BackendEnum as DatabaseBackendEnum from common.orm.Factory import get_database_backend, BackendEnum as DatabaseBackendEnum
from common.message_broker.Factory import get_messagebroker_backend, BackendEnum as MessageBrokerBackendEnum from common.message_broker.Factory import get_messagebroker_backend, BackendEnum as MessageBrokerBackendEnum
from common.message_broker.MessageBroker import MessageBroker from common.message_broker.MessageBroker import MessageBroker
from common.tests.PytestGenerateTests import pytest_generate_tests # (required) pylint: disable=unused-import
from context.Config import ( from context.Config import (
GRPC_SERVICE_PORT as CONTEXT_GRPC_SERVICE_PORT, GRPC_MAX_WORKERS as CONTEXT_GRPC_MAX_WORKERS, GRPC_SERVICE_PORT as CONTEXT_GRPC_SERVICE_PORT, GRPC_MAX_WORKERS as CONTEXT_GRPC_MAX_WORKERS,
GRPC_GRACE_PERIOD as CONTEXT_GRPC_GRACE_PERIOD) GRPC_GRACE_PERIOD as CONTEXT_GRPC_GRACE_PERIOD)
...@@ -32,11 +33,7 @@ from service.proto.context_pb2 import Service, ServiceId ...@@ -32,11 +33,7 @@ from service.proto.context_pb2 import Service, ServiceId
from service.service.ServiceService import ServiceService from service.service.ServiceService import ServiceService
from service.service.service_handler_api.ServiceHandlerFactory import ServiceHandlerFactory from service.service.service_handler_api.ServiceHandlerFactory import ServiceHandlerFactory
from service.service.service_handlers import SERVICE_HANDLERS from service.service.service_handlers import SERVICE_HANDLERS
from service.tests.Service_L3NM_EMU import ( from .CommonObjects import CONTEXT, TOPOLOGY
SERVICE_L3NM_EMU, SERVICE_L3NM_EMU_CONFIG_RULES, SERVICE_L3NM_EMU_CONSTRAINTS, SERVICE_L3NM_EMU_ENDPOINT_IDS,
SERVICE_L3NM_EMU_ID)
#from device.service.MonitoringLoops import MonitoringLoops
from .CommonObjects import CONTEXT, DEVICE_EMU1, DEVICE_EMU2, DEVICE_EMU_CONNECT_RULES, TOPOLOGY
LOGGER = logging.getLogger(__name__) LOGGER = logging.getLogger(__name__)
LOGGER.setLevel(logging.DEBUG) LOGGER.setLevel(logging.DEBUG)
...@@ -144,126 +141,130 @@ def grpc_message_to_json_string(message): ...@@ -144,126 +141,130 @@ def grpc_message_to_json_string(message):
return str(MessageToDict( return str(MessageToDict(
message, including_default_value_fields=True, preserving_proto_field_name=True, use_integers_for_enums=False)) message, including_default_value_fields=True, preserving_proto_field_name=True, use_integers_for_enums=False))
try:
def test_prepare_environment( from .ServiceHandlersToTest import SERVICE_HANDLERS_TO_TEST
context_client : ContextClient, device_client : DeviceClient): # pylint: disable=redefined-outer-name except ImportError:
LOGGER.exception('Unable to load service handlers, nothing will be tested.')
context_client.SetContext(Context(**CONTEXT)) SERVICE_HANDLERS_TO_TEST = []
context_client.SetTopology(Topology(**TOPOLOGY))
class TestServiceHandlers:
DEVICE_EMU1_WITH_CONNECT_RULES = copy.deepcopy(DEVICE_EMU1) scenarios = SERVICE_HANDLERS_TO_TEST
DEVICE_EMU1_WITH_CONNECT_RULES['device_config']['config_rules'].extend(DEVICE_EMU_CONNECT_RULES)
device_client.AddDevice(Device(**DEVICE_EMU1_WITH_CONNECT_RULES)) def test_prepare_environment(
self, service_id, service_descriptor, service_endpoint_ids, service_config_rules, service_constraints,
DEVICE_EMU2_WITH_CONNECT_RULES = copy.deepcopy(DEVICE_EMU2) service_device_uuids, get_device_descriptor, get_connect_rules,
DEVICE_EMU2_WITH_CONNECT_RULES['device_config']['config_rules'].extend(DEVICE_EMU_CONNECT_RULES) context_client : ContextClient, # pylint: disable=redefined-outer-name
device_client.AddDevice(Device(**DEVICE_EMU2_WITH_CONNECT_RULES)) device_client : DeviceClient): # pylint: disable=redefined-outer-name
context_client.SetContext(Context(**CONTEXT))
def test_service_create_error_cases( context_client.SetTopology(Topology(**TOPOLOGY))
context_client : ContextClient, # pylint: disable=redefined-outer-name
service_client : ServiceClient, # pylint: disable=redefined-outer-name for device_uuid in service_device_uuids:
service_service : ServiceService): # pylint: disable=redefined-outer-name device_with_connect_rules = copy.deepcopy(get_device_descriptor(device_uuid))
device_with_connect_rules['device_config']['config_rules'].extend(get_connect_rules(device_uuid))
with pytest.raises(grpc.RpcError) as e: device_client.AddDevice(Device(**device_with_connect_rules))
SERVICE_WITH_ENDPOINTS = copy.deepcopy(SERVICE_L3NM_EMU)
SERVICE_WITH_ENDPOINTS['service_endpoint_ids'].extend(SERVICE_L3NM_EMU_ENDPOINT_IDS) def test_service_create_error_cases(
service_client.CreateService(Service(**SERVICE_WITH_ENDPOINTS)) self, service_id, service_descriptor, service_endpoint_ids, service_config_rules, service_constraints,
assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT service_device_uuids, get_device_descriptor, get_connect_rules,
msg_head = 'service.service_endpoint_ids([' service_client : ServiceClient): # pylint: disable=redefined-outer-name
msg_tail = ']) is invalid; RPC method CreateService does not accept Endpoints. '\
'Endpoints should be configured after creating the service.' with pytest.raises(grpc.RpcError) as e:
except_msg = str(e.value.details()) service_with_endpoints = copy.deepcopy(service_descriptor)
assert except_msg.startswith(msg_head) and except_msg.endswith(msg_tail) service_with_endpoints['service_endpoint_ids'].extend(service_endpoint_ids)
service_client.CreateService(Service(**service_with_endpoints))
with pytest.raises(grpc.RpcError) as e: assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
SERVICE_WITH_CONFIG_RULES = copy.deepcopy(SERVICE_L3NM_EMU) msg_head = 'service.service_endpoint_ids(['
SERVICE_WITH_CONFIG_RULES['service_config']['config_rules'].extend(SERVICE_L3NM_EMU_CONFIG_RULES) msg_tail = ']) is invalid; RPC method CreateService does not accept Endpoints. '\
service_client.CreateService(Service(**SERVICE_WITH_CONFIG_RULES)) 'Endpoints should be configured after creating the service.'
assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT except_msg = str(e.value.details())
msg_head = 'service.service_config.config_rules([' assert except_msg.startswith(msg_head) and except_msg.endswith(msg_tail)
msg_tail = ']) is invalid; RPC method CreateService does not accept Config Rules. '\
'Config Rules should be configured after creating the service.' with pytest.raises(grpc.RpcError) as e:
except_msg = str(e.value.details()) service_with_config_rules = copy.deepcopy(service_descriptor)
assert except_msg.startswith(msg_head) and except_msg.endswith(msg_tail) service_with_config_rules['service_config']['config_rules'].extend(service_config_rules)
service_client.CreateService(Service(**service_with_config_rules))
with pytest.raises(grpc.RpcError) as e: assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
SERVICE_WITH_CONSTRAINTS = copy.deepcopy(SERVICE_L3NM_EMU) msg_head = 'service.service_config.config_rules(['
SERVICE_WITH_CONSTRAINTS['service_constraints'].extend(SERVICE_L3NM_EMU_CONSTRAINTS) msg_tail = ']) is invalid; RPC method CreateService does not accept Config Rules. '\
service_client.CreateService(Service(**SERVICE_WITH_CONSTRAINTS)) 'Config Rules should be configured after creating the service.'
assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT except_msg = str(e.value.details())
msg_head = 'service.service_constraints([' assert except_msg.startswith(msg_head) and except_msg.endswith(msg_tail)
msg_tail = ']) is invalid; RPC method CreateService does not accept Constraints. '\
'Constraints should be configured after creating the service.' with pytest.raises(grpc.RpcError) as e:
except_msg = str(e.value.details()) service_with_constraints = copy.deepcopy(service_descriptor)
assert except_msg.startswith(msg_head) and except_msg.endswith(msg_tail) service_with_constraints['service_constraints'].extend(service_constraints)
service_client.CreateService(Service(**service_with_constraints))
assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
def test_service_create_correct( msg_head = 'service.service_constraints(['
context_client : ContextClient, # pylint: disable=redefined-outer-name msg_tail = ']) is invalid; RPC method CreateService does not accept Constraints. '\
service_client : ServiceClient, # pylint: disable=redefined-outer-name 'Constraints should be configured after creating the service.'
service_service : ServiceService): # pylint: disable=redefined-outer-name except_msg = str(e.value.details())
assert except_msg.startswith(msg_head) and except_msg.endswith(msg_tail)
service_client.CreateService(Service(**SERVICE_L3NM_EMU))
#driver : _Driver = device_service.driver_instance_cache.get(DEVICE_EMU_UUID) # we know the driver exists now
#assert driver is not None def test_service_create_correct(
self, service_id, service_descriptor, service_endpoint_ids, service_config_rules, service_constraints,
service_device_uuids, get_device_descriptor, get_connect_rules,
def test_service_get_created( service_client : ServiceClient): # pylint: disable=redefined-outer-name
context_client : ContextClient, # pylint: disable=redefined-outer-name
service_client : ServiceClient, # pylint: disable=redefined-outer-name service_client.CreateService(Service(**service_descriptor))
service_service : ServiceService): # pylint: disable=redefined-outer-name
service_data = context_client.GetService(ServiceId(**SERVICE_L3NM_EMU_ID)) def test_service_get_created(
LOGGER.info('service_data = {:s}'.format(grpc_message_to_json_string(service_data))) self, service_id, service_descriptor, service_endpoint_ids, service_config_rules, service_constraints,
service_device_uuids, get_device_descriptor, get_connect_rules,
context_client : ContextClient): # pylint: disable=redefined-outer-name
def test_service_update(
context_client : ContextClient, # pylint: disable=redefined-outer-name service_data = context_client.GetService(ServiceId(**service_id))
service_client : ServiceClient, # pylint: disable=redefined-outer-name LOGGER.info('service_data = {:s}'.format(grpc_message_to_json_string(service_data)))
service_service : ServiceService): # pylint: disable=redefined-outer-name
# Configure def test_service_update(
SERVICE_WITH_SETTINGS = copy.deepcopy(SERVICE_L3NM_EMU) self, service_id, service_descriptor, service_endpoint_ids, service_config_rules, service_constraints,
SERVICE_WITH_SETTINGS['service_endpoint_ids'].extend(SERVICE_L3NM_EMU_ENDPOINT_IDS) service_device_uuids, get_device_descriptor, get_connect_rules,
SERVICE_WITH_SETTINGS['service_config']['config_rules'].extend(SERVICE_L3NM_EMU_CONFIG_RULES) context_client : ContextClient, # pylint: disable=redefined-outer-name
SERVICE_WITH_SETTINGS['service_constraints'].extend(SERVICE_L3NM_EMU_CONSTRAINTS) service_client : ServiceClient): # pylint: disable=redefined-outer-name
service_client.UpdateService(Service(**SERVICE_WITH_SETTINGS))
# Configure
for endpoint_id in SERVICE_L3NM_EMU_ENDPOINT_IDS: service_with_settings = copy.deepcopy(service_descriptor)
device_id = endpoint_id['device_id'] service_with_settings['service_endpoint_ids'].extend(service_endpoint_ids)
device_data = context_client.GetDevice(DeviceId(**device_id)) service_with_settings['service_config']['config_rules'].extend(service_config_rules)
for i,config_rule in enumerate(device_data.device_config.config_rules): service_with_settings['service_constraints'].extend(service_constraints)
LOGGER.info('device_data[{:s}][#{:d}] => {:s}'.format( service_client.UpdateService(Service(**service_with_settings))
str(device_id), i, grpc_message_to_json_string(config_rule)))
for endpoint_id in service_endpoint_ids:
# Deconfigure device_id = endpoint_id['device_id']
SERVICE_WITH_SETTINGS = copy.deepcopy(SERVICE_L3NM_EMU) device_data = context_client.GetDevice(DeviceId(**device_id))
SERVICE_WITH_SETTINGS['service_endpoint_ids'].extend([]) # remove endpoints for i,config_rule in enumerate(device_data.device_config.config_rules):
service_client.UpdateService(Service(**SERVICE_WITH_SETTINGS)) LOGGER.info('device_data[{:s}][#{:d}] => {:s}'.format(
str(device_id), i, grpc_message_to_json_string(config_rule)))
for endpoint_id in SERVICE_L3NM_EMU_ENDPOINT_IDS:
device_id = endpoint_id['device_id'] # Deconfigure
device_data = context_client.GetDevice(DeviceId(**device_id)) service_with_settings = copy.deepcopy(service_descriptor)
for i,config_rule in enumerate(device_data.device_config.config_rules): service_with_settings['service_endpoint_ids'].extend([]) # remove endpoints
LOGGER.info('device_data[{:s}][#{:d}] => {:s}'.format( service_client.UpdateService(Service(**service_with_settings))
str(device_id), i, grpc_message_to_json_string(config_rule)))
for endpoint_id in service_endpoint_ids:
device_id = endpoint_id['device_id']
def test_service_get_updated( device_data = context_client.GetDevice(DeviceId(**device_id))
context_client : ContextClient, # pylint: disable=redefined-outer-name for i,config_rule in enumerate(device_data.device_config.config_rules):
service_client : ServiceClient, # pylint: disable=redefined-outer-name LOGGER.info('device_data[{:s}][#{:d}] => {:s}'.format(
service_service : ServiceService): # pylint: disable=redefined-outer-name str(device_id), i, grpc_message_to_json_string(config_rule)))
service_data = context_client.GetService(ServiceId(**SERVICE_L3NM_EMU_ID))
LOGGER.info('service_data = {:s}'.format(grpc_message_to_json_string(service_data))) def test_service_get_updated(
self, service_id, service_descriptor, service_endpoint_ids, service_config_rules, service_constraints,
service_device_uuids, get_device_descriptor, get_connect_rules,
def test_service_delete( context_client : ContextClient): # pylint: disable=redefined-outer-name
context_client : ContextClient, # pylint: disable=redefined-outer-name
service_client : ServiceClient, # pylint: disable=redefined-outer-name service_data = context_client.GetService(ServiceId(**service_id))
service_service : ServiceService): # pylint: disable=redefined-outer-name LOGGER.info('service_data = {:s}'.format(grpc_message_to_json_string(service_data)))
service_client.DeleteService(ServiceId(**SERVICE_L3NM_EMU_ID))
#driver : _Driver = device_service.driver_instance_cache.get(DEVICE_EMU_UUID, {}) def test_service_delete(
#assert driver is None self, service_id, service_descriptor, service_endpoint_ids, service_config_rules, service_constraints,
service_device_uuids, get_device_descriptor, get_connect_rules,
service_client : ServiceClient): # pylint: disable=redefined-outer-name
service_client.DeleteService(ServiceId(**service_id))
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment