accounting-aggregator-se-pl.../src/main/java/org/gcube/accounting/aggregator/persist/DeleteDocument.java

55 lines
1.9 KiB
Java

package org.gcube.accounting.aggregator.persist;
import java.io.File;
import java.util.concurrent.TimeUnit;
import org.gcube.accounting.aggregator.persistence.AggregatorPersistenceFactory;
import org.gcube.accounting.aggregator.persistence.AggregatorPersistenceSrc;
import org.gcube.accounting.aggregator.status.AggregationState;
import org.gcube.accounting.aggregator.status.AggregationStatus;
import org.gcube.com.fasterxml.jackson.databind.JsonNode;
import org.gcube.com.fasterxml.jackson.databind.node.ArrayNode;
import org.gcube.documentstore.records.DSMapper;
/**
* @author Luca Frosini (ISTI - CNR)
*/
public class DeleteDocument extends DocumentElaboration {
protected AggregatorPersistenceSrc aggregatorPersistenceSrc;
protected ArrayNode arrayNode;
public DeleteDocument(AggregationStatus aggregationStatus, File file) throws Exception{
super(aggregationStatus, AggregationState.DELETED, file, aggregationStatus.getOriginalRecordsNumber());
arrayNode = DSMapper.getObjectMapper().createArrayNode();
aggregatorPersistenceSrc = AggregatorPersistenceFactory.getAggregatorPersistenceSrc();
}
@Override
protected void elaborateLine(String line) throws Exception {
JsonNode jsonNode = DSMapper.asJsonNode(line);
if(aggregatorPersistenceSrc.isBulkDeleteAllowed()) {
arrayNode.add(jsonNode);
if(arrayNode.size()>=effectiveMaxRowPerStep) {
aggregatorPersistenceSrc.deleteRecords(arrayNode);
arrayNode = DSMapper.getObjectMapper().createArrayNode();
}
} else {
String id = jsonNode.get(ID).asText();
logger.trace("Going to delete record with id {}", id);
aggregatorPersistenceSrc.deleteRecord(jsonNode);
TimeUnit.MILLISECONDS.sleep(2);
}
}
@Override
protected void afterElaboration() throws Exception {
// Nothing to do
if(aggregatorPersistenceSrc.isBulkDeleteAllowed() && arrayNode.size()>0) {
aggregatorPersistenceSrc.deleteRecords(arrayNode);
arrayNode = DSMapper.getObjectMapper().createArrayNode();
}
}
}