diff --git a/src/main/java/org/gcube/accounting/aggregator/persist/DeleteDocument.java b/src/main/java/org/gcube/accounting/aggregator/persist/DeleteDocument.java index 17377e4..c2549af 100644 --- a/src/main/java/org/gcube/accounting/aggregator/persist/DeleteDocument.java +++ b/src/main/java/org/gcube/accounting/aggregator/persist/DeleteDocument.java @@ -7,6 +7,7 @@ import org.gcube.accounting.aggregator.persistence.AggregatorPersistenceSrc; import org.gcube.accounting.aggregator.status.AggregationState; import org.gcube.accounting.aggregator.status.AggregationStatus; import org.gcube.com.fasterxml.jackson.databind.JsonNode; +import org.gcube.com.fasterxml.jackson.databind.node.ArrayNode; import org.gcube.documentstore.records.DSMapper; /** @@ -14,22 +15,39 @@ import org.gcube.documentstore.records.DSMapper; */ public class DeleteDocument extends DocumentElaboration { - public DeleteDocument(AggregationStatus aggregationStatus, File file){ + protected AggregatorPersistenceSrc aggregatorPersistenceSrc; + protected ArrayNode arrayNode; + + public DeleteDocument(AggregationStatus aggregationStatus, File file) throws Exception{ super(aggregationStatus, AggregationState.DELETED, file, aggregationStatus.getOriginalRecordsNumber()); + arrayNode = DSMapper.getObjectMapper().createArrayNode(); + aggregatorPersistenceSrc = AggregatorPersistenceFactory.getAggregatorPersistenceSrc(); } @Override protected void elaborateLine(String line) throws Exception { JsonNode jsonNode = DSMapper.asJsonNode(line); - String id = jsonNode.get(ID).asText(); - logger.trace("Going to delete record with id {}", id); - AggregatorPersistenceSrc aggregatorPersistenceSrc = AggregatorPersistenceFactory.getAggregatorPersistenceSrc(); - aggregatorPersistenceSrc.deleteRecord(jsonNode); + if(aggregatorPersistenceSrc.isBulkDeleteAllowed()) { + arrayNode.add(jsonNode); + aggregatorPersistenceSrc.deleteRecord(jsonNode); + if(arrayNode.size()>=MAX_ROWS_PER_STEP) { + aggregatorPersistenceSrc.deleteRecords(arrayNode); + arrayNode = DSMapper.getObjectMapper().createArrayNode(); + } + } else { + String id = jsonNode.get(ID).asText(); + logger.trace("Going to delete record with id {}", id); + aggregatorPersistenceSrc.deleteRecord(jsonNode); + } } @Override - protected void afterElaboration() { + protected void afterElaboration() throws Exception { // Nothing to do + if(aggregatorPersistenceSrc.isBulkDeleteAllowed() && arrayNode.size()>0) { + aggregatorPersistenceSrc.deleteRecords(arrayNode); + arrayNode = DSMapper.getObjectMapper().createArrayNode(); + } } } diff --git a/src/main/java/org/gcube/accounting/aggregator/persist/DocumentElaboration.java b/src/main/java/org/gcube/accounting/aggregator/persist/DocumentElaboration.java index f411fdb..f098a33 100644 --- a/src/main/java/org/gcube/accounting/aggregator/persist/DocumentElaboration.java +++ b/src/main/java/org/gcube/accounting/aggregator/persist/DocumentElaboration.java @@ -26,6 +26,8 @@ public abstract class DocumentElaboration { public static final int MAX_RETRY = 7; + public static final int MAX_ROWS_PER_STEP = 500; + protected final AggregationStatus aggregationStatus; protected final File file; protected final AggregationState finalAggregationState; @@ -57,8 +59,8 @@ public abstract class DocumentElaboration { logger.info("{} - Going to elaborate {} rows", aggregationStatus.getAggregationInfo(), rowToBeElaborated); int numberOfRowsForEachRecoveryPoint = (rowToBeElaborated / 10) + 1; - if(numberOfRowsForEachRecoveryPoint>500) { - numberOfRowsForEachRecoveryPoint = 500; + if(numberOfRowsForEachRecoveryPoint>MAX_ROWS_PER_STEP) { + numberOfRowsForEachRecoveryPoint = MAX_ROWS_PER_STEP; } currentlyElaborated = 0; diff --git a/src/main/java/org/gcube/accounting/aggregator/persistence/AggregatorPersistenceSrc.java b/src/main/java/org/gcube/accounting/aggregator/persistence/AggregatorPersistenceSrc.java index 84802cb..dc78a91 100644 --- a/src/main/java/org/gcube/accounting/aggregator/persistence/AggregatorPersistenceSrc.java +++ b/src/main/java/org/gcube/accounting/aggregator/persistence/AggregatorPersistenceSrc.java @@ -4,6 +4,7 @@ import java.sql.ResultSet; import org.gcube.accounting.aggregator.status.AggregationStatus; import org.gcube.com.fasterxml.jackson.databind.JsonNode; +import org.gcube.com.fasterxml.jackson.databind.node.ArrayNode; /** * @author Luca Frosini (ISTI - CNR) @@ -14,4 +15,12 @@ public interface AggregatorPersistenceSrc extends AggregatorPersistence { public void deleteRecord(JsonNode jsonNode) throws Exception; + public boolean isBulkDeleteAllowed(); + + /** + * It must be implemented only and only if isBulkDeleteAllowed() + * return true. It must raise UnsupportedOperationException if bulk delete is not allowed + */ + public void deleteRecords(ArrayNode array) throws UnsupportedOperationException, Exception; + } diff --git a/src/main/java/org/gcube/accounting/aggregator/persistence/PostgreSQLConnector.java b/src/main/java/org/gcube/accounting/aggregator/persistence/PostgreSQLConnector.java index acc8f55..417bfc0 100644 --- a/src/main/java/org/gcube/accounting/aggregator/persistence/PostgreSQLConnector.java +++ b/src/main/java/org/gcube/accounting/aggregator/persistence/PostgreSQLConnector.java @@ -27,6 +27,7 @@ import org.gcube.accounting.persistence.AccountingPersistenceConfiguration; import org.gcube.accounting.utility.postgresql.RecordToDBFields; import org.gcube.accounting.utility.postgresql.RecordToDBMapping; import org.gcube.com.fasterxml.jackson.databind.JsonNode; +import org.gcube.com.fasterxml.jackson.databind.node.ArrayNode; import org.gcube.documentstore.persistence.PersistencePostgreSQL; import org.gcube.documentstore.records.DSMapper; import org.gcube.documentstore.records.Record; @@ -560,5 +561,50 @@ public class PostgreSQLConnector extends PersistencePostgreSQL implements Aggreg return resultSet; } + + @Override + public boolean isBulkDeleteAllowed() { + return true; + } + + @Override + public void deleteRecords(ArrayNode array) throws UnsupportedOperationException, Exception { + if(array.size()<1) { + return; + } + + Record record = DSMapper.unmarshal(Record.class, array.get(0).toString()); + Class clz = record.getClass(); + String type = RecordToDBMapping.getRecordTypeByClass(clz); + String tableName = RecordToDBFields.getKey(type); + + StringBuffer stringBuffer = new StringBuffer(); + stringBuffer.append("DELETE "); + stringBuffer.append("FROM "); + stringBuffer.append(tableName); + stringBuffer.append(" WHERE "); + String id = record.getId(); + stringBuffer.append("id = "); + stringBuffer.append(getValue(id)); + + for(int i=1; i