diff --git a/pom.xml b/pom.xml
index 8619aae..45c77cd 100644
--- a/pom.xml
+++ b/pom.xml
@@ -60,6 +60,11 @@
storagehub-client-library
[1.0.0, 2.0.0-SNAPSHOT)
+
+ org.gcube.data.publishing
+ document-store-lib-postgresql
+ [1.0.0-SNAPSHOT, 2.0.0-SNAPSHOT)
+
org.slf4j
slf4j-api
diff --git a/src/main/java/org/gcube/accounting/aggregator/elaboration/Elaborator.java b/src/main/java/org/gcube/accounting/aggregator/elaboration/Elaborator.java
index 32e43b1..356222f 100644
--- a/src/main/java/org/gcube/accounting/aggregator/elaboration/Elaborator.java
+++ b/src/main/java/org/gcube/accounting/aggregator/elaboration/Elaborator.java
@@ -168,7 +168,7 @@ public class Elaborator {
File elaborationDirectory = fileSystemDirectoryStructure.getTargetFolder(aggregationType, aggregationStartDate);
Bucket srcBucket = CouchBaseConnector.getInstance().getBucket(recordType, aggregationInfo.getAggregationType(), SUFFIX.src);
- Bucket dstBucket = CouchBaseConnector.getInstance().getBucket(recordType, aggregationInfo.getAggregationType(), SUFFIX.dst);
+ // Bucket dstBucket = CouchBaseConnector.getInstance().getBucket(recordType, aggregationInfo.getAggregationType(), SUFFIX.dst);
File originalRecordsbackupFile = getOriginalRecordsBackupFile(elaborationDirectory, recordType);
File aggregateRecordsBackupFile = getAggregatedRecordsBackupFile(originalRecordsbackupFile);
@@ -185,7 +185,8 @@ public class Elaborator {
* before midnight and the second after midnight (so in the next day).
*/
if (Utility.isTimeElapsed(now, persistStartTime) && !Utility.isTimeElapsed(now, persistEndTime)) {
- Persist persist = new Persist(aggregationStatus, srcBucket, dstBucket, originalRecordsbackupFile, aggregateRecordsBackupFile, recordType);
+ // Persist persist = new Persist(aggregationStatus, srcBucket, dstBucket, originalRecordsbackupFile, aggregateRecordsBackupFile, recordType);
+ Persist persist = new Persist(aggregationStatus, srcBucket, originalRecordsbackupFile, aggregateRecordsBackupFile, recordType);
persist.recover();
}else{
logger.info("Cannot delete/insert document before {} and after {}.", AccountingAggregatorPlugin.LOCAL_TIME_DATE_FORMAT.format(persistStartTime), AccountingAggregatorPlugin.LOCAL_TIME_DATE_FORMAT.format(persistEndTime));
diff --git a/src/main/java/org/gcube/accounting/aggregator/persist/InsertDocument.java b/src/main/java/org/gcube/accounting/aggregator/persist/InsertDocument.java
index f10a311..13d609a 100644
--- a/src/main/java/org/gcube/accounting/aggregator/persist/InsertDocument.java
+++ b/src/main/java/org/gcube/accounting/aggregator/persist/InsertDocument.java
@@ -5,19 +5,18 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
-import java.util.concurrent.TimeUnit;
import org.gcube.accounting.aggregator.elaboration.Elaborator;
-import org.gcube.accounting.aggregator.persistence.CouchBaseConnector;
import org.gcube.accounting.aggregator.status.AggregationState;
import org.gcube.accounting.aggregator.status.AggregationStatus;
import org.gcube.accounting.aggregator.utility.Utility;
import org.gcube.accounting.datamodel.aggregation.AggregatedServiceUsageRecord;
import org.gcube.accounting.datamodel.usagerecords.ServiceUsageRecord;
+import org.gcube.documentstore.persistence.PersistenceBackendFactory;
+import org.gcube.documentstore.persistence.PersistencePostgreSQL;
+import org.gcube.documentstore.records.Record;
+import org.gcube.documentstore.records.RecordUtility;
-import com.couchbase.client.java.Bucket;
-import com.couchbase.client.java.PersistTo;
-import com.couchbase.client.java.document.JsonDocument;
import com.couchbase.client.java.document.json.JsonObject;
/**
@@ -41,15 +40,22 @@ public class InsertDocument extends DocumentElaboration {
protected boolean serviceUsageRecordElaboration;
protected File calledMethodCSVFile;
+ protected PersistencePostgreSQL persistencePostgreSQL;
+ protected int count;
- public InsertDocument(AggregationStatus aggregationStatus, File file, Bucket bucket){
- super(aggregationStatus, AggregationState.ADDED, file, bucket, aggregationStatus.getAggregatedRecordsNumber());
+ // public InsertDocument(AggregationStatus aggregationStatus, File file, Bucket bucket){
+ public InsertDocument(AggregationStatus aggregationStatus, File file) throws Exception{
+ // super(aggregationStatus, AggregationState.ADDED, file, bucket, aggregationStatus.getAggregatedRecordsNumber());
+ super(aggregationStatus, AggregationState.ADDED, file, null, aggregationStatus.getAggregatedRecordsNumber());
serviceUsageRecordElaboration = false;
serviceClassName_calledMethods = new TreeMap>();
unparsableLines = new ArrayList<>();
File destinationFolder = file.getParentFile();
calledMethodCSVFile = new File(destinationFolder, file.getName().replace(Elaborator.AGGREGATED_SUFFIX, CSV_FILENAME_SUFFIX));
+
+ persistencePostgreSQL = (PersistencePostgreSQL) PersistenceBackendFactory.getPersistenceBackend(Utility.getCurrentContext());
+ count = 0;
}
protected String getKey(JsonObject jsonObject) {
@@ -98,13 +104,31 @@ public class InsertDocument extends DocumentElaboration {
@Override
protected void elaborateLine(String line) throws Exception {
JsonObject jsonObject = analyseLine(line);
- String id = jsonObject.getString(ID);
- JsonDocument jsonDocument = JsonDocument.create(id, jsonObject);
- bucket.upsert(jsonDocument, PersistTo.MASTER, CouchBaseConnector.CONNECTION_TIMEOUT_BUCKET, TimeUnit.SECONDS);
+ /*
+ * String id = jsonObject.getString(ID);
+ * JsonDocument jsonDocument = JsonDocument.create(id, jsonObject);
+ * bucket.upsert(jsonDocument, PersistTo.MASTER, CouchBaseConnector.CONNECTION_TIMEOUT_BUCKET, TimeUnit.SECONDS);
+ */
+ Record record = RecordUtility.getRecord(jsonObject.toString());
+ if(count == 0) {
+ persistencePostgreSQL.newConnection();
+ }
+
+ persistencePostgreSQL.insert(record);
+ ++count;
+
+ if(count==100) {
+ persistencePostgreSQL.commitAndClose();
+ count = 0;
+ }
+
}
@Override
- protected void afterElaboration() {
+ protected void afterElaboration() throws Exception {
+ persistencePostgreSQL.commitAndClose();
+ count = 0;
+
if(serviceUsageRecordElaboration) {
if(calledMethodCSVFile.exists()) {
calledMethodCSVFile.delete();
diff --git a/src/main/java/org/gcube/accounting/aggregator/persist/Persist.java b/src/main/java/org/gcube/accounting/aggregator/persist/Persist.java
index 81dd673..4ee2cc1 100644
--- a/src/main/java/org/gcube/accounting/aggregator/persist/Persist.java
+++ b/src/main/java/org/gcube/accounting/aggregator/persist/Persist.java
@@ -23,27 +23,30 @@ import com.couchbase.client.java.Bucket;
*/
public class Persist {
- private static Logger logger = LoggerFactory.getLogger(Persist.class);
+ protected static Logger logger = LoggerFactory.getLogger(Persist.class);
protected final AggregationStatus aggregationStatus;
protected final Bucket originalRecordBucket;
- protected final Bucket aggregatedRecordBucket;
+ // protected final Bucket aggregatedRecordBucket;
protected final File originalRecordsbackupFile;
protected final File aggregateRecordsBackupFile;
protected final String recordType;
+ /*
public Persist(AggregationStatus aggregationStatus,
Bucket originalRecordBucket, Bucket aggregatedRecordBucket,
File originalRecordsbackupFile, File aggregateRecordsBackupFile, String recordType) {
-
+ */
+ public Persist(AggregationStatus aggregationStatus,
+ Bucket originalRecordBucket, File originalRecordsbackupFile, File aggregateRecordsBackupFile, String recordType) {
super();
this.aggregationStatus = aggregationStatus;
this.originalRecordBucket = originalRecordBucket;
- this.aggregatedRecordBucket = aggregatedRecordBucket;
+ // this.aggregatedRecordBucket = aggregatedRecordBucket;
this.originalRecordsbackupFile = originalRecordsbackupFile;
this.aggregateRecordsBackupFile = aggregateRecordsBackupFile;
@@ -63,6 +66,7 @@ public class Persist {
public void recover() throws Exception{
if(aggregationStatus.getAggregatedRecordsNumber()==aggregationStatus.getOriginalRecordsNumber()){
+ /*
if(originalRecordBucket.name().compareTo(aggregatedRecordBucket.name())==0 || aggregationStatus.getAggregatedRecordsNumber()==0){
Calendar now = Utility.getUTCCalendarInstance();
logger.info("{} - OriginalRecords are {}. AggregatedRecords are {} ({}=={}). All records were already aggregated. The aggregation didn't had any effects and the Source and Destination Bucket are the same ({}) or the record number is 0. Setting {} to {}",
@@ -76,6 +80,7 @@ public class Persist {
setAggregationStateToCompleted(now);
return;
}
+ */
}
if(AggregationState.canContinue(aggregationStatus.getAggregationState(),AggregationState.AGGREGATED)){
@@ -85,7 +90,8 @@ public class Persist {
DeleteDocument deleteDocument = new DeleteDocument(aggregationStatus, originalRecordsbackupFile, originalRecordBucket);
deleteDocument.elaborate();
}
- InsertDocument insertDocument = new InsertDocument(aggregationStatus, aggregateRecordsBackupFile, aggregatedRecordBucket);
+ // InsertDocument insertDocument = new InsertDocument(aggregationStatus, aggregateRecordsBackupFile, aggregatedRecordBucket);
+ InsertDocument insertDocument = new InsertDocument(aggregationStatus, aggregateRecordsBackupFile);
boolean serviceUsageRecordElaboration = recordType.compareTo(ServiceUsageRecord.class.newInstance().getRecordType())==0 ? true : false;
insertDocument.setServiceUsageRecordElaboration(serviceUsageRecordElaboration);
if(AggregationState.canContinue(aggregationStatus.getAggregationState(),AggregationState.DELETED)){