Fixed problem with stoping routes. There is still a warning, but no Exception

This commit is contained in:
Katerina 2023-09-22 14:17:27 +03:00
parent 2454c900aa
commit c5de380a11
3 changed files with 153 additions and 71 deletions

View File

@ -13,8 +13,10 @@ import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.TypeConversionException;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.impl.engine.DefaultShutdownStrategy;
import org.apache.http.client.ClientProtocolException;
import java.net.UnknownHostException;
import java.util.Date;
import java.util.UUID;
@ -43,6 +45,11 @@ public class FairOaiPmhRoute2 extends RouteBuilder {
public void configure() throws Exception {
String date = new Date().toString();
// Access the DefaultShutdownStrategy
DefaultShutdownStrategy shutdownStrategy = (DefaultShutdownStrategy) getContext().getShutdownStrategy();
// Set the shutdown timeout in milliseconds (e.g., 10 seconds)
shutdownStrategy.setTimeout(1);
onException(TypeConversionException.class)
.process(new ExceptionProcessor(validationJob))
.to("jpa:" + ValidationJob.class.getName() + "?useExecuteUpdate=true").log("\n\n\n\nHEREEEEEE")
@ -61,63 +68,87 @@ public class FairOaiPmhRoute2 extends RouteBuilder {
.handled(true)
.end();
onException(UnknownHostException.class)
.process(new ExceptionProcessor(validationJob))
.to("jpa:" + ValidationJob.class.getName() + "?useExecuteUpdate=true")
.to("controlbus:route?routeId="+routeid+"&action=stop")
.to("controlbus:route?routeId="+routeid2+"&action=stop")
.maximumRedeliveries(0)
.handled(true)
.end();
onException(ClientProtocolException.class)
.process(new ExceptionProcessor(validationJob))
.to("jpa:" + ValidationJob.class.getName() + "?useExecuteUpdate=true")
.to("controlbus:route?routeId="+routeid+"&action=stop")
.to("controlbus:route?routeId="+routeid2+"&action=stop")
.maximumRedeliveries(0)
.handled(true)
.end();
from(oaiEndpoint)
.routeId("1")
.choice()
.when(xpath("//*[local-name()='record']"))
.multicast().parallelProcessing()
.to("direct:guidelinesProcessor")
.to("direct:fairProcessor")
.to("controlbus:route?routeId=1&action=stop&async=true")
.endChoice()
.otherwise()
.process(new ErrorProcessor(validationJob))
.to("jpa:" + ValidationJob.class.getName() + "?useExecuteUpdate=true")
.endChoice()
.end();
from("direct:guidelinesProcessor")
.routeId(routeid)
.process(new XmlProcessor(profile, validationJob, maxNumberOfRecords))
.choice()
.when(simple("${body[results]} && ${header.MyHeader} != 'stop'"))
.split(simple("${body[results]}"))
.to("jpa:" + ValidationRuleResult.class.getName() + "?usePersist=true")
.endChoice()
.end()
.choice()
.when(simple("${body[issues]} && ${header.MyHeader} != 'stop'"))
.split(simple("${body[issues]}"))
.to("jpa:"+ ValidationIssue.class.getName()+ "?usePersist=true")
.endChoice()
.end()
.choice()
.when(header("MyHeader").isEqualTo("stop"))
.process(new DataBaseProcessor())
.to("jpa:" + ValidationJob.class.getName() + "?useExecuteUpdate=true")//;.to("direct:saveToDatabase")
.to("controlbus:route?routeId="+routeid+"&action=stop")
.endChoice()
.end();
.split(xpath("//*[local-name()='record']"))
.process(new XmlProcessor(profile, validationJob, maxNumberOfRecords))
.choice()
.when(simple("${body[results]} && ${header.MyHeader} != 'stop'"))
.split(simple("${body[results]}"))
.to("jpa:" + ValidationRuleResult.class.getName() + "?usePersist=true")
.endChoice()
.end()
.choice()
.when(simple("${body[issues]} && ${header.MyHeader} != 'stop'"))
.split(simple("${body[issues]}"))
.to("jpa:"+ ValidationIssue.class.getName()+ "?usePersist=true")
.endChoice()
.end()
.choice()
.when(header("MyHeader").isEqualTo("stop"))
.process(new DataBaseProcessor())
.to("jpa:" + ValidationJob.class.getName() + "?useExecuteUpdate=true")//;.to("direct:saveToDatabase")
.to("controlbus:route?routeId="+routeid+"&action=stop&async=true")
.endChoice()
.end();
from("direct:fairProcessor")
.routeId(routeid2)
.split(xpath("//*[local-name()='record']"))
.process(new XmlProcessor(new FAIR_Data_GuidelinesProfile(), validationJob, maxNumberOfRecords))
.choice()
.when(simple("${body[results]} && ${header.MyHeader} != 'stop'"))
.split(simple("${body[results]}"))
.to("jpa:" + ValidationRuleResult.class.getName() + "?usePersist=true")
.endChoice()
.when(simple("${body[results]} && ${header.MyHeader} != 'stop'"))
.split(simple("${body[results]}"))
.to("jpa:" + ValidationRuleResult.class.getName() + "?usePersist=true")
.endChoice()
.end()
.choice()
.when(simple("${body[issues]} && ${header.MyHeader} != 'stop'"))
.split(simple("${body[issues]}"))
.to("jpa:"+ ValidationIssue.class.getName()+ "?usePersist=true")
.endChoice()
.when(simple("${body[issues]} && ${header.MyHeader} != 'stop'"))
.split(simple("${body[issues]}"))
.to("jpa:"+ ValidationIssue.class.getName()+ "?usePersist=true")
.endChoice()
.end()
.choice()
.when(header("MyHeader").isEqualTo("stop"))
.process(new DataBaseProcessor())
.to("jpa:" + ValidationJob.class.getName() + "?useExecuteUpdate=true")//;.to("direct:saveToDatabase")
.to("controlbus:route?routeId="+routeid2+"&action=stop")
.endChoice()
.when(header("MyHeader").isEqualTo("stop"))
.process(new DataBaseProcessor())
.to("jpa:" + ValidationJob.class.getName() + "?useExecuteUpdate=true")//;.to("direct:saveToDatabase")
.to("controlbus:route?routeId="+routeid2+"&action=stop&async=true")
.endChoice()
.end();
/*

View File

@ -1,9 +1,9 @@
package eu.dnetlib.validatorapi.routes;
import org.apache.camel.TypeConversionException;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.impl.engine.DefaultShutdownStrategy;
import org.json.JSONObject;
import org.json.XML;
import org.apache.http.client.ClientProtocolException;
import org.springframework.stereotype.Component;
import org.w3c.dom.Document;
import org.w3c.dom.Node;
@ -20,6 +20,7 @@ import javax.xml.transform.dom.DOMSource;
import javax.xml.transform.stream.StreamResult;
import java.io.StringReader;
import java.io.StringWriter;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.List;
@ -35,7 +36,31 @@ public class OaiSetListRoute extends RouteBuilder {
// Set the shutdown timeout in milliseconds (e.g., 10 seconds)
shutdownStrategy.setTimeout(1);
onException(TypeConversionException.class)
.maximumRedeliveries(0)
.handled(true)
.end();
onException(ClientProtocolException.class)
.maximumRedeliveries(0)
.handled(true)
.end();
onException(UnknownHostException.class)
.maximumRedeliveries(0)
.handled(true)
.end();
onException(ClientProtocolException.class)
.maximumRedeliveries(0)
.handled(true)
.end();
from("direct:getResponse")
.log("\n\n\n LIST ")
.end();
/*from("direct:getResponse")
.process(exchange -> {
String endpoint = exchange.getIn().getHeader("endpoint", String.class);
exchange.getIn().setHeader("dynamicEndpoint", endpoint);
@ -56,7 +81,8 @@ public class OaiSetListRoute extends RouteBuilder {
JSONObject jsonObject = XML.toJSONObject(xmlSets);
String jsonString = jsonObject.toString();
exchange.getIn().setBody(jsonString);
});
})
.end();*/
}
/*from("direct:processSets")

View File

@ -12,8 +12,10 @@ import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.TypeConversionException;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.impl.engine.DefaultShutdownStrategy;
import org.apache.http.client.ClientProtocolException;
import java.net.UnknownHostException;
import java.util.Date;
public class SimpleOaiPmhRoute extends RouteBuilder {
@ -40,6 +42,10 @@ public class SimpleOaiPmhRoute extends RouteBuilder {
public void configure() throws Exception {
String date = new Date().toString();
// Access the DefaultShutdownStrategy
DefaultShutdownStrategy shutdownStrategy = (DefaultShutdownStrategy) getContext().getShutdownStrategy();
// Set the shutdown timeout in milliseconds (e.g., 10 seconds)
shutdownStrategy.setTimeout(1); //TODO: Do I need this?
onException(TypeConversionException.class)
.process(new ExceptionProcessor(validationJob))
@ -57,41 +63,60 @@ public class SimpleOaiPmhRoute extends RouteBuilder {
.handled(true)
.end();
onException(UnknownHostException.class)
.process(new ExceptionProcessor(validationJob))
.to("jpa:" + ValidationJob.class.getName() + "?useExecuteUpdate=true")
.to("controlbus:route?routeId="+routeid+"&action=stop")
.maximumRedeliveries(0)
.handled(true)
.end();
onException(ClientProtocolException.class)
.process(new ExceptionProcessor(validationJob))
.to("jpa:" + ValidationJob.class.getName() + "?useExecuteUpdate=true")
.to("controlbus:route?routeId="+routeid+"&action=stop")
.maximumRedeliveries(0)
.handled(true)
.end();
/*
.process(new TypeConversionExceptionProcessor(validationJob)).log("\n\n\n\n HERE 2")
.process(new DataBaseProcessor())//.maximumRedeliveries(0)
.to("jpa:" + ValidationJob.class.getName() + "?useExecuteUpdate=true").log("\n\n\n\nHEREEEEEE")
.stop();
*/
from(oaiEndpoint)
.routeId(routeid)
.routeId(routeid)
.choice()
.when(xpath("//*[local-name()='record']"))
.to("direct:oaipmhProcessor")
.split(xpath("//*[local-name()='record']"))
.process(new XmlProcessor(profile, validationJob, maxNumberOfRecords))
.choice()
.when(simple("${body[results]} && ${header.MyHeader} != 'stop'"))
.split(simple("${body[results]}"))
.to("jpa:" + ValidationRuleResult.class.getName() + "?usePersist=true")
.endChoice()
.end()
.choice()
.when(simple("${body[issues]} && ${header.MyHeader} != 'stop'"))
.split(simple("${body[issues]}"))
.to("jpa:"+ ValidationIssue.class.getName()+ "?usePersist=true")
.endChoice()
.end()
.choice()
.when(header("MyHeader").isEqualTo("stop"))
.process(new DataBaseProcessor())
.to("jpa:" + ValidationJob.class.getName() + "?useExecuteUpdate=true")//;.to("direct:saveToDatabase")
.to("controlbus:route?routeId="+routeid+"&action=stop&async=true")
.endChoice()
.end()
.endChoice()
.otherwise()
.process(new ErrorProcessor(validationJob))
.to("jpa:" + ValidationJob.class.getName() + "?useExecuteUpdate=true")
.to("controlbus:route?routeId="+routeid+"&action=stop")
.end();
from("direct:oaiPmhProcessor")
.split(xpath("//*[local-name()='record']"))
.process(new XmlProcessor(profile, validationJob, maxNumberOfRecords))
.choice()
.when(simple("${body[results]} && ${header.MyHeader} != 'stop'"))
.split(simple("${body[results]}"))
.to("jpa:" + ValidationRuleResult.class.getName() + "?usePersist=true")
.endChoice()
.end()
.choice()
.when(simple("${body[issues]} && ${header.MyHeader} != 'stop'"))
.split(simple("${body[issues]}"))
.to("jpa:"+ ValidationIssue.class.getName()+ "?usePersist=true")
.endChoice()
.end()
.choice()
.when(header("MyHeader").isEqualTo("stop"))
.process(new DataBaseProcessor())
.to("jpa:" + ValidationJob.class.getName() + "?useExecuteUpdate=true")//;.to("direct:saveToDatabase")
.to("controlbus:route?routeId="+routeid+"&action=stop")
.endChoice()
.end();
/*
from("direct:saveToDatabase")
.routeId(routeid2)
@ -188,16 +213,16 @@ public class SimpleOaiPmhRoute extends RouteBuilder {
/*** kai auto from(oaiEndpoint) // trigger the route every minute
.split(xpath("//*[local-name()='record']"))
.setProperty("totalRecords", constant(0))
.loopDoWhile().simple("${exchangeProperty.totalRecords} < 50")
.process(new XmlProcessor(profile, validationJob))
.end()
.to("direct:end");
/***/
.split(xpath("//*[local-name()='record']"))
.setProperty("totalRecords", constant(0))
.loopDoWhile().simple("${exchangeProperty.totalRecords} < 50")
.process(new XmlProcessor(profile, validationJob))
.end()
.to("direct:end");
/***/
//from(oaiEndpoint).process(new XmlProcessor(profile,validationJob));
// from("direct:oaiRequest").process(new DummyXMLProcessor());
// from("direct:oaiRequest").process(new DummyXMLProcessor());
@ -261,4 +286,4 @@ public class SimpleOaiPmhRoute extends RouteBuilder {
exchange.setProperty("totalRecords", processedRecords);
}
}}
}}