uoa-validator-api/src/main/java/eu/dnetlib/validatorapi/routes/SimpleOaiPmhRoute.java

264 lines
11 KiB
Java

package eu.dnetlib.validatorapi.routes;
import eu.dnetlib.validator2.validation.guideline.openaire.AbstractOpenAireProfile;
import eu.dnetlib.validatorapi.entities.ValidationIssue;
import eu.dnetlib.validatorapi.entities.ValidationJob;
import eu.dnetlib.validatorapi.entities.ValidationRuleResult;
import eu.dnetlib.validatorapi.processors.DataBaseProcessor;
import eu.dnetlib.validatorapi.processors.ErrorProcessor;
import eu.dnetlib.validatorapi.processors.ExceptionProcessor;
import eu.dnetlib.validatorapi.processors.XmlProcessor;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.TypeConversionException;
import org.apache.camel.builder.RouteBuilder;
import org.apache.http.client.ClientProtocolException;
import java.util.Date;
public class SimpleOaiPmhRoute extends RouteBuilder {
private String oaiEndpoint;
private AbstractOpenAireProfile profile;
private long maxNumberOfRecords = 0;
private ValidationJob validationJob;
String routeid;
public SimpleOaiPmhRoute(String oaiEndpoint, AbstractOpenAireProfile profile, ValidationJob validationJob,
long maxNumberOfRecords, String routeid) {
this.oaiEndpoint = oaiEndpoint;
this.validationJob = validationJob;
this.profile = profile;
this.maxNumberOfRecords = maxNumberOfRecords;
this.routeid = routeid;
}
@Override
public void configure() throws Exception {
String date = new Date().toString();
onException(TypeConversionException.class)
.process(new ExceptionProcessor(validationJob))
.to("jpa:" + ValidationJob.class.getName() + "?useExecuteUpdate=true").log("\n\n\n\nHEREEEEEE")
.to("controlbus:route?routeId="+routeid+"&action=stop")
.maximumRedeliveries(0)
.handled(true)
.end();
onException(ClientProtocolException.class)
.process(new ExceptionProcessor(validationJob))
.to("jpa:" + ValidationJob.class.getName() + "?useExecuteUpdate=true")
.to("controlbus:route?routeId="+routeid+"&action=stop")
.maximumRedeliveries(0)
.handled(true)
.end();
from(oaiEndpoint)
.routeId(routeid)
.choice()
.when(xpath("//*[local-name()='record']"))
.multicast().parallelProcessing()
.to("direct:oaipmhProcessor")
.endChoice()
.otherwise()
.process(new ErrorProcessor(validationJob))
.to("jpa:" + ValidationJob.class.getName() + "?useExecuteUpdate=true")
.end();
from("direct:oaiPmhProcessor")
.split(xpath("//*[local-name()='record']"))
.process(new XmlProcessor(profile, validationJob, maxNumberOfRecords))
.choice()
.when(simple("${body[results]} && ${header.MyHeader} != 'stop'"))
.split(simple("${body[results]}"))
.to("jpa:" + ValidationRuleResult.class.getName() + "?usePersist=true")
.endChoice()
.end()
.choice()
.when(simple("${body[issues]} && ${header.MyHeader} != 'stop'"))
.split(simple("${body[issues]}"))
.to("jpa:"+ ValidationIssue.class.getName()+ "?usePersist=true")
.endChoice()
.end()
.choice()
.when(header("MyHeader").isEqualTo("stop"))
.process(new DataBaseProcessor())
.to("jpa:" + ValidationJob.class.getName() + "?useExecuteUpdate=true")//;.to("direct:saveToDatabase")
.to("controlbus:route?routeId="+routeid+"&action=stop")
.endChoice()
.end();
/*
from("direct:saveToDatabase")
.routeId(routeid2)
.process(new DataBaseProcessor())
.to("jpa:" + ValidationJob.class.getName() + "?useExecuteUpdate=true");
*/
/*from("timer://myTimer?fixedRate=true&period=60000")
.routeId("OAIProcessingRoute")
.setHeader("MyHeader", constant("stop"))
.choice()
.when(header("MyHeader").isEqualTo("stop"))
.toD("direct:oaiEndpoint")
.otherwise()
.stop()
.end();
from("timer://myTimer?fixedRate=true&period=600000")
.to(oaiEndpoint)
.log("hello")
.split(xpath("//*[local-name()='record']"))
.log("${body}")
.process(new XmlProcessor(profile, validationJob, validationIssueRepository, validationResultRepository))
.log("????");
/* from("direct:processXML")
//.split(xpath("//*[local-name()='record']"))
.log("${exchangeProperty.totalRecords}")
.process(new XmlProcessor(profile, validationJob, validationIssueRepository, validationResultRepository))
.log("${exchangeProperty.totalRecords}");
String date = new Date().toString();
/*
from(oaiEndpoint)
.process(new XmlProcessor(profile, validationJob, validationIssueRepository, validationResultRepository))
.to("mock:start")
.process(new Processor() {
@Override
public void process(Exchange exchange) throws Exception {
// stop Camel by signalling to the latch
latch.countDown();
}
}).to("mock:done");
*/
/*
from(oaiEndpoint)
.routeId(date)
.setHeader("MyHeader", constant("start"))
.choice()
.when(header("MyHeader").isEqualTo("stop"))
.log("Myheader " +"${header.myHeader}")
.to("controlbus:route?routeId="+date+"&action=stop")
.stop()
.endChoice()
.otherwise()
.process(exchange -> exchange.getIn().setHeader("MyHeader", "stop"))
.log("Trying to make the thread stop! " +"${header.myHeader}")
.endChoice();
*/
/* from(oaiEndpoint)
.setProperty("totalRecords", constant(0))
.loopDoWhile(simple("${exchangeProperty.totalRecords} < 5"))
.log("${exchangeProperty.totalRecords}")
//.split(xpath("//*[local-name()='record']"))
//.log("\n\n\n----------------\n\n\n\n\n\n${body}\n\n\n----------------\n\n\n\n\n\n")
.to("direct:ListRecords")
.end();
from("direct:ListRecords")
.process(new XmlProcessor(profile, validationJob, validationIssueRepository, validationResultRepository));
/* from(oaiEndpoint)
.setProperty("totalRecords", constant(0))
.loopDoWhile(simple("${exchangeProperty.totalRecords} < 10"))
.split(xpath("//*[local-name()='record']")).streaming()
.log("${exchangeProperty.totalRecords}")
// .log("\n\n\n----------------\n\n\n\n\n\n${body}\n\n\n----------------\n\n\n\n\n\n")
.process(new XmlProcessor(profile, validationJob, validationIssueRepository, validationResultRepository))
.log("${exchangeProperty.totalRecords}")
.end().to("direct:end");
*/
/* from("oaipmh://http://repositorium.sdum.uminho.pt/oai/request?verb=ListRecords&metadataPrefix=oai_dc")
.setProperty("totalRecords", constant(0))
.loopDoWhile().simple("${exchangeProperty.totalRecords} < 10")
.split(xpath("//*[local-name()='record']"))
//.log("\n\n\n----------------\n\n\n\n\n\n${body}\n\n\n----------------\n\n\n\n\n\n")
.process(new XmlProcessor(profile, validationJob))
.end().to("direct:end");*/
/*** kai auto from(oaiEndpoint) // trigger the route every minute
.split(xpath("//*[local-name()='record']"))
.setProperty("totalRecords", constant(0))
.loopDoWhile().simple("${exchangeProperty.totalRecords} < 50")
.process(new XmlProcessor(profile, validationJob))
.end()
.to("direct:end");
/***/
//from(oaiEndpoint).process(new XmlProcessor(profile,validationJob));
// from("direct:oaiRequest").process(new DummyXMLProcessor());
/*** AUTO PAIZEI !!!!!! ***/
/* from(oaiEndpoint).
split(xpath("//*[local-name()='record']")).process(new XmlProcessor(profile, validationJob));
*/
/*** KAI AUTO !!!!!! ***/
/*
from("timer://myTimer?fixedRate=true&period=60000") // trigger the route every minute
.setProperty("totalRecords", constant(0))
.loopDoWhile().simple("${exchangeProperty.totalRecords} < 50")
.to("direct:oaiRequest")
.process(new RecordCountProcessor())
.end()
.to("direct:end");
from("direct:oaiRequest").
process(exchange -> {
int counter = exchange.getProperty("totalRecords", 0, Integer.class);
System.out.println("Processing iteration: " + exchange.getProperty("totalRecords", 0, Integer.class));
counter++;
System.out.println("counter ++" + counter);
exchange.setProperty("processedRecords", counter);
System.out.println("Processing iteration: " + exchange.getProperty("totalRecords", 0, Integer.class));
});
*/
/** KAI AUTO ***/
/*
from("timer://myTimer?fixedRate=true&period=60000") // trigger the route every minute
.setProperty("totalRecords", constant(0))
.loopDoWhile().simple("${exchangeProperty.totalRecords} < 50")
.to("direct:oaiRequest")
.process(new RecordCountProcessor())
.end()
.to("direct:end");
from("direct:oaiRequest").
process(exchange -> {
int counter = exchange.getProperty("totalRecords", 0, Integer.class);
System.out.println("Processing iteration: " + exchange.getProperty("totalRecords", 0, Integer.class));
counter++;
System.out.println("counter ++" + counter);
exchange.setProperty("processedRecords", counter);
System.out.println("Processing iteration: " + exchange.getProperty("totalRecords", 0, Integer.class));
});
*/
}
class RecordCountProcessor implements Processor {
@Override
public void process(Exchange exchange) throws Exception {
System.out.println("RecordCountProcessor reads");
int processedRecords = exchange.getProperty("processedRecords", 0, Integer.class);
exchange.setProperty("totalRecords", processedRecords);
}
}}