39 lines
1.3 KiB
Java
39 lines
1.3 KiB
Java
package eu.dnetlib.validatorapi;
|
|
|
|
import org.apache.camel.AggregationStrategy;
|
|
import org.apache.camel.Exchange;
|
|
|
|
import java.util.ArrayList;
|
|
import java.util.List;
|
|
|
|
public class RecordAggregationStrategy implements AggregationStrategy {
|
|
private static final int MAX_RECORDS = 2;
|
|
|
|
@Override
|
|
public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
|
|
if (oldExchange == null) {
|
|
// First record, create a new ArrayList to store records
|
|
newExchange.getIn().setHeader("recordList", new ArrayList<>());
|
|
return newExchange;
|
|
}
|
|
|
|
// Get the list of records from the oldExchange
|
|
List<String> recordList = oldExchange.getIn().getHeader("recordList", List.class);
|
|
// Add the new record to the list
|
|
recordList.add(newExchange.getIn().getBody(String.class));
|
|
|
|
if (recordList.size() >= MAX_RECORDS) {
|
|
// Set the aggregated records as the message body
|
|
oldExchange.getIn().setBody(recordList);
|
|
// Remove the property to signal the route to stop
|
|
oldExchange.removeProperty(Exchange.ROUTE_STOP);
|
|
} else {
|
|
// Store the updated list back in the header for the next iteration
|
|
oldExchange.getIn().setHeader("recordList", recordList);
|
|
}
|
|
|
|
return oldExchange;
|
|
}
|
|
}
|
|
|