Skip to content
Snippets Groups Projects
Commit 9c0ed21a authored by tranoris's avatar tranoris
Browse files

fixes for metrico, webclient and resume jobs

parent dc464663
No related branches found
No related tags found
2 merge requests!5MR for Release 2024Q4,!1Creating first version of metrico
Pipeline #11084 failed
......@@ -100,6 +100,17 @@
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<!-- security -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.security</groupId>
<artifactId>spring-security-oauth2-client</artifactId>
</dependency>
<!-- activeMQ -->
<dependency>
<groupId>org.springframework.boot</groupId>
......
package org.etsi.osl.metrico;
import org.etsi.osl.metrico.services.MetricoService;
import org.etsi.osl.tmf.pm628.model.MeasurementCollectionJob;
import org.etsi.osl.tmf.pm628.model.MeasurementCollectionJobRef;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.ExitCodeGenerator;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.domain.EntityScan;
import org.springframework.boot.context.event.ApplicationStartedEvent;
import org.springframework.context.ApplicationContext;
import org.springframework.context.event.EventListener;
import org.springframework.data.jpa.repository.config.EnableJpaRepositories;
import jakarta.validation.constraints.NotNull;
@SpringBootApplication
......@@ -25,17 +32,36 @@ import org.springframework.data.jpa.repository.config.EnableJpaRepositories;
private static final Logger logger =
LoggerFactory.getLogger(MetricoSpringBoot.class.getSimpleName());
//
// @Autowired
// private MetricoService aMetricoService;
@Override
public void run(String... arg0) {
if (arg0.length > 0 && arg0[0].equals("exitcode")) {
throw new ExitException();
}
// @NotNull
// MeasurementCollectionJob mcj = new MeasurementCollectionJob();
//
// @NotNull
// MeasurementCollectionJobRef mjref = new MeasurementCollectionJobRef();
// mjref.setId("268cd191-21ac-4a81-84d8-933445a65ec1");
// aMetricoService.startPeriodicQueryToPrometheusRef(mjref );
}
public static void main(String[] args) {
logger.info("============================== STARTING METRICO ==============================");
ApplicationContext applicationContext = new SpringApplication(MetricoSpringBoot.class).run(args);
}
static class ExitException extends RuntimeException implements ExitCodeGenerator {
......
package org.etsi.osl.metrico.prometheus;
import java.io.IOException;
import java.time.Duration;
import java.time.OffsetDateTime;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import javax.net.ssl.SSLException;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.camel.ProducerTemplate;
import org.etsi.osl.metrico.mapper.DataAccessEndpointMapper;
import org.etsi.osl.metrico.model.Job;
import org.etsi.osl.metrico.services.JobService;
import org.etsi.osl.tmf.common.model.Any;
import org.etsi.osl.tmf.common.model.service.Characteristic;
import org.etsi.osl.tmf.pm628.model.DataAccessEndpoint;
import org.etsi.osl.tmf.pm628.model.DataAccessEndpointMVO;
import org.etsi.osl.tmf.pm628.model.DataFilterAttributeStringArray;
import org.etsi.osl.tmf.pm628.model.DataFilterMap;
import org.etsi.osl.tmf.pm628.model.DataFilterMapItem;
import org.etsi.osl.tmf.pm628.model.DataFilterTemplate;
import org.etsi.osl.tmf.pm628.model.ExecutionStateType;
import org.etsi.osl.tmf.pm628.model.MeasurementCollectionJob;
import org.etsi.osl.tmf.ri639.model.LogicalResource;
......@@ -22,22 +25,13 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.http.ResponseEntity;
import org.springframework.core.ParameterizedTypeReference;
import org.springframework.http.HttpStatusCode;
import org.springframework.stereotype.Component;
import org.springframework.web.client.RestTemplate;
import org.springframework.web.reactive.function.client.WebClient;
import org.springframework.web.util.UriComponentsBuilder;
import jakarta.validation.constraints.NotNull;
import java.io.IOException;
import java.time.Duration;
import java.time.OffsetDateTime;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.databind.ObjectMapper;
import reactor.core.publisher.Mono;
@Component
public class PrometheusQueries {
......@@ -71,7 +65,7 @@ public class PrometheusQueries {
}
public String sendQueryToPrometheus(String prometheusUrl, String query, MeasurementCollectionJob mcj, Job job) {
RestTemplate restTemplate = new RestTemplate();
//RestTemplate restTemplate = new RestTemplate();
UriComponentsBuilder builder = UriComponentsBuilder.fromHttpUrl(prometheusUrl)
.path("/api/v1/query")
......@@ -79,29 +73,72 @@ public class PrometheusQueries {
logger.atInfo().log("Sent query at prometheus with URL: " + prometheusUrl + " with query: " + query);
logger.atInfo().log("Sent query builder.toUriString(): " + builder.toUriString() );
ResponseEntity<String> response = restTemplate.getForEntity(builder.toUriString(), String.class);
//ResponseEntity<String> response = restTemplate.getForEntity(builder.toUriString(), String.class);
WebClient webclient = WebClient.create();
logger.atDebug().log("Received " + response.getBody());
String response="";
String [] promResponse =response.getBody().split("\n");
if ( webclient!=null ) {
try {
String url = builder.toUriString();
response = webclient.get()
.uri(url)
//.header("Authorization", "Basic " + encodedClientData)
//.attributes( ServletOAuth2AuthorizedClientExchangeFilterFunction.clientRegistrationId("authOpensliceProvider"))
.retrieve()
.onStatus(HttpStatusCode::is4xxClientError, r -> {
logger.error("4xx eror");
return Mono.error(new RuntimeException("4xx"));
})
.onStatus(HttpStatusCode::is5xxServerError, r -> {
logger.error("5xx eror");
return Mono.error(new RuntimeException("5xx"));
})
.bodyToMono( new ParameterizedTypeReference<String>() {})
.block();
//patch TMF service
String serviceUUID = mcj.getConsumingApplicationId();
logger.atDebug().log("serviceUUID= " + serviceUUID);
logger.atDebug().log("Received " + response);
String [] promResponse =response.split("\n");
}catch (Exception e) {
logger.error(" error on web client request");
response = "{\"status\":\""+ e.getLocalizedMessage() +"\"";
e.printStackTrace();
}
} else {
logger.error("WebClient is null. Cannot be created.");
response = "{\"status\":\"Cannot connect\"";
}
ServiceUpdate su = new ServiceUpdate();
Characteristic serviceCharacteristicItem = new Characteristic();
serviceCharacteristicItem.setName( mcj.getOutputFormat() );
serviceCharacteristicItem.setValueType( "TEXT" );
Any val = new Any();
val.setValue( response.getBody() );
val.setAlias( "");
serviceCharacteristicItem.setValue( val );
su.addServiceCharacteristicItem(serviceCharacteristicItem);
updateService(serviceUUID, su , true);
return response.getBody();
ServiceUpdate su = new ServiceUpdate();
//patch TMF service
String serviceUUID = mcj.getConsumingApplicationId();
logger.atDebug().log("serviceUUID= " + serviceUUID);
Characteristic serviceCharacteristicItem = new Characteristic();
serviceCharacteristicItem.setName( mcj.getOutputFormat() );
serviceCharacteristicItem.setValueType( "TEXT" );
Any val = new Any();
val.setValue( response );
val.setAlias( "");
serviceCharacteristicItem.setValue( val );
su.addServiceCharacteristicItem(serviceCharacteristicItem);
updateService(serviceUUID, su , true);
return response;
}
public Job startPeriodicQuery(String prometheusUrl, String query, final Job ajob, MeasurementCollectionJob mcj) {
......@@ -116,20 +153,20 @@ public class PrometheusQueries {
job.setExecutionInterval(180);
}
final Runnable queryHandler = () -> sendQueryToPrometheus(prometheusUrl, query, mcj, ajob);
job = jobService.startJob(queryHandler, job);
if (job.getState() == ExecutionStateType.FAILED) {
return job;
jobService.stopJob(ajob);
return null;
}
if (job.getEndDateTime() != null) {
long stopAfterSeconds = Duration.between(OffsetDateTime.now(), job.getEndDateTime() ).getSeconds();
UUID jobid = job.getUuid();
JobService.getScheduler().schedule(() -> {
jobService.stopJob( jobid );
//job.setState(ExecutionStateType.COMPLETED);
jobService.stopJob(ajob);
}, stopAfterSeconds, TimeUnit.SECONDS);
}
......
......@@ -9,6 +9,7 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.time.OffsetDateTime;
import java.util.List;
import java.util.UUID;
@Service
......@@ -66,15 +67,21 @@ public class JobRepoService {
return jobRepository.save(job);
}
public boolean softDeleteJob(UUID jobId) {
Job job = jobRepository.findById(jobId).orElse(null);
public boolean softDeleteJob(Job ajob) {
Job job = jobRepository.findById(ajob.getUuid()).orElse(null);
if (job == null) {
logJobNotFoundError(jobId);
logJobNotFoundError( ajob.getUuid() );
return false;
}
logger.error("Job with ID {} will be DELETED.", ajob.getUuid() );
job.setDeleted(true);
jobRepository.save(job);
jobRepository.delete(job);
return true;
}
public List<Job> findAll() {
return (List<Job>) this.jobRepository.findAll();
}
}
......@@ -4,14 +4,18 @@ import lombok.Getter;
import org.etsi.osl.metrico.model.Job;
import org.etsi.osl.metrico.reposervices.JobRepoService;
import org.etsi.osl.tmf.pm628.model.ExecutionStateType;
import org.etsi.osl.tmf.pm628.model.MeasurementCollectionJobRef;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.event.ApplicationStartedEvent;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Service;
import java.time.Duration;
import java.time.OffsetDateTime;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.*;
......@@ -20,8 +24,6 @@ import java.util.concurrent.*;
public class JobService {
private static final Logger logger = LoggerFactory.getLogger(JobService.class);
@Getter
private static final Map<UUID, Job> jobs = new ConcurrentHashMap<>();
@Value("${METRICO_THREAD_POOL_SIZE}")
private static int threadPoolSize;
......@@ -33,6 +35,10 @@ public class JobService {
public JobService(JobRepoService jobRepoService) {
this.jobRepoService = jobRepoService;
}
/**
* Schedules a new job to be executed periodically based on the provided parameters.
......@@ -54,7 +60,14 @@ public class JobService {
* @throws RejectedExecutionException if the task cannot be scheduled for execution
*/
public Job startJob(Runnable task, Job job) {
if ( job.getEndDateTime().isBefore( OffsetDateTime.now() )) {
job.setState(ExecutionStateType.FAILED);
logger.error("Job with ID {} End Date is before now, the job will not be scheduled.", job.getUuid());
return job;
}
long initialDelay = Duration.between(OffsetDateTime.now(), job.getStartDateTime()).getSeconds();
if (initialDelay<0) {
initialDelay = 0;
......@@ -73,7 +86,6 @@ public class JobService {
job = jobRepoService.updateJob( job.getUuid(), job.getState());
jobs.put(job.getUuid(), job);
return job;
}
......@@ -92,33 +104,33 @@ public class JobService {
*
* @param jobId the UUID of the job to stop
*/
public void stopJob(UUID jobId) {
Job job = jobs.get(jobId);
public void stopJob(Job job) {
if (job != null) {
if (job.getState() == ExecutionStateType.CANCELLED ) {
logger.info("Job with ID {} is already CANCELED.", jobId);
jobRepoService.updateJob(jobId, job.getState());
logger.info("Job with ID {} is already CANCELED.", job.getUuid());
jobRepoService.softDeleteJob( job );
return;
} else if (job.getState() == ExecutionStateType.COMPLETED) {
logger.info("Job with ID {} is already COMPLETED.", jobId);
jobRepoService.updateJob(jobId, job.getState());
logger.info("Job with ID {} is already COMPLETED.", job.getUuid());
jobRepoService.softDeleteJob( job );
return;
}
if (job.getFuture() != null) {
boolean wasCancelled = job.getFuture().cancel(true);
if (wasCancelled) {
job.setState(ExecutionStateType.CANCELLED);
jobRepoService.updateJob(jobId, job.getState());
logger.info("Job with ID {} stopped successfully.", jobId);
jobRepoService.updateJob(job.getUuid(), job.getState());
logger.info("Job with ID {} stopped successfully.", job.getUuid());
} else {
job.setState(ExecutionStateType.PENDING);
jobRepoService.updateJob(jobId, job.getState());
logger.warn("Job with ID {} could not be stopped because it has already completed, has been cancelled, or could not be cancelled for some other reason.", jobId);
jobRepoService.updateJob( job.getUuid(), job.getState());
logger.warn("Job with ID {} could not be stopped because it has already completed, has been cancelled, or could not be cancelled for some other reason.", job.getUuid());
}
}
} else {
logger.warn("Job with ID {} does not exist.", jobId);
logger.warn("Job does not exist.");
}
}
......
......@@ -8,6 +8,8 @@ import org.etsi.osl.metrico.mapper.DataAccessEndpointMapper;
import org.etsi.osl.metrico.mapper.JobMapper;
import org.etsi.osl.metrico.model.Job;
import org.etsi.osl.metrico.prometheus.PrometheusQueries;
import org.etsi.osl.metrico.repo.JobRepository;
import org.etsi.osl.metrico.reposervices.JobRepoService;
import org.etsi.osl.tmf.common.model.service.ResourceRef;
import org.etsi.osl.tmf.pm628.model.*;
import org.etsi.osl.tmf.ri639.model.LogicalResource;
......@@ -18,6 +20,8 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.event.ApplicationStartedEvent;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Service;
import java.io.IOException;
......@@ -40,9 +44,13 @@ public class MetricoService extends RouteBuilder {
@Value("${PM_MEASUREMENT_COLLECTION_GET_JOB_BY_ID}")
private String PM_MEASUREMENT_COLLECTION_GET_JOB_BY_ID = "";
@Autowired
private JobRepoService jobRepoService;
@Autowired
private JobRepository jobRepository;
private final PrometheusQueries prometheusQueries;
private final ProducerTemplate producerTemplate;
......@@ -50,6 +58,32 @@ public class MetricoService extends RouteBuilder {
this.prometheusQueries = prometheusQueries;
this.producerTemplate = producerTemplate;
}
/**
* This one is executed when the cridge service application starts
* @param event
*/
@EventListener(ApplicationStartedEvent.class)
public void onApplicationEvent() {
List<Job> jobsPending = jobRepoService.findAll();
logger.info("===== Pending jobs from previous sessions =====");
for (Job job : jobsPending) {
logger.info("try to resume jobuuid: {}, state:{}, start: {}, end:{}, mcjid: {} ", job.getUuid(), job.getState(), job.getStartDateTime(), job.getEndDateTime(), job.getMeasurementCollectionJobRef() );
//delete previous job from DB. a new one will be created
jobRepoService.softDeleteJob(job);
MeasurementCollectionJobRef mjref = new MeasurementCollectionJobRef();
mjref.setId( job.getMeasurementCollectionJobRef().toString() );
this.startPeriodicQueryToPrometheusRef(mjref );
}
logger.info("===== Pending jobs from previous sessions done =====");
}
@Autowired
......@@ -149,7 +183,7 @@ public class MetricoService extends RouteBuilder {
logger.debug("updateRelatedResource servUUID = " + servUUID );
org.etsi.osl.tmf.sim638.model.Service aService = prometheusQueries.retrieveService(servUUID);
if ( aService.getSupportingResource().size()>0) {
if ( aService!=null && aService.getSupportingResource().size()>0) {
ResourceRef resRef = aService.getSupportingResource().stream().findFirst().get();
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment