diff --git a/src/main/java/eu/dnetlib/validatorapi/controllers/ValidationController.java b/src/main/java/eu/dnetlib/validatorapi/controllers/ValidationController.java index 2ef0bc9..c3d2087 100644 --- a/src/main/java/eu/dnetlib/validatorapi/controllers/ValidationController.java +++ b/src/main/java/eu/dnetlib/validatorapi/controllers/ValidationController.java @@ -9,7 +9,7 @@ import eu.dnetlib.validatorapi.entities.ValidationRuleResult; import eu.dnetlib.validatorapi.repositories.ValidationIssueRepository; import eu.dnetlib.validatorapi.repositories.ValidationJobRepository; import eu.dnetlib.validatorapi.repositories.ValidationResultRepository; -import eu.dnetlib.validatorapi.routes.OaiPmhRoute; +import eu.dnetlib.validatorapi.routes.OaiPmhRoute2; import org.apache.camel.CamelContext; import org.apache.camel.ProducerTemplate; import org.apache.camel.builder.RouteBuilder; @@ -40,10 +40,7 @@ import java.io.StringReader; import java.io.StringWriter; import java.net.HttpURLConnection; import java.net.URL; -import java.util.ArrayList; -import java.util.Date; -import java.util.List; -import java.util.Map; +import java.util.*; @RestController @CrossOrigin(origins = "*") @@ -211,9 +208,13 @@ public class ValidationController { DocumentBuilder db = dbf.newDocumentBuilder(); + UUID uuid = UUID.randomUUID(); + System.out.println("\n\n\n " + uuid); RouteBuilder oaiPmhRouteBuilder = - new OaiPmhRoute("oaipmh://"+baseURL + "?verb=ListRecords&metadataPrefix=" + metadataPrefix , - profile, validationJob, numberOfRecords); + new OaiPmhRoute2("oaipmh://"+baseURL + "?verb=ListRecords&metadataPrefix=" + metadataPrefix , + profile, validationJob, numberOfRecords, uuid.toString()); + + /* RouteBuilder oaiPmhRouteBuilder = @@ -520,4 +521,26 @@ public class ValidationController { return records; } + + @RequestMapping(value = {"/test"}, method = RequestMethod.GET) + public void test(@RequestParam(name = "guidelines") String guidelinesProfileName, + @RequestParam(name = "baseUrl", defaultValue = "http://repositorium.sdum.uminho.pt/oai/request") String baseURL //not in use now + //@RequestParam(name="metadataPrefix", defaultValue = "oai_dc") String metadataPrefix + ) { + + AbstractOpenAireProfile profile = initializeOpenAireProfile(guidelinesProfileName); + AbstractOpenAireProfile fairProfile = initializeFairProfile(guidelinesProfileName); + String metadataPrefix = initializeMetadataPrefix(guidelinesProfileName); + + String endpoint = "oaipmh://"+baseURL + "?verb=ListRecords&metadataPrefix=" + metadataPrefix; + ProducerTemplate producerTemplate = camelContext.createProducerTemplate(); +// String response = producerTemplate.requestBodyAndHeader("direct:oaipmhRequest", null, "endpoint", endpoint, String.class); + + + + producerTemplate.sendBody("direct:startProcessing", endpoint); + + //String response = camelContext.createProducerTemplate().requestBody("direct:oaipmhRequest", endpoint, String.class); + } + } diff --git a/src/main/java/eu/dnetlib/validatorapi/routes/OaiPmhRoute.java b/src/main/java/eu/dnetlib/validatorapi/routes/OaiPmhRoute.java index 12cd916..c1665af 100644 --- a/src/main/java/eu/dnetlib/validatorapi/routes/OaiPmhRoute.java +++ b/src/main/java/eu/dnetlib/validatorapi/routes/OaiPmhRoute.java @@ -11,6 +11,7 @@ import org.apache.camel.Processor; import org.apache.camel.builder.RouteBuilder; import java.util.Date; +import java.util.UUID; import java.util.concurrent.CountDownLatch; public class OaiPmhRoute extends RouteBuilder { @@ -20,21 +21,20 @@ public class OaiPmhRoute extends RouteBuilder { private AbstractOpenAireProfile profile; private long maxNumberOfRecords = 0; private ValidationJob validationJob; - //private final ValidationJobRepository validationJobRepository; - //private final ValidationIssueRepository validationIssueRepository; - //private final ValidationResultRepository validationResultRepository; - CountDownLatch latch; + CountDownLatch latch; + String routeid; + String routeid2; public OaiPmhRoute(String oaiEndpoint, AbstractOpenAireProfile profile, ValidationJob validationJob, - long maxNumberOfRecords) { + long maxNumberOfRecords, String routeid) { this.oaiEndpoint = oaiEndpoint; this.validationJob = validationJob; this.profile = profile; this.maxNumberOfRecords = maxNumberOfRecords; - // this.validationJobRepository = validationJobRepository; - // this.validationIssueRepository = validationIssueRepository; - // this.validationResultRepository = validationResultRepository; + this.routeid = routeid; + this.routeid2 = UUID.randomUUID().toString(); + this.latch = latch; } @@ -44,35 +44,35 @@ public class OaiPmhRoute extends RouteBuilder { String date = new Date().toString(); from(oaiEndpoint) - .routeId(date) - .split(xpath("//*[local-name()='record']")) - .process(new XmlProcessor(profile, validationJob, maxNumberOfRecords)) - .choice() - .when(simple("${body[issues]} && ${header.MyHeader} != 'stop'")) - .log("HERE!!!") - .split(simple("${body[issues]}")) - .to("jpa:"+ ValidationRuleResult.class.getName()+ "?usePersist=true") - .endChoice() - .end() - .choice() - .when(simple("${body[results]} && ${header.MyHeader} != 'stop'")) - .log("THERE") - .split(simple("${body[results]}")) - .to("jpa:" + ValidationIssue.class.getName() + "?usePersist=true") - .endChoice() - .end() - .choice() - .when(header("MyHeader").isEqualTo("stop")) - .to("direct:saveToDatabase") - .to("controlbus:route?routeId="+date+"&action=stop") - .endChoice() - .end(); - + .routeId(routeid) + .split(xpath("//*[local-name()='record']")) + .process(new XmlProcessor(profile, validationJob, maxNumberOfRecords)) + .choice() + .when(simple("${body[results]} && ${header.MyHeader} != 'stop'")) + .split(simple("${body[results]}")) + .to("jpa:" + ValidationRuleResult.class.getName() + "?usePersist=true") + .endChoice() + .end() + .choice() + .when(simple("${body[issues]} && ${header.MyHeader} != 'stop'")) + .split(simple("${body[issues]}")) + .to("jpa:"+ ValidationIssue.class.getName()+ "?usePersist=true") + .endChoice() + .end() + .choice() + .when(header("MyHeader").isEqualTo("stop")) + .process(new DataBaseProcessor()) + .to("jpa:" + ValidationJob.class.getName() + "?useExecuteUpdate=true")//;.to("direct:saveToDatabase") + .to("controlbus:route?routeId="+routeid+"&action=stop") + .endChoice() + .end(); +/* from("direct:saveToDatabase") + .routeId(routeid2) .process(new DataBaseProcessor()) .to("jpa:" + ValidationJob.class.getName() + "?useExecuteUpdate=true"); - +*/ /*from("timer://myTimer?fixedRate=true&period=60000") .routeId("OAIProcessingRoute") .setHeader("MyHeader", constant("stop")) diff --git a/src/main/java/eu/dnetlib/validatorapi/routes/OaiPmhRoute2.java b/src/main/java/eu/dnetlib/validatorapi/routes/OaiPmhRoute2.java new file mode 100644 index 0000000..9589b95 --- /dev/null +++ b/src/main/java/eu/dnetlib/validatorapi/routes/OaiPmhRoute2.java @@ -0,0 +1,270 @@ +package eu.dnetlib.validatorapi.routes; + +import eu.dnetlib.validator2.validation.guideline.openaire.AbstractOpenAireProfile; +import eu.dnetlib.validator2.validation.guideline.openaire.FAIR_Data_GuidelinesProfile; +import eu.dnetlib.validatorapi.entities.ValidationIssue; +import eu.dnetlib.validatorapi.entities.ValidationJob; +import eu.dnetlib.validatorapi.entities.ValidationRuleResult; +import eu.dnetlib.validatorapi.processors.DataBaseProcessor; +import eu.dnetlib.validatorapi.processors.XmlProcessor; +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.builder.RouteBuilder; + +import java.util.Date; +import java.util.UUID; +import java.util.concurrent.CountDownLatch; + +public class OaiPmhRoute2 extends RouteBuilder { + + private String oaiEndpoint; + + private AbstractOpenAireProfile profile; + private long maxNumberOfRecords = 0; + private ValidationJob validationJob; + CountDownLatch latch; + String routeid; + String routeid2; + + + public OaiPmhRoute2(String oaiEndpoint, AbstractOpenAireProfile profile, ValidationJob validationJob, + long maxNumberOfRecords, String routeid) { + this.oaiEndpoint = oaiEndpoint; + this.validationJob = validationJob; + this.profile = profile; + this.maxNumberOfRecords = maxNumberOfRecords; + this.routeid = routeid; + this.routeid2 = UUID.randomUUID().toString(); + + this.latch = latch; + } + + @Override + public void configure() throws Exception { + + String date = new Date().toString(); + + from(oaiEndpoint) + .multicast().parallelProcessing() + .to("direct:guidelinesProcessor") + .to("direct:fairProcessor") + .end(); + + from("direct:guidelinesProcessor") + .routeId(routeid) + .split(xpath("//*[local-name()='record']")) + .process(new XmlProcessor(profile, validationJob, maxNumberOfRecords)) + .choice() + .when(simple("${body[results]} && ${header.MyHeader} != 'stop'")) + .split(simple("${body[results]}")) + .to("jpa:" + ValidationRuleResult.class.getName() + "?usePersist=true") + .endChoice() + .end() + .choice() + .when(simple("${body[issues]} && ${header.MyHeader} != 'stop'")) + .split(simple("${body[issues]}")) + .to("jpa:"+ ValidationIssue.class.getName()+ "?usePersist=true") + .endChoice() + .end() + .choice() + .when(header("MyHeader").isEqualTo("stop")) + .process(new DataBaseProcessor()) + .to("jpa:" + ValidationJob.class.getName() + "?useExecuteUpdate=true")//;.to("direct:saveToDatabase") + .to("controlbus:route?routeId="+routeid+"&action=stop") + .endChoice() + .end(); + + from("direct:fairProcessor") + .routeId(routeid2) + .split(xpath("//*[local-name()='record']")) + .process(new XmlProcessor(new FAIR_Data_GuidelinesProfile(), validationJob, maxNumberOfRecords)) + .choice() + .when(simple("${body[results]} && ${header.MyHeader} != 'stop'")) + .split(simple("${body[results]}")) + .to("jpa:" + ValidationRuleResult.class.getName() + "?usePersist=true") + .endChoice() + .end() + .choice() + .when(simple("${body[issues]} && ${header.MyHeader} != 'stop'")) + .split(simple("${body[issues]}")) + .to("jpa:"+ ValidationIssue.class.getName()+ "?usePersist=true") + .endChoice() + .end() + .choice() + .when(header("MyHeader").isEqualTo("stop")) + .process(new DataBaseProcessor()) + .to("jpa:" + ValidationJob.class.getName() + "?useExecuteUpdate=true")//;.to("direct:saveToDatabase") + .to("controlbus:route?routeId="+routeid2+"&action=stop") + .endChoice() + .end(); + +/* + from("direct:saveToDatabase") + .routeId(routeid2) + .process(new DataBaseProcessor()) + .to("jpa:" + ValidationJob.class.getName() + "?useExecuteUpdate=true"); +*/ + /*from("timer://myTimer?fixedRate=true&period=60000") + .routeId("OAIProcessingRoute") + .setHeader("MyHeader", constant("stop")) + .choice() + .when(header("MyHeader").isEqualTo("stop")) + .toD("direct:oaiEndpoint") + .otherwise() + .stop() + .end(); + + from("timer://myTimer?fixedRate=true&period=600000") + .to(oaiEndpoint) + .log("hello") + .split(xpath("//*[local-name()='record']")) + .log("${body}") + .process(new XmlProcessor(profile, validationJob, validationIssueRepository, validationResultRepository)) + .log("????"); + +/* from("direct:processXML") + //.split(xpath("//*[local-name()='record']")) + .log("${exchangeProperty.totalRecords}") + .process(new XmlProcessor(profile, validationJob, validationIssueRepository, validationResultRepository)) + .log("${exchangeProperty.totalRecords}"); + + + + String date = new Date().toString(); +/* + + from(oaiEndpoint) + .process(new XmlProcessor(profile, validationJob, validationIssueRepository, validationResultRepository)) + .to("mock:start") + .process(new Processor() { + @Override + public void process(Exchange exchange) throws Exception { + // stop Camel by signalling to the latch + latch.countDown(); + } + }).to("mock:done"); + +*/ +/* + from(oaiEndpoint) + .routeId(date) + .setHeader("MyHeader", constant("start")) + .choice() + .when(header("MyHeader").isEqualTo("stop")) + .log("Myheader " +"${header.myHeader}") + + .to("controlbus:route?routeId="+date+"&action=stop") + .stop() + .endChoice() + .otherwise() + .process(exchange -> exchange.getIn().setHeader("MyHeader", "stop")) + .log("Trying to make the thread stop! " +"${header.myHeader}") + .endChoice(); + */ + + /* from(oaiEndpoint) + .setProperty("totalRecords", constant(0)) + .loopDoWhile(simple("${exchangeProperty.totalRecords} < 5")) + .log("${exchangeProperty.totalRecords}") + //.split(xpath("//*[local-name()='record']")) + //.log("\n\n\n----------------\n\n\n\n\n\n${body}\n\n\n----------------\n\n\n\n\n\n") + .to("direct:ListRecords") + .end(); + + from("direct:ListRecords") + .process(new XmlProcessor(profile, validationJob, validationIssueRepository, validationResultRepository)); + /* from(oaiEndpoint) + .setProperty("totalRecords", constant(0)) + .loopDoWhile(simple("${exchangeProperty.totalRecords} < 10")) + .split(xpath("//*[local-name()='record']")).streaming() + .log("${exchangeProperty.totalRecords}") + // .log("\n\n\n----------------\n\n\n\n\n\n${body}\n\n\n----------------\n\n\n\n\n\n") + .process(new XmlProcessor(profile, validationJob, validationIssueRepository, validationResultRepository)) + .log("${exchangeProperty.totalRecords}") + .end().to("direct:end"); +*/ + /* from("oaipmh://http://repositorium.sdum.uminho.pt/oai/request?verb=ListRecords&metadataPrefix=oai_dc") + .setProperty("totalRecords", constant(0)) + .loopDoWhile().simple("${exchangeProperty.totalRecords} < 10") + .split(xpath("//*[local-name()='record']")) + //.log("\n\n\n----------------\n\n\n\n\n\n${body}\n\n\n----------------\n\n\n\n\n\n") + .process(new XmlProcessor(profile, validationJob)) + .end().to("direct:end");*/ + + + +/*** kai auto from(oaiEndpoint) // trigger the route every minute + .split(xpath("//*[local-name()='record']")) + .setProperty("totalRecords", constant(0)) + .loopDoWhile().simple("${exchangeProperty.totalRecords} < 50") + .process(new XmlProcessor(profile, validationJob)) + .end() + .to("direct:end"); +/***/ + //from(oaiEndpoint).process(new XmlProcessor(profile,validationJob)); + + // from("direct:oaiRequest").process(new DummyXMLProcessor()); + + + + + /*** AUTO PAIZEI !!!!!! ***/ +/* from(oaiEndpoint). + split(xpath("//*[local-name()='record']")).process(new XmlProcessor(profile, validationJob)); +*/ + + /*** KAI AUTO !!!!!! ***/ + /* + from("timer://myTimer?fixedRate=true&period=60000") // trigger the route every minute + .setProperty("totalRecords", constant(0)) + .loopDoWhile().simple("${exchangeProperty.totalRecords} < 50") + .to("direct:oaiRequest") + .process(new RecordCountProcessor()) + .end() + .to("direct:end"); + + from("direct:oaiRequest"). + process(exchange -> { + int counter = exchange.getProperty("totalRecords", 0, Integer.class); + System.out.println("Processing iteration: " + exchange.getProperty("totalRecords", 0, Integer.class)); + counter++; + System.out.println("counter ++" + counter); + exchange.setProperty("processedRecords", counter); + System.out.println("Processing iteration: " + exchange.getProperty("totalRecords", 0, Integer.class)); + + }); + */ + + /** KAI AUTO ***/ + /* + from("timer://myTimer?fixedRate=true&period=60000") // trigger the route every minute + .setProperty("totalRecords", constant(0)) + .loopDoWhile().simple("${exchangeProperty.totalRecords} < 50") + .to("direct:oaiRequest") + .process(new RecordCountProcessor()) + .end() + .to("direct:end"); + + from("direct:oaiRequest"). + process(exchange -> { + int counter = exchange.getProperty("totalRecords", 0, Integer.class); + System.out.println("Processing iteration: " + exchange.getProperty("totalRecords", 0, Integer.class)); + counter++; + System.out.println("counter ++" + counter); + exchange.setProperty("processedRecords", counter); + System.out.println("Processing iteration: " + exchange.getProperty("totalRecords", 0, Integer.class)); + + }); + */ + } + + + class RecordCountProcessor implements Processor { + @Override + public void process(Exchange exchange) throws Exception { + System.out.println("RecordCountProcessor reads"); + int processedRecords = exchange.getProperty("processedRecords", 0, Integer.class); + exchange.setProperty("totalRecords", processedRecords); + + } + }} \ No newline at end of file