# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/) # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import copy, grpc, pytest from typing import Tuple from common.Constants import DEFAULT_CONTEXT_UUID, DEFAULT_TOPOLOGY_UUID from common.proto.context_pb2 import ( Context, ContextEvent, ContextId, Device, DeviceEvent, DeviceId, EventTypeEnum, Service, ServiceEvent, ServiceId, ServiceStatusEnum, ServiceTypeEnum, Topology, TopologyEvent, TopologyId) from context.client.ContextClient import ContextClient from context.client.EventsCollector import EventsCollector from .Objects import ( CONTEXT, CONTEXT_ID, DEVICE_R1, DEVICE_R1_ID, DEVICE_R1_UUID, DEVICE_R2, DEVICE_R2_ID, DEVICE_R2_UUID, SERVICE_R1_R2, SERVICE_R1_R2_ID, SERVICE_R1_R2_UUID, TOPOLOGY, TOPOLOGY_ID) def grpc_service( context_client_grpc : ContextClient, # pylint: disable=redefined-outer-name context_db_mb : Tuple[Database, MessageBroker]): # pylint: disable=redefined-outer-name Session = context_db_mb[0] # ----- Clean the database ----------------------------------------------------------------------------------------- database = Database(Session) database.clear() # ----- Initialize the EventsCollector ----------------------------------------------------------------------------- events_collector = EventsCollector(context_client_grpc) events_collector.start() # ----- Prepare dependencies for the test and capture related events ----------------------------------------------- response = context_client_grpc.SetContext(Context(**CONTEXT)) assert response.context_uuid.uuid == DEFAULT_CONTEXT_UUID response = context_client_grpc.SetTopology(Topology(**TOPOLOGY)) assert response.context_id.context_uuid.uuid == DEFAULT_CONTEXT_UUID assert response.topology_uuid.uuid == DEFAULT_TOPOLOGY_UUID response = context_client_grpc.SetDevice(Device(**DEVICE_R1)) assert response.device_uuid.uuid == DEVICE_R1_UUID response = context_client_grpc.SetDevice(Device(**DEVICE_R2)) assert response.device_uuid.uuid == DEVICE_R2_UUID # events = events_collector.get_events(block=True, count=4) # # assert isinstance(events[0], ContextEvent) # assert events[0].event.event_type == EventTypeEnum.EVENTTYPE_CREATE # assert events[0].context_id.context_uuid.uuid == DEFAULT_CONTEXT_UUID # # assert isinstance(events[1], TopologyEvent) # assert events[1].event.event_type == EventTypeEnum.EVENTTYPE_CREATE # assert events[1].topology_id.context_id.context_uuid.uuid == DEFAULT_CONTEXT_UUID # assert events[1].topology_id.topology_uuid.uuid == DEFAULT_TOPOLOGY_UUID # # assert isinstance(events[2], DeviceEvent) # assert events[2].event.event_type == EventTypeEnum.EVENTTYPE_CREATE # assert events[2].device_id.device_uuid.uuid == DEVICE_R1_UUID # # assert isinstance(events[3], DeviceEvent) # assert events[3].event.event_type == EventTypeEnum.EVENTTYPE_CREATE # assert events[3].device_id.device_uuid.uuid == DEVICE_R2_UUID LOGGER.info('----------------') # ----- Get when the object does not exist ------------------------------------------------------------------------- with pytest.raises(grpc.RpcError) as e: context_client_grpc.GetService(ServiceId(**SERVICE_R1_R2_ID)) assert e.value.code() == grpc.StatusCode.NOT_FOUND assert e.value.details() == 'Service({:s}) not found'.format(SERVICE_R1_R2_UUID) LOGGER.info('----------------') # ----- List when the object does not exist ------------------------------------------------------------------------ response = context_client_grpc.ListServiceIds(ContextId(**CONTEXT_ID)) assert len(response.service_ids) == 0 LOGGER.info('----------------') response = context_client_grpc.ListServices(ContextId(**CONTEXT_ID)) assert len(response.services) == 0 LOGGER.info('----------------') # ----- Dump state of database before create the object ------------------------------------------------------------ db_entries = database.dump_all() LOGGER.info('----- Database Dump [{:3d} entries] -------------------------'.format(len(db_entries))) for db_entry in db_entries: LOGGER.info(db_entry) LOGGER.info('-----------------------------------------------------------') assert len(db_entries) == 80 # ----- Create the object ------------------------------------------------------------------------------------------ with pytest.raises(grpc.RpcError) as e: WRONG_SERVICE = copy.deepcopy(SERVICE_R1_R2) WRONG_SERVICE['service_endpoint_ids'][0]\ ['topology_id']['context_id']['context_uuid']['uuid'] = 'ca1ea172-728f-441d-972c-feeae8c9bffc' context_client_grpc.SetService(Service(**WRONG_SERVICE)) assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT msg = 'request.service_endpoint_ids[0].topology_id.context_id.context_uuid.uuid(ca1ea172-728f-441d-972c-feeae8c9bffc) is invalid; '\ 'should be == request.service_id.context_id.context_uuid.uuid({:s})'.format(DEFAULT_CONTEXT_UUID) assert e.value.details() == msg response = context_client_grpc.SetService(Service(**SERVICE_R1_R2)) assert response.context_id.context_uuid.uuid == DEFAULT_CONTEXT_UUID assert response.service_uuid.uuid == SERVICE_R1_R2_UUID CONTEXT_WITH_SERVICE = copy.deepcopy(CONTEXT) CONTEXT_WITH_SERVICE['service_ids'].append(SERVICE_R1_R2_ID) response = context_client_grpc.SetContext(Context(**CONTEXT_WITH_SERVICE)) assert response.context_uuid.uuid == DEFAULT_CONTEXT_UUID # ----- Check create event ----------------------------------------------------------------------------------------- events = events_collector.get_events(block=True, count=2) assert isinstance(events[0], ServiceEvent) assert events[0].event.event_type == EventTypeEnum.EVENTTYPE_CREATE assert events[0].service_id.context_id.context_uuid.uuid == DEFAULT_CONTEXT_UUID assert events[0].service_id.service_uuid.uuid == SERVICE_R1_R2_UUID assert isinstance(events[1], ContextEvent) assert events[1].event.event_type == EventTypeEnum.EVENTTYPE_UPDATE assert events[1].context_id.context_uuid.uuid == DEFAULT_CONTEXT_UUID # ----- Update the object ------------------------------------------------------------------------------------------ response = context_client_grpc.SetService(Service(**SERVICE_R1_R2)) assert response.context_id.context_uuid.uuid == DEFAULT_CONTEXT_UUID assert response.service_uuid.uuid == SERVICE_R1_R2_UUID # ----- Check update event ----------------------------------------------------------------------------------------- event = events_collector.get_event(block=True) assert isinstance(event, ServiceEvent) assert event.event.event_type == EventTypeEnum.EVENTTYPE_UPDATE assert event.service_id.context_id.context_uuid.uuid == DEFAULT_CONTEXT_UUID assert event.service_id.service_uuid.uuid == SERVICE_R1_R2_UUID # ----- Dump state of database after create/update the object ------------------------------------------------------ db_entries = context_database.dump() LOGGER.info('----- Database Dump [{:3d} entries] -------------------------'.format(len(db_entries))) for db_entry in db_entries: LOGGER.info(' [{:>4s}] {:40s} :: {:s}'.format(*db_entry)) # pragma: no cover LOGGER.info('-----------------------------------------------------------') assert len(db_entries) == 108 # ----- Get when the object exists --------------------------------------------------------------------------------- response = context_client_grpc.GetService(ServiceId(**SERVICE_R1_R2_ID)) assert response.service_id.context_id.context_uuid.uuid == DEFAULT_CONTEXT_UUID assert response.service_id.service_uuid.uuid == SERVICE_R1_R2_UUID assert response.service_type == ServiceTypeEnum.SERVICETYPE_L3NM assert len(response.service_endpoint_ids) == 2 assert len(response.service_constraints) == 2 assert response.service_status.service_status == ServiceStatusEnum.SERVICESTATUS_PLANNED assert len(response.service_config.config_rules) == 3 # ----- List when the object exists -------------------------------------------------------------------------------- response = context_client_grpc.ListServiceIds(ContextId(**CONTEXT_ID)) assert len(response.service_ids) == 1 assert response.service_ids[0].context_id.context_uuid.uuid == DEFAULT_CONTEXT_UUID assert response.service_ids[0].service_uuid.uuid == SERVICE_R1_R2_UUID response = context_client_grpc.ListServices(ContextId(**CONTEXT_ID)) assert len(response.services) == 1 assert response.services[0].service_id.context_id.context_uuid.uuid == DEFAULT_CONTEXT_UUID assert response.services[0].service_id.service_uuid.uuid == SERVICE_R1_R2_UUID assert response.services[0].service_type == ServiceTypeEnum.SERVICETYPE_L3NM assert len(response.services[0].service_endpoint_ids) == 2 assert len(response.services[0].service_constraints) == 2 assert response.services[0].service_status.service_status == ServiceStatusEnum.SERVICESTATUS_PLANNED assert len(response.services[0].service_config.config_rules) == 3 # ----- Remove the object ------------------------------------------------------------------------------------------ context_client_grpc.RemoveService(ServiceId(**SERVICE_R1_R2_ID)) context_client_grpc.RemoveDevice(DeviceId(**DEVICE_R1_ID)) context_client_grpc.RemoveDevice(DeviceId(**DEVICE_R2_ID)) context_client_grpc.RemoveTopology(TopologyId(**TOPOLOGY_ID)) context_client_grpc.RemoveContext(ContextId(**CONTEXT_ID)) # ----- Check remove event ----------------------------------------------------------------------------------------- events = events_collector.get_events(block=True, count=5) assert isinstance(events[0], ServiceEvent) assert events[0].service_id.context_id.context_uuid.uuid == DEFAULT_CONTEXT_UUID assert events[0].service_id.service_uuid.uuid == SERVICE_R1_R2_UUID assert isinstance(events[1], DeviceEvent) assert events[1].event.event_type == EventTypeEnum.EVENTTYPE_REMOVE assert events[1].device_id.device_uuid.uuid == DEVICE_R1_UUID assert isinstance(events[2], DeviceEvent) assert events[2].event.event_type == EventTypeEnum.EVENTTYPE_REMOVE assert events[2].device_id.device_uuid.uuid == DEVICE_R2_UUID assert isinstance(events[3], TopologyEvent) assert events[3].event.event_type == EventTypeEnum.EVENTTYPE_REMOVE assert events[3].topology_id.context_id.context_uuid.uuid == DEFAULT_CONTEXT_UUID assert events[3].topology_id.topology_uuid.uuid == DEFAULT_TOPOLOGY_UUID assert isinstance(events[4], ContextEvent) assert events[4].event.event_type == EventTypeEnum.EVENTTYPE_REMOVE assert events[4].context_id.context_uuid.uuid == DEFAULT_CONTEXT_UUID # ----- Stop the EventsCollector ----------------------------------------------------------------------------------- events_collector.stop() # ----- Dump state of database after remove the object ------------------------------------------------------------- db_entries = context_database.dump() LOGGER.info('----- Database Dump [{:3d} entries] -------------------------'.format(len(db_entries))) for db_entry in db_entries: LOGGER.info(' [{:>4s}] {:40s} :: {:s}'.format(*db_entry)) # pragma: no cover LOGGER.info('-----------------------------------------------------------') assert len(db_entries) == 0