Skip to content
Snippets Groups Projects
Select Git revision
  • 362b28a9d147a4cb960efea553dfb8ba9d06b633
  • master default protected
  • release/6.0.0 protected
  • develop protected
  • feat/345-cttc-fix-ci-cd-and-unit-tests-for-dscm-pluggables
  • feat/349-new-monitoring-updates-for-optical-controller-integration
  • cnit_ofc26
  • ofc_polimi
  • CTTC-IMPLEMENT-NBI-CONNECTOR-NOS-ZTP
  • CTTC-TEST-SMARTNICS-6GMICROSDN-ZTP
  • feat/327-tid-new-service-to-ipowdm-controller-to-manage-transceivers-configuration-on-external-agent
  • cnit_tapi
  • feat/330-tid-pcep-component
  • feat/tid-newer-pcep-component
  • feat/116-ubi-updates-in-telemetry-backend-to-support-p4-in-band-network-telemetry
  • feat/292-cttc-implement-integration-test-for-ryu-openflow
  • cnit-p2mp-premerge
  • feat/325-tid-nbi-e2e-to-manage-e2e-path-computation
  • feat/326-tid-external-management-of-devices-telemetry-nbi
  • feat/324-tid-nbi-ietf_l3vpn-deploy-fail
  • feat/321-add-support-for-gnmi-configuration-via-proto
  • v6.0.0 protected
  • v5.0.0 protected
  • v4.0.0 protected
  • demo-dpiab-eucnc2024
  • v3.0.0 protected
  • v2.1.0 protected
  • v2.0.0 protected
  • v1.0.0 protected
29 results

test_policy.py

Blame
  • gifrerenom's avatar
    Lluis Gifre Renom authored
    - corrected unitary test order and one-by-one execution
    - extractedunitary test constant to separate file
    - added updated_at refresh for Service and Slice entities
    - corrected return types for Connection entity
    - prepared PolicyRule entity to raise events and corrected return types of methods
    362b28a9
    History
    Code owners
    Assign users and groups as approvers for specific file changes. Learn more.
    test_policy.py 4.77 KiB
    # Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/)
    #
    # Licensed under the Apache License, Version 2.0 (the "License");
    # you may not use this file except in compliance with the License.
    # You may obtain a copy of the License at
    #
    #      http://www.apache.org/licenses/LICENSE-2.0
    #
    # Unless required by applicable law or agreed to in writing, software
    # distributed under the License is distributed on an "AS IS" BASIS,
    # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    # See the License for the specific language governing permissions and
    # limitations under the License.
    
    import copy, grpc, pytest
    from common.proto.context_pb2 import Empty
    from common.proto.policy_pb2 import PolicyRuleId, PolicyRule
    from context.client.ContextClient import ContextClient
    from context.service.database.uuids.PolicuRule import policyrule_get_uuid
    from .Objects import POLICYRULE, POLICYRULE_ID, POLICYRULE_NAME
    
    @pytest.mark.depends(on=['context/tests/test_connection.py::test_connection'])
    def test_policy(context_client : ContextClient):
    
        # ----- Get when the object does not exist -------------------------------------------------------------------------
        policyrule_id = PolicyRuleId(**POLICYRULE_ID)
        policyrule_uuid = policyrule_get_uuid(policyrule_id, allow_random=False)
    
        with pytest.raises(grpc.RpcError) as e:
            context_client.GetPolicyRule(policyrule_id)
        assert e.value.code() == grpc.StatusCode.NOT_FOUND
        MSG = 'PolicyRule({:s}) not found; policyrule_uuid generated was: {:s}'
        assert e.value.details() == MSG.format(POLICYRULE_NAME, policyrule_uuid)
    
        # ----- List when the object does not exist ------------------------------------------------------------------------
        response = context_client.ListPolicyRuleIds(Empty())
        assert len(response.policyRuleIdList) == 0
    
        response = context_client.ListPolicyRules(Empty())
        assert len(response.policyRules) == 0
    
        # ----- Create the object ------------------------------------------------------------------------------------------
        response = context_client.SetPolicyRule(PolicyRule(**POLICYRULE))
        assert response.uuid.uuid == policyrule_uuid
    
        # ----- Get when the object exists ---------------------------------------------------------------------------------
        response = context_client.GetPolicyRule(PolicyRuleId(**POLICYRULE_ID))
        assert response.device.policyRuleBasic.policyRuleId.uuid.uuid == policyrule_uuid
        assert response.device.policyRuleBasic.priority == 1
    
        # ----- List when the object exists --------------------------------------------------------------------------------
        response = context_client.ListPolicyRuleIds(Empty())
        assert len(response.policyRuleIdList) == 1
        assert response.policyRuleIdList[0].uuid.uuid == policyrule_uuid
    
        response = context_client.ListPolicyRules(Empty())
        assert len(response.policyRules) == 1
        assert response.policyRules[0].device.policyRuleBasic.policyRuleId.uuid.uuid == policyrule_uuid
        assert response.policyRules[0].device.policyRuleBasic.priority == 1
    
        # ----- Update the object ------------------------------------------------------------------------------------------
        new_policy_priority = 100
        POLICYRULE_UPDATED = copy.deepcopy(POLICYRULE)
        POLICYRULE_UPDATED['device']['policyRuleBasic']['priority'] = new_policy_priority
        response = context_client.SetPolicyRule(PolicyRule(**POLICYRULE_UPDATED))
        assert response.uuid.uuid == policyrule_uuid
    
        # ----- Get when the object is modified ----------------------------------------------------------------------------
        response = context_client.GetPolicyRule(PolicyRuleId(**POLICYRULE_ID))
        assert response.device.policyRuleBasic.policyRuleId.uuid.uuid == policyrule_uuid
    
        # ----- List when the object is modified ---------------------------------------------------------------------------
        response = context_client.ListPolicyRuleIds(Empty())
        assert len(response.policyRuleIdList) == 1
        assert response.policyRuleIdList[0].uuid.uuid == policyrule_uuid
    
        response = context_client.ListPolicyRules(Empty())
        assert len(response.policyRules) == 1
        assert response.policyRules[0].device.policyRuleBasic.policyRuleId.uuid.uuid == policyrule_uuid
        assert response.policyRules[0].device.policyRuleBasic.priority == new_policy_priority
    
        # ----- Remove the object ------------------------------------------------------------------------------------------
        context_client.RemovePolicyRule(PolicyRuleId(**POLICYRULE_ID))
    
        # ----- List after deleting the object -----------------------------------------------------------------------------
        response = context_client.ListPolicyRuleIds(Empty())
        assert len(response.policyRuleIdList) == 0
    
        response = context_client.ListPolicyRules(Empty())
        assert len(response.policyRules) == 0