Commit eb80a6da authored by Andrea Sgambelluri's avatar Andrea Sgambelluri
Browse files

last bugs

parent f4fe80d2
Loading
Loading
Loading
Loading
+3 −1
Original line number Diff line number Diff line
@@ -393,6 +393,8 @@ public class CommonPolicyServiceImpl {
        final var policyRuleTypeService = new PolicyRuleTypeService(policyRuleService);
        final var policyRule = new PolicyRule(policyRuleTypeService);
        contextService.setPolicyRule(policyRule).subscribe().with(x -> {});

        LOGGER.infof("Policy Rule state is now [%s]", policyRuleState.toString());
    }

    public void setPolicyRuleDeviceToContext(
@@ -404,6 +406,6 @@ public class CommonPolicyServiceImpl {

        final var policyRuleTypeService = new PolicyRuleTypeDevice(policyRuleDevice);
        final var policyRule = new PolicyRule(policyRuleTypeService);
        contextService.setPolicyRule(policyRule).subscribe().with(x -> {});
        final var policyRuleId = contextService.setPolicyRule(policyRule).subscribe().with(x -> {});
    }
}
+0 −117
Original line number Diff line number Diff line


import logging

from common.proto.kpi_manager_pb2 import KpiId
import time

from common.proto import kpi_manager_pb2
from common.proto.kpi_sample_types_pb2 import KpiSampleType

from tests.ofc26_flexscale.test_ofc26_messages import create_kpi_descriptor_request
from telemetry.backend.service.collectors.gnmi_oc.GnmiOpenConfigCollector import GNMIOpenConfigCollector
from src.tests.ofc26_flexscale.test_ofc26_messages import create_basic_sub_request_parameters

WITH_TFS = True     #True/False
if WITH_TFS:
    from .Fixtures import kpi_manager_client
else:
    from .mock_tfs_services import kpi_manager_client

LOGGER = logging.getLogger(__name__)


def create_kpi_descriptor_request(descriptor_name: str = "Test_name"):
    _create_kpi_request                                    = kpi_manager_pb2.KpiDescriptor()
    _create_kpi_request.kpi_id.kpi_id.uuid                 = "6e22f180-ba28-4641-b190-2287bf447777"
    _create_kpi_request.kpi_description                    = descriptor_name
    _create_kpi_request.kpi_sample_type                    = KpiSampleType.KPISAMPLETYPE_OPTICAL_TOTAL_INPUT_POWER
    _create_kpi_request.device_id.device_uuid.uuid         = "ddb3ef8e-ee65-5cf9-9d21-dac56a27f85b"         # confirm for TFS
    _create_kpi_request.service_id.service_uuid.uuid       = "b2a60c5b-8c46-5707-a64a-9c6539d395f2"
    # _create_kpi_request.slice_id.slice_uuid.uuid           = 'SLC1'
    # _create_kpi_request.endpoint_id.endpoint_uuid.uuid     = str(uuid.uuid4())
    # _create_kpi_request.connection_id.connection_uuid.uuid = 'CON1' 
    # _create_kpi_request.link_id.link_uuid.uuid             = 'LNK1' 
    return _create_kpi_request


def test_Complete_MGON_Integration(kpi_manager_client):
    
    # 1. KPI Descriptor Creation
    LOGGER.info(" >>> test_Complete_MGON_Integration: START <<< ")
    kpi_descriptor_obj = create_kpi_descriptor_request()
    _search_kpi_id = kpi_manager_pb2.KpiId()
    _search_kpi_id = kpi_descriptor_obj.kpi_id
    # response = kpi_manager_client.GetKpiDescriptor(_search_kpi_id)
    
    # if isinstance(response, kpi_manager_pb2.KpiDescriptor):
    #     LOGGER.info("KPI Descriptor already exists with ID: %s. Skipping creation.", _search_kpi_id.kpi_id.uuid)
    # else:
    # LOGGER.info("No existing KPI Descriptor found with ID: %s. Proceeding to create it.", _search_kpi_id.kpi_id.uuid)
    response = kpi_manager_client.SetKpiDescriptor(kpi_descriptor_obj)
    LOGGER.info("Response gRPC message object: {:}".format(response))
    assert isinstance(response, KpiId)
        

    # 2. Telemetry Collector Creation
    sub_parameters = create_basic_sub_request_parameters()
    LOGGER.info("Subscription parameters: %s", sub_parameters)

    collector = GNMIOpenConfigCollector(
        address     = sub_parameters.get('host',        ''),
        port        = sub_parameters.get('port',        -1),
        username    = sub_parameters.get('username',    None),
        password    = sub_parameters.get('password',    None),
        insecure    = sub_parameters.get('insecure',    None),
        skip_verify = sub_parameters.get('skip_verify', True),
    )
    if not collector.Connect():
        LOGGER.error("Failed to connect to the collector")
        return

    LOGGER.info("----- Testing State Subscription -----")
    
    sub_data = [(
                "x123",
                {
                    "kpi"      : sub_parameters['kpi'],
                    "endpoint" : sub_parameters['endpoint'],
                    "resource" : sub_parameters['resource'],
                },
                sub_parameters['duration'],
                sub_parameters['sample_interval'],
            ),]
    
    response = collector.SubscribeState(sub_data)
    if response is None:
        LOGGER.error("Subscription failed.")
        return
    
    LOGGER.info("Subscription started: Status: %s, Data: %s", response, sub_data)
    
    test_get_state_updates(collector, sub_data)
    
    LOGGER.info("Sleeping...")
    time.sleep(600)
    LOGGER.info("Done sleeping.")
    LOGGER.info(" >>> test_Complete_MGON_Integration: END <<< ")


def test_get_state_updates(collector, subscription_data):
    """Test getting state updates."""
    LOGGER.info("----- Testing State Updates -----")
    collector.SubscribeState(subscription_data)
    
    LOGGER.info("Requesting state updates for 300 seconds ...")
    updates_received = []
    for samples in collector.GetState(duration=300, blocking=True):
        LOGGER.info("Received state update: %s", samples)
        updates_received.append(samples)
    
    assert len(updates_received) > 0


if __name__ == "__main__":
    test_Complete_MGON_Integration(kpi_manager_client)