Commit 32fc3509 authored by George Tziavas's avatar George Tziavas
Browse files

JPA works, UUIDs are created successfully

parent 8f06e138
Loading
Loading
Loading
Loading
+0 −1
Original line number Diff line number Diff line
@@ -176,7 +176,6 @@
			<version>${mysql-connector.version}</version>
		</dependency>


        <!-- Testing -->
		<!-- <dependency>
			<groupId>org.springframework.boot</groupId>
+12 −2
Original line number Diff line number Diff line
@@ -4,8 +4,10 @@ package org.etsi.osl.metrico;
import org.etsi.osl.metrico.model.Job;
import org.etsi.osl.metrico.model.StartPeriodicQueryRequest;
import org.etsi.osl.metrico.prometheus.PrometheusQueries;
import org.etsi.osl.tmf.pm628.model.ExecutionStateType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
@@ -18,7 +20,12 @@ public class MetricoController {

    private static final Logger logger = LoggerFactory.getLogger(MetricoController.class);

    private PrometheusQueries prometheusQueries;
    private final PrometheusQueries prometheusQueries;

    @Autowired
    public MetricoController(PrometheusQueries prometheusQueries) {
        this.prometheusQueries = prometheusQueries;
    }

    @GetMapping("/live")
    public ResponseEntity<String> livenessCheck() {
@@ -60,8 +67,11 @@ public class MetricoController {
            logger.atDebug().setMessage("/startPeriodicQuery endpoint called without a stopAfterSeconds. Job will not stop by itself.").log();
        }
        String prom_url = request.getProtocol() + "://" + request.getProm_ip() + ":" + request.getProm_port();
        Job newPeriodicQuery = PrometheusQueries.startPeriodicQuery(prom_url, request.getQuery(), request.getStartDateTime(), request.getEndDateTime(), request.getExecutionInterval()
        Job newPeriodicQuery = prometheusQueries.startPeriodicQuery(prom_url, request.getQuery(), request.getStartDateTime(), request.getEndDateTime(), request.getExecutionInterval()
        );
        if(newPeriodicQuery.getState()== ExecutionStateType.FAILED){
            return new ResponseEntity<>("Periodic query failed to start due to internal error.", HttpStatus.INTERNAL_SERVER_ERROR);
        }
        return new ResponseEntity<>("Periodic query started, with ID: " + newPeriodicQuery.getUuid(), HttpStatus.OK);
    }

+9 −2
Original line number Diff line number Diff line
@@ -6,12 +6,19 @@ import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.ExitCodeGenerator;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.domain.EntityScan;
import org.springframework.context.ApplicationContext;
import org.springframework.data.jpa.repository.config.EnableJpaRepositories;


@SpringBootApplication
@EnableJpaRepositories
@EnableJpaRepositories("org.etsi.osl.metrico.repo")
@EntityScan( basePackages = {
        "org.etsi.osl.metrico.repo",
        "org.etsi.osl.metrico.model",
        "org.etsi.osl.metrico",
        "org.etsi.osl.metrico.reposervices"
})
        public class MetricoSpringBoot implements CommandLineRunner {

  private static final Logger logger =
+11 −90
Original line number Diff line number Diff line
@@ -4,12 +4,12 @@ import com.fasterxml.jackson.annotation.JsonIgnore;
import jakarta.persistence.*;
import lombok.Getter;
import lombok.Setter;
import org.etsi.osl.metrico.repo.JobRepository;
import org.etsi.osl.metrico.reposervices.JobRepoService;
import org.etsi.osl.tmf.pm628.model.ExecutionStateType;
import org.hibernate.annotations.GenericGenerator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.format.annotation.DateTimeFormat;

import java.time.Duration;
@@ -39,6 +39,16 @@ public class Job{

    private Integer executionInterval;

    private UUID dataAccessEndPointRef; // Get the prometheus IP (uri) and its state

    private UUID scheduleDefinitionRef;  //Get the startDateTime, endDateTime and executionInterval (recurringFrequency)

    // Get the reporting period and collection granularity(the periods are too big),
    // Consuming / producing application id,
    // scheduleDefinitionRef -> Get the startDateTime, endDateTime and executionInterval (recurringFrequency)
    // Should I check the granularity or the scheduleDefinitionRef for the recurringFrequency?
    private UUID measurementCollectionJobRef;

    @JsonIgnore
    private boolean deleted = false;

@@ -74,93 +84,4 @@ public class Job{
        this(startDateTime, endDateTime);
        this.executionInterval = executionInterval;
    }




    private static final Logger logger = LoggerFactory.getLogger(Job.class);
    @Getter
    private static final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
    private static final Map<UUID, Job> jobs = new ConcurrentHashMap<>();

    private static JobRepoService jobRepoService;

    /**
     * Schedules a new job to be executed periodically based on the provided parameters.
     * <p>
     * This method initializes a new {@link Job} instance with the specified start and end date times, and execution interval.
     * It calculates the initial delay as the difference in seconds between the current time and the start date time.
     * The job is initially set to a PENDING state. It then attempts to schedule the job to run at a fixed rate,
     * starting after the initial delay and subsequently with the specified execution interval.
     * If the job is successfully scheduled, its state is updated to INPROGRESS, and it is logged.
     * In case of a scheduling failure due to a {@link RejectedExecutionException}, the job's state is set to FAILED,
     * and the error is logged.
     * </p>
     *
     * @param task the {@link Runnable} task that the job will execute
     * @param startDateTime the {@link OffsetDateTime} specifying when the job should start
     * @param endDateTime the {@link OffsetDateTime} specifying when the job should end
     * @param executionInterval the interval, in seconds, between successive executions of the job
     * @return the newly created and scheduled {@link Job} instance
     * @throws RejectedExecutionException if the task cannot be scheduled for execution
     */
    public static Job startJob(Runnable task, OffsetDateTime startDateTime, OffsetDateTime endDateTime, Integer executionInterval) {
        Job job = new Job(startDateTime, endDateTime, executionInterval);
        long initialDelay = Duration.between(OffsetDateTime.now(), startDateTime).getSeconds();
        job.setState(ExecutionStateType.PENDING);
        jobRepoService.createAndSaveJob();
        UUID jobId = job.getUuid();
        try {
            ScheduledFuture<?> future = scheduler.scheduleAtFixedRate(task, initialDelay, executionInterval, TimeUnit.SECONDS);
            job.setFuture(future);
            jobs.put(jobId, job);
            job.setState(ExecutionStateType.INPROGRESS);
            logger.info("Job with ID {} started successfully.", jobId);
            jobRepoService.updateJob(jobId, job.getState());
        } catch (RejectedExecutionException e) {
            job.setState(ExecutionStateType.FAILED);
            logger.error("Job with ID {} could not be scheduled.", jobId, e);
        }
        return job;
    }

    /**
     * Attempts to stop a job identified by its UUID.
     * <p>
     * This method checks if the job exists and its current state. If the job is already in a
     * {@link ExecutionStateType#CANCELLED} or {@link ExecutionStateType#COMPLETED} state, it logs
     * the status and returns. Otherwise, it attempts to cancel the job's future execution.
     * If the cancellation is successful, the job's state is set to {@link ExecutionStateType#CANCELLED},
     * and it logs the successful stop. If the job cannot be stopped (e.g., already completed or cancelled),
     * its state is set to {@link ExecutionStateType#PENDING}, and a warning is logged. If the job does not
     * exist, it logs a warning.
     * </p>
     *
     * @param jobId the UUID of the job to stop
     */
    public static void stopJob(UUID jobId) {
        Job job = jobs.get(jobId);
        if (job != null) {
            if (job.getState() == ExecutionStateType.CANCELLED ) {
                logger.info("Job with ID {} is already CANCELED.", jobId);
                return;
            } else if (job.getState() == ExecutionStateType.COMPLETED) {
                logger.info("Job with ID {} is already COMPLETED.", jobId);
                return;
            }
            if (job.getFuture() != null) {
                boolean wasCancelled = job.getFuture().cancel(true);
                if (wasCancelled) {
                    job.setState(ExecutionStateType.CANCELLED);
                    logger.info("Job with ID {} stopped successfully.", jobId);
                } else {
                    job.setState(ExecutionStateType.PENDING);
                    logger.warn("Job with ID {} could not be stopped because it has already completed, has been cancelled, or could not be cancelled for some other reason.", jobId);
                }
            }
        } else {
            logger.warn("Job with ID {} does not exist.", jobId);
        }
    }

}
+16 −5
Original line number Diff line number Diff line
@@ -2,10 +2,13 @@ package org.etsi.osl.metrico.prometheus;


import org.etsi.osl.metrico.model.Job;
import org.etsi.osl.metrico.services.JobService;
import org.etsi.osl.tmf.pm628.model.ExecutionStateType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.ResponseEntity;
import org.springframework.stereotype.Component;
import org.springframework.stereotype.Service;
import org.springframework.web.client.RestTemplate;
import org.springframework.web.util.UriComponentsBuilder;

@@ -16,9 +19,15 @@ import java.util.concurrent.TimeUnit;
/**
 * This class contains the Prometheus Queries for the metrics.
 */
@Service
public class PrometheusQueries {

    private static final Logger logger = LoggerFactory.getLogger(PrometheusQueries.class);
    private final JobService jobService;

    public PrometheusQueries(JobService jobService) {
        this.jobService = jobService;
    }

    public static String sendQueryToPrometheus(String prometheusUrl, String query){
        RestTemplate restTemplate = new RestTemplate();
@@ -33,7 +42,7 @@ public class PrometheusQueries {
        return response.getBody();
    }

    public static Job startPeriodicQuery(String prometheusUrl, String query, OffsetDateTime startDateTime, OffsetDateTime endDateTime, Integer executionInterval) {
    public Job startPeriodicQuery(String prometheusUrl, String query, OffsetDateTime startDateTime, OffsetDateTime endDateTime, Integer executionInterval) {

        final Runnable queryHandler = new Runnable() {
            public void run() {
@@ -42,13 +51,15 @@ public class PrometheusQueries {
                }).start();
            }
        };
        Job job = Job.startJob(queryHandler, startDateTime, endDateTime, executionInterval);

        Job job = jobService.startJob(queryHandler, startDateTime, endDateTime, executionInterval);
        if(job.getState() == ExecutionStateType.FAILED){
            return job;
        }
        // Schedule a task to stop the job after the specified delay
        if (endDateTime != null) {
            long stopAfterSeconds = Duration.between(OffsetDateTime.now(), endDateTime).getSeconds();
            Job.getScheduler().schedule(() -> {
                Job.stopJob(job.getUuid());
            JobService.getScheduler().schedule(() -> {
                JobService.stopJob(job.getUuid());
                job.setState(ExecutionStateType.COMPLETED);
            }, stopAfterSeconds, TimeUnit.SECONDS);
        }
Loading