Skip to content
Snippets Groups Projects
test_link.py 10.2 KiB
Newer Older
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
import copy, grpc, pytest, time
from common.proto.context_pb2 import (
    Context, ContextEvent, ContextId, Device, DeviceEvent, DeviceId, Empty, EventTypeEnum, Link, LinkEvent, LinkId,
    Topology, TopologyEvent, TopologyId)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from context.client.ContextClient import ContextClient
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from context.client.EventsCollector import EventsCollector
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from context.service.database.uuids.Link import link_get_uuid
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from .Objects import (
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    CONTEXT, CONTEXT_ID, DEVICE_R1, DEVICE_R1_ID, DEVICE_R2, DEVICE_R2_ID, LINK_R1_R2, LINK_R1_R2_ID, LINK_R1_R2_NAME,
    TOPOLOGY, TOPOLOGY_ID)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
@pytest.mark.depends(on=['context/tests/test_device.py::test_device'])
def test_link(context_client : ContextClient) -> None:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

    # ----- Initialize the EventsCollector -----------------------------------------------------------------------------
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    events_collector = EventsCollector(
        context_client, log_events_received=True,
        activate_context_collector = True, activate_topology_collector = True, activate_device_collector = True,
        activate_link_collector = True, activate_service_collector = False, activate_slice_collector = False,
        activate_connection_collector = False)
    events_collector.start()
    time.sleep(3)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

    # ----- Prepare dependencies for the test and capture related events -----------------------------------------------
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    response = context_client.SetContext(Context(**CONTEXT))
    context_uuid = response.context_uuid.uuid
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    response = context_client.SetTopology(Topology(**TOPOLOGY))
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    assert response.context_id.context_uuid.uuid == context_uuid
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    topology_uuid = response.topology_uuid.uuid
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    response = context_client.SetDevice(Device(**DEVICE_R1))
    device_r1_uuid = response.device_uuid.uuid
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    response = context_client.SetDevice(Device(**DEVICE_R2))
    device_r2_uuid = response.device_uuid.uuid
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    events = events_collector.get_events(block=True, count=4, timeout=1.0)
    assert isinstance(events[0], ContextEvent)
    assert events[0].event.event_type == EventTypeEnum.EVENTTYPE_CREATE
    assert events[0].context_id.context_uuid.uuid == context_uuid
    assert isinstance(events[1], TopologyEvent)
    assert events[1].event.event_type == EventTypeEnum.EVENTTYPE_CREATE
    assert events[1].topology_id.context_id.context_uuid.uuid == context_uuid
    assert events[1].topology_id.topology_uuid.uuid == topology_uuid
    assert isinstance(events[2], DeviceEvent)
    assert events[2].event.event_type == EventTypeEnum.EVENTTYPE_CREATE
    assert events[2].device_id.device_uuid.uuid == device_r1_uuid
    assert isinstance(events[3], DeviceEvent)
    assert events[3].event.event_type == EventTypeEnum.EVENTTYPE_CREATE
    assert events[3].device_id.device_uuid.uuid == device_r2_uuid
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

    # ----- Get when the object does not exist -------------------------------------------------------------------------
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    link_id = LinkId(**LINK_R1_R2_ID)
    link_uuid = link_get_uuid(link_id, allow_random=False)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    with pytest.raises(grpc.RpcError) as e:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        context_client.GetLink(link_id)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    assert e.value.code() == grpc.StatusCode.NOT_FOUND
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    MSG = 'Link({:s}) not found; link_uuid generated was: {:s}'
    assert e.value.details() == MSG.format(LINK_R1_R2_NAME, link_uuid)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

    # ----- List when the object does not exist ------------------------------------------------------------------------
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    response = context_client.ListLinkIds(Empty())
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    assert len(response.link_ids) == 0

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    response = context_client.ListLinks(Empty())
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    assert len(response.links) == 0

    # ----- Create the object ------------------------------------------------------------------------------------------
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    response = context_client.SetLink(Link(**LINK_R1_R2))
    assert response.link_uuid.uuid == link_uuid
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

    # ----- Check create event -----------------------------------------------------------------------------------------
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    event = events_collector.get_event(block=True, timeout=1.0)
    assert isinstance(event, LinkEvent)
    assert event.event.event_type == EventTypeEnum.EVENTTYPE_CREATE
    assert event.link_id.link_uuid.uuid == link_uuid
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

    # ----- Get when the object exists ---------------------------------------------------------------------------------
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    response = context_client.GetLink(LinkId(**LINK_R1_R2_ID))
    assert response.link_id.link_uuid.uuid == link_uuid
    assert response.name == LINK_R1_R2_NAME
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    assert len(response.link_endpoint_ids) == 2

    # ----- List when the object exists --------------------------------------------------------------------------------
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    response = context_client.ListLinkIds(Empty())
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    assert len(response.link_ids) == 1
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    assert response.link_ids[0].link_uuid.uuid == link_uuid
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    response = context_client.ListLinks(Empty())
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    assert len(response.links) == 1
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    assert response.links[0].link_id.link_uuid.uuid == link_uuid
    assert response.links[0].name == LINK_R1_R2_NAME
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    assert len(response.links[0].link_endpoint_ids) == 2

    # ----- Update the object ------------------------------------------------------------------------------------------
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    new_link_name = 'new'
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    LINK_UPDATED = copy.deepcopy(LINK_R1_R2)
    LINK_UPDATED['name'] = new_link_name
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    response = context_client.SetLink(Link(**LINK_UPDATED))
    assert response.link_uuid.uuid == link_uuid
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

    # ----- Check update event -----------------------------------------------------------------------------------------
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    event = events_collector.get_event(block=True, timeout=1.0)
    assert isinstance(event, LinkEvent)
    assert event.event.event_type == EventTypeEnum.EVENTTYPE_UPDATE
    assert event.link_id.link_uuid.uuid == link_uuid
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

    # ----- Get when the object is modified ----------------------------------------------------------------------------
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    response = context_client.GetLink(LinkId(**LINK_R1_R2_ID))
    assert response.link_id.link_uuid.uuid == link_uuid
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    assert response.name == new_link_name
    assert len(response.link_endpoint_ids) == 2

    # ----- List when the object is modified ---------------------------------------------------------------------------
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    response = context_client.ListLinkIds(Empty())
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    assert len(response.link_ids) == 1
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    assert response.link_ids[0].link_uuid.uuid == link_uuid
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    response = context_client.ListLinks(Empty())
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    assert len(response.links) == 1
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    assert response.links[0].link_id.link_uuid.uuid == link_uuid
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    assert response.links[0].name == new_link_name
    assert len(response.links[0].link_endpoint_ids) == 2

    # ----- Check relation was created ---------------------------------------------------------------------------------
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    response = context_client.GetTopology(TopologyId(**TOPOLOGY_ID))
    assert response.topology_id.context_id.context_uuid.uuid == context_uuid
    assert response.topology_id.topology_uuid.uuid == topology_uuid
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    assert len(response.device_ids) == 2
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    assert response.device_ids[0].device_uuid.uuid in {device_r1_uuid, device_r2_uuid}
    assert response.device_ids[1].device_uuid.uuid in {device_r1_uuid, device_r2_uuid}
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    assert len(response.link_ids) == 1
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    assert response.link_ids[0].link_uuid.uuid == link_uuid
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

    # ----- Remove the object ------------------------------------------------------------------------------------------
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    context_client.RemoveLink(LinkId(**LINK_R1_R2_ID))
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

    # ----- Check remove event -----------------------------------------------------------------------------------------
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    event = events_collector.get_event(block=True, timeout=1.0)
    assert isinstance(event, LinkEvent)
    assert event.event.event_type == EventTypeEnum.EVENTTYPE_REMOVE
    assert event.link_id.link_uuid.uuid == link_uuid
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

    # ----- List after deleting the object -----------------------------------------------------------------------------
    response = context_client.ListLinkIds(Empty())
    assert len(response.link_ids) == 0

    response = context_client.ListLinks(Empty())
    assert len(response.links) == 0

    response = context_client.GetTopology(TopologyId(**TOPOLOGY_ID))
    assert response.topology_id.context_id.context_uuid.uuid == context_uuid
    assert response.topology_id.topology_uuid.uuid == topology_uuid
    assert len(response.device_ids) == 2
    assert response.device_ids[0].device_uuid.uuid in {device_r1_uuid, device_r2_uuid}
    assert response.device_ids[1].device_uuid.uuid in {device_r1_uuid, device_r2_uuid}
    assert len(response.link_ids) == 0

    # ----- Clean dependencies used in the test and capture related events ---------------------------------------------
    context_client.RemoveDevice(DeviceId(**DEVICE_R1_ID))
    context_client.RemoveDevice(DeviceId(**DEVICE_R2_ID))
    context_client.RemoveTopology(TopologyId(**TOPOLOGY_ID))
    context_client.RemoveContext(ContextId(**CONTEXT_ID))

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    events = events_collector.get_events(block=True, count=4, timeout=1.0)
    assert isinstance(events[0], DeviceEvent)
    assert events[0].event.event_type == EventTypeEnum.EVENTTYPE_REMOVE
    assert events[0].device_id.device_uuid.uuid == device_r1_uuid
    assert isinstance(events[1], DeviceEvent)
    assert events[1].event.event_type == EventTypeEnum.EVENTTYPE_REMOVE
    assert events[1].device_id.device_uuid.uuid == device_r2_uuid
    assert isinstance(events[2], TopologyEvent)
    assert events[2].event.event_type == EventTypeEnum.EVENTTYPE_REMOVE
    assert events[2].topology_id.context_id.context_uuid.uuid == context_uuid
    assert events[2].topology_id.topology_uuid.uuid == topology_uuid
    assert isinstance(events[3], ContextEvent)
    assert events[3].event.event_type == EventTypeEnum.EVENTTYPE_REMOVE
    assert events[3].context_id.context_uuid.uuid == context_uuid
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

    # ----- Stop the EventsCollector -----------------------------------------------------------------------------------
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    events_collector.stop()