package eu.dnetlib.validatorapi.routes; import eu.dnetlib.validator2.validation.guideline.openaire.AbstractOpenAireProfile; import eu.dnetlib.validator2.validation.guideline.openaire.FAIR_Data_GuidelinesProfile; import eu.dnetlib.validatorapi.entities.ValidationIssue; import eu.dnetlib.validatorapi.entities.ValidationJob; import eu.dnetlib.validatorapi.entities.ValidationRuleResult; import eu.dnetlib.validatorapi.processors.DataBaseProcessor; import eu.dnetlib.validatorapi.processors.ExceptionProcessor; import eu.dnetlib.validatorapi.processors.XmlProcessor; import org.apache.camel.Exchange; import org.apache.camel.Processor; import org.apache.camel.TypeConversionException; import org.apache.camel.builder.RouteBuilder; import org.apache.http.client.ClientProtocolException; import java.util.Date; import java.util.UUID; public class FairOaiPmhRoute2 extends RouteBuilder { private String oaiEndpoint; private AbstractOpenAireProfile profile; private long maxNumberOfRecords = 0; private ValidationJob validationJob; String routeid; String routeid2; public FairOaiPmhRoute2(String oaiEndpoint, AbstractOpenAireProfile profile, ValidationJob validationJob, long maxNumberOfRecords, String routeid) { this.oaiEndpoint = oaiEndpoint; this.validationJob = validationJob; this.profile = profile; this.maxNumberOfRecords = maxNumberOfRecords; this.routeid = routeid; this.routeid2 = UUID.randomUUID().toString(); } @Override public void configure() throws Exception { String date = new Date().toString(); onException(TypeConversionException.class) .process(new ExceptionProcessor(validationJob)) .to("jpa:" + ValidationJob.class.getName() + "?useExecuteUpdate=true").log("\n\n\n\nHEREEEEEE") .to("controlbus:route?routeId="+routeid+"&action=stop") .to("controlbus:route?routeId="+routeid2+"&action=stop") .maximumRedeliveries(0) .handled(true) .end(); onException(ClientProtocolException.class) .process(new ExceptionProcessor(validationJob)) .to("jpa:" + ValidationJob.class.getName() + "?useExecuteUpdate=true") .to("controlbus:route?routeId="+routeid+"&action=stop") .to("controlbus:route?routeId="+routeid2+"&action=stop") .maximumRedeliveries(0) .handled(true) .end(); from(oaiEndpoint) .multicast().parallelProcessing() .to("direct:guidelinesProcessor") .to("direct:fairProcessor") .end(); from("direct:guidelinesProcessor") .routeId(routeid) .split(xpath("//*[local-name()='record']")) .process(new XmlProcessor(profile, validationJob, maxNumberOfRecords)) .choice() .when(simple("${body[results]} && ${header.MyHeader} != 'stop'")) .split(simple("${body[results]}")) .to("jpa:" + ValidationRuleResult.class.getName() + "?usePersist=true") .endChoice() .end() .choice() .when(simple("${body[issues]} && ${header.MyHeader} != 'stop'")) .split(simple("${body[issues]}")) .to("jpa:"+ ValidationIssue.class.getName()+ "?usePersist=true") .endChoice() .end() .choice() .when(header("MyHeader").isEqualTo("stop")) .process(new DataBaseProcessor()) .to("jpa:" + ValidationJob.class.getName() + "?useExecuteUpdate=true")//;.to("direct:saveToDatabase") .to("controlbus:route?routeId="+routeid+"&action=stop") .endChoice() .end(); from("direct:fairProcessor") .routeId(routeid2) .split(xpath("//*[local-name()='record']")) .process(new XmlProcessor(new FAIR_Data_GuidelinesProfile(), validationJob, maxNumberOfRecords)) .choice() .when(simple("${body[results]} && ${header.MyHeader} != 'stop'")) .split(simple("${body[results]}")) .to("jpa:" + ValidationRuleResult.class.getName() + "?usePersist=true") .endChoice() .end() .choice() .when(simple("${body[issues]} && ${header.MyHeader} != 'stop'")) .split(simple("${body[issues]}")) .to("jpa:"+ ValidationIssue.class.getName()+ "?usePersist=true") .endChoice() .end() .choice() .when(header("MyHeader").isEqualTo("stop")) .process(new DataBaseProcessor()) .to("jpa:" + ValidationJob.class.getName() + "?useExecuteUpdate=true")//;.to("direct:saveToDatabase") .to("controlbus:route?routeId="+routeid2+"&action=stop") .endChoice() .end(); /* from("direct:saveToDatabase") .routeId(routeid2) .process(new DataBaseProcessor()) .to("jpa:" + ValidationJob.class.getName() + "?useExecuteUpdate=true"); */ /*from("timer://myTimer?fixedRate=true&period=60000") .routeId("OAIProcessingRoute") .setHeader("MyHeader", constant("stop")) .choice() .when(header("MyHeader").isEqualTo("stop")) .toD("direct:oaiEndpoint") .otherwise() .stop() .end(); from("timer://myTimer?fixedRate=true&period=600000") .to(oaiEndpoint) .log("hello") .split(xpath("//*[local-name()='record']")) .log("${body}") .process(new XmlProcessor(profile, validationJob, validationIssueRepository, validationResultRepository)) .log("????"); /* from("direct:processXML") //.split(xpath("//*[local-name()='record']")) .log("${exchangeProperty.totalRecords}") .process(new XmlProcessor(profile, validationJob, validationIssueRepository, validationResultRepository)) .log("${exchangeProperty.totalRecords}"); String date = new Date().toString(); /* from(oaiEndpoint) .process(new XmlProcessor(profile, validationJob, validationIssueRepository, validationResultRepository)) .to("mock:start") .process(new Processor() { @Override public void process(Exchange exchange) throws Exception { // stop Camel by signalling to the latch latch.countDown(); } }).to("mock:done"); */ /* from(oaiEndpoint) .routeId(date) .setHeader("MyHeader", constant("start")) .choice() .when(header("MyHeader").isEqualTo("stop")) .log("Myheader " +"${header.myHeader}") .to("controlbus:route?routeId="+date+"&action=stop") .stop() .endChoice() .otherwise() .process(exchange -> exchange.getIn().setHeader("MyHeader", "stop")) .log("Trying to make the thread stop! " +"${header.myHeader}") .endChoice(); */ /* from(oaiEndpoint) .setProperty("totalRecords", constant(0)) .loopDoWhile(simple("${exchangeProperty.totalRecords} < 5")) .log("${exchangeProperty.totalRecords}") //.split(xpath("//*[local-name()='record']")) //.log("\n\n\n----------------\n\n\n\n\n\n${body}\n\n\n----------------\n\n\n\n\n\n") .to("direct:ListRecords") .end(); from("direct:ListRecords") .process(new XmlProcessor(profile, validationJob, validationIssueRepository, validationResultRepository)); /* from(oaiEndpoint) .setProperty("totalRecords", constant(0)) .loopDoWhile(simple("${exchangeProperty.totalRecords} < 10")) .split(xpath("//*[local-name()='record']")).streaming() .log("${exchangeProperty.totalRecords}") // .log("\n\n\n----------------\n\n\n\n\n\n${body}\n\n\n----------------\n\n\n\n\n\n") .process(new XmlProcessor(profile, validationJob, validationIssueRepository, validationResultRepository)) .log("${exchangeProperty.totalRecords}") .end().to("direct:end"); */ /* from("oaipmh://http://repositorium.sdum.uminho.pt/oai/request?verb=ListRecords&metadataPrefix=oai_dc") .setProperty("totalRecords", constant(0)) .loopDoWhile().simple("${exchangeProperty.totalRecords} < 10") .split(xpath("//*[local-name()='record']")) //.log("\n\n\n----------------\n\n\n\n\n\n${body}\n\n\n----------------\n\n\n\n\n\n") .process(new XmlProcessor(profile, validationJob)) .end().to("direct:end");*/ /*** kai auto from(oaiEndpoint) // trigger the route every minute .split(xpath("//*[local-name()='record']")) .setProperty("totalRecords", constant(0)) .loopDoWhile().simple("${exchangeProperty.totalRecords} < 50") .process(new XmlProcessor(profile, validationJob)) .end() .to("direct:end"); /***/ //from(oaiEndpoint).process(new XmlProcessor(profile,validationJob)); // from("direct:oaiRequest").process(new DummyXMLProcessor()); /*** AUTO PAIZEI !!!!!! ***/ /* from(oaiEndpoint). split(xpath("//*[local-name()='record']")).process(new XmlProcessor(profile, validationJob)); */ /*** KAI AUTO !!!!!! ***/ /* from("timer://myTimer?fixedRate=true&period=60000") // trigger the route every minute .setProperty("totalRecords", constant(0)) .loopDoWhile().simple("${exchangeProperty.totalRecords} < 50") .to("direct:oaiRequest") .process(new RecordCountProcessor()) .end() .to("direct:end"); from("direct:oaiRequest"). process(exchange -> { int counter = exchange.getProperty("totalRecords", 0, Integer.class); System.out.println("Processing iteration: " + exchange.getProperty("totalRecords", 0, Integer.class)); counter++; System.out.println("counter ++" + counter); exchange.setProperty("processedRecords", counter); System.out.println("Processing iteration: " + exchange.getProperty("totalRecords", 0, Integer.class)); }); */ /** KAI AUTO ***/ /* from("timer://myTimer?fixedRate=true&period=60000") // trigger the route every minute .setProperty("totalRecords", constant(0)) .loopDoWhile().simple("${exchangeProperty.totalRecords} < 50") .to("direct:oaiRequest") .process(new RecordCountProcessor()) .end() .to("direct:end"); from("direct:oaiRequest"). process(exchange -> { int counter = exchange.getProperty("totalRecords", 0, Integer.class); System.out.println("Processing iteration: " + exchange.getProperty("totalRecords", 0, Integer.class)); counter++; System.out.println("counter ++" + counter); exchange.setProperty("processedRecords", counter); System.out.println("Processing iteration: " + exchange.getProperty("totalRecords", 0, Integer.class)); }); */ } class RecordCountProcessor implements Processor { @Override public void process(Exchange exchange) throws Exception { System.out.println("RecordCountProcessor reads"); int processedRecords = exchange.getProperty("processedRecords", 0, Integer.class); exchange.setProperty("totalRecords", processedRecords); } }}