Commit faaf7dd8 authored by kesnar's avatar kesnar
Browse files

feat: policy calls service rpc (wip)

parent ee163f7d
Loading
Loading
Loading
Loading
+3 −1
Original line number Diff line number Diff line
@@ -26,6 +26,8 @@ enum PolicyRuleActionEnum {
  POLICYRULE_ACTION_SET_DEVICE_STATUS = 1;
  POLICYRULE_ACTION_ADD_SERVICE_CONFIGRULE = 2;
  POLICYRULE_ACTION_ADD_SERVICE_CONSTRAINT = 3;
  POLICY_RULE_ACTION_CALL_SERVICE_RPC = 4;
  POLICY_RULE_ACTION_RECALCULATE_PATH = 5;
}

// Action configuration

src/policy/mvnw

100644 → 100755
+0 −0

File mode changed from 100644 to 100755.

+96 −64
Original line number Diff line number Diff line
@@ -218,32 +218,32 @@ public class PolicyServiceImpl implements PolicyService {
                            LOGGER.infof("Passed 7th");
                            LOGGER.infof("%s", alarmIds);

                            // Transform the alarmIds into promised alarms returned from the getAlarmResponseStream
                            // Transform the alarmIds into promised alarms returned from the
                            // getAlarmResponseStream
                            List<Multi<AlarmResponse>> alarmResponseStreamList = new ArrayList<>();
                            for (Uni<String> alarmId : alarmIds) {
                                alarmResponseStreamList.add(
                                    alarmId.onItem().transformToMulti(id -> {
                                        alarmId
                                                .onItem()
                                                .transformToMulti(
                                                        id -> {
                                                            alarmPolicyRuleServiceMap.put(id, policyRuleService);

                                                            var alarmSubscription = new AlarmSubscription(id, 60, 500);
                                                            LOGGER.infof("Creating Alarm Subscription with id: %s", id);
                                                            return monitoringService.getAlarmResponseStream(alarmSubscription);
                                    })
                                );
                                                        }));
                            }

                            // Merge the promised alarms into one stream (Multi Object)
                            final var multi = Multi.createBy().merging().streams(alarmResponseStreamList);

                            //multi
                            //    .subscribe()
                            //    .with(
                            //        x -> {
                            //            LOGGER.info(x);
                            //        }
                            //    );

                            monitorAlarmResponseForService(multi);

                            // Check that the stream has ended and in that case resubscribe
                            // multi.onCompletion().invoke(() -> LOGGER.infof("************************ THE STREAM
                            // HAS ENDED *****************"));

                            LOGGER.infof("Passed 8th");
                            // evaluateAction(policyRule, alarmDescriptorList, multi);

@@ -318,7 +318,8 @@ public class PolicyServiceImpl implements PolicyService {

                            //                          final var multi =
                            //                                  setAlarmResponseStream(
  //                                          policyRule, alarmDescriptorList, alarmSubscriptionList, false);
                            //                                          policyRule, alarmDescriptorList,
                            // alarmSubscriptionList, false);
                            //                          monitorAlarmResponseForDevice(multi);
                            //                          evaluateAction(policyRule, alarmDescriptorList, multi);

@@ -493,11 +494,15 @@ public class PolicyServiceImpl implements PolicyService {
    //                        id -> {
    //                            var alarmSubscription = new AlarmSubscription(id, 60, 500);
    //                            LOGGER.infof("Creating Alarm Subscription with id: %s", id);
    //
    // //alarmResponseStreamList.add(monitoringService.getAlarmResponseStream(alarmSubscription));
//                            monitoringService.getAlarmResponseStream(alarmSubscription).subscribe().with(x -> {LOGGER.info(x);});
    //
    // monitoringService.getAlarmResponseStream(alarmSubscription).subscribe().with(x ->
    // {LOGGER.info(x);});
    //                            LOGGER.infof("Created Alarm Subscription with id: %s", id);
    //                        });
    //        }
    //
    // LOGGER.info("*****************************************************************************");
    //
    //        for (Multi<AlarmResponse> a: alarmResponseStreamList) {
@@ -519,6 +524,7 @@ public class PolicyServiceImpl implements PolicyService {
                .subscribe()
                .with(
                        alarmResponse -> {
                            LOGGER.infof("**************************Received Alarm!**************************");
                            LOGGER.infof("alarmResponse:");
                            LOGGER.info(alarmResponse);
                            LOGGER.info(alarmResponse.getAlarmId());
@@ -717,11 +723,35 @@ public class PolicyServiceImpl implements PolicyService {
                        });
    }

    private void callRecalculatePathRPC(
            PolicyRuleService policyRuleService, PolicyRuleAction policyRuleAction) {

        LOGGER.info("Inside callRecalculatePathRPC");

        final var deserializedServiceUni = contextService.getService(policyRuleService.getServiceId());

        deserializedServiceUni
                .subscribe()
                .with(
                        deserializedService -> {
                            serviceService.recomputeConnections(deserializedService)
                            .subscribe()
                            .with(x -> {
                                LOGGER.info("recomputeConnections failed with:");
                                LOGGER.info(x);
                            });
                            LOGGER.info("called recomputeConnections with:");
                            LOGGER.info(deserializedService);
                            setPolicyRuleServiceToContext(policyRuleService, ENFORCED_POLICYRULE_STATE);
                        });
    }

    private void applyActionService(String alarmId) {
        LOGGER.info("Inside applyActionService");
        PolicyRuleService policyRuleService = alarmPolicyRuleServiceMap.get(alarmId);
        LOGGER.info(policyRuleService);
        PolicyRuleAction policyRuleAction = policyRuleService.getPolicyRuleBasic().getPolicyRuleActions().get(0);
        PolicyRuleAction policyRuleAction =
                policyRuleService.getPolicyRuleBasic().getPolicyRuleActions().get(0);
        LOGGER.info(policyRuleAction);

        PolicyRuleAction test = policyRuleActionMap.get(alarmId);
@@ -734,6 +764,8 @@ public class PolicyServiceImpl implements PolicyService {
                addServiceConstraint(policyRuleService, policyRuleAction);
            case POLICY_RULE_ACTION_ADD_SERVICE_CONFIGRULE:
                addServiceConfigRule(policyRuleService, policyRuleAction);
            case POLICY_RULE_ACTION_RECALCULATE_PATH:
                callRecalculatePathRPC(policyRuleService, policyRuleAction);
            default:
                LOGGER.errorf(INVALID_MESSAGE, policyRuleAction.getPolicyRuleActionEnum());
                return;
+8 −0
Original line number Diff line number Diff line
@@ -1845,6 +1845,10 @@ public class Serializer {
                return PolicyAction.PolicyRuleActionEnum.POLICYRULE_ACTION_ADD_SERVICE_CONSTRAINT;
            case POLICY_RULE_ACTION_NO_ACTION:
                return PolicyAction.PolicyRuleActionEnum.POLICYRULE_ACTION_NO_ACTION;
            case POLICY_RULE_ACTION_CALL_SERVICE_RPC:
                return PolicyAction.PolicyRuleActionEnum.POLICY_RULE_ACTION_CALL_SERVICE_RPC;
            case POLICY_RULE_ACTION_RECALCULATE_PATH:
                return PolicyAction.PolicyRuleActionEnum.POLICY_RULE_ACTION_RECALCULATE_PATH;
            default:
                return PolicyAction.PolicyRuleActionEnum.UNRECOGNIZED;
        }
@@ -1859,6 +1863,10 @@ public class Serializer {
                return PolicyRuleActionEnum.POLICY_RULE_ACTION_ADD_SERVICE_CONFIGRULE;
            case POLICYRULE_ACTION_ADD_SERVICE_CONSTRAINT:
                return PolicyRuleActionEnum.POLICY_RULE_ACTION_ADD_SERVICE_CONSTRAINT;
            case POLICY_RULE_ACTION_CALL_SERVICE_RPC:
                return PolicyRuleActionEnum.POLICY_RULE_ACTION_CALL_SERVICE_RPC;
            case POLICY_RULE_ACTION_RECALCULATE_PATH:
                return PolicyRuleActionEnum.POLICY_RULE_ACTION_RECALCULATE_PATH;
            case POLICYRULE_ACTION_NO_ACTION:
            case UNRECOGNIZED:
            default:
+3 −1
Original line number Diff line number Diff line
@@ -20,5 +20,7 @@ public enum ServiceStatusEnum {
    UNDEFINED,
    PLANNED,
    ACTIVE,
    PENDING_REMOVAL
    PENDING_REMOVAL,
    SLA_VIOLATED,
    UPDATING
}
Loading