accounting-aggregator-se-pl.../src/main/java/org/gcube/accounting/aggregator/aggregation/Aggregator.java

331 lines
11 KiB
Java

package org.gcube.accounting.aggregator.aggregation;
import java.io.File;
import java.sql.ResultSet;
import java.time.OffsetDateTime;
import java.util.Calendar;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import org.gcube.accounting.aggregator.persistence.AggregatorPersistenceFactory;
import org.gcube.accounting.aggregator.persistence.AggregatorPersistenceSrc;
import org.gcube.accounting.aggregator.status.AggregationState;
import org.gcube.accounting.aggregator.status.AggregationStatus;
import org.gcube.accounting.aggregator.utility.Constant;
import org.gcube.accounting.aggregator.utility.Utility;
import org.gcube.accounting.datamodel.AggregatedUsageRecord;
import org.gcube.accounting.datamodel.aggregation.AggregatedServiceUsageRecord;
import org.gcube.accounting.datamodel.basetypes.AbstractServiceUsageRecord;
import org.gcube.accounting.datamodel.usagerecords.ServiceUsageRecord;
import org.gcube.accounting.utility.postgresql.RecordToDBFields;
import org.gcube.accounting.utility.postgresql.RecordToDBMapping;
import org.gcube.com.fasterxml.jackson.databind.ObjectMapper;
import org.gcube.com.fasterxml.jackson.databind.node.ObjectNode;
import org.gcube.documentstore.exception.InvalidValueException;
import org.gcube.documentstore.records.AggregatedRecord;
import org.gcube.documentstore.records.DSMapper;
import org.gcube.documentstore.records.Record;
import org.gcube.documentstore.records.RecordUtility;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author Luca Frosini (ISTI - CNR)
*/
public class Aggregator {
private static Logger logger = LoggerFactory.getLogger(Aggregator.class);
private static final String TMP_SUFFIX = ".tmp";
protected final AggregationStatus aggregationStatus;
protected final File originalRecordsbackupFile;
protected final File aggregateRecordsBackupFile;
protected final File malformedRecordsFile;
protected int malformedRecordNumber;
protected ObjectMapper objectMapper;
protected Calendar startTime;
public Aggregator(AggregationStatus aggregationStatus, File originalRecordsbackupFile, File aggregateRecordsBackupFile) {
this.aggregationStatus = aggregationStatus;
this.originalRecordsbackupFile = originalRecordsbackupFile;
this.aggregateRecordsBackupFile = aggregateRecordsBackupFile;
this.malformedRecordsFile = Utility.getMalformatedFile(aggregateRecordsBackupFile);
this.objectMapper = new ObjectMapper();
}
public void aggregate() throws Exception {
if(AggregationState.canContinue(aggregationStatus.getAggregationState(),AggregationState.STARTED)) {
startTime = Utility.getUTCCalendarInstance();
AggregatorPersistenceSrc aggregatorPersistenceSrc = AggregatorPersistenceFactory.getAggregatorPersistenceSrc();
ResultSet resultSet = aggregatorPersistenceSrc.getResultSetOfRecordToBeAggregated(aggregationStatus);
retrieveAndAggregate(resultSet);
}
}
private static final String USAGE_RECORD_TYPE = "usageRecordType";
private static final String SINGLE = "Single";
private static final String SIMPLE = "Simple";
protected int elaborateRow(ObjectNode content, AggregatorBuffer aggregatorBuffer, int originalRecordsCounter) throws Exception {
try {
if(content.has(USAGE_RECORD_TYPE)){
String recordType = content.get(USAGE_RECORD_TYPE).asText();
content.remove(USAGE_RECORD_TYPE);
content.put(Record.RECORD_TYPE, recordType);
}
Boolean aggregated = false;
if(content.has(AggregatedRecord.CREATION_TIME)) {
Object object = content.get(AggregatedRecord.CREATION_TIME);
if(object instanceof Double) {
Double d = ((Double) object);
content.put(AggregatedRecord.CREATION_TIME, d.longValue());
}
}
if(content.has(AggregatedRecord.START_TIME)) {
aggregated = true;
Object object = content.get(AggregatedRecord.START_TIME);
if(object instanceof Double) {
Double d = ((Double) object);
content.put(AggregatedRecord.START_TIME, d.longValue());
}
}
if(content.has(AggregatedRecord.END_TIME)) {
aggregated = true;
Object object = content.get(AggregatedRecord.END_TIME);
if(object instanceof Double) {
Double d = ((Double) object);
content.put(AggregatedRecord.END_TIME, d.longValue());
}
}
if(content.has(AggregatedRecord.OPERATION_COUNT)) {
Object object = content.get(AggregatedRecord.OPERATION_COUNT);
if(object instanceof Double) {
Double d = ((Double) object);
content.put(AggregatedRecord.OPERATION_COUNT, d.intValue());
}
if(content.get(AggregatedRecord.OPERATION_COUNT).asInt()>1) {
aggregated = true;
}
}
if(aggregated) {
content.put(AggregatedRecord.AGGREGATED, true);
}
String recordType = content.get(Record.RECORD_TYPE).asText();
if(!aggregated){
if(recordType.startsWith(SIMPLE)){
recordType = recordType.replace(SIMPLE, SINGLE);
content.put(Record.RECORD_TYPE, recordType);
}
if(!recordType.startsWith(SINGLE)) {
recordType = SINGLE + recordType;
content.put(Record.RECORD_TYPE, recordType);
}
}else {
if(recordType.startsWith(SIMPLE)){
recordType = recordType.replace(SIMPLE, "");
content.put(Record.RECORD_TYPE, recordType);
}
if(recordType.startsWith(SINGLE)) {
recordType = recordType.replace(SINGLE, "");
content.put(Record.RECORD_TYPE, recordType);
}
}
String record = content.toString();
// Aggregate the Record
aggregateRow(aggregatorBuffer, record);
++originalRecordsCounter;
if(originalRecordsCounter%1000==0){
int aggregatedRecordsNumber = aggregatorBuffer.getAggregatedRecords().size();
int diff = originalRecordsCounter - aggregatedRecordsNumber;
float percentage = (100 * diff) / originalRecordsCounter;
logger.info("{} At the moment, the elaborated original records are {}. The Aggregated records are {}. Difference {}. We are recovering {}% of Documents",
aggregationStatus.getAggregationInfo(), originalRecordsCounter, aggregatedRecordsNumber, diff, percentage);
}
Utility.printLine(originalRecordsbackupFile, record);
return originalRecordsCounter;
}catch (Exception e) {
throw e;
}
}
private static final int MAX_RETRY = 3;
protected void addProperty(ObjectNode objectNode, String key, Object value) {
if(value instanceof Number) {
if(value instanceof Integer) {
objectNode.put(key, (int) value);
return;
}
Long longValue = Long.valueOf(value.toString());
objectNode.put(key, longValue);
return;
}
objectNode.put(key, (String) value.toString());
}
protected Calendar getCalendar(OffsetDateTime offsetDateTime) {
Calendar calendar = Calendar.getInstance();
long epochMillis = offsetDateTime.toInstant().toEpochMilli();
calendar.setTimeInMillis(epochMillis);
return calendar;
}
protected void retrieveAndAggregate(ResultSet resultSet) throws Exception {
AggregatorBuffer aggregatorBuffer = new AggregatorBuffer();
Calendar start = Utility.getUTCCalendarInstance();
logger.debug("Elaboration of Records started at {}", Constant.DEFAULT_DATE_FORMAT.format(start.getTime()));
originalRecordsbackupFile.delete();
aggregateRecordsBackupFile.delete();
malformedRecordsFile.delete();
AggregationInfo aggregationInfo = aggregationStatus.getAggregationInfo();
String type = aggregationInfo.getRecordType();
Class<? extends AggregatedRecord<?, ?>> clz = RecordUtility.getAggregatedRecordClass(type);
RecordToDBFields recordToDBFields = RecordToDBMapping.getRecordToDBFields(clz);
Set<String> requiredFields = clz.newInstance().getRequiredFields();
malformedRecordNumber = 0;
int originalRecordsCounter = 0;
while (resultSet.next()) {
for(int i=1; i<=MAX_RETRY; i++){
try {
ObjectNode objectNode = objectMapper.createObjectNode();
addProperty(objectNode, Record.RECORD_TYPE, type);
for(String recordField : requiredFields) {
String tableField = recordToDBFields.getTableField(recordField);
Object obj = null;
switch (recordField) {
case AggregatedRecord.START_TIME: case AggregatedRecord.END_TIME: case AggregatedRecord.CREATION_TIME:
OffsetDateTime offsetDateTime = resultSet.getObject(tableField, OffsetDateTime.class);
Calendar calendar = getCalendar(offsetDateTime);
obj = calendar.getTimeInMillis();
break;
default:
obj = resultSet.getObject(tableField);
break;
}
addProperty(objectNode, recordField, obj);
}
originalRecordsCounter = elaborateRow(objectNode, aggregatorBuffer, originalRecordsCounter);
TimeUnit.MILLISECONDS.sleep(3);
break;
}catch (RuntimeException e) {
if(i==2){
logger.error("Unable to elaborate row {}. Tryed {} times.", i, e);
}
}
}
}
Calendar end = Utility.getUTCCalendarInstance();
long duration = end.getTimeInMillis() - start.getTimeInMillis();
String durationForHuman = Utility.getHumanReadableDuration(duration);
logger.debug("{} Elaboration of Records terminated at {}. Duration {}",
aggregationStatus.getAggregationInfo(), Constant.DEFAULT_DATE_FORMAT.format(end.getTime()), durationForHuman);
File aggregateRecordsBackupFileTmp = new File(aggregateRecordsBackupFile.getParent(),
aggregateRecordsBackupFile.getName() + TMP_SUFFIX);
aggregateRecordsBackupFileTmp.delete();
// Saving Aggregated Record on local file
logger.debug("Going to save {} to file {}", AggregatedUsageRecord.class.getSimpleName(),
aggregateRecordsBackupFile);
List<AggregatedRecord<?, ?>> aggregatedRecords = aggregatorBuffer.getAggregatedRecords();
for (AggregatedRecord<?, ?> aggregatedRecord : aggregatedRecords) {
String marshalled = DSMapper.marshal(aggregatedRecord);
Utility.printLine(aggregateRecordsBackupFileTmp, marshalled);
}
aggregateRecordsBackupFileTmp.renameTo(aggregateRecordsBackupFile);
aggregationStatus.setRecordNumbers(originalRecordsCounter, aggregatedRecords.size(), malformedRecordNumber);
aggregationStatus.setAggregationState(AggregationState.AGGREGATED, startTime, true);
}
protected void aggregateRow(AggregatorBuffer aggregatorBuffer, String json) throws Exception {
Record record = RecordUtility.getRecord(json);
try {
record.validate();
}catch (InvalidValueException e) {
++malformedRecordNumber;
Utility.printLine(malformedRecordsFile, json);
if(record instanceof AggregatedServiceUsageRecord){
try {
if(record.getResourceProperty(AggregatedServiceUsageRecord.MIN_INVOCATION_TIME)==null){
record.setResourceProperty(AggregatedServiceUsageRecord.MIN_INVOCATION_TIME, record.getResourceProperty(ServiceUsageRecord.DURATION));
}
if(record.getResourceProperty(AggregatedServiceUsageRecord.MAX_INVOCATION_TIME)==null) {
record.setResourceProperty(AggregatedServiceUsageRecord.MAX_INVOCATION_TIME, record.getResourceProperty(ServiceUsageRecord.DURATION));
}
if(record.getResourceProperty(AggregatedServiceUsageRecord.CALLER_QUALIFIER)==null) {
record.setResourceProperty(AggregatedServiceUsageRecord.CALLER_QUALIFIER, AbstractServiceUsageRecord.UNKNOWN);
}
record.validate();
} catch (Exception ex) {
return;
}
} else {
return;
}
}
record.setId(UUID.randomUUID().toString());
@SuppressWarnings("rawtypes")
AggregatedRecord aggregatedRecord = AggregatorBuffer.getAggregatedRecord(record);
aggregatorBuffer.aggregate(aggregatedRecord);
}
}