Skip to content
Snippets Groups Projects
test_service.py 12.9 KiB
Newer Older
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import copy, grpc, pytest
from common.proto.context_pb2 import (
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    Context, ContextId, Device, DeviceId, Service, ServiceId, ServiceStatusEnum, ServiceTypeEnum, Topology, TopologyId)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from context.client.ContextClient import ContextClient
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from context.service.database.uuids.Service import service_get_uuid
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
#from context.client.EventsCollector import EventsCollector
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from .Objects import (
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    CONTEXT, CONTEXT_ID, CONTEXT_NAME, DEVICE_R1, DEVICE_R1_ID, SERVICE_R1_R2_NAME, DEVICE_R2, DEVICE_R2_ID,
    SERVICE_R1_R2, SERVICE_R1_R2_ID, TOPOLOGY, TOPOLOGY_ID)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
@pytest.mark.depends(on=['context/tests/test_link.py::test_link'])
def test_service(context_client : ContextClient) -> None:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

    # ----- Initialize the EventsCollector -----------------------------------------------------------------------------
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    #events_collector = EventsCollector(
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    #    context_client, log_events_received=True,
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    #    activate_context_collector = False, activate_topology_collector = False, activate_device_collector = False,
    #    activate_link_collector = False, activate_service_collector = True, activate_slice_collector = False,
    #    activate_connection_collector = False)
    #events_collector.start()
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

    # ----- Prepare dependencies for the test and capture related events -----------------------------------------------
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    context_client.SetContext(Context(**CONTEXT))
    context_client.SetTopology(Topology(**TOPOLOGY))
    context_client.SetDevice(Device(**DEVICE_R1))
    context_client.SetDevice(Device(**DEVICE_R2))
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    # events = events_collector.get_events(block=True, count=4)
    # assert isinstance(events[0], ContextEvent)
    # assert events[0].event.event_type == EventTypeEnum.EVENTTYPE_CREATE
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    # assert events[0].context_id.context_uuid.uuid == context_uuid
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    # assert isinstance(events[1], TopologyEvent)
    # assert events[1].event.event_type == EventTypeEnum.EVENTTYPE_CREATE
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    # assert events[1].topology_id.context_id.context_uuid.uuid == context_uuid
    # assert events[1].topology_id.topology_uuid.uuid == topology_uuid
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    # assert isinstance(events[2], DeviceEvent)
    # assert events[2].event.event_type == EventTypeEnum.EVENTTYPE_CREATE
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    # assert events[2].device_id.device_uuid.uuid == device_r1_uuid
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    # assert isinstance(events[3], DeviceEvent)
    # assert events[3].event.event_type == EventTypeEnum.EVENTTYPE_CREATE
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    # assert events[3].device_id.device_uuid.uuid == device_r2_uuid
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

    # ----- Get when the object does not exist -------------------------------------------------------------------------
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    service_id = ServiceId(**SERVICE_R1_R2_ID)
    context_uuid,service_uuid = service_get_uuid(service_id, allow_random=False)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    with pytest.raises(grpc.RpcError) as e:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        context_client.GetService(service_id)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    assert e.value.code() == grpc.StatusCode.NOT_FOUND
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    MSG = 'Service({:s}/{:s}) not found; context_uuid generated was: {:s}; service_uuid generated was: {:s}'
    assert e.value.details() == MSG.format(CONTEXT_NAME, SERVICE_R1_R2_NAME, context_uuid, service_uuid)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

    # ----- List when the object does not exist ------------------------------------------------------------------------
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    response = context_client.GetContext(ContextId(**CONTEXT_ID))
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    assert len(response.topology_ids) == 1
    assert len(response.service_ids) == 0
    assert len(response.slice_ids) == 0

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    response = context_client.ListServiceIds(ContextId(**CONTEXT_ID))
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    assert len(response.service_ids) == 0

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    response = context_client.ListServices(ContextId(**CONTEXT_ID))
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    assert len(response.services) == 0

    # ----- Create the object ------------------------------------------------------------------------------------------
    with pytest.raises(grpc.RpcError) as e:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        WRONG_UUID = 'ffffffff-ffff-ffff-ffff-ffffffffffff'
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        WRONG_SERVICE = copy.deepcopy(SERVICE_R1_R2)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        WRONG_SERVICE['service_endpoint_ids'][0]['topology_id']['context_id']['context_uuid']['uuid'] = WRONG_UUID
        context_client.SetService(Service(**WRONG_SERVICE))
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    MSG = 'request.service_endpoint_ids[0].topology_id.context_id.context_uuid.uuid({}) is invalid; '\
          'should be == request.service_id.context_id.context_uuid.uuid({})'
    raw_context_uuid = service_id.context_id.context_uuid.uuid # pylint: disable=no-member
    assert e.value.details() == MSG.format(WRONG_UUID, raw_context_uuid)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    response = context_client.SetService(Service(**SERVICE_R1_R2))
    assert response.context_id.context_uuid.uuid == context_uuid
    assert response.service_uuid.uuid == service_uuid
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

    # ----- Check create event -----------------------------------------------------------------------------------------
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    #event = events_collector.get_event(block=True)
    #assert isinstance(event, ServiceEvent)
    #assert event.event.event_type == EventTypeEnum.EVENTTYPE_CREATE
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    #assert event.service_id.context_id.context_uuid.uuid == context_uuid
    #assert event.service_id.service_uuid.uuid == service_uuid
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

    # ----- Get when the object exists ---------------------------------------------------------------------------------
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    response = context_client.GetContext(ContextId(**CONTEXT_ID))
    assert response.context_id.context_uuid.uuid == context_uuid
    assert response.name == CONTEXT_NAME
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    assert len(response.topology_ids) == 1
    assert len(response.service_ids) == 1
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    assert response.service_ids[0].context_id.context_uuid.uuid == context_uuid
    assert response.service_ids[0].service_uuid.uuid == service_uuid
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    assert len(response.slice_ids) == 0

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    response = context_client.GetService(ServiceId(**SERVICE_R1_R2_ID))
    assert response.service_id.context_id.context_uuid.uuid == context_uuid
    assert response.service_id.service_uuid.uuid == service_uuid
    assert response.name == SERVICE_R1_R2_NAME
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    assert response.service_type == ServiceTypeEnum.SERVICETYPE_L3NM
    assert len(response.service_endpoint_ids) == 2
    assert len(response.service_constraints) == 2
    assert response.service_status.service_status == ServiceStatusEnum.SERVICESTATUS_PLANNED
    assert len(response.service_config.config_rules) == 3
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    # ----- List when the object exists --------------------------------------------------------------------------------
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    response = context_client.ListServiceIds(ContextId(**CONTEXT_ID))
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    assert len(response.service_ids) == 1
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    assert response.service_ids[0].context_id.context_uuid.uuid == context_uuid
    assert response.service_ids[0].service_uuid.uuid == service_uuid
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    response = context_client.ListServices(ContextId(**CONTEXT_ID))
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    assert len(response.services) == 1
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    assert response.services[0].service_id.context_id.context_uuid.uuid == context_uuid
    assert response.services[0].service_id.service_uuid.uuid == service_uuid
    assert response.services[0].name == SERVICE_R1_R2_NAME
    assert response.services[0].service_type == ServiceTypeEnum.SERVICETYPE_L3NM
    assert len(response.services[0].service_endpoint_ids) == 2
    assert len(response.services[0].service_constraints) == 2
    assert response.services[0].service_status.service_status == ServiceStatusEnum.SERVICESTATUS_PLANNED
    assert len(response.services[0].service_config.config_rules) == 3
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

    # ----- Update the object ------------------------------------------------------------------------------------------
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    new_service_name = 'new'
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    SERVICE_UPDATED = copy.deepcopy(SERVICE_R1_R2)
    SERVICE_UPDATED['name'] = new_service_name
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    SERVICE_UPDATED['service_status']['service_status'] = ServiceStatusEnum.SERVICESTATUS_ACTIVE
    response = context_client.SetService(Service(**SERVICE_UPDATED))
    assert response.context_id.context_uuid.uuid == context_uuid
    assert response.service_uuid.uuid == service_uuid
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

    # ----- Check update event -----------------------------------------------------------------------------------------
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    #event = events_collector.get_event(block=True)
    #assert isinstance(event, ServiceEvent)
    #assert event.event.event_type == EventTypeEnum.EVENTTYPE_UPDATE
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    #assert event.service_id.context_id.context_uuid.uuid == context_uuid
    #assert event.service_id.service_uuid.uuid == service_uuid
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    # ----- Get when the object is modified ----------------------------------------------------------------------------
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    response = context_client.GetService(ServiceId(**SERVICE_R1_R2_ID))
    assert response.service_id.context_id.context_uuid.uuid == context_uuid
    assert response.service_id.service_uuid.uuid == service_uuid
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    assert response.name == new_service_name
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    assert response.service_type == ServiceTypeEnum.SERVICETYPE_L3NM
    assert len(response.service_endpoint_ids) == 2
    assert len(response.service_constraints) == 2
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    assert response.service_status.service_status == ServiceStatusEnum.SERVICESTATUS_ACTIVE
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    assert len(response.service_config.config_rules) == 3

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    # ----- List when the object is modified ---------------------------------------------------------------------------
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    response = context_client.ListServiceIds(ContextId(**CONTEXT_ID))
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    assert len(response.service_ids) == 1
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    assert response.service_ids[0].context_id.context_uuid.uuid == context_uuid
    assert response.service_ids[0].service_uuid.uuid == service_uuid
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    response = context_client.ListServices(ContextId(**CONTEXT_ID))
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    assert len(response.services) == 1
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    assert response.services[0].service_id.context_id.context_uuid.uuid == context_uuid
    assert response.services[0].service_id.service_uuid.uuid == service_uuid
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    assert response.services[0].name == new_service_name
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    assert response.services[0].service_type == ServiceTypeEnum.SERVICETYPE_L3NM
    assert len(response.services[0].service_endpoint_ids) == 2
    assert len(response.services[0].service_constraints) == 2
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    assert response.services[0].service_status.service_status == ServiceStatusEnum.SERVICESTATUS_ACTIVE
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    assert len(response.services[0].service_config.config_rules) == 3

    # ----- Remove the object ------------------------------------------------------------------------------------------
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    context_client.RemoveService(ServiceId(**SERVICE_R1_R2_ID))
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

    # ----- Check remove event -----------------------------------------------------------------------------------------
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    #event = events_collector.get_event(block=True)
    #assert isinstance(event, ServiceEvent)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    #assert event.event.event_type == EventTypeEnum.EVENTTYPE_REMOVE
    #assert event.service_id.context_id.context_uuid.uuid == context_uuid
    #assert event.service_id.service_uuid.uuid == service_uuid
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

    # ----- List after deleting the object -----------------------------------------------------------------------------
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    response = context_client.GetContext(ContextId(**CONTEXT_ID))
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    assert len(response.topology_ids) == 1
    assert len(response.service_ids) == 0
    assert len(response.slice_ids) == 0
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    response = context_client.ListServiceIds(ContextId(**CONTEXT_ID))
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    assert len(response.service_ids) == 0
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    response = context_client.ListServices(ContextId(**CONTEXT_ID))
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    assert len(response.services) == 0
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    # ----- Clean dependencies used in the test and capture related events ---------------------------------------------
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    context_client.RemoveDevice(DeviceId(**DEVICE_R1_ID))
    context_client.RemoveDevice(DeviceId(**DEVICE_R2_ID))
    context_client.RemoveTopology(TopologyId(**TOPOLOGY_ID))
    context_client.RemoveContext(ContextId(**CONTEXT_ID))
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    #events = events_collector.get_events(block=True, count=4)
    #assert isinstance(events[0], DeviceEvent)
    #assert events[0].event.event_type == EventTypeEnum.EVENTTYPE_REMOVE
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    #assert events[0].device_id.device_uuid.uuid == device_r1_uuid
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    #assert isinstance(events[1], DeviceEvent)
    #assert events[1].event.event_type == EventTypeEnum.EVENTTYPE_REMOVE
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    #assert events[1].device_id.device_uuid.uuid == device_r2_uuid
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    #assert isinstance(events[2], TopologyEvent)
    #assert events[2].event.event_type == EventTypeEnum.EVENTTYPE_REMOVE
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    #assert events[2].topology_id.context_id.context_uuid.uuid == context_uuid
    #assert events[2].topology_id.topology_uuid.uuid == topology_uuid
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    #assert isinstance(events[3], ContextEvent)
    #assert events[3].event.event_type == EventTypeEnum.EVENTTYPE_REMOVE
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    #assert events[3].context_id.context_uuid.uuid == context_uuid
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

    # ----- Stop the EventsCollector -----------------------------------------------------------------------------------
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    #events_collector.stop()