From f0332ebd4d67c4bc18a746bc76d49cb66312756d Mon Sep 17 00:00:00 2001 From: Luca Frosini Date: Thu, 22 Feb 2018 14:31:35 +0000 Subject: [PATCH] Refs #11258 Added retry when inserting or deleting documents git-svn-id: https://svn.d4science.research-infrastructures.eu/gcube/trunk/accounting/accounting-aggregator-se-plugin@164525 82a268e6-3cf1-43bd-a215-b396298e98cf --- .../persist/DocumentElaboration.java | 99 ++++++++++++------- 1 file changed, 65 insertions(+), 34 deletions(-) diff --git a/src/main/java/org/gcube/accounting/aggregator/persist/DocumentElaboration.java b/src/main/java/org/gcube/accounting/aggregator/persist/DocumentElaboration.java index 8d57471..8449d92 100644 --- a/src/main/java/org/gcube/accounting/aggregator/persist/DocumentElaboration.java +++ b/src/main/java/org/gcube/accounting/aggregator/persist/DocumentElaboration.java @@ -6,6 +6,7 @@ import java.io.File; import java.io.FileInputStream; import java.io.InputStreamReader; import java.util.Calendar; +import java.util.concurrent.TimeUnit; import org.gcube.accounting.aggregator.status.AggregationState; import org.gcube.accounting.aggregator.status.AggregationStatus; @@ -20,13 +21,15 @@ import com.couchbase.client.java.Bucket; * @author Luca Frosini (ISTI - CNR) */ public abstract class DocumentElaboration { - + protected Logger logger = LoggerFactory.getLogger(this.getClass()); protected static final String ID = Record.ID; protected static final int THRESHOLD_FOR_FIVE_PERCENT = 100000; + public static final int MAX_RETRY = 8; + protected final AggregationStatus aggregationStatus; protected final File file; protected final Bucket bucket; @@ -36,7 +39,8 @@ public abstract class DocumentElaboration { protected Calendar startTime; - protected DocumentElaboration(AggregationStatus statusManager, AggregationState finalAggregationState, File file, Bucket bucket, int rowToBeElaborated){ + protected DocumentElaboration(AggregationStatus statusManager, AggregationState finalAggregationState, File file, + Bucket bucket, int rowToBeElaborated) { this.aggregationStatus = statusManager; this.finalAggregationState = finalAggregationState; this.file = file; @@ -45,59 +49,86 @@ public abstract class DocumentElaboration { } protected void readFile() throws Exception { + FileInputStream fstream = null; + DataInputStream in = null; + BufferedReader br = null; try { // Open the file that is the first // command line parameter - FileInputStream fstream = new FileInputStream(file); + fstream = new FileInputStream(file); // Get the object of DataInputStream - DataInputStream in = new DataInputStream(fstream); - BufferedReader br = new BufferedReader(new InputStreamReader(in)); - + in = new DataInputStream(fstream); + br = new BufferedReader(new InputStreamReader(in)); + logger.info("{} - Going to elaborate {} rows", aggregationStatus.getAggregationInfo(), rowToBeElaborated); - int percentOfNumberOfRows = (rowToBeElaborated/10)+1; - if(rowToBeElaborated>=THRESHOLD_FOR_FIVE_PERCENT) { - percentOfNumberOfRows = percentOfNumberOfRows/2; + int percentOfNumberOfRows = (rowToBeElaborated / 10) + 1; + if(rowToBeElaborated >= THRESHOLD_FOR_FIVE_PERCENT) { + percentOfNumberOfRows = percentOfNumberOfRows / 2; } - int elaborated = 0; String line; // Read File Line By Line - while ((line = br.readLine()) != null) { - elaborateLine(line); + while((line = br.readLine()) != null) { + boolean elaborate = true; + long delay = TimeUnit.MILLISECONDS.toMillis(100); + int i = 0; + while(elaborate) { + ++i; + try { + elaborateLine(line); + elaborate = false; + } catch(Exception e) { + logger.debug("Elaboration of line {} failed due to {}. Retrying in {} {}", line, e.getMessage(), + delay, TimeUnit.MILLISECONDS.name().toLowerCase()); + if(i < MAX_RETRY) { + TimeUnit.MILLISECONDS.sleep(delay); + delay = delay * 2; + } else { + // elaborate = false; // This is not needed but it is added to improve code readability + throw e; + } + } + } + ++elaborated; - if(elaborated % percentOfNumberOfRows == 0){ - int elaboratedPercentage = elaborated*100/rowToBeElaborated; - logger.info("{} - Elaborated {} rows of {} (about {}%)", aggregationStatus.getAggregationInfo(), elaborated, rowToBeElaborated, elaboratedPercentage); + if(elaborated % percentOfNumberOfRows == 0) { + int elaboratedPercentage = elaborated * 100 / rowToBeElaborated; + logger.info("{} - Elaborated {} rows of {} (about {}%)", aggregationStatus.getAggregationInfo(), + elaborated, rowToBeElaborated, elaboratedPercentage); } - if(elaborated>rowToBeElaborated){ - br.close(); - in.close(); - fstream.close(); - throw new Exception("Elaborated file line is number " + elaborated + " > " + rowToBeElaborated + " (total number of rows to elaborate). This is really strange and should not occur. Stopping execution"); + if(elaborated > rowToBeElaborated) { + throw new Exception("Elaborated file line is number " + elaborated + " > " + rowToBeElaborated + + " (total number of rows to elaborate). This is really strange and should not occur. Stopping execution"); } } - if(elaborated!=rowToBeElaborated){ - br.close(); - in.close(); - fstream.close(); - throw new Exception("Elaborated file line is number " + elaborated + " != " + rowToBeElaborated + "(total number of rows to elaborate). This is really strange and should not occur. Stopping execution"); + if(elaborated != rowToBeElaborated) { + throw new Exception("Elaborated file line is number " + elaborated + " != " + rowToBeElaborated + + "(total number of rows to elaborate). This is really strange and should not occur. Stopping execution"); } - logger.info("{} - Elaborated {} rows of {} ({}%)", aggregationStatus.getAggregationInfo(), elaborated, rowToBeElaborated, 100); + logger.info("{} - Elaborated {} rows of {} ({}%)", aggregationStatus.getAggregationInfo(), elaborated, + rowToBeElaborated, 100); - br.close(); - in.close(); - fstream.close(); - - } catch (Exception e) { + } catch(Exception e) { logger.error("Error while elaborating file {}", file.getAbsolutePath(), e); throw e; + } finally { + if(br != null) { + br.close(); + } + if(in != null) { + in.close(); + } + if(fstream != null) { + fstream.close(); + } + } - + } - - public void elaborate() throws Exception{ + + public void elaborate() throws Exception { startTime = Utility.getUTCCalendarInstance(); readFile(); aggregationStatus.setAggregationState(finalAggregationState, startTime, true);