package org.gcube.accounting.aggregator.persist; import java.io.BufferedReader; import java.io.DataInputStream; import java.io.File; import java.io.FileInputStream; import java.io.InputStreamReader; import java.util.Calendar; import java.util.concurrent.TimeUnit; import org.gcube.accounting.aggregator.status.AggregationState; import org.gcube.accounting.aggregator.status.AggregationStatus; import org.gcube.accounting.aggregator.utility.Utility; import org.gcube.documentstore.records.Record; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * @author Luca Frosini (ISTI - CNR) */ public abstract class DocumentElaboration { protected Logger logger = LoggerFactory.getLogger(this.getClass()); protected static final String ID = Record.ID; public static final int MAX_RETRY = 7; public static final int MAX_ROWS_PER_STEP = 500; protected final AggregationStatus aggregationStatus; protected final File file; protected final AggregationState finalAggregationState; protected final int rowToBeElaborated; protected int currentlyElaborated; protected Calendar startTime; protected int effectiveMaxRowPerStep; protected DocumentElaboration(AggregationStatus statusManager, AggregationState finalAggregationState, File file, int rowToBeElaborated) { this.aggregationStatus = statusManager; this.finalAggregationState = finalAggregationState; this.file = file; this.rowToBeElaborated = rowToBeElaborated; this.currentlyElaborated = 0; } protected void readFile() throws Exception { FileInputStream fstream = null; DataInputStream in = null; BufferedReader br = null; try { // Open the file that is the first // command line parameter fstream = new FileInputStream(file); // Get the object of DataInputStream in = new DataInputStream(fstream); br = new BufferedReader(new InputStreamReader(in)); logger.info("{} - Going to elaborate {} rows", aggregationStatus.getAggregationInfo(), rowToBeElaborated); effectiveMaxRowPerStep = (rowToBeElaborated / 10) + 1; if(effectiveMaxRowPerStep>MAX_ROWS_PER_STEP) { effectiveMaxRowPerStep = MAX_ROWS_PER_STEP; } currentlyElaborated = 0; int restartFrom = aggregationStatus.getRestartFrom(); if(restartFrom>0) { logger.info("The elaboration will be restarted from record number {}", aggregationStatus.getRestartFrom()); } String line; // Read File Line By Line while((line = br.readLine()) != null) { boolean elaborate = true; long delay = TimeUnit.MILLISECONDS.toMillis(100); int i = 0; while(elaborate) { ++i; if(currentlyElaborated=restartFrom) { aggregationStatus.setRestartFrom(currentlyElaborated, true); } int elaboratedPercentage = currentlyElaborated * 100 / rowToBeElaborated; logger.info("{} - Elaborated {} rows of {} (about {}%)", aggregationStatus.getAggregationInfo(), currentlyElaborated, rowToBeElaborated, elaboratedPercentage); } if(currentlyElaborated > rowToBeElaborated) { throw new Exception("Elaborated file line is number " + currentlyElaborated + " > " + rowToBeElaborated + " (total number of rows to elaborate). This is really strange and should not occur. Stopping execution"); } } if(currentlyElaborated != rowToBeElaborated) { throw new Exception("Elaborated file line is number " + currentlyElaborated + " != " + rowToBeElaborated + "(total number of rows to elaborate). This is really strange and should not occur. Stopping execution"); } logger.info("{} - Elaborated {} rows of {} ({}%)", aggregationStatus.getAggregationInfo(), currentlyElaborated, rowToBeElaborated, 100); } catch(Exception e) { logger.error("Error while elaborating file {}", file.getAbsolutePath(), e); throw e; } finally { if(br != null) { br.close(); } if(in != null) { in.close(); } if(fstream != null) { fstream.close(); } } } public void elaborate() throws Exception { startTime = Utility.getUTCCalendarInstance(); try { readFile(); aggregationStatus.setAggregationState(finalAggregationState, startTime, true); }catch (Exception e) { throw e; } finally { afterElaboration(); } } protected abstract void elaborateLine(String line) throws Exception; /** * Perform actions at the end of line by line elaboration * @throws Exception */ protected abstract void afterElaboration() throws Exception; }