From c5de380a11c4f3e33b590e3af66c8bf6dd4c6550 Mon Sep 17 00:00:00 2001 From: Katerina Date: Fri, 22 Sep 2023 14:17:27 +0300 Subject: [PATCH] Fixed problem with stoping routes. There is still a warning, but no Exception --- .../validatorapi/routes/FairOaiPmhRoute2.java | 97 ++++++++++++------- .../validatorapi/routes/OaiSetListRoute.java | 32 +++++- .../routes/SimpleOaiPmhRoute.java | 95 +++++++++++------- 3 files changed, 153 insertions(+), 71 deletions(-) diff --git a/src/main/java/eu/dnetlib/validatorapi/routes/FairOaiPmhRoute2.java b/src/main/java/eu/dnetlib/validatorapi/routes/FairOaiPmhRoute2.java index 70bb608..0f42653 100644 --- a/src/main/java/eu/dnetlib/validatorapi/routes/FairOaiPmhRoute2.java +++ b/src/main/java/eu/dnetlib/validatorapi/routes/FairOaiPmhRoute2.java @@ -13,8 +13,10 @@ import org.apache.camel.Exchange; import org.apache.camel.Processor; import org.apache.camel.TypeConversionException; import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.impl.engine.DefaultShutdownStrategy; import org.apache.http.client.ClientProtocolException; +import java.net.UnknownHostException; import java.util.Date; import java.util.UUID; @@ -43,6 +45,11 @@ public class FairOaiPmhRoute2 extends RouteBuilder { public void configure() throws Exception { String date = new Date().toString(); + // Access the DefaultShutdownStrategy + DefaultShutdownStrategy shutdownStrategy = (DefaultShutdownStrategy) getContext().getShutdownStrategy(); + // Set the shutdown timeout in milliseconds (e.g., 10 seconds) + shutdownStrategy.setTimeout(1); + onException(TypeConversionException.class) .process(new ExceptionProcessor(validationJob)) .to("jpa:" + ValidationJob.class.getName() + "?useExecuteUpdate=true").log("\n\n\n\nHEREEEEEE") @@ -61,63 +68,87 @@ public class FairOaiPmhRoute2 extends RouteBuilder { .handled(true) .end(); + onException(UnknownHostException.class) + .process(new ExceptionProcessor(validationJob)) + .to("jpa:" + ValidationJob.class.getName() + "?useExecuteUpdate=true") + .to("controlbus:route?routeId="+routeid+"&action=stop") + .to("controlbus:route?routeId="+routeid2+"&action=stop") + .maximumRedeliveries(0) + .handled(true) + .end(); + + + onException(ClientProtocolException.class) + .process(new ExceptionProcessor(validationJob)) + .to("jpa:" + ValidationJob.class.getName() + "?useExecuteUpdate=true") + .to("controlbus:route?routeId="+routeid+"&action=stop") + .to("controlbus:route?routeId="+routeid2+"&action=stop") + .maximumRedeliveries(0) + .handled(true) + .end(); + + from(oaiEndpoint) + .routeId("1") .choice() .when(xpath("//*[local-name()='record']")) .multicast().parallelProcessing() .to("direct:guidelinesProcessor") .to("direct:fairProcessor") + .to("controlbus:route?routeId=1&action=stop&async=true") .endChoice() .otherwise() .process(new ErrorProcessor(validationJob)) .to("jpa:" + ValidationJob.class.getName() + "?useExecuteUpdate=true") + .endChoice() .end(); from("direct:guidelinesProcessor") .routeId(routeid) - .process(new XmlProcessor(profile, validationJob, maxNumberOfRecords)) - .choice() - .when(simple("${body[results]} && ${header.MyHeader} != 'stop'")) - .split(simple("${body[results]}")) - .to("jpa:" + ValidationRuleResult.class.getName() + "?usePersist=true") - .endChoice() - .end() - .choice() - .when(simple("${body[issues]} && ${header.MyHeader} != 'stop'")) - .split(simple("${body[issues]}")) - .to("jpa:"+ ValidationIssue.class.getName()+ "?usePersist=true") - .endChoice() - .end() - .choice() - .when(header("MyHeader").isEqualTo("stop")) - .process(new DataBaseProcessor()) - .to("jpa:" + ValidationJob.class.getName() + "?useExecuteUpdate=true")//;.to("direct:saveToDatabase") - .to("controlbus:route?routeId="+routeid+"&action=stop") - .endChoice() - .end(); + .split(xpath("//*[local-name()='record']")) + .process(new XmlProcessor(profile, validationJob, maxNumberOfRecords)) + .choice() + .when(simple("${body[results]} && ${header.MyHeader} != 'stop'")) + .split(simple("${body[results]}")) + .to("jpa:" + ValidationRuleResult.class.getName() + "?usePersist=true") + .endChoice() + .end() + .choice() + .when(simple("${body[issues]} && ${header.MyHeader} != 'stop'")) + .split(simple("${body[issues]}")) + .to("jpa:"+ ValidationIssue.class.getName()+ "?usePersist=true") + .endChoice() + .end() + .choice() + .when(header("MyHeader").isEqualTo("stop")) + .process(new DataBaseProcessor()) + .to("jpa:" + ValidationJob.class.getName() + "?useExecuteUpdate=true")//;.to("direct:saveToDatabase") + .to("controlbus:route?routeId="+routeid+"&action=stop&async=true") + .endChoice() + .end(); from("direct:fairProcessor") .routeId(routeid2) .split(xpath("//*[local-name()='record']")) .process(new XmlProcessor(new FAIR_Data_GuidelinesProfile(), validationJob, maxNumberOfRecords)) .choice() - .when(simple("${body[results]} && ${header.MyHeader} != 'stop'")) - .split(simple("${body[results]}")) - .to("jpa:" + ValidationRuleResult.class.getName() + "?usePersist=true") - .endChoice() + .when(simple("${body[results]} && ${header.MyHeader} != 'stop'")) + .split(simple("${body[results]}")) + .to("jpa:" + ValidationRuleResult.class.getName() + "?usePersist=true") + .endChoice() .end() .choice() - .when(simple("${body[issues]} && ${header.MyHeader} != 'stop'")) - .split(simple("${body[issues]}")) - .to("jpa:"+ ValidationIssue.class.getName()+ "?usePersist=true") - .endChoice() + .when(simple("${body[issues]} && ${header.MyHeader} != 'stop'")) + .split(simple("${body[issues]}")) + .to("jpa:"+ ValidationIssue.class.getName()+ "?usePersist=true") + .endChoice() .end() .choice() - .when(header("MyHeader").isEqualTo("stop")) - .process(new DataBaseProcessor()) - .to("jpa:" + ValidationJob.class.getName() + "?useExecuteUpdate=true")//;.to("direct:saveToDatabase") - .to("controlbus:route?routeId="+routeid2+"&action=stop") - .endChoice() + .when(header("MyHeader").isEqualTo("stop")) + .process(new DataBaseProcessor()) + .to("jpa:" + ValidationJob.class.getName() + "?useExecuteUpdate=true")//;.to("direct:saveToDatabase") + .to("controlbus:route?routeId="+routeid2+"&action=stop&async=true") + .endChoice() .end(); /* diff --git a/src/main/java/eu/dnetlib/validatorapi/routes/OaiSetListRoute.java b/src/main/java/eu/dnetlib/validatorapi/routes/OaiSetListRoute.java index ea2ba79..becc05a 100644 --- a/src/main/java/eu/dnetlib/validatorapi/routes/OaiSetListRoute.java +++ b/src/main/java/eu/dnetlib/validatorapi/routes/OaiSetListRoute.java @@ -1,9 +1,9 @@ package eu.dnetlib.validatorapi.routes; +import org.apache.camel.TypeConversionException; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.impl.engine.DefaultShutdownStrategy; -import org.json.JSONObject; -import org.json.XML; +import org.apache.http.client.ClientProtocolException; import org.springframework.stereotype.Component; import org.w3c.dom.Document; import org.w3c.dom.Node; @@ -20,6 +20,7 @@ import javax.xml.transform.dom.DOMSource; import javax.xml.transform.stream.StreamResult; import java.io.StringReader; import java.io.StringWriter; +import java.net.UnknownHostException; import java.util.ArrayList; import java.util.List; @@ -35,7 +36,31 @@ public class OaiSetListRoute extends RouteBuilder { // Set the shutdown timeout in milliseconds (e.g., 10 seconds) shutdownStrategy.setTimeout(1); + onException(TypeConversionException.class) + .maximumRedeliveries(0) + .handled(true) + .end(); + + onException(ClientProtocolException.class) + .maximumRedeliveries(0) + .handled(true) + .end(); + + onException(UnknownHostException.class) + .maximumRedeliveries(0) + .handled(true) + .end(); + + onException(ClientProtocolException.class) + .maximumRedeliveries(0) + .handled(true) + .end(); + from("direct:getResponse") + .log("\n\n\n LIST ") + .end(); + + /*from("direct:getResponse") .process(exchange -> { String endpoint = exchange.getIn().getHeader("endpoint", String.class); exchange.getIn().setHeader("dynamicEndpoint", endpoint); @@ -56,7 +81,8 @@ public class OaiSetListRoute extends RouteBuilder { JSONObject jsonObject = XML.toJSONObject(xmlSets); String jsonString = jsonObject.toString(); exchange.getIn().setBody(jsonString); - }); + }) + .end();*/ } /*from("direct:processSets") diff --git a/src/main/java/eu/dnetlib/validatorapi/routes/SimpleOaiPmhRoute.java b/src/main/java/eu/dnetlib/validatorapi/routes/SimpleOaiPmhRoute.java index e21de77..fa37441 100644 --- a/src/main/java/eu/dnetlib/validatorapi/routes/SimpleOaiPmhRoute.java +++ b/src/main/java/eu/dnetlib/validatorapi/routes/SimpleOaiPmhRoute.java @@ -12,8 +12,10 @@ import org.apache.camel.Exchange; import org.apache.camel.Processor; import org.apache.camel.TypeConversionException; import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.impl.engine.DefaultShutdownStrategy; import org.apache.http.client.ClientProtocolException; +import java.net.UnknownHostException; import java.util.Date; public class SimpleOaiPmhRoute extends RouteBuilder { @@ -40,6 +42,10 @@ public class SimpleOaiPmhRoute extends RouteBuilder { public void configure() throws Exception { String date = new Date().toString(); + // Access the DefaultShutdownStrategy + DefaultShutdownStrategy shutdownStrategy = (DefaultShutdownStrategy) getContext().getShutdownStrategy(); + // Set the shutdown timeout in milliseconds (e.g., 10 seconds) + shutdownStrategy.setTimeout(1); //TODO: Do I need this? onException(TypeConversionException.class) .process(new ExceptionProcessor(validationJob)) @@ -57,41 +63,60 @@ public class SimpleOaiPmhRoute extends RouteBuilder { .handled(true) .end(); + onException(UnknownHostException.class) + .process(new ExceptionProcessor(validationJob)) + .to("jpa:" + ValidationJob.class.getName() + "?useExecuteUpdate=true") + .to("controlbus:route?routeId="+routeid+"&action=stop") + .maximumRedeliveries(0) + .handled(true) + .end(); + + onException(ClientProtocolException.class) + .process(new ExceptionProcessor(validationJob)) + .to("jpa:" + ValidationJob.class.getName() + "?useExecuteUpdate=true") + .to("controlbus:route?routeId="+routeid+"&action=stop") + .maximumRedeliveries(0) + .handled(true) + .end(); + + +/* + .process(new TypeConversionExceptionProcessor(validationJob)).log("\n\n\n\n HERE 2") + .process(new DataBaseProcessor())//.maximumRedeliveries(0) + .to("jpa:" + ValidationJob.class.getName() + "?useExecuteUpdate=true").log("\n\n\n\nHEREEEEEE") + .stop(); +*/ from(oaiEndpoint) - .routeId(routeid) + .routeId(routeid) .choice() .when(xpath("//*[local-name()='record']")) - .to("direct:oaipmhProcessor") + .split(xpath("//*[local-name()='record']")) + .process(new XmlProcessor(profile, validationJob, maxNumberOfRecords)) + .choice() + .when(simple("${body[results]} && ${header.MyHeader} != 'stop'")) + .split(simple("${body[results]}")) + .to("jpa:" + ValidationRuleResult.class.getName() + "?usePersist=true") + .endChoice() + .end() + .choice() + .when(simple("${body[issues]} && ${header.MyHeader} != 'stop'")) + .split(simple("${body[issues]}")) + .to("jpa:"+ ValidationIssue.class.getName()+ "?usePersist=true") + .endChoice() + .end() + .choice() + .when(header("MyHeader").isEqualTo("stop")) + .process(new DataBaseProcessor()) + .to("jpa:" + ValidationJob.class.getName() + "?useExecuteUpdate=true")//;.to("direct:saveToDatabase") + .to("controlbus:route?routeId="+routeid+"&action=stop&async=true") + .endChoice() + .end() .endChoice() .otherwise() .process(new ErrorProcessor(validationJob)) .to("jpa:" + ValidationJob.class.getName() + "?useExecuteUpdate=true") - .to("controlbus:route?routeId="+routeid+"&action=stop") .end(); - from("direct:oaiPmhProcessor") - .split(xpath("//*[local-name()='record']")) - .process(new XmlProcessor(profile, validationJob, maxNumberOfRecords)) - .choice() - .when(simple("${body[results]} && ${header.MyHeader} != 'stop'")) - .split(simple("${body[results]}")) - .to("jpa:" + ValidationRuleResult.class.getName() + "?usePersist=true") - .endChoice() - .end() - .choice() - .when(simple("${body[issues]} && ${header.MyHeader} != 'stop'")) - .split(simple("${body[issues]}")) - .to("jpa:"+ ValidationIssue.class.getName()+ "?usePersist=true") - .endChoice() - .end() - .choice() - .when(header("MyHeader").isEqualTo("stop")) - .process(new DataBaseProcessor()) - .to("jpa:" + ValidationJob.class.getName() + "?useExecuteUpdate=true")//;.to("direct:saveToDatabase") - .to("controlbus:route?routeId="+routeid+"&action=stop") - .endChoice() - .end(); - /* from("direct:saveToDatabase") .routeId(routeid2) @@ -188,16 +213,16 @@ public class SimpleOaiPmhRoute extends RouteBuilder { /*** kai auto from(oaiEndpoint) // trigger the route every minute - .split(xpath("//*[local-name()='record']")) - .setProperty("totalRecords", constant(0)) - .loopDoWhile().simple("${exchangeProperty.totalRecords} < 50") - .process(new XmlProcessor(profile, validationJob)) - .end() - .to("direct:end"); -/***/ + .split(xpath("//*[local-name()='record']")) + .setProperty("totalRecords", constant(0)) + .loopDoWhile().simple("${exchangeProperty.totalRecords} < 50") + .process(new XmlProcessor(profile, validationJob)) + .end() + .to("direct:end"); + /***/ //from(oaiEndpoint).process(new XmlProcessor(profile,validationJob)); - // from("direct:oaiRequest").process(new DummyXMLProcessor()); + // from("direct:oaiRequest").process(new DummyXMLProcessor()); @@ -261,4 +286,4 @@ public class SimpleOaiPmhRoute extends RouteBuilder { exchange.setProperty("totalRecords", processedRecords); } - }} \ No newline at end of file + }}