From 9cfdce17b1ec690dff54c7b2ac624dfe4f7427d9 Mon Sep 17 00:00:00 2001 From: pfamelis <pfamelis@ubitech.eu> Date: Tue, 30 May 2023 09:39:58 +0300 Subject: [PATCH] wip: policy component calls recomputeConnection rpc --- .../eu/teraflow/policy/PolicyServiceImpl.java | 49 +++++++++++-------- .../policy/service/ServiceGatewayImpl.java | 1 + .../service/ServiceServiceServicerImpl.py | 3 +- 3 files changed, 31 insertions(+), 22 deletions(-) diff --git a/src/policy/src/main/java/eu/teraflow/policy/PolicyServiceImpl.java b/src/policy/src/main/java/eu/teraflow/policy/PolicyServiceImpl.java index 1874b1546..df07cd37a 100644 --- a/src/policy/src/main/java/eu/teraflow/policy/PolicyServiceImpl.java +++ b/src/policy/src/main/java/eu/teraflow/policy/PolicyServiceImpl.java @@ -70,6 +70,8 @@ public class PolicyServiceImpl implements PolicyService { private static final int ACCEPTABLE_NUMBER_OF_ALARMS = 3; private static final int MONITORING_WINDOW_IN_SECONDS = 5; private static final int SAMPLING_RATE_PER_SECOND = 1; + // Temporary solution for not calling the same rpc more than it's needed + private static int noAlarms = 0; private static final PolicyRuleState INSERTED_POLICYRULE_STATE = new PolicyRuleState( @@ -213,6 +215,7 @@ public class PolicyServiceImpl implements PolicyService { LOGGER.infof("alarmDescriptor:"); LOGGER.infof(alarmDescriptor.toString()); alarmIds.add(monitoringService.setKpiAlarm(alarmDescriptor)); + noAlarms = 0; } LOGGER.infof("Passed 7th"); @@ -229,8 +232,7 @@ public class PolicyServiceImpl implements PolicyService { id -> { alarmPolicyRuleServiceMap.put(id, policyRuleService); - var alarmSubscription = new AlarmSubscription(id, 60, 500); - LOGGER.infof("Creating Alarm Subscription with id: %s", id); + var alarmSubscription = new AlarmSubscription(id, 60, 5000); return monitoringService.getAlarmResponseStream(alarmSubscription); })); } @@ -312,7 +314,7 @@ public class PolicyServiceImpl implements PolicyService { .subscribe() .with( alarmId -> { - alarmSubscriptionList.add(new AlarmSubscription(alarmId, 0, 0)); + alarmSubscriptionList.add(new AlarmSubscription(alarmId, 30, 4000)); }); } @@ -736,13 +738,11 @@ public class PolicyServiceImpl implements PolicyService { deserializedService -> { serviceService.recomputeConnections(deserializedService) .subscribe() - .with(x -> { - LOGGER.info("recomputeConnections failed with:"); - LOGGER.info(x); + .with( x -> { + LOGGER.info("called recomputeConnections with:"); + LOGGER.info(deserializedService); + setPolicyRuleServiceToContext(policyRuleService, ENFORCED_POLICYRULE_STATE); }); - LOGGER.info("called recomputeConnections with:"); - LOGGER.info(deserializedService); - setPolicyRuleServiceToContext(policyRuleService, ENFORCED_POLICYRULE_STATE); }); } @@ -757,18 +757,25 @@ public class PolicyServiceImpl implements PolicyService { PolicyRuleAction test = policyRuleActionMap.get(alarmId); LOGGER.info(test); - setPolicyRuleServiceToContext(policyRuleService, ACTIVE_POLICYRULE_STATE); - - switch (policyRuleAction.getPolicyRuleActionEnum()) { - case POLICY_RULE_ACTION_ADD_SERVICE_CONSTRAINT: - addServiceConstraint(policyRuleService, policyRuleAction); - case POLICY_RULE_ACTION_ADD_SERVICE_CONFIGRULE: - addServiceConfigRule(policyRuleService, policyRuleAction); - case POLICY_RULE_ACTION_RECALCULATE_PATH: - callRecalculatePathRPC(policyRuleService, policyRuleAction); - default: - LOGGER.errorf(INVALID_MESSAGE, policyRuleAction.getPolicyRuleActionEnum()); - return; + if (noAlarms == 0) { + noAlarms++; + setPolicyRuleServiceToContext(policyRuleService, ACTIVE_POLICYRULE_STATE); + + switch (policyRuleAction.getPolicyRuleActionEnum()) { + case POLICY_RULE_ACTION_ADD_SERVICE_CONSTRAINT: + addServiceConstraint(policyRuleService, policyRuleAction); + case POLICY_RULE_ACTION_ADD_SERVICE_CONFIGRULE: + addServiceConfigRule(policyRuleService, policyRuleAction); + case POLICY_RULE_ACTION_RECALCULATE_PATH: + callRecalculatePathRPC(policyRuleService, policyRuleAction); + default: + LOGGER.errorf(INVALID_MESSAGE, policyRuleAction.getPolicyRuleActionEnum()); + return; + } + } else if (noAlarms == 5) { + noAlarms = 0; + } else { + noAlarms++; } } diff --git a/src/policy/src/main/java/eu/teraflow/policy/service/ServiceGatewayImpl.java b/src/policy/src/main/java/eu/teraflow/policy/service/ServiceGatewayImpl.java index eeafcb6fb..6d399b2e6 100644 --- a/src/policy/src/main/java/eu/teraflow/policy/service/ServiceGatewayImpl.java +++ b/src/policy/src/main/java/eu/teraflow/policy/service/ServiceGatewayImpl.java @@ -58,6 +58,7 @@ public class ServiceGatewayImpl implements ServiceGateway { //final var dummyService = new Service(tmpServiceId, null, null, null, null, null, 0); service.getServiceEndPointIds().clear(); + service.getServiceConstraints().clear(); final var serializedService = serializer.serialize(service); LOGGER.info(serializedService); diff --git a/src/service/service/ServiceServiceServicerImpl.py b/src/service/service/ServiceServiceServicerImpl.py index 6d23fd4ce..b0ed16d5c 100644 --- a/src/service/service/ServiceServiceServicerImpl.py +++ b/src/service/service/ServiceServiceServicerImpl.py @@ -174,6 +174,7 @@ class ServiceServiceServicerImpl(ServiceServiceServicer): @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def RecomputeConnections(self, request : Service, context : grpc.ServicerContext) -> Empty: + LOGGER.info("**************************************Called RecomputeConnections*********************************************") if len(request.service_endpoint_ids) > 0: raise NotImplementedException('update-endpoints') @@ -254,7 +255,7 @@ class ServiceServiceServicerImpl(ServiceServiceServicer): pathcomp_request = PathCompRequest() pathcomp_request.services.append(updated_service_with_uuids) #pathcomp_request.k_disjoint_path.num_disjoint = 100 - pathcomp_request.k_shortest_path.k_inspection = 100 + pathcomp_request.k_shortest_path.k_inspection = 4 pathcomp_request.k_shortest_path.k_return = 3 LOGGER.debug('pathcomp_request={:s}'.format(grpc_message_to_json_string(pathcomp_request))) -- GitLab