Skip to content
Snippets Groups Projects
test_link.py 10.4 KiB
Newer Older
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
import copy, grpc, pytest #, time
from common.proto.context_pb2 import Context, ContextId, Device, DeviceId, Empty, Link, LinkId, Topology, TopologyId
#from common.proto.context_pb2 import ContextEvent, DeviceEvent, EventTypeEnum, LinkEvent, TopologyEvent
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from context.client.ContextClient import ContextClient
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
#from context.client.EventsCollector import EventsCollector
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from context.service.database.uuids.Link import link_get_uuid
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
#from .Constants import GET_EVENTS_TIMEOUT
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from .Objects import (
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    CONTEXT, CONTEXT_ID, DEVICE_R1, DEVICE_R1_ID, DEVICE_R2, DEVICE_R2_ID, LINK_R1_R2, LINK_R1_R2_ID, LINK_R1_R2_NAME,
    TOPOLOGY, TOPOLOGY_ID)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
@pytest.mark.depends(on=['context/tests/test_device.py::test_device'])
def test_link(context_client : ContextClient) -> None:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

    # ----- Initialize the EventsCollector -----------------------------------------------------------------------------
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    #events_collector = EventsCollector(
    #    context_client, log_events_received=True,
    #    activate_context_collector = True, activate_topology_collector = True, activate_device_collector = True,
    #    activate_link_collector = True, activate_service_collector = False, activate_slice_collector = False,
    #    activate_connection_collector = False)
    #events_collector.start()
    #time.sleep(3)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

    # ----- Prepare dependencies for the test and capture related events -----------------------------------------------
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    response = context_client.SetContext(Context(**CONTEXT))
    context_uuid = response.context_uuid.uuid
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    response = context_client.SetTopology(Topology(**TOPOLOGY))
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    assert response.context_id.context_uuid.uuid == context_uuid
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    topology_uuid = response.topology_uuid.uuid
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    response = context_client.SetDevice(Device(**DEVICE_R1))
    device_r1_uuid = response.device_uuid.uuid
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    response = context_client.SetDevice(Device(**DEVICE_R2))
    device_r2_uuid = response.device_uuid.uuid
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    #events = events_collector.get_events(block=True, count=4, timeout=GET_EVENTS_TIMEOUT)
    #assert isinstance(events[0], ContextEvent)
    #assert events[0].event.event_type == EventTypeEnum.EVENTTYPE_CREATE
    #assert events[0].context_id.context_uuid.uuid == context_uuid
    #assert isinstance(events[1], TopologyEvent)
    #assert events[1].event.event_type == EventTypeEnum.EVENTTYPE_CREATE
    #assert events[1].topology_id.context_id.context_uuid.uuid == context_uuid
    #assert events[1].topology_id.topology_uuid.uuid == topology_uuid
    #assert isinstance(events[2], DeviceEvent)
    #assert events[2].event.event_type == EventTypeEnum.EVENTTYPE_CREATE
    #assert events[2].device_id.device_uuid.uuid == device_r1_uuid
    #assert isinstance(events[3], DeviceEvent)
    #assert events[3].event.event_type == EventTypeEnum.EVENTTYPE_CREATE
    #assert events[3].device_id.device_uuid.uuid == device_r2_uuid
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

    # ----- Get when the object does not exist -------------------------------------------------------------------------
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    link_id = LinkId(**LINK_R1_R2_ID)
    link_uuid = link_get_uuid(link_id, allow_random=False)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    with pytest.raises(grpc.RpcError) as e:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        context_client.GetLink(link_id)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    assert e.value.code() == grpc.StatusCode.NOT_FOUND
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    MSG = 'Link({:s}) not found; link_uuid generated was: {:s}'
    assert e.value.details() == MSG.format(LINK_R1_R2_NAME, link_uuid)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

    # ----- List when the object does not exist ------------------------------------------------------------------------
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    response = context_client.ListLinkIds(Empty())
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    assert len(response.link_ids) == 0

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    response = context_client.ListLinks(Empty())
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    assert len(response.links) == 0

    # ----- Create the object ------------------------------------------------------------------------------------------
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    response = context_client.SetLink(Link(**LINK_R1_R2))
    assert response.link_uuid.uuid == link_uuid
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

    # ----- Check create event -----------------------------------------------------------------------------------------
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    #event = events_collector.get_event(block=True, timeout=GET_EVENTS_TIMEOUT)
    #assert isinstance(event, LinkEvent)
    #assert event.event.event_type == EventTypeEnum.EVENTTYPE_CREATE
    #assert event.link_id.link_uuid.uuid == link_uuid
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

    # ----- Get when the object exists ---------------------------------------------------------------------------------
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    response = context_client.GetLink(LinkId(**LINK_R1_R2_ID))
    assert response.link_id.link_uuid.uuid == link_uuid
    assert response.name == LINK_R1_R2_NAME
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    assert len(response.link_endpoint_ids) == 2

    # ----- List when the object exists --------------------------------------------------------------------------------
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    response = context_client.ListLinkIds(Empty())
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    assert len(response.link_ids) == 1
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    assert response.link_ids[0].link_uuid.uuid == link_uuid
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    response = context_client.ListLinks(Empty())
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    assert len(response.links) == 1
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    assert response.links[0].link_id.link_uuid.uuid == link_uuid
    assert response.links[0].name == LINK_R1_R2_NAME
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    assert len(response.links[0].link_endpoint_ids) == 2

    # ----- Update the object ------------------------------------------------------------------------------------------
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    new_link_name = 'new'
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    LINK_UPDATED = copy.deepcopy(LINK_R1_R2)
    LINK_UPDATED['name'] = new_link_name
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    response = context_client.SetLink(Link(**LINK_UPDATED))
    assert response.link_uuid.uuid == link_uuid
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

    # ----- Check update event -----------------------------------------------------------------------------------------
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    #event = events_collector.get_event(block=True, timeout=GET_EVENTS_TIMEOUT)
    #assert isinstance(event, LinkEvent)
    #assert event.event.event_type == EventTypeEnum.EVENTTYPE_UPDATE
    #assert event.link_id.link_uuid.uuid == link_uuid
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

    # ----- Get when the object is modified ----------------------------------------------------------------------------
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    response = context_client.GetLink(LinkId(**LINK_R1_R2_ID))
    assert response.link_id.link_uuid.uuid == link_uuid
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    assert response.name == new_link_name
    assert len(response.link_endpoint_ids) == 2

    # ----- List when the object is modified ---------------------------------------------------------------------------
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    response = context_client.ListLinkIds(Empty())
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    assert len(response.link_ids) == 1
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    assert response.link_ids[0].link_uuid.uuid == link_uuid
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    response = context_client.ListLinks(Empty())
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    assert len(response.links) == 1
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    assert response.links[0].link_id.link_uuid.uuid == link_uuid
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    assert response.links[0].name == new_link_name
    assert len(response.links[0].link_endpoint_ids) == 2

    # ----- Check relation was created ---------------------------------------------------------------------------------
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    response = context_client.GetTopology(TopologyId(**TOPOLOGY_ID))
    assert response.topology_id.context_id.context_uuid.uuid == context_uuid
    assert response.topology_id.topology_uuid.uuid == topology_uuid
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    assert len(response.device_ids) == 2
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    assert response.device_ids[0].device_uuid.uuid in {device_r1_uuid, device_r2_uuid}
    assert response.device_ids[1].device_uuid.uuid in {device_r1_uuid, device_r2_uuid}
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    assert len(response.link_ids) == 1
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    assert response.link_ids[0].link_uuid.uuid == link_uuid
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

    # ----- Remove the object ------------------------------------------------------------------------------------------
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    context_client.RemoveLink(LinkId(**LINK_R1_R2_ID))
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

    # ----- Check remove event -----------------------------------------------------------------------------------------
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    #event = events_collector.get_event(block=True, timeout=GET_EVENTS_TIMEOUT)
    #assert isinstance(event, LinkEvent)
    #assert event.event.event_type == EventTypeEnum.EVENTTYPE_REMOVE
    #assert event.link_id.link_uuid.uuid == link_uuid
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

    # ----- List after deleting the object -----------------------------------------------------------------------------
    response = context_client.ListLinkIds(Empty())
    assert len(response.link_ids) == 0

    response = context_client.ListLinks(Empty())
    assert len(response.links) == 0

    response = context_client.GetTopology(TopologyId(**TOPOLOGY_ID))
    assert response.topology_id.context_id.context_uuid.uuid == context_uuid
    assert response.topology_id.topology_uuid.uuid == topology_uuid
    assert len(response.device_ids) == 2
    assert response.device_ids[0].device_uuid.uuid in {device_r1_uuid, device_r2_uuid}
    assert response.device_ids[1].device_uuid.uuid in {device_r1_uuid, device_r2_uuid}
    assert len(response.link_ids) == 0

    # ----- Clean dependencies used in the test and capture related events ---------------------------------------------
    context_client.RemoveDevice(DeviceId(**DEVICE_R1_ID))
    context_client.RemoveDevice(DeviceId(**DEVICE_R2_ID))
    context_client.RemoveTopology(TopologyId(**TOPOLOGY_ID))
    context_client.RemoveContext(ContextId(**CONTEXT_ID))

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    #events = events_collector.get_events(block=True, count=4, timeout=GET_EVENTS_TIMEOUT)
    #assert isinstance(events[0], DeviceEvent)
    #assert events[0].event.event_type == EventTypeEnum.EVENTTYPE_REMOVE
    #assert events[0].device_id.device_uuid.uuid == device_r1_uuid
    #assert isinstance(events[1], DeviceEvent)
    #assert events[1].event.event_type == EventTypeEnum.EVENTTYPE_REMOVE
    #assert events[1].device_id.device_uuid.uuid == device_r2_uuid
    #assert isinstance(events[2], TopologyEvent)
    #assert events[2].event.event_type == EventTypeEnum.EVENTTYPE_REMOVE
    #assert events[2].topology_id.context_id.context_uuid.uuid == context_uuid
    #assert events[2].topology_id.topology_uuid.uuid == topology_uuid
    #assert isinstance(events[3], ContextEvent)
    #assert events[3].event.event_type == EventTypeEnum.EVENTTYPE_REMOVE
    #assert events[3].context_id.context_uuid.uuid == context_uuid
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

    # ----- Stop the EventsCollector -----------------------------------------------------------------------------------
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    #events_collector.stop()