Commit 4f1271a7 authored by George Tziavas's avatar George Tziavas
Browse files

added routes, sent them to tmf

parent 4564d6af
Loading
Loading
Loading
Loading
Loading
+0 −44
Original line number Diff line number Diff line
//package org.etsi.osl.metrico;
//
//
//import org.apache.camel.LoggingLevel;
//import org.apache.camel.builder.RouteBuilder;
//import org.apache.commons.logging.Log;
//import org.apache.commons.logging.LogFactory;
//import org.etsi.osl.metrico.prometheus.PrometheusQueries;
//import org.etsi.osl.metrico.reposervices.JobRepoService;
//import org.springframework.beans.factory.annotation.Autowired;
//import org.springframework.beans.factory.annotation.Value;
//import org.springframework.context.annotation.Configuration;
//import org.springframework.stereotype.Component;
//import org.etsi.osl.centrallog.client.CentralLogger;
//
//@Configuration
//@Component
//public class MetricoRouteBuilder extends RouteBuilder {
//
//    private static final transient Log logger = LogFactory.getLog(MetricoRouteBuilder.class);
//
//    final JobRepoService jobRepoService;
//    private final PrometheusQueries prometheusQueries;
//
//    public MetricoRouteBuilder(JobRepoService jobRepoService, PrometheusQueries prometheusQueries) {
//        this.jobRepoService = jobRepoService;
//        this.prometheusQueries = prometheusQueries;
//    }
//
//    @Value("${MEASUREMENT_COLLECTION_JOB_CREATED}")
//    private static String MEASUREMENT_COLLECTION_JOB_CREATED;
//
//    @Value("${MEASUREMENT_COLLECTION_JOB_RESPONSE}")
//    private static String MEASUREMENT_COLLECTION_JOB_RESPONSE ;
//
//    public  void configure() throws Exception{
//        from(MEASUREMENT_COLLECTION_JOB_CREATED)
//                .log(LoggingLevel.INFO, log, MEASUREMENT_COLLECTION_JOB_CREATED + "message received!")
//                .to("log:DEBUG?showBody=true&showHeaders=true")
//                .setBody(simple("Message received and processed"))
//                .to(MEASUREMENT_COLLECTION_JOB_RESPONSE);
//
//    }
//}
+39 −9
Original line number Diff line number Diff line
package org.etsi.osl.metrico.services;

import jakarta.validation.constraints.NotNull;
import org.apache.camel.ProducerTemplate;
import org.etsi.osl.metrico.mapper.JobMapper;
import org.etsi.osl.metrico.model.Job;
import org.etsi.osl.metrico.prometheus.PrometheusQueries;
import org.etsi.osl.tmf.pm628.model.ExecutionStateType;
import org.etsi.osl.tmf.pm628.model.MeasurementCollectionJob;
import org.etsi.osl.tmf.pm628.model.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;

import java.time.OffsetDateTime;
import java.util.ArrayList;
import java.util.List;

@Service
public class MetricoService {

    private static final Logger logger = LoggerFactory.getLogger(JobService.class);

    @Value("{PM_MEASUREMENT_COLLECTION_JOB_UPDATE}")
    private String PM_MEASUREMENT_COLLECTION_JOB_UPDATE = "";

    private final PrometheusQueries prometheusQueries;
    private final ProducerTemplate producerTemplate;

    public MetricoService(PrometheusQueries prometheusQueries) {
    public MetricoService(PrometheusQueries prometheusQueries, ProducerTemplate producerTemplate) {
        this.prometheusQueries = prometheusQueries;
        this.producerTemplate = producerTemplate;
    }

    public String sendQueryToPrometheus(String promURL, String promQuery) {
@@ -28,16 +36,36 @@ public class MetricoService {
    }

    public String[] queryToPrometheus(@NotNull MeasurementCollectionJob givenMCJ){

        DataAccessEndpoint givenDataAccessEndpoint = givenMCJ.getDataAccessEndpoint().get(0);
        Job job = JobMapper.measurementCollectionJobMapToJob(givenMCJ);
        String promURL = job.getDataAccessEndPointUri().getScheme() + "://" + job.getDataAccessEndPointUri().getAuthority();
        String promQuery = job.getDataAccessEndPointUri().getQuery();
        promQuery = promQuery.replace("query=", "");

        return sendQueryToPrometheus(promURL, promQuery).split("\n");
        String [] promResponse = sendQueryToPrometheus(promURL, promQuery).split("\n");

        DataFilterTemplate filterTemplate = new DataFilterTemplate();
        filterTemplate.setName(promQuery);
        DataFilterAttributeStringArray stringArray = new DataFilterAttributeStringArray();
        stringArray.setValue(List.of(promResponse));
        DataFilterMapItem dataFilterMapItem = new DataFilterMapItem();
        dataFilterMapItem.setFilterTemplate(filterTemplate);
        dataFilterMapItem.setStringArray(stringArray);
        DataFilterMap dataFilterMap = new DataFilterMap();
        dataFilterMap.addMappingsItem(dataFilterMapItem);
        givenDataAccessEndpoint.setUriQueryFilter(dataFilterMap);

        List<DataAccessEndpoint> newDataAccessEndpoint = new ArrayList<>();
        newDataAccessEndpoint.add(givenDataAccessEndpoint);

        givenMCJ.setDataAccessEndpoint(newDataAccessEndpoint);

        producerTemplate.sendBody(PM_MEASUREMENT_COLLECTION_JOB_UPDATE, givenMCJ);

        return promResponse;
    }

    public void startPeriodicQueryToPrometheus(@NotNull MeasurementCollectionJob givenMCJ){
    public String startPeriodicQueryToPrometheus(@NotNull MeasurementCollectionJob givenMCJ){
        Job job = JobMapper.measurementCollectionJobMapToJob(givenMCJ);
        String promURL = job.getDataAccessEndPointUri().getScheme() + "://" + job.getDataAccessEndPointUri().getAuthority();
        String promQuery = job.getDataAccessEndPointUri().getQuery();
@@ -56,9 +84,11 @@ public class MetricoService {
        Job newPeriodicQuery =  prometheusQueries.startPeriodicQuery(promURL, promQuery,job.getStartDateTime(), job.getEndDateTime(), job.getExecutionInterval());

        if (newPeriodicQuery.getState() == ExecutionStateType.FAILED) {
            logger.atError().setMessage("Periodic query failed to start due to internal error.");
            logger.atError().setMessage("Periodic query failed to start due to internal error.").log();
            return "Periodic query failed to start.";
        } else {
            logger.atError().setMessage("Periodic query started, with ID: " + newPeriodicQuery.getUuid());
            logger.atInfo().setMessage("Periodic query started, with ID: " + newPeriodicQuery.getUuid()).log();
            return "Periodic query started with ID: " + newPeriodicQuery.getUuid();
        }
    }

+38 −0
Original line number Diff line number Diff line
package org.etsi.osl.metrico.services;

import org.apache.camel.LoggingLevel;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.model.dataformat.JsonLibrary;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.etsi.osl.tmf.pm628.model.MeasurementCollectionJob;
import org.etsi.osl.tmf.so641.model.ServiceOrderUpdate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Component;

@Configuration
@Component
public class MetricoServiceRouteBuilder extends RouteBuilder {

    private static final transient Log logger = LogFactory.getLog(MetricoServiceRouteBuilder.class.getName());

    @Autowired
    private MetricoService metricoService;

    @Value("${PM_MEASUREMENT_COLLECTION_JOB_CREATED}")
    private String PM_MEASUREMENT_COLLECTION_JOB_CREATED = "";

    public void configure() throws Exception {

        from(PM_MEASUREMENT_COLLECTION_JOB_CREATED)
                .log(LoggingLevel.INFO, log, PM_MEASUREMENT_COLLECTION_JOB_CREATED + " message received!")
                .to("log:DEBUG?showBody=true&showHeaders=true").unmarshal()
                .json(JsonLibrary.Jackson, MeasurementCollectionJob.class, true)
                .bean(metricoService, "startPeriodicQueryToPrometheus(${body})");

    }


}
+5 −7
Original line number Diff line number Diff line
@@ -69,7 +69,5 @@ PM_GET_MEASUREMENT_COLLECTION_JOB_BY_ID: "jms:queue:PM.GET.MEASUREMENTCOLLECTION
PM_GET_MEASUREMENT_COLLECTION_JOBS:      "jms:queue:PM.GET.MEASUREMENTCOLLECTIONJOBS"
PM_ADD_MEASUREMENT_COLLECTION_JOB:       "jms:queue:PM.ADD.MEASUREMENTCOLLECTIONJOB"
PM_UPDATE_MEASUREMENT_COLLECTION_JOB:    "jms:queue:PM.UPD.MEASUREMENTCOLLECTIONJOB"

# COMMON QUEUES
MEASUREMENT_COLLECTION_JOB_CREATED: "jms:queue:MEASUREMENT_COLLECTION_JOB.CREATED"
MEASUREMENT_COLLECTION_JOB_RESPONSE: "jms:queue:MEASUREMENT_COLLECTION_JOB.RESPONSE"
 No newline at end of file
PM_MEASUREMENT_COLLECTION_JOB_CREATED:   "jms:queue:PM.MEASUREMENTCOLLECTIONJOB.CREATED"
PM_MEASUREMENT_COLLECTION_JOB_UPDATE:    "jms:queue:PM.MEASUREMENTCOLLECTIONJOB.UPDATE"