package org.gcube.accounting.aggregator.aggregation; import java.io.File; import java.sql.ResultSet; import java.text.DateFormat; import java.util.Calendar; import java.util.List; import java.util.UUID; import java.util.concurrent.TimeUnit; import org.gcube.accounting.aggregator.status.AggregationState; import org.gcube.accounting.aggregator.status.AggregationStatus; import org.gcube.accounting.aggregator.utility.Constant; import org.gcube.accounting.aggregator.utility.Utility; import org.gcube.accounting.datamodel.AggregatedUsageRecord; import org.gcube.accounting.datamodel.aggregation.AggregatedServiceUsageRecord; import org.gcube.accounting.datamodel.basetypes.AbstractServiceUsageRecord; import org.gcube.accounting.datamodel.usagerecords.ServiceUsageRecord; import org.gcube.com.fasterxml.jackson.databind.ObjectMapper; import org.gcube.com.fasterxml.jackson.databind.node.ObjectNode; import org.gcube.documentstore.exception.InvalidValueException; import org.gcube.documentstore.records.AggregatedRecord; import org.gcube.documentstore.records.DSMapper; import org.gcube.documentstore.records.Record; import org.gcube.documentstore.records.RecordUtility; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * @author Luca Frosini (ISTI - CNR) */ public class Aggregator { private static Logger logger = LoggerFactory.getLogger(Aggregator.class); private static final String TMP_SUFFIX = ".tmp"; protected final AggregationStatus aggregationStatus; protected final File originalRecordsbackupFile; protected final File aggregateRecordsBackupFile; protected final File malformedRecordsFile; protected int malformedRecordNumber; protected ObjectMapper objectMapper; protected Calendar startTime; public Aggregator(AggregationStatus aggregationStatus, File originalRecordsbackupFile, File aggregateRecordsBackupFile) { this.aggregationStatus = aggregationStatus; this.originalRecordsbackupFile = originalRecordsbackupFile; this.aggregateRecordsBackupFile = aggregateRecordsBackupFile; this.malformedRecordsFile = Utility.getMalformatedFile(aggregateRecordsBackupFile); this.objectMapper = new ObjectMapper(); } public void aggregate() throws Exception { if(AggregationState.canContinue(aggregationStatus.getAggregationState(),AggregationState.STARTED)) { startTime = Utility.getUTCCalendarInstance(); // TODO query ResultSet resultSet = null; retrieveAndAggregate(resultSet); } } protected ResultSet getViewResult() throws Exception { DateFormat dateFormat = aggregationStatus.getAggregationInfo().getAggregationType().getDateFormat(); String dateStartKey = dateFormat.format(aggregationStatus.getAggregationInfo().getAggregationStartDate()); String dateEndKey = dateFormat.format(aggregationStatus.getAggregationInfo().getAggregationEndDate()); // TODO query here return null; } private static final String USAGE_RECORD_TYPE = "usageRecordType"; private static final String SINGLE = "Single"; private static final String SIMPLE = "Simple"; protected int elaborateRow(ObjectNode content, AggregatorBuffer aggregatorBuffer, int originalRecordsCounter) throws Exception { try { if(content.has(USAGE_RECORD_TYPE)){ String recordType = content.get(USAGE_RECORD_TYPE).asText(); content.remove(USAGE_RECORD_TYPE); content.put(Record.RECORD_TYPE, recordType); } Boolean aggregated = false; if(content.has(AggregatedRecord.CREATION_TIME)) { Object object = content.get(AggregatedRecord.CREATION_TIME); if(object instanceof Double) { Double d = ((Double) object); content.put(AggregatedRecord.CREATION_TIME, d.longValue()); } } if(content.has(AggregatedRecord.START_TIME)) { aggregated = true; Object object = content.get(AggregatedRecord.START_TIME); if(object instanceof Double) { Double d = ((Double) object); content.put(AggregatedRecord.START_TIME, d.longValue()); } } if(content.has(AggregatedRecord.END_TIME)) { aggregated = true; Object object = content.get(AggregatedRecord.END_TIME); if(object instanceof Double) { Double d = ((Double) object); content.put(AggregatedRecord.END_TIME, d.longValue()); } } if(content.has(AggregatedRecord.OPERATION_COUNT)) { Object object = content.get(AggregatedRecord.OPERATION_COUNT); if(object instanceof Double) { Double d = ((Double) object); content.put(AggregatedRecord.OPERATION_COUNT, d.intValue()); } if(content.get(AggregatedRecord.OPERATION_COUNT).asInt()>1) { aggregated = true; } } if(aggregated) { content.put(AggregatedRecord.AGGREGATED, true); } String recordType = content.get(Record.RECORD_TYPE).asText(); if(!aggregated){ if(recordType.startsWith(SIMPLE)){ recordType = recordType.replace(SIMPLE, SINGLE); content.put(Record.RECORD_TYPE, recordType); } if(!recordType.startsWith(SINGLE)) { recordType = SINGLE + recordType; content.put(Record.RECORD_TYPE, recordType); } }else { if(recordType.startsWith(SIMPLE)){ recordType = recordType.replace(SIMPLE, ""); content.put(Record.RECORD_TYPE, recordType); } if(recordType.startsWith(SINGLE)) { recordType = recordType.replace(SINGLE, ""); content.put(Record.RECORD_TYPE, recordType); } } String record = content.toString(); // Aggregate the Record aggregateRow(aggregatorBuffer, record); ++originalRecordsCounter; if(originalRecordsCounter%1000==0){ int aggregatedRecordsNumber = aggregatorBuffer.getAggregatedRecords().size(); int diff = originalRecordsCounter - aggregatedRecordsNumber; float percentage = (100 * diff) / originalRecordsCounter; logger.info("{} At the moment, the elaborated original records are {}. The Aggregated records are {}. Difference {}. We are recovering {}% of Documents", aggregationStatus.getAggregationInfo(), originalRecordsCounter, aggregatedRecordsNumber, diff, percentage); } Utility.printLine(originalRecordsbackupFile, record); return originalRecordsCounter; }catch (Exception e) { throw e; } } private static final int MAX_RETRY = 3; protected void retrieveAndAggregate(ResultSet resultSet) throws Exception { AggregatorBuffer aggregatorBuffer = new AggregatorBuffer(); Calendar start = Utility.getUTCCalendarInstance(); logger.debug("Elaboration of Records started at {}", Constant.DEFAULT_DATE_FORMAT.format(start.getTime())); originalRecordsbackupFile.delete(); aggregateRecordsBackupFile.delete(); malformedRecordsFile.delete(); malformedRecordNumber = 0; int originalRecordsCounter = 0; while (resultSet.next()) { for(int i=1; i<=MAX_RETRY; i++){ try { ObjectNode content = objectMapper.createObjectNode(); // todo set data from resultset originalRecordsCounter = elaborateRow(content, aggregatorBuffer, originalRecordsCounter); TimeUnit.MILLISECONDS.sleep(3); break; }catch (RuntimeException e) { if(i==2){ logger.error("Unable to elaborate row {}. Tryed {} times.", i, e); } } } } Calendar end = Utility.getUTCCalendarInstance(); long duration = end.getTimeInMillis() - start.getTimeInMillis(); String durationForHuman = Utility.getHumanReadableDuration(duration); logger.debug("{} Elaboration of Records terminated at {}. Duration {}", aggregationStatus.getAggregationInfo(), Constant.DEFAULT_DATE_FORMAT.format(end.getTime()), durationForHuman); File aggregateRecordsBackupFileTmp = new File(aggregateRecordsBackupFile.getParent(), aggregateRecordsBackupFile.getName() + TMP_SUFFIX); aggregateRecordsBackupFileTmp.delete(); // Saving Aggregated Record on local file logger.debug("Going to save {} to file {}", AggregatedUsageRecord.class.getSimpleName(), aggregateRecordsBackupFile); List> aggregatedRecords = aggregatorBuffer.getAggregatedRecords(); for (AggregatedRecord aggregatedRecord : aggregatedRecords) { String marshalled = DSMapper.marshal(aggregatedRecord); Utility.printLine(aggregateRecordsBackupFileTmp, marshalled); } aggregateRecordsBackupFileTmp.renameTo(aggregateRecordsBackupFile); aggregationStatus.setRecordNumbers(originalRecordsCounter, aggregatedRecords.size(), malformedRecordNumber); aggregationStatus.setAggregationState(AggregationState.AGGREGATED, startTime, true); } protected void aggregateRow(AggregatorBuffer aggregatorBuffer, String json) throws Exception { Record record = RecordUtility.getRecord(json); try { record.validate(); }catch (InvalidValueException e) { ++malformedRecordNumber; Utility.printLine(malformedRecordsFile, json); if(record instanceof AggregatedServiceUsageRecord){ try { if(record.getResourceProperty(AggregatedServiceUsageRecord.MIN_INVOCATION_TIME)==null){ record.setResourceProperty(AggregatedServiceUsageRecord.MIN_INVOCATION_TIME, record.getResourceProperty(ServiceUsageRecord.DURATION)); } if(record.getResourceProperty(AggregatedServiceUsageRecord.MAX_INVOCATION_TIME)==null) { record.setResourceProperty(AggregatedServiceUsageRecord.MAX_INVOCATION_TIME, record.getResourceProperty(ServiceUsageRecord.DURATION)); } if(record.getResourceProperty(AggregatedServiceUsageRecord.CALLER_QUALIFIER)==null) { record.setResourceProperty(AggregatedServiceUsageRecord.CALLER_QUALIFIER, AbstractServiceUsageRecord.UNKNOWN); } record.validate(); } catch (Exception ex) { return; } } else { return; } } record.setId(UUID.randomUUID().toString()); @SuppressWarnings("rawtypes") AggregatedRecord aggregatedRecord = AggregatorBuffer.getAggregatedRecord(record); aggregatorBuffer.aggregate(aggregatedRecord); } }