Commit 7dd42f5d authored by Lluis Gifre Renom's avatar Lluis Gifre Renom
Browse files

Merge branch 'feat/context-policy' into 'develop'

Add support in Context to persist Policies

See merge request !28
parents 913175b0 124fee1d
Loading
Loading
Loading
Loading
+48 −0
Original line number Diff line number Diff line
# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import logging
from typing import Dict, List, Optional
from common.proto.policy_condition_pb2 import BooleanOperator

LOGGER = logging.getLogger(__name__)

def json_policy_rule_id(policy_rule_uuid : str) -> Dict:
    return {'uuid': {'uuid': policy_rule_uuid}}

def json_policy_rule(
    policy_rule_uuid : str, policy_priority : int = 1,
    boolean_operator : BooleanOperator = BooleanOperator.POLICYRULE_CONDITION_BOOLEAN_AND,
    condition_list : List[Dict] = [], action_list : List[Dict] = [],
    service_id : Optional[Dict] = None, device_id_list : List[Dict] = []
) -> Dict:
    basic = {
        'policyRuleId': json_policy_rule_id(policy_rule_uuid),
        'priority': policy_priority,
        'conditionList': condition_list,
        'booleanOperator': boolean_operator,
        'actionList': action_list,
    }

    result = {}
    if service_id is not None:
        policy_rule_type = 'service'
        result[policy_rule_type] = {'policyRuleBasic': basic}
        result[policy_rule_type]['serviceId'] = service_id
    else:
        policy_rule_type = 'device'
        result[policy_rule_type] = {'policyRuleBasic': basic}

    result[policy_rule_type]['deviceList'] = device_id_list
    return result
+40 −0
Original line number Diff line number Diff line
@@ -28,6 +28,8 @@ from common.proto.context_pb2 import (
    Slice, SliceEvent, SliceId, SliceIdList, SliceList,
    Topology, TopologyEvent, TopologyId, TopologyIdList, TopologyList)
from common.proto.context_pb2_grpc import ContextServiceStub
from common.proto.context_policy_pb2_grpc import ContextPolicyServiceStub
from common.proto.policy_pb2 import PolicyRuleIdList, PolicyRuleId, PolicyRuleList, PolicyRule

LOGGER = logging.getLogger(__name__)
MAX_RETRIES = 15
@@ -42,17 +44,20 @@ class ContextClient:
        LOGGER.debug('Creating channel to {:s}...'.format(str(self.endpoint)))
        self.channel = None
        self.stub = None
        self.policy_stub = None
        self.connect()
        LOGGER.debug('Channel created')

    def connect(self):
        self.channel = grpc.insecure_channel(self.endpoint)
        self.stub = ContextServiceStub(self.channel)
        self.policy_stub = ContextPolicyServiceStub(self.channel)

    def close(self):
        if self.channel is not None: self.channel.close()
        self.channel = None
        self.stub = None
        self.policy_stub = None

    @RETRY_DECORATOR
    def ListContextIds(self, request: Empty) -> ContextIdList:
@@ -361,3 +366,38 @@ class ContextClient:
        response = self.stub.GetConnectionEvents(request)
        LOGGER.debug('GetConnectionEvents result: {:s}'.format(grpc_message_to_json_string(response)))
        return response

    @RETRY_DECORATOR
    def ListPolicyRuleIds(self, request: Empty) -> PolicyRuleIdList:
        LOGGER.debug('ListPolicyRuleIds request: {:s}'.format(grpc_message_to_json_string(request)))
        response = self.policy_stub.ListPolicyRuleIds(request)
        LOGGER.debug('ListPolicyRuleIds result: {:s}'.format(grpc_message_to_json_string(response)))
        return response

    @RETRY_DECORATOR
    def ListPolicyRules(self, request: Empty) -> PolicyRuleList:
        LOGGER.debug('ListPolicyRules request: {:s}'.format(grpc_message_to_json_string(request)))
        response = self.policy_stub.ListPolicyRules(request)
        LOGGER.debug('ListPolicyRules result: {:s}'.format(grpc_message_to_json_string(response)))
        return response

    @RETRY_DECORATOR
    def GetPolicyRule(self, request: PolicyRuleId) -> PolicyRule:
        LOGGER.info('GetPolicyRule request: {:s}'.format(grpc_message_to_json_string(request)))
        response = self.policy_stub.GetPolicyRule(request)
        LOGGER.info('GetPolicyRule result: {:s}'.format(grpc_message_to_json_string(response)))
        return response

    @RETRY_DECORATOR
    def SetPolicyRule(self, request: PolicyRule) -> PolicyRuleId:
        LOGGER.debug('SetPolicyRule request: {:s}'.format(grpc_message_to_json_string(request)))
        response = self.policy_stub.SetPolicyRule(request)
        LOGGER.debug('SetPolicyRule result: {:s}'.format(grpc_message_to_json_string(response)))
        return response

    @RETRY_DECORATOR
    def RemovePolicyRule(self, request: PolicyRuleId) -> Empty:
        LOGGER.debug('RemovePolicyRule request: {:s}'.format(grpc_message_to_json_string(request)))
        response = self.policy_stub.RemovePolicyRule(request)
        LOGGER.debug('RemovePolicyRule result: {:s}'.format(grpc_message_to_json_string(response)))
        return response
+32 −0
Original line number Diff line number Diff line
# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import logging
import json
from typing import Dict
from common.orm.fields.PrimaryKeyField import PrimaryKeyField
from common.orm.fields.StringField import StringField
from common.orm.model.Model import Model

LOGGER = logging.getLogger(__name__)

class PolicyRuleModel(Model):
    pk = PrimaryKeyField()
    value = StringField(required=True, allow_empty=False)

    def dump_id(self) -> Dict:
        return {'uuid': {'uuid': self.pk}}

    def dump(self) -> Dict:
        return json.loads(self.value)
+2 −0
Original line number Diff line number Diff line
@@ -17,6 +17,7 @@ from common.Settings import get_service_port_grpc
from common.message_broker.MessageBroker import MessageBroker
from common.orm.Database import Database
from common.proto.context_pb2_grpc import add_ContextServiceServicer_to_server
from common.proto.context_policy_pb2_grpc import add_ContextPolicyServiceServicer_to_server
from common.tools.service.GenericGrpcService import GenericGrpcService
from .ContextServiceServicerImpl import ContextServiceServicerImpl

@@ -31,3 +32,4 @@ class ContextService(GenericGrpcService):

    def install_servicers(self):
        add_ContextServiceServicer_to_server(self.context_servicer, self.server)
        add_ContextPolicyServiceServicer_to_server(self.context_servicer, self.server)
+59 −1
Original line number Diff line number Diff line
@@ -28,13 +28,17 @@ from common.proto.context_pb2 import (
    Service, ServiceEvent, ServiceId, ServiceIdList, ServiceList,
    Slice, SliceEvent, SliceId, SliceIdList, SliceList,
    Topology, TopologyEvent, TopologyId, TopologyIdList, TopologyList)
from common.proto.policy_pb2 import (PolicyRuleIdList, PolicyRuleId, PolicyRuleList, PolicyRule)
from common.proto.context_pb2_grpc import ContextServiceServicer
from common.proto.context_policy_pb2_grpc import ContextPolicyServiceServicer
from common.rpc_method_wrapper.Decorator import create_metrics, safe_and_metered_rpc_method
from common.rpc_method_wrapper.ServiceExceptions import InvalidArgumentException
from common.tools.grpc.Tools import grpc_message_to_json
from context.service.database.ConfigModel import update_config
from context.service.database.ConnectionModel import ConnectionModel, set_path
from context.service.database.ConstraintModel import set_constraints
from context.service.database.ContextModel import ContextModel
from context.service.database.PolicyRuleModel import PolicyRuleModel
from context.service.database.DeviceModel import DeviceModel, grpc_to_enum__device_operational_status, set_drivers
from context.service.database.EndPointModel import EndPointModel, set_kpi_sample_types
from context.service.database.Events import notify_event
@@ -61,11 +65,12 @@ METHOD_NAMES = [
    'ListLinkIds',       'ListLinks',       'GetLink',       'SetLink',       'RemoveLink',       'GetLinkEvents',
    'ListServiceIds',    'ListServices',    'GetService',    'SetService',    'RemoveService',    'GetServiceEvents',
    'ListSliceIds',      'ListSlices',      'GetSlice',      'SetSlice',      'RemoveSlice',      'GetSliceEvents',
    'ListPolicyRuleIds', 'ListPolicyRules', 'GetPolicyRule', 'SetPolicyRule', 'RemovePolicyRule',
    'UnsetService',      'UnsetSlice',
]
METRICS = create_metrics(SERVICE_NAME, METHOD_NAMES)

class ContextServiceServicerImpl(ContextServiceServicer):
class ContextServiceServicerImpl(ContextServiceServicer, ContextPolicyServiceServicer):
    def __init__(self, database : Database, messagebroker : MessageBroker):
        LOGGER.debug('Creating Servicer...')
        self.lock = threading.Lock()
@@ -813,3 +818,56 @@ class ContextServiceServicerImpl(ContextServiceServicer):
    def GetConnectionEvents(self, request: Empty, context : grpc.ServicerContext) -> Iterator[ConnectionEvent]:
        for message in self.messagebroker.consume({TOPIC_CONNECTION}, consume_timeout=CONSUME_TIMEOUT):
            yield ConnectionEvent(**json.loads(message.content))


    # ----- Policy -----------------------------------------------------------------------------------------------------

    @safe_and_metered_rpc_method(METRICS, LOGGER)
    def ListPolicyRuleIds(self, request: Empty, context: grpc.ServicerContext) -> PolicyRuleIdList:
        with self.lock:
            db_policy_rules: List[PolicyRuleModel] = get_all_objects(self.database, PolicyRuleModel)
            db_policy_rules = sorted(db_policy_rules, key=operator.attrgetter('pk'))
            return PolicyRuleIdList(policyRuleIdList=[db_policy_rule.dump_id() for db_policy_rule in db_policy_rules])

    @safe_and_metered_rpc_method(METRICS, LOGGER)
    def ListPolicyRules(self, request: Empty, context: grpc.ServicerContext) -> PolicyRuleList:
        with self.lock:
            db_policy_rules: List[PolicyRuleModel] = get_all_objects(self.database, PolicyRuleModel)
            db_policy_rules = sorted(db_policy_rules, key=operator.attrgetter('pk'))
            return PolicyRuleList(policyRules=[db_policy_rule.dump() for db_policy_rule in db_policy_rules])

    @safe_and_metered_rpc_method(METRICS, LOGGER)
    def GetPolicyRule(self, request: PolicyRuleId, context: grpc.ServicerContext) -> PolicyRule:
        with self.lock:
            policy_rule_uuid = request.uuid.uuid
            db_policy_rule: PolicyRuleModel = get_object(self.database, PolicyRuleModel, policy_rule_uuid)
            return PolicyRule(**db_policy_rule.dump())

    @safe_and_metered_rpc_method(METRICS, LOGGER)
    def SetPolicyRule(self, request: PolicyRule, context: grpc.ServicerContext) -> PolicyRuleId:
        with self.lock:
            policy_rule_type = request.WhichOneof('policy_rule')
            policy_rule_json = grpc_message_to_json(request)
            policy_rule_uuid = policy_rule_json[policy_rule_type]['policyRuleBasic']['policyRuleId']['uuid']['uuid']
            result: Tuple[PolicyRuleModel, bool] = update_or_create_object(
                self.database, PolicyRuleModel, policy_rule_uuid, {'value': json.dumps(policy_rule_json)})
            db_policy, updated = result # pylint: disable=unused-variable

            #event_type = EventTypeEnum.EVENTTYPE_UPDATE if updated else EventTypeEnum.EVENTTYPE_CREATE
            dict_policy_id = db_policy.dump_id()
            #notify_event(self.messagebroker, TOPIC_POLICY, event_type, {"policy_id": dict_policy_id})
            return PolicyRuleId(**dict_policy_id)

    @safe_and_metered_rpc_method(METRICS, LOGGER)
    def RemovePolicyRule(self, request: PolicyRuleId, context: grpc.ServicerContext) -> Empty:
        with self.lock:
            policy_uuid = request.uuid.uuid
            db_policy = PolicyRuleModel(self.database, policy_uuid, auto_load=False)
            found = db_policy.load()
            if not found: return Empty()

            dict_policy_id = db_policy.dump_id()
            db_policy.delete()
            #event_type = EventTypeEnum.EVENTTYPE_REMOVE
            #notify_event(self.messagebroker, TOPIC_POLICY, event_type, {"policy_id": dict_policy_id})
            return Empty()
Loading