Simple and Fair route added with unique route ids
This commit is contained in:
parent
04fd403ac8
commit
69818d6b22
|
@ -9,7 +9,7 @@ import eu.dnetlib.validatorapi.entities.ValidationRuleResult;
|
||||||
import eu.dnetlib.validatorapi.repositories.ValidationIssueRepository;
|
import eu.dnetlib.validatorapi.repositories.ValidationIssueRepository;
|
||||||
import eu.dnetlib.validatorapi.repositories.ValidationJobRepository;
|
import eu.dnetlib.validatorapi.repositories.ValidationJobRepository;
|
||||||
import eu.dnetlib.validatorapi.repositories.ValidationResultRepository;
|
import eu.dnetlib.validatorapi.repositories.ValidationResultRepository;
|
||||||
import eu.dnetlib.validatorapi.routes.OaiPmhRoute;
|
import eu.dnetlib.validatorapi.routes.OaiPmhRoute2;
|
||||||
import org.apache.camel.CamelContext;
|
import org.apache.camel.CamelContext;
|
||||||
import org.apache.camel.ProducerTemplate;
|
import org.apache.camel.ProducerTemplate;
|
||||||
import org.apache.camel.builder.RouteBuilder;
|
import org.apache.camel.builder.RouteBuilder;
|
||||||
|
@ -40,10 +40,7 @@ import java.io.StringReader;
|
||||||
import java.io.StringWriter;
|
import java.io.StringWriter;
|
||||||
import java.net.HttpURLConnection;
|
import java.net.HttpURLConnection;
|
||||||
import java.net.URL;
|
import java.net.URL;
|
||||||
import java.util.ArrayList;
|
import java.util.*;
|
||||||
import java.util.Date;
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.Map;
|
|
||||||
|
|
||||||
@RestController
|
@RestController
|
||||||
@CrossOrigin(origins = "*")
|
@CrossOrigin(origins = "*")
|
||||||
|
@ -211,9 +208,13 @@ public class ValidationController {
|
||||||
DocumentBuilder db = dbf.newDocumentBuilder();
|
DocumentBuilder db = dbf.newDocumentBuilder();
|
||||||
|
|
||||||
|
|
||||||
|
UUID uuid = UUID.randomUUID();
|
||||||
|
System.out.println("\n\n\n " + uuid);
|
||||||
RouteBuilder oaiPmhRouteBuilder =
|
RouteBuilder oaiPmhRouteBuilder =
|
||||||
new OaiPmhRoute("oaipmh://"+baseURL + "?verb=ListRecords&metadataPrefix=" + metadataPrefix ,
|
new OaiPmhRoute2("oaipmh://"+baseURL + "?verb=ListRecords&metadataPrefix=" + metadataPrefix ,
|
||||||
profile, validationJob, numberOfRecords);
|
profile, validationJob, numberOfRecords, uuid.toString());
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
RouteBuilder oaiPmhRouteBuilder =
|
RouteBuilder oaiPmhRouteBuilder =
|
||||||
|
@ -520,4 +521,26 @@ public class ValidationController {
|
||||||
|
|
||||||
return records;
|
return records;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@RequestMapping(value = {"/test"}, method = RequestMethod.GET)
|
||||||
|
public void test(@RequestParam(name = "guidelines") String guidelinesProfileName,
|
||||||
|
@RequestParam(name = "baseUrl", defaultValue = "http://repositorium.sdum.uminho.pt/oai/request") String baseURL //not in use now
|
||||||
|
//@RequestParam(name="metadataPrefix", defaultValue = "oai_dc") String metadataPrefix
|
||||||
|
) {
|
||||||
|
|
||||||
|
AbstractOpenAireProfile profile = initializeOpenAireProfile(guidelinesProfileName);
|
||||||
|
AbstractOpenAireProfile fairProfile = initializeFairProfile(guidelinesProfileName);
|
||||||
|
String metadataPrefix = initializeMetadataPrefix(guidelinesProfileName);
|
||||||
|
|
||||||
|
String endpoint = "oaipmh://"+baseURL + "?verb=ListRecords&metadataPrefix=" + metadataPrefix;
|
||||||
|
ProducerTemplate producerTemplate = camelContext.createProducerTemplate();
|
||||||
|
// String response = producerTemplate.requestBodyAndHeader("direct:oaipmhRequest", null, "endpoint", endpoint, String.class);
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
producerTemplate.sendBody("direct:startProcessing", endpoint);
|
||||||
|
|
||||||
|
//String response = camelContext.createProducerTemplate().requestBody("direct:oaipmhRequest", endpoint, String.class);
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -11,6 +11,7 @@ import org.apache.camel.Processor;
|
||||||
import org.apache.camel.builder.RouteBuilder;
|
import org.apache.camel.builder.RouteBuilder;
|
||||||
|
|
||||||
import java.util.Date;
|
import java.util.Date;
|
||||||
|
import java.util.UUID;
|
||||||
import java.util.concurrent.CountDownLatch;
|
import java.util.concurrent.CountDownLatch;
|
||||||
|
|
||||||
public class OaiPmhRoute extends RouteBuilder {
|
public class OaiPmhRoute extends RouteBuilder {
|
||||||
|
@ -20,21 +21,20 @@ public class OaiPmhRoute extends RouteBuilder {
|
||||||
private AbstractOpenAireProfile profile;
|
private AbstractOpenAireProfile profile;
|
||||||
private long maxNumberOfRecords = 0;
|
private long maxNumberOfRecords = 0;
|
||||||
private ValidationJob validationJob;
|
private ValidationJob validationJob;
|
||||||
//private final ValidationJobRepository validationJobRepository;
|
CountDownLatch latch;
|
||||||
//private final ValidationIssueRepository validationIssueRepository;
|
String routeid;
|
||||||
//private final ValidationResultRepository validationResultRepository;
|
String routeid2;
|
||||||
CountDownLatch latch;
|
|
||||||
|
|
||||||
|
|
||||||
public OaiPmhRoute(String oaiEndpoint, AbstractOpenAireProfile profile, ValidationJob validationJob,
|
public OaiPmhRoute(String oaiEndpoint, AbstractOpenAireProfile profile, ValidationJob validationJob,
|
||||||
long maxNumberOfRecords) {
|
long maxNumberOfRecords, String routeid) {
|
||||||
this.oaiEndpoint = oaiEndpoint;
|
this.oaiEndpoint = oaiEndpoint;
|
||||||
this.validationJob = validationJob;
|
this.validationJob = validationJob;
|
||||||
this.profile = profile;
|
this.profile = profile;
|
||||||
this.maxNumberOfRecords = maxNumberOfRecords;
|
this.maxNumberOfRecords = maxNumberOfRecords;
|
||||||
// this.validationJobRepository = validationJobRepository;
|
this.routeid = routeid;
|
||||||
// this.validationIssueRepository = validationIssueRepository;
|
this.routeid2 = UUID.randomUUID().toString();
|
||||||
// this.validationResultRepository = validationResultRepository;
|
|
||||||
this.latch = latch;
|
this.latch = latch;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -44,35 +44,35 @@ public class OaiPmhRoute extends RouteBuilder {
|
||||||
String date = new Date().toString();
|
String date = new Date().toString();
|
||||||
|
|
||||||
from(oaiEndpoint)
|
from(oaiEndpoint)
|
||||||
.routeId(date)
|
.routeId(routeid)
|
||||||
.split(xpath("//*[local-name()='record']"))
|
.split(xpath("//*[local-name()='record']"))
|
||||||
.process(new XmlProcessor(profile, validationJob, maxNumberOfRecords))
|
.process(new XmlProcessor(profile, validationJob, maxNumberOfRecords))
|
||||||
.choice()
|
.choice()
|
||||||
.when(simple("${body[issues]} && ${header.MyHeader} != 'stop'"))
|
.when(simple("${body[results]} && ${header.MyHeader} != 'stop'"))
|
||||||
.log("HERE!!!")
|
.split(simple("${body[results]}"))
|
||||||
.split(simple("${body[issues]}"))
|
.to("jpa:" + ValidationRuleResult.class.getName() + "?usePersist=true")
|
||||||
.to("jpa:"+ ValidationRuleResult.class.getName()+ "?usePersist=true")
|
.endChoice()
|
||||||
.endChoice()
|
.end()
|
||||||
.end()
|
.choice()
|
||||||
.choice()
|
.when(simple("${body[issues]} && ${header.MyHeader} != 'stop'"))
|
||||||
.when(simple("${body[results]} && ${header.MyHeader} != 'stop'"))
|
.split(simple("${body[issues]}"))
|
||||||
.log("THERE")
|
.to("jpa:"+ ValidationIssue.class.getName()+ "?usePersist=true")
|
||||||
.split(simple("${body[results]}"))
|
.endChoice()
|
||||||
.to("jpa:" + ValidationIssue.class.getName() + "?usePersist=true")
|
.end()
|
||||||
.endChoice()
|
.choice()
|
||||||
.end()
|
.when(header("MyHeader").isEqualTo("stop"))
|
||||||
.choice()
|
.process(new DataBaseProcessor())
|
||||||
.when(header("MyHeader").isEqualTo("stop"))
|
.to("jpa:" + ValidationJob.class.getName() + "?useExecuteUpdate=true")//;.to("direct:saveToDatabase")
|
||||||
.to("direct:saveToDatabase")
|
.to("controlbus:route?routeId="+routeid+"&action=stop")
|
||||||
.to("controlbus:route?routeId="+date+"&action=stop")
|
.endChoice()
|
||||||
.endChoice()
|
.end();
|
||||||
.end();
|
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
from("direct:saveToDatabase")
|
from("direct:saveToDatabase")
|
||||||
|
.routeId(routeid2)
|
||||||
.process(new DataBaseProcessor())
|
.process(new DataBaseProcessor())
|
||||||
.to("jpa:" + ValidationJob.class.getName() + "?useExecuteUpdate=true");
|
.to("jpa:" + ValidationJob.class.getName() + "?useExecuteUpdate=true");
|
||||||
|
*/
|
||||||
/*from("timer://myTimer?fixedRate=true&period=60000")
|
/*from("timer://myTimer?fixedRate=true&period=60000")
|
||||||
.routeId("OAIProcessingRoute")
|
.routeId("OAIProcessingRoute")
|
||||||
.setHeader("MyHeader", constant("stop"))
|
.setHeader("MyHeader", constant("stop"))
|
||||||
|
|
|
@ -0,0 +1,270 @@
|
||||||
|
package eu.dnetlib.validatorapi.routes;
|
||||||
|
|
||||||
|
import eu.dnetlib.validator2.validation.guideline.openaire.AbstractOpenAireProfile;
|
||||||
|
import eu.dnetlib.validator2.validation.guideline.openaire.FAIR_Data_GuidelinesProfile;
|
||||||
|
import eu.dnetlib.validatorapi.entities.ValidationIssue;
|
||||||
|
import eu.dnetlib.validatorapi.entities.ValidationJob;
|
||||||
|
import eu.dnetlib.validatorapi.entities.ValidationRuleResult;
|
||||||
|
import eu.dnetlib.validatorapi.processors.DataBaseProcessor;
|
||||||
|
import eu.dnetlib.validatorapi.processors.XmlProcessor;
|
||||||
|
import org.apache.camel.Exchange;
|
||||||
|
import org.apache.camel.Processor;
|
||||||
|
import org.apache.camel.builder.RouteBuilder;
|
||||||
|
|
||||||
|
import java.util.Date;
|
||||||
|
import java.util.UUID;
|
||||||
|
import java.util.concurrent.CountDownLatch;
|
||||||
|
|
||||||
|
public class OaiPmhRoute2 extends RouteBuilder {
|
||||||
|
|
||||||
|
private String oaiEndpoint;
|
||||||
|
|
||||||
|
private AbstractOpenAireProfile profile;
|
||||||
|
private long maxNumberOfRecords = 0;
|
||||||
|
private ValidationJob validationJob;
|
||||||
|
CountDownLatch latch;
|
||||||
|
String routeid;
|
||||||
|
String routeid2;
|
||||||
|
|
||||||
|
|
||||||
|
public OaiPmhRoute2(String oaiEndpoint, AbstractOpenAireProfile profile, ValidationJob validationJob,
|
||||||
|
long maxNumberOfRecords, String routeid) {
|
||||||
|
this.oaiEndpoint = oaiEndpoint;
|
||||||
|
this.validationJob = validationJob;
|
||||||
|
this.profile = profile;
|
||||||
|
this.maxNumberOfRecords = maxNumberOfRecords;
|
||||||
|
this.routeid = routeid;
|
||||||
|
this.routeid2 = UUID.randomUUID().toString();
|
||||||
|
|
||||||
|
this.latch = latch;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void configure() throws Exception {
|
||||||
|
|
||||||
|
String date = new Date().toString();
|
||||||
|
|
||||||
|
from(oaiEndpoint)
|
||||||
|
.multicast().parallelProcessing()
|
||||||
|
.to("direct:guidelinesProcessor")
|
||||||
|
.to("direct:fairProcessor")
|
||||||
|
.end();
|
||||||
|
|
||||||
|
from("direct:guidelinesProcessor")
|
||||||
|
.routeId(routeid)
|
||||||
|
.split(xpath("//*[local-name()='record']"))
|
||||||
|
.process(new XmlProcessor(profile, validationJob, maxNumberOfRecords))
|
||||||
|
.choice()
|
||||||
|
.when(simple("${body[results]} && ${header.MyHeader} != 'stop'"))
|
||||||
|
.split(simple("${body[results]}"))
|
||||||
|
.to("jpa:" + ValidationRuleResult.class.getName() + "?usePersist=true")
|
||||||
|
.endChoice()
|
||||||
|
.end()
|
||||||
|
.choice()
|
||||||
|
.when(simple("${body[issues]} && ${header.MyHeader} != 'stop'"))
|
||||||
|
.split(simple("${body[issues]}"))
|
||||||
|
.to("jpa:"+ ValidationIssue.class.getName()+ "?usePersist=true")
|
||||||
|
.endChoice()
|
||||||
|
.end()
|
||||||
|
.choice()
|
||||||
|
.when(header("MyHeader").isEqualTo("stop"))
|
||||||
|
.process(new DataBaseProcessor())
|
||||||
|
.to("jpa:" + ValidationJob.class.getName() + "?useExecuteUpdate=true")//;.to("direct:saveToDatabase")
|
||||||
|
.to("controlbus:route?routeId="+routeid+"&action=stop")
|
||||||
|
.endChoice()
|
||||||
|
.end();
|
||||||
|
|
||||||
|
from("direct:fairProcessor")
|
||||||
|
.routeId(routeid2)
|
||||||
|
.split(xpath("//*[local-name()='record']"))
|
||||||
|
.process(new XmlProcessor(new FAIR_Data_GuidelinesProfile(), validationJob, maxNumberOfRecords))
|
||||||
|
.choice()
|
||||||
|
.when(simple("${body[results]} && ${header.MyHeader} != 'stop'"))
|
||||||
|
.split(simple("${body[results]}"))
|
||||||
|
.to("jpa:" + ValidationRuleResult.class.getName() + "?usePersist=true")
|
||||||
|
.endChoice()
|
||||||
|
.end()
|
||||||
|
.choice()
|
||||||
|
.when(simple("${body[issues]} && ${header.MyHeader} != 'stop'"))
|
||||||
|
.split(simple("${body[issues]}"))
|
||||||
|
.to("jpa:"+ ValidationIssue.class.getName()+ "?usePersist=true")
|
||||||
|
.endChoice()
|
||||||
|
.end()
|
||||||
|
.choice()
|
||||||
|
.when(header("MyHeader").isEqualTo("stop"))
|
||||||
|
.process(new DataBaseProcessor())
|
||||||
|
.to("jpa:" + ValidationJob.class.getName() + "?useExecuteUpdate=true")//;.to("direct:saveToDatabase")
|
||||||
|
.to("controlbus:route?routeId="+routeid2+"&action=stop")
|
||||||
|
.endChoice()
|
||||||
|
.end();
|
||||||
|
|
||||||
|
/*
|
||||||
|
from("direct:saveToDatabase")
|
||||||
|
.routeId(routeid2)
|
||||||
|
.process(new DataBaseProcessor())
|
||||||
|
.to("jpa:" + ValidationJob.class.getName() + "?useExecuteUpdate=true");
|
||||||
|
*/
|
||||||
|
/*from("timer://myTimer?fixedRate=true&period=60000")
|
||||||
|
.routeId("OAIProcessingRoute")
|
||||||
|
.setHeader("MyHeader", constant("stop"))
|
||||||
|
.choice()
|
||||||
|
.when(header("MyHeader").isEqualTo("stop"))
|
||||||
|
.toD("direct:oaiEndpoint")
|
||||||
|
.otherwise()
|
||||||
|
.stop()
|
||||||
|
.end();
|
||||||
|
|
||||||
|
from("timer://myTimer?fixedRate=true&period=600000")
|
||||||
|
.to(oaiEndpoint)
|
||||||
|
.log("hello")
|
||||||
|
.split(xpath("//*[local-name()='record']"))
|
||||||
|
.log("${body}")
|
||||||
|
.process(new XmlProcessor(profile, validationJob, validationIssueRepository, validationResultRepository))
|
||||||
|
.log("????");
|
||||||
|
|
||||||
|
/* from("direct:processXML")
|
||||||
|
//.split(xpath("//*[local-name()='record']"))
|
||||||
|
.log("${exchangeProperty.totalRecords}")
|
||||||
|
.process(new XmlProcessor(profile, validationJob, validationIssueRepository, validationResultRepository))
|
||||||
|
.log("${exchangeProperty.totalRecords}");
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
String date = new Date().toString();
|
||||||
|
/*
|
||||||
|
|
||||||
|
from(oaiEndpoint)
|
||||||
|
.process(new XmlProcessor(profile, validationJob, validationIssueRepository, validationResultRepository))
|
||||||
|
.to("mock:start")
|
||||||
|
.process(new Processor() {
|
||||||
|
@Override
|
||||||
|
public void process(Exchange exchange) throws Exception {
|
||||||
|
// stop Camel by signalling to the latch
|
||||||
|
latch.countDown();
|
||||||
|
}
|
||||||
|
}).to("mock:done");
|
||||||
|
|
||||||
|
*/
|
||||||
|
/*
|
||||||
|
from(oaiEndpoint)
|
||||||
|
.routeId(date)
|
||||||
|
.setHeader("MyHeader", constant("start"))
|
||||||
|
.choice()
|
||||||
|
.when(header("MyHeader").isEqualTo("stop"))
|
||||||
|
.log("Myheader " +"${header.myHeader}")
|
||||||
|
|
||||||
|
.to("controlbus:route?routeId="+date+"&action=stop")
|
||||||
|
.stop()
|
||||||
|
.endChoice()
|
||||||
|
.otherwise()
|
||||||
|
.process(exchange -> exchange.getIn().setHeader("MyHeader", "stop"))
|
||||||
|
.log("Trying to make the thread stop! " +"${header.myHeader}")
|
||||||
|
.endChoice();
|
||||||
|
*/
|
||||||
|
|
||||||
|
/* from(oaiEndpoint)
|
||||||
|
.setProperty("totalRecords", constant(0))
|
||||||
|
.loopDoWhile(simple("${exchangeProperty.totalRecords} < 5"))
|
||||||
|
.log("${exchangeProperty.totalRecords}")
|
||||||
|
//.split(xpath("//*[local-name()='record']"))
|
||||||
|
//.log("\n\n\n----------------\n\n\n\n\n\n${body}\n\n\n----------------\n\n\n\n\n\n")
|
||||||
|
.to("direct:ListRecords")
|
||||||
|
.end();
|
||||||
|
|
||||||
|
from("direct:ListRecords")
|
||||||
|
.process(new XmlProcessor(profile, validationJob, validationIssueRepository, validationResultRepository));
|
||||||
|
/* from(oaiEndpoint)
|
||||||
|
.setProperty("totalRecords", constant(0))
|
||||||
|
.loopDoWhile(simple("${exchangeProperty.totalRecords} < 10"))
|
||||||
|
.split(xpath("//*[local-name()='record']")).streaming()
|
||||||
|
.log("${exchangeProperty.totalRecords}")
|
||||||
|
// .log("\n\n\n----------------\n\n\n\n\n\n${body}\n\n\n----------------\n\n\n\n\n\n")
|
||||||
|
.process(new XmlProcessor(profile, validationJob, validationIssueRepository, validationResultRepository))
|
||||||
|
.log("${exchangeProperty.totalRecords}")
|
||||||
|
.end().to("direct:end");
|
||||||
|
*/
|
||||||
|
/* from("oaipmh://http://repositorium.sdum.uminho.pt/oai/request?verb=ListRecords&metadataPrefix=oai_dc")
|
||||||
|
.setProperty("totalRecords", constant(0))
|
||||||
|
.loopDoWhile().simple("${exchangeProperty.totalRecords} < 10")
|
||||||
|
.split(xpath("//*[local-name()='record']"))
|
||||||
|
//.log("\n\n\n----------------\n\n\n\n\n\n${body}\n\n\n----------------\n\n\n\n\n\n")
|
||||||
|
.process(new XmlProcessor(profile, validationJob))
|
||||||
|
.end().to("direct:end");*/
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
/*** kai auto from(oaiEndpoint) // trigger the route every minute
|
||||||
|
.split(xpath("//*[local-name()='record']"))
|
||||||
|
.setProperty("totalRecords", constant(0))
|
||||||
|
.loopDoWhile().simple("${exchangeProperty.totalRecords} < 50")
|
||||||
|
.process(new XmlProcessor(profile, validationJob))
|
||||||
|
.end()
|
||||||
|
.to("direct:end");
|
||||||
|
/***/
|
||||||
|
//from(oaiEndpoint).process(new XmlProcessor(profile,validationJob));
|
||||||
|
|
||||||
|
// from("direct:oaiRequest").process(new DummyXMLProcessor());
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
/*** AUTO PAIZEI !!!!!! ***/
|
||||||
|
/* from(oaiEndpoint).
|
||||||
|
split(xpath("//*[local-name()='record']")).process(new XmlProcessor(profile, validationJob));
|
||||||
|
*/
|
||||||
|
|
||||||
|
/*** KAI AUTO !!!!!! ***/
|
||||||
|
/*
|
||||||
|
from("timer://myTimer?fixedRate=true&period=60000") // trigger the route every minute
|
||||||
|
.setProperty("totalRecords", constant(0))
|
||||||
|
.loopDoWhile().simple("${exchangeProperty.totalRecords} < 50")
|
||||||
|
.to("direct:oaiRequest")
|
||||||
|
.process(new RecordCountProcessor())
|
||||||
|
.end()
|
||||||
|
.to("direct:end");
|
||||||
|
|
||||||
|
from("direct:oaiRequest").
|
||||||
|
process(exchange -> {
|
||||||
|
int counter = exchange.getProperty("totalRecords", 0, Integer.class);
|
||||||
|
System.out.println("Processing iteration: " + exchange.getProperty("totalRecords", 0, Integer.class));
|
||||||
|
counter++;
|
||||||
|
System.out.println("counter ++" + counter);
|
||||||
|
exchange.setProperty("processedRecords", counter);
|
||||||
|
System.out.println("Processing iteration: " + exchange.getProperty("totalRecords", 0, Integer.class));
|
||||||
|
|
||||||
|
});
|
||||||
|
*/
|
||||||
|
|
||||||
|
/** KAI AUTO ***/
|
||||||
|
/*
|
||||||
|
from("timer://myTimer?fixedRate=true&period=60000") // trigger the route every minute
|
||||||
|
.setProperty("totalRecords", constant(0))
|
||||||
|
.loopDoWhile().simple("${exchangeProperty.totalRecords} < 50")
|
||||||
|
.to("direct:oaiRequest")
|
||||||
|
.process(new RecordCountProcessor())
|
||||||
|
.end()
|
||||||
|
.to("direct:end");
|
||||||
|
|
||||||
|
from("direct:oaiRequest").
|
||||||
|
process(exchange -> {
|
||||||
|
int counter = exchange.getProperty("totalRecords", 0, Integer.class);
|
||||||
|
System.out.println("Processing iteration: " + exchange.getProperty("totalRecords", 0, Integer.class));
|
||||||
|
counter++;
|
||||||
|
System.out.println("counter ++" + counter);
|
||||||
|
exchange.setProperty("processedRecords", counter);
|
||||||
|
System.out.println("Processing iteration: " + exchange.getProperty("totalRecords", 0, Integer.class));
|
||||||
|
|
||||||
|
});
|
||||||
|
*/
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
class RecordCountProcessor implements Processor {
|
||||||
|
@Override
|
||||||
|
public void process(Exchange exchange) throws Exception {
|
||||||
|
System.out.println("RecordCountProcessor reads");
|
||||||
|
int processedRecords = exchange.getProperty("processedRecords", 0, Integer.class);
|
||||||
|
exchange.setProperty("totalRecords", processedRecords);
|
||||||
|
|
||||||
|
}
|
||||||
|
}}
|
Loading…
Reference in New Issue