Aggregator mvoe the rcords to TimeScaleDB

This commit is contained in:
Luca Frosini 2021-03-12 11:24:09 +01:00
parent c2950bd135
commit cc6fea69fb
4 changed files with 54 additions and 18 deletions

View File

@ -60,6 +60,11 @@
<artifactId>storagehub-client-library</artifactId>
<version>[1.0.0, 2.0.0-SNAPSHOT)</version>
</dependency>
<dependency>
<groupId>org.gcube.data.publishing</groupId>
<artifactId>document-store-lib-postgresql</artifactId>
<version>[1.0.0-SNAPSHOT, 2.0.0-SNAPSHOT)</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>

View File

@ -168,7 +168,7 @@ public class Elaborator {
File elaborationDirectory = fileSystemDirectoryStructure.getTargetFolder(aggregationType, aggregationStartDate);
Bucket srcBucket = CouchBaseConnector.getInstance().getBucket(recordType, aggregationInfo.getAggregationType(), SUFFIX.src);
Bucket dstBucket = CouchBaseConnector.getInstance().getBucket(recordType, aggregationInfo.getAggregationType(), SUFFIX.dst);
// Bucket dstBucket = CouchBaseConnector.getInstance().getBucket(recordType, aggregationInfo.getAggregationType(), SUFFIX.dst);
File originalRecordsbackupFile = getOriginalRecordsBackupFile(elaborationDirectory, recordType);
File aggregateRecordsBackupFile = getAggregatedRecordsBackupFile(originalRecordsbackupFile);
@ -185,7 +185,8 @@ public class Elaborator {
* before midnight and the second after midnight (so in the next day).
*/
if (Utility.isTimeElapsed(now, persistStartTime) && !Utility.isTimeElapsed(now, persistEndTime)) {
Persist persist = new Persist(aggregationStatus, srcBucket, dstBucket, originalRecordsbackupFile, aggregateRecordsBackupFile, recordType);
// Persist persist = new Persist(aggregationStatus, srcBucket, dstBucket, originalRecordsbackupFile, aggregateRecordsBackupFile, recordType);
Persist persist = new Persist(aggregationStatus, srcBucket, originalRecordsbackupFile, aggregateRecordsBackupFile, recordType);
persist.recover();
}else{
logger.info("Cannot delete/insert document before {} and after {}.", AccountingAggregatorPlugin.LOCAL_TIME_DATE_FORMAT.format(persistStartTime), AccountingAggregatorPlugin.LOCAL_TIME_DATE_FORMAT.format(persistEndTime));

View File

@ -5,19 +5,18 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.TimeUnit;
import org.gcube.accounting.aggregator.elaboration.Elaborator;
import org.gcube.accounting.aggregator.persistence.CouchBaseConnector;
import org.gcube.accounting.aggregator.status.AggregationState;
import org.gcube.accounting.aggregator.status.AggregationStatus;
import org.gcube.accounting.aggregator.utility.Utility;
import org.gcube.accounting.datamodel.aggregation.AggregatedServiceUsageRecord;
import org.gcube.accounting.datamodel.usagerecords.ServiceUsageRecord;
import org.gcube.documentstore.persistence.PersistenceBackendFactory;
import org.gcube.documentstore.persistence.PersistencePostgreSQL;
import org.gcube.documentstore.records.Record;
import org.gcube.documentstore.records.RecordUtility;
import com.couchbase.client.java.Bucket;
import com.couchbase.client.java.PersistTo;
import com.couchbase.client.java.document.JsonDocument;
import com.couchbase.client.java.document.json.JsonObject;
/**
@ -41,15 +40,22 @@ public class InsertDocument extends DocumentElaboration {
protected boolean serviceUsageRecordElaboration;
protected File calledMethodCSVFile;
protected PersistencePostgreSQL persistencePostgreSQL;
protected int count;
public InsertDocument(AggregationStatus aggregationStatus, File file, Bucket bucket){
super(aggregationStatus, AggregationState.ADDED, file, bucket, aggregationStatus.getAggregatedRecordsNumber());
// public InsertDocument(AggregationStatus aggregationStatus, File file, Bucket bucket){
public InsertDocument(AggregationStatus aggregationStatus, File file) throws Exception{
// super(aggregationStatus, AggregationState.ADDED, file, bucket, aggregationStatus.getAggregatedRecordsNumber());
super(aggregationStatus, AggregationState.ADDED, file, null, aggregationStatus.getAggregatedRecordsNumber());
serviceUsageRecordElaboration = false;
serviceClassName_calledMethods = new TreeMap<String,Map<String,Integer>>();
unparsableLines = new ArrayList<>();
File destinationFolder = file.getParentFile();
calledMethodCSVFile = new File(destinationFolder, file.getName().replace(Elaborator.AGGREGATED_SUFFIX, CSV_FILENAME_SUFFIX));
persistencePostgreSQL = (PersistencePostgreSQL) PersistenceBackendFactory.getPersistenceBackend(Utility.getCurrentContext());
count = 0;
}
protected String getKey(JsonObject jsonObject) {
@ -98,13 +104,31 @@ public class InsertDocument extends DocumentElaboration {
@Override
protected void elaborateLine(String line) throws Exception {
JsonObject jsonObject = analyseLine(line);
String id = jsonObject.getString(ID);
JsonDocument jsonDocument = JsonDocument.create(id, jsonObject);
bucket.upsert(jsonDocument, PersistTo.MASTER, CouchBaseConnector.CONNECTION_TIMEOUT_BUCKET, TimeUnit.SECONDS);
/*
* String id = jsonObject.getString(ID);
* JsonDocument jsonDocument = JsonDocument.create(id, jsonObject);
* bucket.upsert(jsonDocument, PersistTo.MASTER, CouchBaseConnector.CONNECTION_TIMEOUT_BUCKET, TimeUnit.SECONDS);
*/
Record record = RecordUtility.getRecord(jsonObject.toString());
if(count == 0) {
persistencePostgreSQL.newConnection();
}
persistencePostgreSQL.insert(record);
++count;
if(count==100) {
persistencePostgreSQL.commitAndClose();
count = 0;
}
}
@Override
protected void afterElaboration() {
protected void afterElaboration() throws Exception {
persistencePostgreSQL.commitAndClose();
count = 0;
if(serviceUsageRecordElaboration) {
if(calledMethodCSVFile.exists()) {
calledMethodCSVFile.delete();

View File

@ -23,27 +23,30 @@ import com.couchbase.client.java.Bucket;
*/
public class Persist {
private static Logger logger = LoggerFactory.getLogger(Persist.class);
protected static Logger logger = LoggerFactory.getLogger(Persist.class);
protected final AggregationStatus aggregationStatus;
protected final Bucket originalRecordBucket;
protected final Bucket aggregatedRecordBucket;
// protected final Bucket aggregatedRecordBucket;
protected final File originalRecordsbackupFile;
protected final File aggregateRecordsBackupFile;
protected final String recordType;
/*
public Persist(AggregationStatus aggregationStatus,
Bucket originalRecordBucket, Bucket aggregatedRecordBucket,
File originalRecordsbackupFile, File aggregateRecordsBackupFile, String recordType) {
*/
public Persist(AggregationStatus aggregationStatus,
Bucket originalRecordBucket, File originalRecordsbackupFile, File aggregateRecordsBackupFile, String recordType) {
super();
this.aggregationStatus = aggregationStatus;
this.originalRecordBucket = originalRecordBucket;
this.aggregatedRecordBucket = aggregatedRecordBucket;
// this.aggregatedRecordBucket = aggregatedRecordBucket;
this.originalRecordsbackupFile = originalRecordsbackupFile;
this.aggregateRecordsBackupFile = aggregateRecordsBackupFile;
@ -63,6 +66,7 @@ public class Persist {
public void recover() throws Exception{
if(aggregationStatus.getAggregatedRecordsNumber()==aggregationStatus.getOriginalRecordsNumber()){
/*
if(originalRecordBucket.name().compareTo(aggregatedRecordBucket.name())==0 || aggregationStatus.getAggregatedRecordsNumber()==0){
Calendar now = Utility.getUTCCalendarInstance();
logger.info("{} - OriginalRecords are {}. AggregatedRecords are {} ({}=={}). All records were already aggregated. The aggregation didn't had any effects and the Source and Destination Bucket are the same ({}) or the record number is 0. Setting {} to {}",
@ -76,6 +80,7 @@ public class Persist {
setAggregationStateToCompleted(now);
return;
}
*/
}
if(AggregationState.canContinue(aggregationStatus.getAggregationState(),AggregationState.AGGREGATED)){
@ -85,7 +90,8 @@ public class Persist {
DeleteDocument deleteDocument = new DeleteDocument(aggregationStatus, originalRecordsbackupFile, originalRecordBucket);
deleteDocument.elaborate();
}
InsertDocument insertDocument = new InsertDocument(aggregationStatus, aggregateRecordsBackupFile, aggregatedRecordBucket);
// InsertDocument insertDocument = new InsertDocument(aggregationStatus, aggregateRecordsBackupFile, aggregatedRecordBucket);
InsertDocument insertDocument = new InsertDocument(aggregationStatus, aggregateRecordsBackupFile);
boolean serviceUsageRecordElaboration = recordType.compareTo(ServiceUsageRecord.class.newInstance().getRecordType())==0 ? true : false;
insertDocument.setServiceUsageRecordElaboration(serviceUsageRecordElaboration);
if(AggregationState.canContinue(aggregationStatus.getAggregationState(),AggregationState.DELETED)){