From cc6fea69fbada95ad1e6f54ec8baa09dbac2df0e Mon Sep 17 00:00:00 2001 From: Luca Frosini Date: Fri, 12 Mar 2021 11:24:09 +0100 Subject: [PATCH] Aggregator mvoe the rcords to TimeScaleDB --- pom.xml | 5 ++ .../aggregator/elaboration/Elaborator.java | 5 +- .../aggregator/persist/InsertDocument.java | 46 ++++++++++++++----- .../aggregator/persist/Persist.java | 16 +++++-- 4 files changed, 54 insertions(+), 18 deletions(-) diff --git a/pom.xml b/pom.xml index 8619aae..45c77cd 100644 --- a/pom.xml +++ b/pom.xml @@ -60,6 +60,11 @@ storagehub-client-library [1.0.0, 2.0.0-SNAPSHOT) + + org.gcube.data.publishing + document-store-lib-postgresql + [1.0.0-SNAPSHOT, 2.0.0-SNAPSHOT) + org.slf4j slf4j-api diff --git a/src/main/java/org/gcube/accounting/aggregator/elaboration/Elaborator.java b/src/main/java/org/gcube/accounting/aggregator/elaboration/Elaborator.java index 32e43b1..356222f 100644 --- a/src/main/java/org/gcube/accounting/aggregator/elaboration/Elaborator.java +++ b/src/main/java/org/gcube/accounting/aggregator/elaboration/Elaborator.java @@ -168,7 +168,7 @@ public class Elaborator { File elaborationDirectory = fileSystemDirectoryStructure.getTargetFolder(aggregationType, aggregationStartDate); Bucket srcBucket = CouchBaseConnector.getInstance().getBucket(recordType, aggregationInfo.getAggregationType(), SUFFIX.src); - Bucket dstBucket = CouchBaseConnector.getInstance().getBucket(recordType, aggregationInfo.getAggregationType(), SUFFIX.dst); + // Bucket dstBucket = CouchBaseConnector.getInstance().getBucket(recordType, aggregationInfo.getAggregationType(), SUFFIX.dst); File originalRecordsbackupFile = getOriginalRecordsBackupFile(elaborationDirectory, recordType); File aggregateRecordsBackupFile = getAggregatedRecordsBackupFile(originalRecordsbackupFile); @@ -185,7 +185,8 @@ public class Elaborator { * before midnight and the second after midnight (so in the next day). */ if (Utility.isTimeElapsed(now, persistStartTime) && !Utility.isTimeElapsed(now, persistEndTime)) { - Persist persist = new Persist(aggregationStatus, srcBucket, dstBucket, originalRecordsbackupFile, aggregateRecordsBackupFile, recordType); + // Persist persist = new Persist(aggregationStatus, srcBucket, dstBucket, originalRecordsbackupFile, aggregateRecordsBackupFile, recordType); + Persist persist = new Persist(aggregationStatus, srcBucket, originalRecordsbackupFile, aggregateRecordsBackupFile, recordType); persist.recover(); }else{ logger.info("Cannot delete/insert document before {} and after {}.", AccountingAggregatorPlugin.LOCAL_TIME_DATE_FORMAT.format(persistStartTime), AccountingAggregatorPlugin.LOCAL_TIME_DATE_FORMAT.format(persistEndTime)); diff --git a/src/main/java/org/gcube/accounting/aggregator/persist/InsertDocument.java b/src/main/java/org/gcube/accounting/aggregator/persist/InsertDocument.java index f10a311..13d609a 100644 --- a/src/main/java/org/gcube/accounting/aggregator/persist/InsertDocument.java +++ b/src/main/java/org/gcube/accounting/aggregator/persist/InsertDocument.java @@ -5,19 +5,18 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.TreeMap; -import java.util.concurrent.TimeUnit; import org.gcube.accounting.aggregator.elaboration.Elaborator; -import org.gcube.accounting.aggregator.persistence.CouchBaseConnector; import org.gcube.accounting.aggregator.status.AggregationState; import org.gcube.accounting.aggregator.status.AggregationStatus; import org.gcube.accounting.aggregator.utility.Utility; import org.gcube.accounting.datamodel.aggregation.AggregatedServiceUsageRecord; import org.gcube.accounting.datamodel.usagerecords.ServiceUsageRecord; +import org.gcube.documentstore.persistence.PersistenceBackendFactory; +import org.gcube.documentstore.persistence.PersistencePostgreSQL; +import org.gcube.documentstore.records.Record; +import org.gcube.documentstore.records.RecordUtility; -import com.couchbase.client.java.Bucket; -import com.couchbase.client.java.PersistTo; -import com.couchbase.client.java.document.JsonDocument; import com.couchbase.client.java.document.json.JsonObject; /** @@ -41,15 +40,22 @@ public class InsertDocument extends DocumentElaboration { protected boolean serviceUsageRecordElaboration; protected File calledMethodCSVFile; + protected PersistencePostgreSQL persistencePostgreSQL; + protected int count; - public InsertDocument(AggregationStatus aggregationStatus, File file, Bucket bucket){ - super(aggregationStatus, AggregationState.ADDED, file, bucket, aggregationStatus.getAggregatedRecordsNumber()); + // public InsertDocument(AggregationStatus aggregationStatus, File file, Bucket bucket){ + public InsertDocument(AggregationStatus aggregationStatus, File file) throws Exception{ + // super(aggregationStatus, AggregationState.ADDED, file, bucket, aggregationStatus.getAggregatedRecordsNumber()); + super(aggregationStatus, AggregationState.ADDED, file, null, aggregationStatus.getAggregatedRecordsNumber()); serviceUsageRecordElaboration = false; serviceClassName_calledMethods = new TreeMap>(); unparsableLines = new ArrayList<>(); File destinationFolder = file.getParentFile(); calledMethodCSVFile = new File(destinationFolder, file.getName().replace(Elaborator.AGGREGATED_SUFFIX, CSV_FILENAME_SUFFIX)); + + persistencePostgreSQL = (PersistencePostgreSQL) PersistenceBackendFactory.getPersistenceBackend(Utility.getCurrentContext()); + count = 0; } protected String getKey(JsonObject jsonObject) { @@ -98,13 +104,31 @@ public class InsertDocument extends DocumentElaboration { @Override protected void elaborateLine(String line) throws Exception { JsonObject jsonObject = analyseLine(line); - String id = jsonObject.getString(ID); - JsonDocument jsonDocument = JsonDocument.create(id, jsonObject); - bucket.upsert(jsonDocument, PersistTo.MASTER, CouchBaseConnector.CONNECTION_TIMEOUT_BUCKET, TimeUnit.SECONDS); + /* + * String id = jsonObject.getString(ID); + * JsonDocument jsonDocument = JsonDocument.create(id, jsonObject); + * bucket.upsert(jsonDocument, PersistTo.MASTER, CouchBaseConnector.CONNECTION_TIMEOUT_BUCKET, TimeUnit.SECONDS); + */ + Record record = RecordUtility.getRecord(jsonObject.toString()); + if(count == 0) { + persistencePostgreSQL.newConnection(); + } + + persistencePostgreSQL.insert(record); + ++count; + + if(count==100) { + persistencePostgreSQL.commitAndClose(); + count = 0; + } + } @Override - protected void afterElaboration() { + protected void afterElaboration() throws Exception { + persistencePostgreSQL.commitAndClose(); + count = 0; + if(serviceUsageRecordElaboration) { if(calledMethodCSVFile.exists()) { calledMethodCSVFile.delete(); diff --git a/src/main/java/org/gcube/accounting/aggregator/persist/Persist.java b/src/main/java/org/gcube/accounting/aggregator/persist/Persist.java index 81dd673..4ee2cc1 100644 --- a/src/main/java/org/gcube/accounting/aggregator/persist/Persist.java +++ b/src/main/java/org/gcube/accounting/aggregator/persist/Persist.java @@ -23,27 +23,30 @@ import com.couchbase.client.java.Bucket; */ public class Persist { - private static Logger logger = LoggerFactory.getLogger(Persist.class); + protected static Logger logger = LoggerFactory.getLogger(Persist.class); protected final AggregationStatus aggregationStatus; protected final Bucket originalRecordBucket; - protected final Bucket aggregatedRecordBucket; + // protected final Bucket aggregatedRecordBucket; protected final File originalRecordsbackupFile; protected final File aggregateRecordsBackupFile; protected final String recordType; + /* public Persist(AggregationStatus aggregationStatus, Bucket originalRecordBucket, Bucket aggregatedRecordBucket, File originalRecordsbackupFile, File aggregateRecordsBackupFile, String recordType) { - + */ + public Persist(AggregationStatus aggregationStatus, + Bucket originalRecordBucket, File originalRecordsbackupFile, File aggregateRecordsBackupFile, String recordType) { super(); this.aggregationStatus = aggregationStatus; this.originalRecordBucket = originalRecordBucket; - this.aggregatedRecordBucket = aggregatedRecordBucket; + // this.aggregatedRecordBucket = aggregatedRecordBucket; this.originalRecordsbackupFile = originalRecordsbackupFile; this.aggregateRecordsBackupFile = aggregateRecordsBackupFile; @@ -63,6 +66,7 @@ public class Persist { public void recover() throws Exception{ if(aggregationStatus.getAggregatedRecordsNumber()==aggregationStatus.getOriginalRecordsNumber()){ + /* if(originalRecordBucket.name().compareTo(aggregatedRecordBucket.name())==0 || aggregationStatus.getAggregatedRecordsNumber()==0){ Calendar now = Utility.getUTCCalendarInstance(); logger.info("{} - OriginalRecords are {}. AggregatedRecords are {} ({}=={}). All records were already aggregated. The aggregation didn't had any effects and the Source and Destination Bucket are the same ({}) or the record number is 0. Setting {} to {}", @@ -76,6 +80,7 @@ public class Persist { setAggregationStateToCompleted(now); return; } + */ } if(AggregationState.canContinue(aggregationStatus.getAggregationState(),AggregationState.AGGREGATED)){ @@ -85,7 +90,8 @@ public class Persist { DeleteDocument deleteDocument = new DeleteDocument(aggregationStatus, originalRecordsbackupFile, originalRecordBucket); deleteDocument.elaborate(); } - InsertDocument insertDocument = new InsertDocument(aggregationStatus, aggregateRecordsBackupFile, aggregatedRecordBucket); + // InsertDocument insertDocument = new InsertDocument(aggregationStatus, aggregateRecordsBackupFile, aggregatedRecordBucket); + InsertDocument insertDocument = new InsertDocument(aggregationStatus, aggregateRecordsBackupFile); boolean serviceUsageRecordElaboration = recordType.compareTo(ServiceUsageRecord.class.newInstance().getRecordType())==0 ? true : false; insertDocument.setServiceUsageRecordElaboration(serviceUsageRecordElaboration); if(AggregationState.canContinue(aggregationStatus.getAggregationState(),AggregationState.DELETED)){