diff --git a/pom.xml b/pom.xml index 850d4683e2fcadeefdd9a7cad8834c3636b4231c..3220e40fb85c75f8f635b614bb8a3302f7ff2827 100644 --- a/pom.xml +++ b/pom.xml @@ -176,7 +176,6 @@ <version>${mysql-connector.version}</version> </dependency> - <!-- Testing --> <!-- <dependency> <groupId>org.springframework.boot</groupId> diff --git a/src/main/java/org/etsi/osl/metrico/MetricoController.java b/src/main/java/org/etsi/osl/metrico/MetricoController.java index 099ad667c522fae154a88feea60103956fcfb398..e174511b93cddabd650354a51bcd6fd94d1634db 100644 --- a/src/main/java/org/etsi/osl/metrico/MetricoController.java +++ b/src/main/java/org/etsi/osl/metrico/MetricoController.java @@ -4,8 +4,10 @@ package org.etsi.osl.metrico; import org.etsi.osl.metrico.model.Job; import org.etsi.osl.metrico.model.StartPeriodicQueryRequest; import org.etsi.osl.metrico.prometheus.PrometheusQueries; +import org.etsi.osl.tmf.pm628.model.ExecutionStateType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.http.HttpHeaders; import org.springframework.http.HttpStatus; import org.springframework.http.ResponseEntity; @@ -18,7 +20,12 @@ public class MetricoController { private static final Logger logger = LoggerFactory.getLogger(MetricoController.class); - private PrometheusQueries prometheusQueries; + private final PrometheusQueries prometheusQueries; + + @Autowired + public MetricoController(PrometheusQueries prometheusQueries) { + this.prometheusQueries = prometheusQueries; + } @GetMapping("/live") public ResponseEntity<String> livenessCheck() { @@ -60,8 +67,11 @@ public class MetricoController { logger.atDebug().setMessage("/startPeriodicQuery endpoint called without a stopAfterSeconds. Job will not stop by itself.").log(); } String prom_url = request.getProtocol() + "://" + request.getProm_ip() + ":" + request.getProm_port(); - Job newPeriodicQuery = PrometheusQueries.startPeriodicQuery(prom_url, request.getQuery(), request.getStartDateTime(), request.getEndDateTime(), request.getExecutionInterval() + Job newPeriodicQuery = prometheusQueries.startPeriodicQuery(prom_url, request.getQuery(), request.getStartDateTime(), request.getEndDateTime(), request.getExecutionInterval() ); + if(newPeriodicQuery.getState()== ExecutionStateType.FAILED){ + return new ResponseEntity<>("Periodic query failed to start due to internal error.", HttpStatus.INTERNAL_SERVER_ERROR); + } return new ResponseEntity<>("Periodic query started, with ID: " + newPeriodicQuery.getUuid(), HttpStatus.OK); } diff --git a/src/main/java/org/etsi/osl/metrico/MetricoSpringBoot.java b/src/main/java/org/etsi/osl/metrico/MetricoSpringBoot.java index deb9d0d9b76ad1c38fb371a4a725da559904e85f..8940ad54b5f9e4bea29ccac409a6df550c06922f 100644 --- a/src/main/java/org/etsi/osl/metrico/MetricoSpringBoot.java +++ b/src/main/java/org/etsi/osl/metrico/MetricoSpringBoot.java @@ -6,13 +6,20 @@ import org.springframework.boot.CommandLineRunner; import org.springframework.boot.ExitCodeGenerator; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.boot.autoconfigure.domain.EntityScan; import org.springframework.context.ApplicationContext; import org.springframework.data.jpa.repository.config.EnableJpaRepositories; @SpringBootApplication -@EnableJpaRepositories -public class MetricoSpringBoot implements CommandLineRunner { +@EnableJpaRepositories("org.etsi.osl.metrico.repo") +@EntityScan( basePackages = { + "org.etsi.osl.metrico.repo", + "org.etsi.osl.metrico.model", + "org.etsi.osl.metrico", + "org.etsi.osl.metrico.reposervices" +}) + public class MetricoSpringBoot implements CommandLineRunner { private static final Logger logger = LoggerFactory.getLogger(MetricoSpringBoot.class.getSimpleName()); diff --git a/src/main/java/org/etsi/osl/metrico/model/Job.java b/src/main/java/org/etsi/osl/metrico/model/Job.java index 9854ac5aa88e4b025eca8045ca5005c63d8a020d..fba21c0dbc72edb15c23cfa117fed3426b106b44 100644 --- a/src/main/java/org/etsi/osl/metrico/model/Job.java +++ b/src/main/java/org/etsi/osl/metrico/model/Job.java @@ -4,12 +4,12 @@ import com.fasterxml.jackson.annotation.JsonIgnore; import jakarta.persistence.*; import lombok.Getter; import lombok.Setter; -import org.etsi.osl.metrico.repo.JobRepository; import org.etsi.osl.metrico.reposervices.JobRepoService; import org.etsi.osl.tmf.pm628.model.ExecutionStateType; import org.hibernate.annotations.GenericGenerator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.format.annotation.DateTimeFormat; import java.time.Duration; @@ -39,6 +39,16 @@ public class Job{ private Integer executionInterval; + private UUID dataAccessEndPointRef; // Get the prometheus IP (uri) and its state + + private UUID scheduleDefinitionRef; //Get the startDateTime, endDateTime and executionInterval (recurringFrequency) + + // Get the reporting period and collection granularity(the periods are too big), + // Consuming / producing application id, + // scheduleDefinitionRef -> Get the startDateTime, endDateTime and executionInterval (recurringFrequency) + // Should I check the granularity or the scheduleDefinitionRef for the recurringFrequency? + private UUID measurementCollectionJobRef; + @JsonIgnore private boolean deleted = false; @@ -74,93 +84,4 @@ public class Job{ this(startDateTime, endDateTime); this.executionInterval = executionInterval; } - - - - - private static final Logger logger = LoggerFactory.getLogger(Job.class); - @Getter - private static final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); - private static final Map<UUID, Job> jobs = new ConcurrentHashMap<>(); - - private static JobRepoService jobRepoService; - - /** - * Schedules a new job to be executed periodically based on the provided parameters. - * <p> - * This method initializes a new {@link Job} instance with the specified start and end date times, and execution interval. - * It calculates the initial delay as the difference in seconds between the current time and the start date time. - * The job is initially set to a PENDING state. It then attempts to schedule the job to run at a fixed rate, - * starting after the initial delay and subsequently with the specified execution interval. - * If the job is successfully scheduled, its state is updated to INPROGRESS, and it is logged. - * In case of a scheduling failure due to a {@link RejectedExecutionException}, the job's state is set to FAILED, - * and the error is logged. - * </p> - * - * @param task the {@link Runnable} task that the job will execute - * @param startDateTime the {@link OffsetDateTime} specifying when the job should start - * @param endDateTime the {@link OffsetDateTime} specifying when the job should end - * @param executionInterval the interval, in seconds, between successive executions of the job - * @return the newly created and scheduled {@link Job} instance - * @throws RejectedExecutionException if the task cannot be scheduled for execution - */ - public static Job startJob(Runnable task, OffsetDateTime startDateTime, OffsetDateTime endDateTime, Integer executionInterval) { - Job job = new Job(startDateTime, endDateTime, executionInterval); - long initialDelay = Duration.between(OffsetDateTime.now(), startDateTime).getSeconds(); - job.setState(ExecutionStateType.PENDING); - jobRepoService.createAndSaveJob(); - UUID jobId = job.getUuid(); - try { - ScheduledFuture<?> future = scheduler.scheduleAtFixedRate(task, initialDelay, executionInterval, TimeUnit.SECONDS); - job.setFuture(future); - jobs.put(jobId, job); - job.setState(ExecutionStateType.INPROGRESS); - logger.info("Job with ID {} started successfully.", jobId); - jobRepoService.updateJob(jobId, job.getState()); - } catch (RejectedExecutionException e) { - job.setState(ExecutionStateType.FAILED); - logger.error("Job with ID {} could not be scheduled.", jobId, e); - } - return job; - } - - /** - * Attempts to stop a job identified by its UUID. - * <p> - * This method checks if the job exists and its current state. If the job is already in a - * {@link ExecutionStateType#CANCELLED} or {@link ExecutionStateType#COMPLETED} state, it logs - * the status and returns. Otherwise, it attempts to cancel the job's future execution. - * If the cancellation is successful, the job's state is set to {@link ExecutionStateType#CANCELLED}, - * and it logs the successful stop. If the job cannot be stopped (e.g., already completed or cancelled), - * its state is set to {@link ExecutionStateType#PENDING}, and a warning is logged. If the job does not - * exist, it logs a warning. - * </p> - * - * @param jobId the UUID of the job to stop - */ - public static void stopJob(UUID jobId) { - Job job = jobs.get(jobId); - if (job != null) { - if (job.getState() == ExecutionStateType.CANCELLED ) { - logger.info("Job with ID {} is already CANCELED.", jobId); - return; - } else if (job.getState() == ExecutionStateType.COMPLETED) { - logger.info("Job with ID {} is already COMPLETED.", jobId); - return; - } - if (job.getFuture() != null) { - boolean wasCancelled = job.getFuture().cancel(true); - if (wasCancelled) { - job.setState(ExecutionStateType.CANCELLED); - logger.info("Job with ID {} stopped successfully.", jobId); - } else { - job.setState(ExecutionStateType.PENDING); - logger.warn("Job with ID {} could not be stopped because it has already completed, has been cancelled, or could not be cancelled for some other reason.", jobId); - } - } - } else { - logger.warn("Job with ID {} does not exist.", jobId); - } - } - } diff --git a/src/main/java/org/etsi/osl/metrico/prometheus/PrometheusQueries.java b/src/main/java/org/etsi/osl/metrico/prometheus/PrometheusQueries.java index 12fca75fcb4fb26df76a3a4684222cdc6143a3cb..22e002c5d6d60a2fd56329d7965bdbd3c2445773 100644 --- a/src/main/java/org/etsi/osl/metrico/prometheus/PrometheusQueries.java +++ b/src/main/java/org/etsi/osl/metrico/prometheus/PrometheusQueries.java @@ -2,10 +2,13 @@ package org.etsi.osl.metrico.prometheus; import org.etsi.osl.metrico.model.Job; +import org.etsi.osl.metrico.services.JobService; import org.etsi.osl.tmf.pm628.model.ExecutionStateType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.http.ResponseEntity; +import org.springframework.stereotype.Component; +import org.springframework.stereotype.Service; import org.springframework.web.client.RestTemplate; import org.springframework.web.util.UriComponentsBuilder; @@ -16,9 +19,15 @@ import java.util.concurrent.TimeUnit; /** * This class contains the Prometheus Queries for the metrics. */ +@Service public class PrometheusQueries { private static final Logger logger = LoggerFactory.getLogger(PrometheusQueries.class); + private final JobService jobService; + + public PrometheusQueries(JobService jobService) { + this.jobService = jobService; + } public static String sendQueryToPrometheus(String prometheusUrl, String query){ RestTemplate restTemplate = new RestTemplate(); @@ -33,7 +42,7 @@ public class PrometheusQueries { return response.getBody(); } - public static Job startPeriodicQuery(String prometheusUrl, String query, OffsetDateTime startDateTime, OffsetDateTime endDateTime, Integer executionInterval) { + public Job startPeriodicQuery(String prometheusUrl, String query, OffsetDateTime startDateTime, OffsetDateTime endDateTime, Integer executionInterval) { final Runnable queryHandler = new Runnable() { public void run() { @@ -42,13 +51,15 @@ public class PrometheusQueries { }).start(); } }; - Job job = Job.startJob(queryHandler, startDateTime, endDateTime, executionInterval); - + Job job = jobService.startJob(queryHandler, startDateTime, endDateTime, executionInterval); + if(job.getState() == ExecutionStateType.FAILED){ + return job; + } // Schedule a task to stop the job after the specified delay if (endDateTime != null) { long stopAfterSeconds = Duration.between(OffsetDateTime.now(), endDateTime).getSeconds(); - Job.getScheduler().schedule(() -> { - Job.stopJob(job.getUuid()); + JobService.getScheduler().schedule(() -> { + JobService.stopJob(job.getUuid()); job.setState(ExecutionStateType.COMPLETED); }, stopAfterSeconds, TimeUnit.SECONDS); } diff --git a/src/main/java/org/etsi/osl/metrico/repo/JobRepository.java b/src/main/java/org/etsi/osl/metrico/repo/JobRepository.java index b9673e9f18eb937bc8bba4518dce12cedc618351..714323e33ec3ba9cd4cf66c44d753da9521560e7 100644 --- a/src/main/java/org/etsi/osl/metrico/repo/JobRepository.java +++ b/src/main/java/org/etsi/osl/metrico/repo/JobRepository.java @@ -2,7 +2,10 @@ package org.etsi.osl.metrico.repo; import org.etsi.osl.metrico.model.Job; import org.springframework.data.jpa.repository.JpaRepository; +import org.springframework.stereotype.Repository; + import java.util.UUID; +@Repository public interface JobRepository extends JpaRepository<Job, UUID> { } \ No newline at end of file diff --git a/src/main/java/org/etsi/osl/metrico/reposervices/JobRepoService.java b/src/main/java/org/etsi/osl/metrico/reposervices/JobRepoService.java index f078e684422186ba40637dbb591065f8f47372cb..dd843bbe1cc27042d48486693cdacb4f646e04a6 100644 --- a/src/main/java/org/etsi/osl/metrico/reposervices/JobRepoService.java +++ b/src/main/java/org/etsi/osl/metrico/reposervices/JobRepoService.java @@ -16,15 +16,17 @@ public class JobRepoService { private static final Logger logger = LoggerFactory.getLogger(JobRepoService.class); - // private final JobRepository jobRepository; - -// @Autowired -// public JobRepoService(JobRepository jobRepository) { -// this.jobRepository = jobRepository; -// } + private JobRepository jobRepository; @Autowired - JobRepository jobRepository; + public JobRepoService(JobRepository jobRepository) { + this.jobRepository = jobRepository; + if (jobRepository == null) { + logger.error("JobRepository is null. Dependency injection failed."); + } + } + + public Job createAndSaveJob() { Job job = new Job(); // Assuming Job has a default constructor diff --git a/src/main/java/org/etsi/osl/metrico/services/JobService.java b/src/main/java/org/etsi/osl/metrico/services/JobService.java new file mode 100644 index 0000000000000000000000000000000000000000..6280a9ebce3dfebf54be3ab97aa3843f4fd548b9 --- /dev/null +++ b/src/main/java/org/etsi/osl/metrico/services/JobService.java @@ -0,0 +1,118 @@ +package org.etsi.osl.metrico.services; + +import lombok.Getter; +import org.etsi.osl.metrico.model.Job; +import org.etsi.osl.metrico.reposervices.JobRepoService; +import org.etsi.osl.tmf.pm628.model.ExecutionStateType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +import java.time.Duration; +import java.time.OffsetDateTime; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.*; + +@Service +public class JobService { + + private static final Logger logger = LoggerFactory.getLogger(JobService.class); + private static final Map<UUID, Job> jobs = new ConcurrentHashMap<>(); + + @Getter + private static final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); + + private final JobRepoService jobRepoService; + + public JobService(JobRepoService jobRepoService) { + this.jobRepoService = jobRepoService; + } + + /** + * Schedules a new job to be executed periodically based on the provided parameters. + * <p> + * This method initializes a new {@link Job} instance with the specified start and end date times, and execution interval. + * It calculates the initial delay as the difference in seconds between the current time and the start date time. + * The job is initially set to a PENDING state. It then attempts to schedule the job to run at a fixed rate, + * starting after the initial delay and subsequently with the specified execution interval. + * If the job is successfully scheduled, its state is updated to INPROGRESS, and it is logged. + * In case of a scheduling failure due to a {@link RejectedExecutionException}, the job's state is set to FAILED, + * and the error is logged. + * </p> + * + * @param task the {@link Runnable} task that the job will execute + * @param startDateTime the {@link OffsetDateTime} specifying when the job should start + * @param endDateTime the {@link OffsetDateTime} specifying when the job should end + * @param executionInterval the interval, in seconds, between successive executions of the job + * @return the newly created and scheduled {@link Job} instance + * @throws RejectedExecutionException if the task cannot be scheduled for execution + */ + public Job startJob(Runnable task, OffsetDateTime startDateTime, OffsetDateTime endDateTime, Integer executionInterval) { + Job job = new Job(startDateTime, endDateTime, executionInterval); + long initialDelay = Duration.between(OffsetDateTime.now(), startDateTime).getSeconds(); + job.setState(ExecutionStateType.PENDING); + try{ + job = jobRepoService.createAndSaveJob(); + } catch (NullPointerException e){ + job.setState(ExecutionStateType.FAILED); + logger.error("JobRepo is null", e); + return job; + } + + UUID jobId = job.getUuid(); + try { + ScheduledFuture<?> future = scheduler.scheduleAtFixedRate(task, initialDelay, executionInterval, TimeUnit.SECONDS); + job.setFuture(future); + jobs.put(jobId, job); + job.setState(ExecutionStateType.INPROGRESS); + logger.info("Job with ID {} started successfully.", jobId); + jobRepoService.updateJob(jobId, job.getState()); + } catch (RejectedExecutionException e) { + job.setState(ExecutionStateType.FAILED); + logger.error("Job with ID {} could not be scheduled.", jobId, e); + } + return job; + } + + /** + * Attempts to stop a job identified by its UUID. + * <p> + * This method checks if the job exists and its current state. If the job is already in a + * {@link ExecutionStateType#CANCELLED} or {@link ExecutionStateType#COMPLETED} state, it logs + * the status and returns. Otherwise, it attempts to cancel the job's future execution. + * If the cancellation is successful, the job's state is set to {@link ExecutionStateType#CANCELLED}, + * and it logs the successful stop. If the job cannot be stopped (e.g., already completed or cancelled), + * its state is set to {@link ExecutionStateType#PENDING}, and a warning is logged. If the job does not + * exist, it logs a warning. + * </p> + * + * @param jobId the UUID of the job to stop + */ + public static void stopJob(UUID jobId) { + Job job = jobs.get(jobId); + if (job != null) { + if (job.getState() == ExecutionStateType.CANCELLED ) { + logger.info("Job with ID {} is already CANCELED.", jobId); + return; + } else if (job.getState() == ExecutionStateType.COMPLETED) { + logger.info("Job with ID {} is already COMPLETED.", jobId); + return; + } + if (job.getFuture() != null) { + boolean wasCancelled = job.getFuture().cancel(true); + if (wasCancelled) { + job.setState(ExecutionStateType.CANCELLED); + logger.info("Job with ID {} stopped successfully.", jobId); + } else { + job.setState(ExecutionStateType.PENDING); + logger.warn("Job with ID {} could not be stopped because it has already completed, has been cancelled, or could not be cancelled for some other reason.", jobId); + } + } + } else { + logger.warn("Job with ID {} does not exist.", jobId); + } + } + +} \ No newline at end of file