Skip to content
Snippets Groups Projects
test_unitary.py 18 KiB
Newer Older
# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

Javi Moreno's avatar
Javi Moreno committed
import os
Javi Moreno's avatar
Javi Moreno committed
import pytest
Javi Moreno's avatar
Javi Moreno committed

from monitoring.client.monitoring_client import MonitoringClient
from monitoring.Config import GRPC_SERVICE_PORT, GRPC_MAX_WORKERS, GRPC_GRACE_PERIOD, DEVICE_GRPC_SERVICE_PORT
from monitoring.proto import context_pb2, monitoring_pb2
from monitoring.proto.kpi_sample_types_pb2 import KpiSampleType
Javi Moreno's avatar
Javi Moreno committed
from monitoring.service import SqliteTools, InfluxTools
from monitoring.service.MonitoringService import MonitoringService
from monitoring.service.EventTools import EventsDeviceCollector

from common.orm.Database import Database
from common.orm.Factory import get_database_backend, BackendEnum as DatabaseBackendEnum
from common.message_broker.Factory import get_messagebroker_backend, BackendEnum as MessageBrokerBackendEnum
from common.message_broker.MessageBroker import MessageBroker

from context.Config import (
    GRPC_SERVICE_PORT as grpc_port_context,
    GRPC_MAX_WORKERS as grpc_workers_context,
    GRPC_GRACE_PERIOD as grpc_grace_context
)
from context.client.ContextClient import ContextClient
from context.service.grpc_server.ContextService import ContextService
from context.service.Populate import populate
from context.proto.context_pb2 import EventTypeEnum, DeviceEvent, Device
from context.tests.Objects import (DEVICE_R1, DEVICE_R1_UUID)

LOGGER = logging.getLogger(__name__)
LOGGER.setLevel(logging.DEBUG)

Javi Moreno's avatar
Javi Moreno committed
###########################
# Tests Setup
###########################
SERVER_ADDRESS = '127.0.0.1'
LISTEN_ADDRESS = '[::]'
GRPC_PORT_MONITORING = 7070

GRPC_PORT_CONTEXT    = 10000 + grpc_port_context    # avoid privileged ports
DEVICE_GRPC_SERVICE_PORT = 10000 + DEVICE_GRPC_SERVICE_PORT # avoid privileged ports
MONITORING_GRPC_SERVICE_PORT = GRPC_PORT_MONITORING # avoid privileged ports


SCENARIOS = [ # comment/uncomment scenarios to activate/deactivate them in the test unit
    ('all_inmemory', DatabaseBackendEnum.INMEMORY, {},           MessageBrokerBackendEnum.INMEMORY, {}          ),
]

Javi Moreno's avatar
Javi Moreno committed
INFLUXDB_HOSTNAME = os.environ.get("INFLUXDB_HOSTNAME")
INFLUXDB_PORT = os.environ.get("INFLUXDB_PORT")
Javi Moreno's avatar
Javi Moreno committed
INFLUXDB_USER = os.environ.get("INFLUXDB_USER")
INFLUXDB_PASSWORD = os.environ.get("INFLUXDB_PASSWORD")
INFLUXDB_DATABASE = os.environ.get("INFLUXDB_DATABASE")

@pytest.fixture(scope='session', ids=[str(scenario[0]) for scenario in SCENARIOS], params=SCENARIOS)
def context_db_mb(request) -> Tuple[Database, MessageBroker]:
    name,db_backend,db_settings,mb_backend,mb_settings = request.param
    msg = 'Running scenario {:s} db_backend={:s}, db_settings={:s}, mb_backend={:s}, mb_settings={:s}...'
    LOGGER.info(msg.format(str(name), str(db_backend.value), str(db_settings), str(mb_backend.value), str(mb_settings)))
    _database = Database(get_database_backend(backend=db_backend, **db_settings))
    _message_broker = MessageBroker(get_messagebroker_backend(backend=mb_backend, **mb_settings))
    yield _database, _message_broker
    _message_broker.terminate()

@pytest.fixture(scope='session')
def context_service_grpc(context_db_mb : Tuple[Database, MessageBroker]): # pylint: disable=redefined-outer-name
    database = context_db_mb[0]
    database.clear_all()
    _service = ContextService(
        database, context_db_mb[1], port=GRPC_PORT_CONTEXT, max_workers=grpc_workers_context,
        grace_period=grpc_grace_context)
    _service.start()
    yield _service
    _service.stop()

@pytest.fixture(scope='session')
def context_client_grpc(context_service_grpc : ContextService): # pylint: disable=redefined-outer-name
    _client = ContextClient(address='localhost', port=GRPC_PORT_CONTEXT)
    yield _client
    _client.close()


# This fixture will be requested by test cases and last during testing session
@pytest.fixture(scope='session')
def monitoring_service():
    LOGGER.warning('monitoring_service begin')

    service_port    = GRPC_SERVICE_PORT
    max_workers     = GRPC_MAX_WORKERS
    grace_period    = GRPC_GRACE_PERIOD
    LOGGER.info('Initializing MonitoringService...')
    _service = MonitoringService(port=service_port, max_workers=max_workers, grace_period=grace_period)
    _service.start()

    # yield the server, when test finishes, execution will resume to stop it
    LOGGER.warning('monitoring_service yielding')

    LOGGER.info('Terminating MonitoringService...')

# This fixture will be requested by test cases and last during testing session.
# The client requires the server, so client fixture has the server as dependency.
@pytest.fixture(scope='session')
def monitoring_client(monitoring_service):
    LOGGER.warning('monitoring_client begin')
    client = MonitoringClient(server=SERVER_ADDRESS, port=GRPC_PORT_MONITORING)  # instantiate the client
    LOGGER.warning('monitoring_client returning')
    return client

Javi Moreno's avatar
Javi Moreno committed
# This fixture will be requested by test cases and last during testing session.
@pytest.fixture(scope='session')
def kpi():
    LOGGER.warning('test_include_kpi begin')
    # form request
    kpi                     = monitoring_pb2.Kpi()
    kpi.kpi_id.kpi_id.uuid  = 'KPIID0000'
    kpi.kpiDescription      = 'KPI Desc'
Javi Moreno's avatar
Javi Moreno committed
    return kpi

@pytest.fixture(scope='session')
def kpi_id():
    LOGGER.warning('test_include_kpi begin')

    # form request
    kpi_id              = monitoring_pb2.KpiId()
    kpi_id.kpi_id.uuid  = str(1)
Javi Moreno's avatar
Javi Moreno committed
@pytest.fixture(scope='session')
def sql_db():
    sql_db = SqliteTools.SQLite('monitoring.db')
    return sql_db

@pytest.fixture(scope='session')
def influx_db():
    influx_db = InfluxTools.Influx(INFLUXDB_HOSTNAME, INFLUXDB_PORT, INFLUXDB_USER, INFLUXDB_PASSWORD, INFLUXDB_DATABASE)
Javi Moreno's avatar
Javi Moreno committed
    return influx_db

def create_kpi_request():
    create_kpi_request                                  = monitoring_pb2.KpiDescriptor()
    create_kpi_request.kpi_description                  = 'KPI Description Test'
    create_kpi_request.kpi_sample_type                  = KpiSampleType.KPISAMPLETYPE_PACKETS_TRANSMITTED
    create_kpi_request.device_id.device_uuid.uuid       = 'DEV1'  # pylint: disable=maybe-no-member
    create_kpi_request.service_id.service_uuid.uuid     = "SERV1"
    create_kpi_request.endpoint_id.endpoint_uuid.uuid   = "END1"

    return create_kpi_request

@pytest.fixture(scope='session')
def monitor_kpi_request():
    LOGGER.warning('test_monitor_kpi begin')

    monitor_kpi_request                     = monitoring_pb2.MonitorKpiRequest()
    monitor_kpi_request.kpi_id.kpi_id.uuid  = str(1)
    monitor_kpi_request.sampling_duration_s = 120
    monitor_kpi_request.sampling_interval_s = 5
Javi Moreno's avatar
Javi Moreno committed
@pytest.fixture(scope='session')
def include_kpi_request():
    LOGGER.warning('test_include_kpi begin')

    include_kpi_request                     = monitoring_pb2.Kpi()
    include_kpi_request.kpi_id.kpi_id.uuid  = str(1)
    include_kpi_request.timestamp           = "2021-10-12T13:14:42Z"
    include_kpi_request.kpi_value.intVal    = 500
Javi Moreno's avatar
Javi Moreno committed

    return include_kpi_request

@pytest.fixture(scope='session')
def address():
    address = '127.0.0.1'
    return address

@pytest.fixture(scope='session')
def port():
    port = 7070
    return port

Javi Moreno's avatar
Javi Moreno committed
###########################
# Tests Implementation
###########################
# Test case that makes use of client fixture to test server's CreateKpi method
def test_create_kpi(monitoring_client,create_kpi_request):
    LOGGER.warning('test_create_kpi requesting')
    response = monitoring_client.CreateKpi(create_kpi_request)
    assert isinstance(response, monitoring_pb2.KpiId)
# Test case that makes use of client fixture to test server's MonitorKpi method
def test_monitor_kpi(monitoring_client,create_kpi_request):
    LOGGER.warning('test_monitor_kpi begin')

    response = monitoring_client.CreateKpi(create_kpi_request)

    monitor_kpi_request                     = monitoring_pb2.MonitorKpiRequest()
    monitor_kpi_request.kpi_id.kpi_id.uuid  = response.kpi_id.uuid
    monitor_kpi_request.sampling_duration_s = 120
    monitor_kpi_request.sampling_interval_s = 5

    response = monitoring_client.MonitorKpi(monitor_kpi_request)
Javi Moreno's avatar
Javi Moreno committed
    LOGGER.debug(str(response))
    assert isinstance(response, context_pb2.Empty)


# Test case that makes use of client fixture to test server's IncludeKpi method
Javi Moreno's avatar
Javi Moreno committed
def test_include_kpi(monitoring_client,include_kpi_request):
    # make call to server
    LOGGER.warning('test_include_kpi requesting')
Javi Moreno's avatar
Javi Moreno committed
    response = monitoring_client.IncludeKpi(include_kpi_request)
    LOGGER.debug(str(response))
    assert isinstance(response, context_pb2.Empty)

# Test case that makes use of client fixture to test server's GetStreamKpi method
def test_get_stream_kpi(monitoring_client,include_kpi_request):
    LOGGER.warning('test_getstream_kpi begin')
    response = monitoring_client.GetStreamKpi(kpi)
Javi Moreno's avatar
Javi Moreno committed
    LOGGER.debug(str(response))
Javi Moreno's avatar
 
Javi Moreno committed
    #assert isinstance(response, monitoring_pb2.Kpi)

# Test case that makes use of client fixture to test server's GetInstantKpi method
def test_get_instant_kpi(monitoring_client,kpi_id):
    LOGGER.warning('test_getinstant_kpi begin')
    response = monitoring_client.GetInstantKpi(kpi_id)
Javi Moreno's avatar
Javi Moreno committed
    LOGGER.debug(str(response))
    assert isinstance(response, monitoring_pb2.Kpi)
Javi Moreno's avatar
 
Javi Moreno committed

# Test case that makes use of client fixture to test server's GetInstantKpi method
def test_get_kpidescritor_kpi(monitoring_client,create_kpi_request):
Javi Moreno's avatar
 
Javi Moreno committed
    LOGGER.warning('test_getkpidescritor_kpi begin')

    response = monitoring_client.CreateKpi(create_kpi_request)

    response = monitoring_client.GetKpiDescriptor(response)
Javi Moreno's avatar
 
Javi Moreno committed
    LOGGER.debug(str(response))
    assert isinstance(response, monitoring_pb2.KpiDescriptor)

Javi Moreno's avatar
Javi Moreno committed
def test_sqlitedb_tools_insert_kpi(sql_db, create_kpi_request):
    LOGGER.warning('test_sqlitedb_tools_insert_kpi begin')

    kpi_description = create_kpi_request.kpi_description
    kpi_sample_type = create_kpi_request.kpi_sample_type
    kpi_device_id = create_kpi_request.device_id.device_uuid.uuid
    kpi_endpoint_id = create_kpi_request.endpoint_id.endpoint_uuid.uuid
    kpi_service_id = create_kpi_request.service_id.service_uuid.uuid

    response = sql_db.insert_KPI(kpi_description, kpi_sample_type, kpi_device_id, kpi_endpoint_id, kpi_service_id)
    assert isinstance(response, int)

def test_sqlitedb_tools_get_kpi(sql_db, create_kpi_request):
    LOGGER.warning('test_sqlitedb_tools_get_kpi begin')

    kpi_description = create_kpi_request.kpi_description
    kpi_sample_type = create_kpi_request.kpi_sample_type
    kpi_device_id = create_kpi_request.device_id.device_uuid.uuid
    kpi_endpoint_id = create_kpi_request.endpoint_id.endpoint_uuid.uuid
    kpi_service_id = create_kpi_request.service_id.service_uuid.uuid

    kpi_id = sql_db.insert_KPI(kpi_description, kpi_sample_type, kpi_device_id, kpi_endpoint_id, kpi_service_id)

    response = sql_db.get_KPI(kpi_id)
    assert isinstance(response, tuple)

def test_sqlitedb_tools_get_kpis(sql_db):
    LOGGER.warning('test_sqlitedb_tools_get_kpis begin')
    response = sql_db.get_KPIS()
    assert isinstance(response, list)

def test_sqlitedb_tools_delete_kpi(sql_db, create_kpi_request):
    LOGGER.warning('test_sqlitedb_tools_get_kpi begin')

    response = sql_db.delete_KPI("DEV1",KpiSampleType.KPISAMPLETYPE_PACKETS_TRANSMITTED)
Javi Moreno's avatar
Javi Moreno committed

    if response == False:
        kpi_description = create_kpi_request.kpi_description
        kpi_sample_type = create_kpi_request.kpi_sample_type
        kpi_device_id = create_kpi_request.device_id.device_uuid.uuid
        kpi_endpoint_id = create_kpi_request.endpoint_id.endpoint_uuid.uuid
        kpi_service_id = create_kpi_request.service_id.service_uuid.uuid

        sql_db.insert_KPI(kpi_description, kpi_sample_type, kpi_device_id, kpi_endpoint_id, kpi_service_id)
        response = sql_db.delete_KPI("DEV1", KpiSampleType.KPISAMPLETYPE_PACKETS_TRANSMITTED)
Javi Moreno's avatar
Javi Moreno committed

    assert response == True

def test_sqlitedb_tools_delete_kpid_id(sql_db, create_kpi_request):
    LOGGER.warning('test_sqlitedb_tools_delete_kpid_id begin')

    response = sql_db.delete_kpid_id(1)

    if response == False:
        kpi_description = create_kpi_request.kpi_description
        kpi_sample_type = create_kpi_request.kpi_sample_type
        kpi_device_id = create_kpi_request.device_id.device_uuid.uuid
        kpi_endpoint_id = create_kpi_request.endpoint_id.endpoint_uuid.uuid
        kpi_service_id = create_kpi_request.service_id.service_uuid.uuid

        kpi_id = sql_db.insert_KPI(kpi_description, kpi_sample_type, kpi_device_id, kpi_endpoint_id, kpi_service_id)
        response = sql_db.delete_kpid_id(kpi_id)

    assert response == True


def test_influxdb_tools_write_kpi(influx_db):
    LOGGER.warning('test_influxdb_tools_write_kpi begin')

def test_influxdb_tools_read_kpi_points(influx_db):
    LOGGER.warning('test_influxdb_tools_read_kpi_points begin')


def test_events_tools(context_client_grpc: ContextClient,  # pylint: disable=redefined-outer-name
    monitoring_client : MonitoringClient,
    context_db_mb: Tuple[Database, MessageBroker]):
    LOGGER.warning('test_get_device_events begin')

    context_database = context_db_mb[0]

    # ----- Clean the database -----------------------------------------------------------------------------------------
    context_database.clear_all()

    # ----- Initialize the EventsCollector -----------------------------------------------------------------------------
    events_collector = EventsDeviceCollector(context_client_grpc, monitoring_client)
    events_collector.start()

    # # ----- Dump state of database before create the object ------------------------------------------------------------
    db_entries = context_database.dump()
    LOGGER.info('----- Database Dump [{:3d} entries] -------------------------'.format(len(db_entries)))
    for db_entry in db_entries:
        LOGGER.info('  [{:>4s}] {:40s} :: {:s}'.format(*db_entry)) # pragma: no cover
    LOGGER.info('-----------------------------------------------------------')
    assert len(db_entries) == 0

    populate('localhost', GRPC_PORT_CONTEXT) # place this call in the appropriate line, according to your tests

    # ----- Update the object ------------------------------------------------------------------------------------------
    response = context_client_grpc.SetDevice(Device(**DEVICE_R1))
    assert response.device_uuid.uuid == DEVICE_R1_UUID
Javi Moreno's avatar
Javi Moreno committed

    events_collector.stop()


def test_get_device_events(context_client_grpc: ContextClient,  # pylint: disable=redefined-outer-name
    monitoring_client : MonitoringClient,
    context_db_mb: Tuple[Database, MessageBroker]):

Javi Moreno's avatar
Javi Moreno committed
    LOGGER.warning('test_get_device_events begin')

    context_database = context_db_mb[0]

    # ----- Clean the database -----------------------------------------------------------------------------------------
    context_database.clear_all()

    # ----- Initialize the EventsCollector -----------------------------------------------------------------------------
    events_collector = EventsDeviceCollector(context_client_grpc,monitoring_client)
    events_collector.start()

    # # ----- Dump state of database before create the object ------------------------------------------------------------
    db_entries = context_database.dump()
    LOGGER.info('----- Database Dump [{:3d} entries] -------------------------'.format(len(db_entries)))
    for db_entry in db_entries:
        LOGGER.info('  [{:>4s}] {:40s} :: {:s}'.format(*db_entry)) # pragma: no cover
    LOGGER.info('-----------------------------------------------------------')
    assert len(db_entries) == 0

    populate('localhost', GRPC_PORT_CONTEXT) # place this call in the appropriate line, according to your tests

    # ----- Check create event -----------------------------------------------------------------------------------------
    event = events_collector.get_event(block=True)

    assert isinstance(event, DeviceEvent)
    assert event.event.event_type == EventTypeEnum.EVENTTYPE_CREATE
    assert event.device_id.device_uuid.uuid == DEVICE_R1_UUID

    events_collector.stop()

def test_listen_events(monitoring_client: MonitoringClient,
    context_client_grpc: ContextClient,  # pylint: disable=redefined-outer-name
    context_db_mb: Tuple[Database, MessageBroker]):

    LOGGER.warning('test_listen_events begin')

    context_database = context_db_mb[0]

    # ----- Clean the database -----------------------------------------------------------------------------------------
    context_database.clear_all()

    # ----- Initialize the EventsCollector -----------------------------------------------------------------------------
    events_collector = EventsDeviceCollector(context_client_grpc,monitoring_client)
    events_collector.start()

    # # ----- Dump state of database before create the object ------------------------------------------------------------
    db_entries = context_database.dump()
    LOGGER.info('----- Database Dump [{:3d} entries] -------------------------'.format(len(db_entries)))
    for db_entry in db_entries:
        LOGGER.info('  [{:>4s}] {:40s} :: {:s}'.format(*db_entry)) # pragma: no cover
    LOGGER.info('-----------------------------------------------------------')
    assert len(db_entries) == 0

    populate('localhost', GRPC_PORT_CONTEXT) # place this call in the appropriate line, according to your tests

    kpi_id_list = events_collector.listen_events()

    assert bool(kpi_id_list) == True

def test_socket_ports(address, port):
    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    result = s.connect_ex((address,port))

    if result == 0:
        print('socket is open')
    else:
        print('socket is not open')
    s.close()