package org.gcube.accounting.aggregator.aggregation; import java.io.File; import java.nio.file.Files; import java.nio.file.StandardCopyOption; import java.sql.ResultSet; import java.time.OffsetDateTime; import java.util.Calendar; import java.util.List; import java.util.Set; import java.util.UUID; import java.util.concurrent.TimeUnit; import org.gcube.accounting.aggregator.persistence.AggregatorPersistenceFactory; import org.gcube.accounting.aggregator.persistence.AggregatorPersistenceSrc; import org.gcube.accounting.aggregator.status.AggregationState; import org.gcube.accounting.aggregator.status.AggregationStatus; import org.gcube.accounting.aggregator.utility.Constant; import org.gcube.accounting.aggregator.utility.Utility; import org.gcube.accounting.datamodel.AggregatedUsageRecord; import org.gcube.accounting.datamodel.aggregation.AggregatedServiceUsageRecord; import org.gcube.accounting.datamodel.basetypes.AbstractServiceUsageRecord; import org.gcube.accounting.datamodel.usagerecords.ServiceUsageRecord; import org.gcube.accounting.utility.postgresql.RecordToDBFields; import org.gcube.accounting.utility.postgresql.RecordToDBMapping; import org.gcube.com.fasterxml.jackson.databind.ObjectMapper; import org.gcube.com.fasterxml.jackson.databind.node.ObjectNode; import org.gcube.documentstore.exception.InvalidValueException; import org.gcube.documentstore.records.AggregatedRecord; import org.gcube.documentstore.records.DSMapper; import org.gcube.documentstore.records.Record; import org.gcube.documentstore.records.RecordUtility; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * @author Luca Frosini (ISTI - CNR) */ public class Aggregator { private static Logger logger = LoggerFactory.getLogger(Aggregator.class); private static final String TMP_SUFFIX = ".tmp"; protected final AggregationStatus aggregationStatus; protected final File originalRecordsbackupFile; protected final File aggregateRecordsBackupFile; protected final File malformedRecordsFile; protected int estimatedRecordsNumber; protected int originalRecordsNumber; protected int malformedRecordNumber; protected ObjectMapper objectMapper; protected Calendar startTime; protected boolean skipAggregation; public Aggregator(AggregationStatus aggregationStatus, File originalRecordsbackupFile, File aggregateRecordsBackupFile) { this.aggregationStatus = aggregationStatus; this.originalRecordsbackupFile = originalRecordsbackupFile; this.aggregateRecordsBackupFile = aggregateRecordsBackupFile; this.malformedRecordsFile = Utility.getMalformatedFile(aggregateRecordsBackupFile); this.objectMapper = new ObjectMapper(); } public void setSkipAggregation(boolean skipAggregation) { this.skipAggregation = skipAggregation; } public void aggregate() throws Exception { if(AggregationState.canContinue(aggregationStatus.getAggregationState(),AggregationState.STARTED)) { startTime = Utility.getUTCCalendarInstance(); AggregatorPersistenceSrc aggregatorPersistenceSrc = AggregatorPersistenceFactory.getAggregatorPersistenceSrc(); estimatedRecordsNumber = aggregatorPersistenceSrc.getEstimatedRecordRecordToBeAggregated(aggregationStatus); logger.info("Estimated records to be aggregated are {}", estimatedRecordsNumber); ResultSet resultSet = aggregatorPersistenceSrc.getResultSetOfRecordToBeAggregated(aggregationStatus); retrieveAndAggregate(resultSet); } } private static final String USAGE_RECORD_TYPE = "usageRecordType"; private static final String SINGLE = "Single"; private static final String SIMPLE = "Simple"; protected void elaborateRow(ObjectNode content, AggregatorBuffer aggregatorBuffer) throws Exception { try { if(content.has(USAGE_RECORD_TYPE)){ String recordType = content.get(USAGE_RECORD_TYPE).asText(); content.remove(USAGE_RECORD_TYPE); content.put(Record.RECORD_TYPE, recordType); } Boolean aggregated = false; if(content.has(AggregatedRecord.CREATION_TIME)) { Object object = content.get(AggregatedRecord.CREATION_TIME); if(object instanceof Double) { Double d = ((Double) object); content.put(AggregatedRecord.CREATION_TIME, d.longValue()); } } if(content.has(AggregatedRecord.START_TIME)) { aggregated = true; Object object = content.get(AggregatedRecord.START_TIME); if(object instanceof Double) { Double d = ((Double) object); content.put(AggregatedRecord.START_TIME, d.longValue()); } } if(content.has(AggregatedRecord.END_TIME)) { aggregated = true; Object object = content.get(AggregatedRecord.END_TIME); if(object instanceof Double) { Double d = ((Double) object); content.put(AggregatedRecord.END_TIME, d.longValue()); } } if(content.has(AggregatedRecord.OPERATION_COUNT)) { Object object = content.get(AggregatedRecord.OPERATION_COUNT); if(object instanceof Double) { Double d = ((Double) object); content.put(AggregatedRecord.OPERATION_COUNT, d.intValue()); } if(content.get(AggregatedRecord.OPERATION_COUNT).asInt()>1) { aggregated = true; } } if(aggregated) { content.put(AggregatedRecord.AGGREGATED, true); } String recordType = content.get(Record.RECORD_TYPE).asText(); if(!aggregated){ if(recordType.startsWith(SIMPLE)){ recordType = recordType.replace(SIMPLE, SINGLE); content.put(Record.RECORD_TYPE, recordType); } if(!recordType.startsWith(SINGLE)) { recordType = SINGLE + recordType; content.put(Record.RECORD_TYPE, recordType); } }else { if(recordType.startsWith(SIMPLE)){ recordType = recordType.replace(SIMPLE, ""); content.put(Record.RECORD_TYPE, recordType); } if(recordType.startsWith(SINGLE)) { recordType = recordType.replace(SINGLE, ""); content.put(Record.RECORD_TYPE, recordType); } } String record = content.toString(); if(!skipAggregation) { // Aggregate the Record aggregateRow(aggregatorBuffer, record); } ++originalRecordsNumber; if(originalRecordsNumber%1000==0){ int aggregatedRecordsNumber = 0; if(!skipAggregation) { aggregatedRecordsNumber = aggregatorBuffer.getAggregatedRecords().size(); }else { aggregatedRecordsNumber = originalRecordsNumber; } int diff = originalRecordsNumber - aggregatedRecordsNumber; float percentage = (100 * diff) / originalRecordsNumber; logger.info("{} At the moment, the elaborated original records are {} (Total Estimated Number is {}). The Aggregated records are {}. Difference {}. We are recovering {}% of Records", aggregationStatus.getAggregationInfo(), originalRecordsNumber, estimatedRecordsNumber, aggregatedRecordsNumber, diff, percentage); } Utility.printLine(originalRecordsbackupFile, record); }catch (Exception e) { throw e; } } private static final int MAX_RETRY = 3; protected void addProperty(ObjectNode objectNode, String key, Object value) { if(value instanceof Number) { if(value instanceof Integer) { objectNode.put(key, (int) value); return; } Long longValue = Long.valueOf(value.toString()); objectNode.put(key, longValue); return; } objectNode.put(key, (String) value.toString()); } protected Calendar getCalendar(OffsetDateTime offsetDateTime) { Calendar calendar = Calendar.getInstance(); long epochMillis = offsetDateTime.toInstant().toEpochMilli(); calendar.setTimeInMillis(epochMillis); return calendar; } protected void retrieveAndAggregate(ResultSet resultSet) throws Exception { AggregatorBuffer aggregatorBuffer = new AggregatorBuffer(); Calendar start = Utility.getUTCCalendarInstance(); logger.debug("Elaboration of Records started at {}", Constant.DEFAULT_DATE_FORMAT.format(start.getTime())); originalRecordsbackupFile.delete(); aggregateRecordsBackupFile.delete(); malformedRecordsFile.delete(); AggregationInfo aggregationInfo = aggregationStatus.getAggregationInfo(); String type = aggregationInfo.getRecordType(); Class> clz = RecordUtility.getAggregatedRecordClass(type); RecordToDBFields recordToDBFields = RecordToDBMapping.getRecordToDBFields(clz); Set requiredFields = clz.newInstance().getRequiredFields(); originalRecordsNumber = 0; malformedRecordNumber = 0; while (resultSet.next()) { for(int i=1; i<=MAX_RETRY; i++){ try { ObjectNode objectNode = objectMapper.createObjectNode(); addProperty(objectNode, Record.RECORD_TYPE, type); for(String recordField : requiredFields) { String tableField = recordToDBFields.getTableField(recordField); Object obj = null; switch (recordField) { case AggregatedRecord.START_TIME: case AggregatedRecord.END_TIME: case AggregatedRecord.CREATION_TIME: OffsetDateTime offsetDateTime = resultSet.getObject(tableField, OffsetDateTime.class); Calendar calendar = getCalendar(offsetDateTime); obj = calendar.getTimeInMillis(); break; default: obj = resultSet.getObject(tableField); break; } addProperty(objectNode, recordField, obj); } elaborateRow(objectNode, aggregatorBuffer); TimeUnit.MILLISECONDS.sleep(3); break; }catch (RuntimeException e) { if(i==2){ logger.error("Unable to elaborate row {}. Tryed {} times.", i, e); } } } } Calendar end = Utility.getUTCCalendarInstance(); long duration = end.getTimeInMillis() - start.getTimeInMillis(); String durationForHuman = Utility.getHumanReadableDuration(duration); logger.debug("{} Elaboration of Records terminated at {}. Duration {}", aggregationStatus.getAggregationInfo(), Constant.DEFAULT_DATE_FORMAT.format(end.getTime()), durationForHuman); if(!skipAggregation) { File aggregateRecordsBackupFileTmp = new File(aggregateRecordsBackupFile.getParent(), aggregateRecordsBackupFile.getName() + TMP_SUFFIX); aggregateRecordsBackupFileTmp.delete(); // Saving Aggregated Record on local file logger.debug("Going to save {} to file {}", AggregatedUsageRecord.class.getSimpleName(), aggregateRecordsBackupFile); List> aggregatedRecords = aggregatorBuffer.getAggregatedRecords(); for (AggregatedRecord aggregatedRecord : aggregatedRecords) { String marshalled = DSMapper.marshal(aggregatedRecord); Utility.printLine(aggregateRecordsBackupFileTmp, marshalled); } aggregateRecordsBackupFileTmp.renameTo(aggregateRecordsBackupFile); aggregationStatus.setRecordNumbers(originalRecordsNumber, aggregatedRecords.size(), malformedRecordNumber); }else { Files.copy(originalRecordsbackupFile.toPath(), aggregateRecordsBackupFile.toPath(), StandardCopyOption.REPLACE_EXISTING); aggregationStatus.setRecordNumbers(originalRecordsNumber, originalRecordsNumber, malformedRecordNumber); } aggregationStatus.setAggregationState(AggregationState.AGGREGATED, startTime, true); } protected void aggregateRow(AggregatorBuffer aggregatorBuffer, String json) throws Exception { Record record = RecordUtility.getRecord(json); try { record.validate(); }catch (InvalidValueException e) { ++malformedRecordNumber; Utility.printLine(malformedRecordsFile, json); if(record instanceof AggregatedServiceUsageRecord){ try { if(record.getResourceProperty(AggregatedServiceUsageRecord.MIN_INVOCATION_TIME)==null){ record.setResourceProperty(AggregatedServiceUsageRecord.MIN_INVOCATION_TIME, record.getResourceProperty(ServiceUsageRecord.DURATION)); } if(record.getResourceProperty(AggregatedServiceUsageRecord.MAX_INVOCATION_TIME)==null) { record.setResourceProperty(AggregatedServiceUsageRecord.MAX_INVOCATION_TIME, record.getResourceProperty(ServiceUsageRecord.DURATION)); } if(record.getResourceProperty(AggregatedServiceUsageRecord.CALLER_QUALIFIER)==null) { record.setResourceProperty(AggregatedServiceUsageRecord.CALLER_QUALIFIER, AbstractServiceUsageRecord.UNKNOWN); } record.validate(); } catch (Exception ex) { return; } } else { return; } } record.setId(UUID.randomUUID().toString()); @SuppressWarnings("rawtypes") AggregatedRecord aggregatedRecord = AggregatorBuffer.getAggregatedRecord(record); aggregatorBuffer.aggregate(aggregatedRecord); } }