186 lines
6.7 KiB
Java
186 lines
6.7 KiB
Java
package org.gcube.accounting.aggregator.aggregation;
|
|
|
|
import java.io.File;
|
|
import java.text.DateFormat;
|
|
import java.util.Calendar;
|
|
import java.util.List;
|
|
import java.util.UUID;
|
|
|
|
import org.gcube.accounting.aggregator.status.AggregationState;
|
|
import org.gcube.accounting.aggregator.status.AggregationStatus;
|
|
import org.gcube.accounting.aggregator.utility.Constant;
|
|
import org.gcube.accounting.aggregator.utility.Utility;
|
|
import org.gcube.accounting.datamodel.AggregatedUsageRecord;
|
|
import org.gcube.documentstore.records.AggregatedRecord;
|
|
import org.gcube.documentstore.records.DSMapper;
|
|
import org.gcube.documentstore.records.RecordUtility;
|
|
import org.slf4j.Logger;
|
|
import org.slf4j.LoggerFactory;
|
|
|
|
import com.couchbase.client.java.Bucket;
|
|
import com.couchbase.client.java.document.JsonDocument;
|
|
import com.couchbase.client.java.document.json.JsonArray;
|
|
import com.couchbase.client.java.document.json.JsonObject;
|
|
import com.couchbase.client.java.view.ViewQuery;
|
|
import com.couchbase.client.java.view.ViewResult;
|
|
import com.couchbase.client.java.view.ViewRow;
|
|
|
|
/**
|
|
* @author Luca Frosini (ISTI - CNR)
|
|
*/
|
|
public class Aggregator {
|
|
|
|
private static Logger logger = LoggerFactory.getLogger(Aggregator.class);
|
|
|
|
private static final String TMP_SUFFIX = ".tmp";
|
|
|
|
protected final AggregationStatus aggregationStatus;
|
|
protected final Bucket bucket;
|
|
protected final File originalRecordsbackupFile;
|
|
protected final File aggregateRecordsBackupFile;
|
|
|
|
protected Calendar startTime;
|
|
|
|
public Aggregator(AggregationStatus aggregationStatus, Bucket bucket, File originalRecordsbackupFile, File aggregateRecordsBackupFile) {
|
|
this.aggregationStatus = aggregationStatus;
|
|
|
|
this.bucket = bucket;
|
|
this.originalRecordsbackupFile = originalRecordsbackupFile;
|
|
this.aggregateRecordsBackupFile = aggregateRecordsBackupFile;
|
|
}
|
|
|
|
public void aggregate() throws Exception {
|
|
if(AggregationState.canContinue(aggregationStatus.getAggregationState(),AggregationState.STARTED)) {
|
|
startTime = Utility.getUTCCalendarInstance();
|
|
ViewResult viewResult = getViewResult();
|
|
retrieveAndAggregate(viewResult);
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Generate a key for map-reduce
|
|
* @param key
|
|
* @return
|
|
*/
|
|
protected JsonArray generateKey(String key){
|
|
JsonArray arrayKey = JsonArray.create();
|
|
for (String value : key.split("/")){
|
|
if (!value.toString().isEmpty()){
|
|
arrayKey.add(Integer.parseInt(value));
|
|
}
|
|
}
|
|
return arrayKey;
|
|
|
|
}
|
|
|
|
protected ViewResult getViewResult() throws Exception {
|
|
|
|
DateFormat dateFormat = aggregationStatus.getAggregationInfo().getAggregationType().getDateFormat();
|
|
|
|
String dateStartKey = dateFormat.format(aggregationStatus.getAggregationInfo().getAggregationStartDate());
|
|
String dateEndKey = dateFormat.format(aggregationStatus.getAggregationInfo().getAggregationEndDate());
|
|
|
|
JsonArray startKey = generateKey(dateStartKey);
|
|
JsonArray endKey = generateKey(dateEndKey);
|
|
|
|
DesignID designid = DesignID.valueOf(bucket.name());
|
|
String designDocId = designid.getDesignName();
|
|
|
|
String viewName = designid.getViewName();
|
|
|
|
ViewQuery query = ViewQuery.from(designDocId, viewName);
|
|
query.startKey(startKey);
|
|
query.endKey(endKey);
|
|
query.reduce(false);
|
|
query.inclusiveEnd(false);
|
|
|
|
logger.debug("View Query: designDocId:{} - viewName:{}, startKey:{} - endKey:{} ",
|
|
designDocId, viewName, startKey, endKey);
|
|
|
|
try {
|
|
return bucket.query(query);
|
|
} catch (Exception e) {
|
|
logger.error("Exception error VIEW", e.getLocalizedMessage(), e);
|
|
throw e;
|
|
}
|
|
}
|
|
|
|
protected void retrieveAndAggregate(ViewResult viewResult) throws Exception {
|
|
AggregatorBuffer aggregatorBuffer = new AggregatorBuffer();
|
|
|
|
Calendar start = Utility.getUTCCalendarInstance();
|
|
logger.debug("Elaboration of Records started at {}", Constant.DEFAULT_DATE_FORMAT.format(start.getTime()));
|
|
|
|
originalRecordsbackupFile.delete();
|
|
aggregateRecordsBackupFile.delete();
|
|
|
|
int originalRecordsCounter = 0;
|
|
for (ViewRow row : viewResult) {
|
|
try {
|
|
String record = row.document().content().toString();
|
|
|
|
record.replace("usageRecordType", "recordType");
|
|
|
|
// Backup the Record on local file
|
|
Utility.printLine(originalRecordsbackupFile, record);
|
|
|
|
// Aggregate the Record
|
|
aggregateRow(aggregatorBuffer, record);
|
|
|
|
++originalRecordsCounter;
|
|
if(originalRecordsCounter%1000==0){
|
|
int aggregatedRecordsNumber = aggregatorBuffer.getAggregatedRecords().size();
|
|
int diff = originalRecordsCounter - aggregatedRecordsNumber;
|
|
float percentage = (100 * diff) / originalRecordsCounter;
|
|
logger.info("{} At the moment, the elaborated original records are {}. The Aggregated records are {}. Difference {}. We are recovering {}% of Documents",
|
|
aggregationStatus.getAggregationInfo(), originalRecordsCounter, aggregatedRecordsNumber, diff, percentage);
|
|
}
|
|
}catch (Exception e) {
|
|
logger.error("Unable to elaborate {} {}", ViewRow.class.getSimpleName(), row, e);
|
|
throw e;
|
|
}
|
|
}
|
|
|
|
Calendar end = Utility.getUTCCalendarInstance();
|
|
long duration = end.getTimeInMillis() - start.getTimeInMillis();
|
|
String durationForHuman = Utility.getHumanReadableDuration(duration);
|
|
logger.debug("{} Elaboration of Records terminated at {}. Duration {}",
|
|
aggregationStatus.getAggregationInfo(), Constant.DEFAULT_DATE_FORMAT.format(end.getTime()), durationForHuman);
|
|
|
|
File aggregateRecordsBackupFileTmp = new File(aggregateRecordsBackupFile.getParent(),
|
|
aggregateRecordsBackupFile.getName() + TMP_SUFFIX);
|
|
aggregateRecordsBackupFileTmp.delete();
|
|
|
|
// Saving Aggregated Record on local file
|
|
logger.debug("Going to save {} to file {}", AggregatedUsageRecord.class.getSimpleName(),
|
|
aggregateRecordsBackupFile);
|
|
|
|
List<AggregatedRecord<?, ?>> aggregatedRecords = aggregatorBuffer.getAggregatedRecords();
|
|
for (AggregatedRecord<?, ?> aggregatedRecord : aggregatedRecords) {
|
|
String marshalled = DSMapper.marshal(aggregatedRecord);
|
|
JsonObject jsonObject = JsonObject.fromJson(marshalled);
|
|
Utility.printLine(aggregateRecordsBackupFileTmp, jsonObject.toString());
|
|
}
|
|
|
|
aggregateRecordsBackupFileTmp.renameTo(aggregateRecordsBackupFile);
|
|
|
|
aggregationStatus.setRecordNumbers(originalRecordsCounter, aggregatedRecords.size());
|
|
aggregationStatus.setState(AggregationState.AGGREGATED, startTime, true);
|
|
}
|
|
|
|
protected void aggregateRow(AggregatorBuffer aggregatorBuffer, String json) throws Exception {
|
|
@SuppressWarnings("rawtypes")
|
|
AggregatedRecord record = (AggregatedRecord) RecordUtility.getRecord(json);
|
|
record.setId(UUID.randomUUID().toString());
|
|
aggregatorBuffer.aggregate(record);
|
|
}
|
|
|
|
protected JsonDocument getJsonDocument(ViewRow row) {
|
|
String identifier = (String) row.document().content().get("id");
|
|
JsonDocument jsonDocument = JsonDocument.create(identifier, row.document().content());
|
|
logger.trace("{}", jsonDocument.toString());
|
|
return jsonDocument;
|
|
}
|
|
|
|
}
|