Routers are now stopped. For next steps I need to see why I get Exceptions

This commit is contained in:
Katerina 2023-06-12 15:48:10 +03:00
parent 376715c1af
commit 8eae2632a1
8 changed files with 151 additions and 127 deletions

View File

@ -79,6 +79,11 @@
<artifactId>camel-spring-boot</artifactId>
<version>${apache.camel.version}</version>
</dependency>
<dependency>
<groupId>org.apache.camel.springboot</groupId>
<artifactId>camel-core-starter</artifactId>
<version>${apache.camel.version}</version>
</dependency>
<dependency>
<groupId>org.apache.camel.springboot</groupId>
<artifactId>camel-spring-boot-starter</artifactId>

View File

@ -2,16 +2,14 @@ package eu.dnetlib.validatorapi.controllers;
import eu.dnetlib.validatorapi.entities.SummaryResult;
import eu.dnetlib.validatorapi.entities.ValidationIssue;
import eu.dnetlib.validatorapi.entities.ValidationJob;
import eu.dnetlib.validatorapi.repositories.ValidationIssueRepository;
import eu.dnetlib.validatorapi.repositories.ValidationJobRepository;
import eu.dnetlib.validatorapi.repositories.ValidationResultRepository;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.*;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.*;
@RestController
@CrossOrigin(origins = "*")
@ -29,9 +27,13 @@ public class ReportController {
this.validationIssueRepository = validationIssueRepository;
}
@RequestMapping(value={"getJobResult"}, method = RequestMethod.GET, produces = MediaType.APPLICATION_JSON_VALUE)
public ValidationJob getJobResults(@RequestParam(name = "jobId") int jobId){
Optional<ValidationJob> validationJob = validationJobRepository.findById(jobId);
return (ValidationJob) validationJob.orElse(null);
}
@RequestMapping(value = {"getResultsByJobId"}, method = RequestMethod.GET, produces = MediaType.APPLICATION_JSON_VALUE)
public List<SummaryResult> getJobResults(@RequestParam(name = "jobId") int jobId){
public List<SummaryResult> getSummaryJobResults(@RequestParam(name = "jobId") int jobId){
return validationResultRepository.getSummaryResult(jobId);
}

View File

@ -78,10 +78,11 @@ public class ValidationController {
log.error("Exception: No valid guidelines");
new Exception("Validation Job stopped unexpectedly. No valid guidelines were provided.");
}
System.out.println("AAAAAAAA");
ValidationJob validationJob = new ValidationJob(baseURL, numberOfRecords);
validationJob.guidelines = profile.name();
validationJobRepository.save(validationJob);
System.out.println("id " + validationJob.id);
int record = 0;
double resultSum = 0;
@ -125,6 +126,7 @@ public class ValidationController {
}
record++;
System.out.println(record++);
}
}
validationJob.status = "COMPLETED";
@ -141,7 +143,7 @@ public class ValidationController {
validationJob.recordsTested = record;
validationJob.score = resultSum / record;
//TODO uncomment
// validationJobRepository.save(validationJob);
validationJobRepository.save(validationJob);
}
//xmlValidationResponse.setRules(resultRules);
@ -179,9 +181,10 @@ public class ValidationController {
@RequestMapping(value = {"/realValidator"}, method = RequestMethod.GET)
public void validateWithApacheCamel(@RequestParam(name = "guidelines") String guidelinesProfileName,
@RequestParam(name = "baseUrl", defaultValue = "http://repositorium.sdum.uminho.pt/oai/request") String baseURL, //not in use now
@RequestParam(name="numberOfRecords", defaultValue = "10") int numberOfRecords, //not in use now
@RequestParam(name="set", required = false) String set, @RequestParam(name="metadataPrefix", defaultValue = "oai_dc") String metadataPrefix) {
@RequestParam(name = "baseUrl", defaultValue = "http://repositorium.sdum.uminho.pt/oai/request") String baseURL, //not in use now
@RequestParam(name="numberOfRecords", defaultValue = "10") int numberOfRecords,
@RequestParam(name="set", required = false) String set, //not in use now
@RequestParam(name="metadataPrefix", defaultValue = "oai_dc") String metadataPrefix) {
AbstractOpenAireProfile profile = initializeOpenAireProfile(guidelinesProfileName);
AbstractOpenAireProfile fairProfile = initializeFairProfile(guidelinesProfileName);
@ -194,7 +197,10 @@ public class ValidationController {
ValidationJob validationJob = new ValidationJob(baseURL, numberOfRecords);
validationJob.guidelines = profile.name();
validationJob.status = "IN_PROGRESS";
validationJobRepository.save(validationJob);
log.info("Initial validation job id "+ validationJob.id);
int record = 0;
double resultSum = 0;
@ -207,8 +213,6 @@ public class ValidationController {
new OaiPmhRoute("oaipmh://"+baseURL + "?verb=ListRecords&metadataPrefix=" + metadataPrefix ,
profile, validationJob, numberOfRecords, validationIssueRepository, validationResultRepository);
camelContext.addRoutes(oaiPmhRouteBuilder);
//camelContext.start();
//camelContext.stop();
validationJob.status = "COMPLETED";
}
@ -219,7 +223,7 @@ public class ValidationController {
} finally {
validationJob.endDate = new Date();
System.out.println("Final validation job "+ validationJob.hashCode());
log.info("Final validation job "+ validationJob.id);
validationJob.recordsTested = record;
validationJob.score = resultSum / record;
validationJobRepository.save(validationJob);
@ -378,8 +382,9 @@ public class ValidationController {
AbstractOpenAireProfile fairProfile = initializeFairProfile(guidelinesProfileName);
validationJob.guidelines = profile.name();
System.out.println("Initial validation job "+ validationJob + " | " + validationJob.hashCode() + "\n");
validationJobRepository.save(validationJob);
log.info("Initial validation job id "+ validationJob.id);
int record = 0;
double resultSum = 0;
@ -425,19 +430,15 @@ public class ValidationController {
}
catch (Exception e) {
log.error("Validation job stopped unexpectedly." + e.getMessage());
System.out.println("ERROR " + e.getMessage());
validationJob.status = "STOPPED";
} finally {
validationJob.endDate = new Date();
System.out.println("Final validation job "+ validationJob.hashCode());
log.info("Final validation job "+ validationJob.id);
validationJob.recordsTested = record;
validationJob.score = resultSum / record;
validationJobRepository.save(validationJob);
}
//xmlValidationResponse.setRules(resultRules);
//xmlValidationResponse.setFairRules(fairRules);
}

View File

@ -25,7 +25,7 @@ public class ValidationJob {
@Column(name = "records_tested")
public int recordsTested;
@Column(name="duration")
public String status; //stopped, completed, in progres
public String status; //stopped, completed, in progress
@Column(name="score")
public double score;

View File

@ -0,0 +1,4 @@
package eu.dnetlib.validatorapi.processors;
public class StopRouteProcessor {
}

View File

@ -23,35 +23,33 @@ import java.util.Map;
public class XmlProcessor implements Processor {
private final AbstractOpenAireProfile profile;
private final ValidationJob validationJob;
private long processedRecords;
private final ValidationIssueRepository validationIssueRepository;
private final ValidationResultRepository validationResultRepository;
private final long maxNumberOfRecords;
private long processedRecords;
public XmlProcessor(final AbstractOpenAireProfile profile, final ValidationJob validationJob,
final ValidationIssueRepository validationIssueRepository,
final ValidationResultRepository validationResultRepository){
final ValidationResultRepository validationResultRepository,
final long maxNumberOfRecords){
super();
this.profile = profile;
this.validationJob = validationJob;
this.validationIssueRepository = validationIssueRepository;
this.validationResultRepository = validationResultRepository;
this.maxNumberOfRecords = maxNumberOfRecords;
}
@Override
public void process(Exchange exchange) throws Exception {
String recordXml = exchange.getIn().getBody(String.class);
final DocumentBuilderFactory dbf = DocumentBuilderFactory.newInstance();
final DocumentBuilder documentBuilder = dbf.newDocumentBuilder();
Document doc = documentBuilder.parse(new InputSource(new StringReader(recordXml)));
String recordUrl = extractRecordUrl(doc, "identifier");
processedRecords++;
System.out.println("processed " + processedRecords);
exchange.setProperty("totalRecords", processedRecords);
if (profile != null) {
XMLApplicationProfile.ValidationResult validationResult = profile.validate("id", doc); //what id is that?
System.out.println("DC IDENTIFIER: " + recordUrl);
@ -66,100 +64,31 @@ public class XmlProcessor implements Processor {
constructValidationRuleResult(validationRuleResult, validationJob.id, recordUrl,
ruleName, profile, engineResult);
validationResultRepository.save(validationRuleResult);
saveValidationIssues(validationJob.id, recordUrl, ruleName, engineResult);
// System.out.println(validationRuleResult + " | " + validationRuleResult.hashCode() + "\n");
}
}
}
private void saveValidationIssues(int validationJobId, String recordUrl, String ruleName, Guideline.Result engineResult) {
for (String error:engineResult.errors()) {
/*System.out.println("11111");*/
ValidationIssue validationIssue = new ValidationIssue();
validationIssue.validationJobId = validationJobId;
validationIssue.ruleName = ruleName;
validationIssue.recordUrl = recordUrl;
validationIssue.issueType = "ERROR";
validationIssue.issueText = error;
validationIssueRepository.save(validationIssue);
/*
System.out.println(validationIssue);
*/
}
for (String warning: engineResult.warnings()){
/*
System.out.println("22222");
*/
ValidationIssue validationIssue = new ValidationIssue();
validationIssue.validationJobId = validationJobId;
validationIssue.ruleName = ruleName;
validationIssue.recordUrl = recordUrl;
validationIssue.issueType = "WARNING";
validationIssue.issueText = warning;
validationIssueRepository.save(validationIssue);
/*
System.out.println(validationIssue);
*/
}
}
/*public void process2(Exchange exchange) throws Exception {
DocumentBuilderFactory dbf = DocumentBuilderFactory.newInstance();
DocumentBuilder db = dbf.newDocumentBuilder();
String recordXml = exchange.getIn().getBody(String.class);
// System.out.println("\n\n\n\n\n\n\n\n\n\n\n\n");
System.out.println(recordXml);
// System.out.println("\n\n\n\n\n\n\n\n\n\n\n\n");
Document doc = db.parse(new InputSource(new StringReader(recordXml)));
if (profile != null) {
XMLApplicationProfile.ValidationResult validationResult = profile.validate("id", doc); //what id is that?
String recordUrl = extractRecordUrl(doc, "identifier");
System.out.println("DC IDENTIFIER: " + recordUrl);
Map<String, Guideline.Result> results = validationResult.results();
for (Map.Entry entry : results.entrySet()) {
Guideline.Result engineResult = (Guideline.Result) entry.getValue();
// System.out.println("engine result > " + engineResult);
String ruleName = entry.getKey().toString();
// System.out.println("rulename: " + recordUrl);
ValidationRuleResult validationRuleResult = new ValidationRuleResult();
// System.out.println("validationjob.id " + validationJob.id);
// System.out.println("recordUrl " + recordUrl);
// System.out.println("rulename " + ruleName);
// System.out.println("profile " + profile);
// System.out.println("engineResult " + engineResult);
constructValidationRuleResult(validationRuleResult, validationJob.id, recordUrl,
ruleName, profile, engineResult);
// System.out.println("xaxaxa");
// System.out.println(validationRuleResult + " | " + validationRuleResult.hashCode() + "\n");
// resultSum += engineResult.score();
// elements = nodeListToList(recordNodes); // assign the Elements to the variable
saveValidationIssues(validationJob.id, recordUrl, ruleName, engineResult.errors(), "ERROR");
saveValidationIssues(validationJob.id, recordUrl, ruleName, engineResult.warnings(), "WARNING");
}
}
processedRecords++;
System.out.println("processed " + processedRecords);
exchange.setProperty("totalRecords", processedRecords);
}*/
if (processedRecords > maxNumberOfRecords) {
exchange.getIn().setHeader("MyHeader", "stop");
}
}
private void saveValidationIssues(int validationJobId, String recordUrl, String ruleName,
Iterable<String> issues, String issueType) {
for (String issue:issues) {
ValidationIssue validationIssue = new ValidationIssue();
validationIssue.validationJobId = validationJobId;
validationIssue.ruleName = ruleName;
validationIssue.recordUrl = recordUrl;
validationIssue.issueType = issueType;
validationIssue.issueText = issue;
validationIssueRepository.save(validationIssue);
}
}
//TODO consider throwing exception - UTIL class?
private String extractRecordUrl(final Document document, final String xmlField) {

View File

@ -5,6 +5,6 @@ import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.stereotype.Repository;
@Repository
public interface ValidationJobRepository extends JpaRepository<ValidationJob, Long> {
public interface ValidationJobRepository extends JpaRepository<ValidationJob, Integer> {
}

View File

@ -9,6 +9,9 @@ import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
import java.util.Date;
import java.util.concurrent.CountDownLatch;
public class OaiPmhRoute extends RouteBuilder {
private String oaiEndpoint;
@ -18,6 +21,8 @@ public class OaiPmhRoute extends RouteBuilder {
private ValidationJob validationJob;
private final ValidationIssueRepository validationIssueRepository;
private final ValidationResultRepository validationResultRepository;
CountDownLatch latch;
public OaiPmhRoute(String oaiEndpoint, AbstractOpenAireProfile profile, ValidationJob validationJob,
long maxNumberOfRecords, final ValidationIssueRepository validationIssueRepository,
@ -28,21 +33,100 @@ public class OaiPmhRoute extends RouteBuilder {
this.maxNumberOfRecords = maxNumberOfRecords;
this.validationIssueRepository = validationIssueRepository;
this.validationResultRepository = validationResultRepository;
this.latch = latch;
}
@Override
public void configure() throws Exception {
String date = new Date().toString();
from(oaiEndpoint)
.setProperty("totalRecords", constant(0))
.loopDoWhile().simple("${exchangeProperty.totalRecords} < 10")
from(oaiEndpoint).routeId(date)
.split(xpath("//*[local-name()='record']"))
.process(new XmlProcessor(profile, validationJob, validationIssueRepository, validationResultRepository, maxNumberOfRecords))
.choice().when(header("MyHeader").isEqualTo("stop"))
.to("controlbus:route?routeId="+date+"&action=stop")
.endChoice();
/*from("timer://myTimer?fixedRate=true&period=60000")
.routeId("OAIProcessingRoute")
.setHeader("MyHeader", constant("stop"))
.choice()
.when(header("MyHeader").isEqualTo("stop"))
.toD("direct:oaiEndpoint")
.otherwise()
.stop()
.end();
from("timer://myTimer?fixedRate=true&period=600000")
.to(oaiEndpoint)
.log("hello")
.split(xpath("//*[local-name()='record']"))
.log("${body}")
.process(new XmlProcessor(profile, validationJob, validationIssueRepository, validationResultRepository))
.log("????");
/* from("direct:processXML")
//.split(xpath("//*[local-name()='record']"))
.log("${exchangeProperty.totalRecords}")
.process(new XmlProcessor(profile, validationJob, validationIssueRepository, validationResultRepository))
.log("${exchangeProperty.totalRecords}");
String date = new Date().toString();
/*
from(oaiEndpoint)
.process(new XmlProcessor(profile, validationJob, validationIssueRepository, validationResultRepository))
.to("mock:start")
.process(new Processor() {
@Override
public void process(Exchange exchange) throws Exception {
// stop Camel by signalling to the latch
latch.countDown();
}
}).to("mock:done");
*/
/*
from(oaiEndpoint)
.routeId(date)
.setHeader("MyHeader", constant("start"))
.choice()
.when(header("MyHeader").isEqualTo("stop"))
.log("Myheader " +"${header.myHeader}")
.to("controlbus:route?routeId="+date+"&action=stop")
.stop()
.endChoice()
.otherwise()
.process(exchange -> exchange.getIn().setHeader("MyHeader", "stop"))
.log("Trying to make the thread stop! " +"${header.myHeader}")
.endChoice();
*/
/* from(oaiEndpoint)
.setProperty("totalRecords", constant(0))
.loopDoWhile(simple("${exchangeProperty.totalRecords} < 5"))
.log("${exchangeProperty.totalRecords}")
//.split(xpath("//*[local-name()='record']"))
//.log("\n\n\n----------------\n\n\n\n\n\n${body}\n\n\n----------------\n\n\n\n\n\n")
.to("direct:ListRecords")
.end();
from("direct:ListRecords")
.process(new XmlProcessor(profile, validationJob, validationIssueRepository, validationResultRepository));
/* from(oaiEndpoint)
.setProperty("totalRecords", constant(0))
.loopDoWhile(simple("${exchangeProperty.totalRecords} < 10"))
.split(xpath("//*[local-name()='record']")).streaming()
.log("${exchangeProperty.totalRecords}")
// .log("\n\n\n----------------\n\n\n\n\n\n${body}\n\n\n----------------\n\n\n\n\n\n")
.process(new XmlProcessor(profile, validationJob, validationIssueRepository, validationResultRepository))
.end().to("direct:end");
.log("${exchangeProperty.totalRecords}")
.end().to("direct:end");
*/
/* from("oaipmh://http://repositorium.sdum.uminho.pt/oai/request?verb=ListRecords&metadataPrefix=oai_dc")
.setProperty("totalRecords", constant(0))
.loopDoWhile().simple("${exchangeProperty.totalRecords} < 10")
@ -60,7 +144,7 @@ public class OaiPmhRoute extends RouteBuilder {
.process(new XmlProcessor(profile, validationJob))
.end()
.to("direct:end");
**/
/***/
//from(oaiEndpoint).process(new XmlProcessor(profile,validationJob));
// from("direct:oaiRequest").process(new DummyXMLProcessor());
@ -122,8 +206,7 @@ public class OaiPmhRoute extends RouteBuilder {
class RecordCountProcessor implements Processor {
@Override
public void process(Exchange exchange) throws Exception {
System.out.println("RecordCountProcessor");
System.out.println("RecordCountProcessor reads");
int processedRecords = exchange.getProperty("processedRecords", 0, Integer.class);
exchange.setProperty("totalRecords", processedRecords);