Refs #11258 Added retry when inserting or deleting documents

git-svn-id: https://svn.d4science.research-infrastructures.eu/gcube/trunk/accounting/accounting-aggregator-se-plugin@164525 82a268e6-3cf1-43bd-a215-b396298e98cf
This commit is contained in:
Luca Frosini 2018-02-22 14:31:35 +00:00
parent 1d688f6aa8
commit f0332ebd4d
1 changed files with 65 additions and 34 deletions

View File

@ -6,6 +6,7 @@ import java.io.File;
import java.io.FileInputStream;
import java.io.InputStreamReader;
import java.util.Calendar;
import java.util.concurrent.TimeUnit;
import org.gcube.accounting.aggregator.status.AggregationState;
import org.gcube.accounting.aggregator.status.AggregationStatus;
@ -20,13 +21,15 @@ import com.couchbase.client.java.Bucket;
* @author Luca Frosini (ISTI - CNR)
*/
public abstract class DocumentElaboration {
protected Logger logger = LoggerFactory.getLogger(this.getClass());
protected static final String ID = Record.ID;
protected static final int THRESHOLD_FOR_FIVE_PERCENT = 100000;
public static final int MAX_RETRY = 8;
protected final AggregationStatus aggregationStatus;
protected final File file;
protected final Bucket bucket;
@ -36,7 +39,8 @@ public abstract class DocumentElaboration {
protected Calendar startTime;
protected DocumentElaboration(AggregationStatus statusManager, AggregationState finalAggregationState, File file, Bucket bucket, int rowToBeElaborated){
protected DocumentElaboration(AggregationStatus statusManager, AggregationState finalAggregationState, File file,
Bucket bucket, int rowToBeElaborated) {
this.aggregationStatus = statusManager;
this.finalAggregationState = finalAggregationState;
this.file = file;
@ -45,59 +49,86 @@ public abstract class DocumentElaboration {
}
protected void readFile() throws Exception {
FileInputStream fstream = null;
DataInputStream in = null;
BufferedReader br = null;
try {
// Open the file that is the first // command line parameter
FileInputStream fstream = new FileInputStream(file);
fstream = new FileInputStream(file);
// Get the object of DataInputStream
DataInputStream in = new DataInputStream(fstream);
BufferedReader br = new BufferedReader(new InputStreamReader(in));
in = new DataInputStream(fstream);
br = new BufferedReader(new InputStreamReader(in));
logger.info("{} - Going to elaborate {} rows", aggregationStatus.getAggregationInfo(), rowToBeElaborated);
int percentOfNumberOfRows = (rowToBeElaborated/10)+1;
if(rowToBeElaborated>=THRESHOLD_FOR_FIVE_PERCENT) {
percentOfNumberOfRows = percentOfNumberOfRows/2;
int percentOfNumberOfRows = (rowToBeElaborated / 10) + 1;
if(rowToBeElaborated >= THRESHOLD_FOR_FIVE_PERCENT) {
percentOfNumberOfRows = percentOfNumberOfRows / 2;
}
int elaborated = 0;
String line;
// Read File Line By Line
while ((line = br.readLine()) != null) {
elaborateLine(line);
while((line = br.readLine()) != null) {
boolean elaborate = true;
long delay = TimeUnit.MILLISECONDS.toMillis(100);
int i = 0;
while(elaborate) {
++i;
try {
elaborateLine(line);
elaborate = false;
} catch(Exception e) {
logger.debug("Elaboration of line {} failed due to {}. Retrying in {} {}", line, e.getMessage(),
delay, TimeUnit.MILLISECONDS.name().toLowerCase());
if(i < MAX_RETRY) {
TimeUnit.MILLISECONDS.sleep(delay);
delay = delay * 2;
} else {
// elaborate = false; // This is not needed but it is added to improve code readability
throw e;
}
}
}
++elaborated;
if(elaborated % percentOfNumberOfRows == 0){
int elaboratedPercentage = elaborated*100/rowToBeElaborated;
logger.info("{} - Elaborated {} rows of {} (about {}%)", aggregationStatus.getAggregationInfo(), elaborated, rowToBeElaborated, elaboratedPercentage);
if(elaborated % percentOfNumberOfRows == 0) {
int elaboratedPercentage = elaborated * 100 / rowToBeElaborated;
logger.info("{} - Elaborated {} rows of {} (about {}%)", aggregationStatus.getAggregationInfo(),
elaborated, rowToBeElaborated, elaboratedPercentage);
}
if(elaborated>rowToBeElaborated){
br.close();
in.close();
fstream.close();
throw new Exception("Elaborated file line is number " + elaborated + " > " + rowToBeElaborated + " (total number of rows to elaborate). This is really strange and should not occur. Stopping execution");
if(elaborated > rowToBeElaborated) {
throw new Exception("Elaborated file line is number " + elaborated + " > " + rowToBeElaborated
+ " (total number of rows to elaborate). This is really strange and should not occur. Stopping execution");
}
}
if(elaborated!=rowToBeElaborated){
br.close();
in.close();
fstream.close();
throw new Exception("Elaborated file line is number " + elaborated + " != " + rowToBeElaborated + "(total number of rows to elaborate). This is really strange and should not occur. Stopping execution");
if(elaborated != rowToBeElaborated) {
throw new Exception("Elaborated file line is number " + elaborated + " != " + rowToBeElaborated
+ "(total number of rows to elaborate). This is really strange and should not occur. Stopping execution");
}
logger.info("{} - Elaborated {} rows of {} ({}%)", aggregationStatus.getAggregationInfo(), elaborated, rowToBeElaborated, 100);
logger.info("{} - Elaborated {} rows of {} ({}%)", aggregationStatus.getAggregationInfo(), elaborated,
rowToBeElaborated, 100);
br.close();
in.close();
fstream.close();
} catch (Exception e) {
} catch(Exception e) {
logger.error("Error while elaborating file {}", file.getAbsolutePath(), e);
throw e;
} finally {
if(br != null) {
br.close();
}
if(in != null) {
in.close();
}
if(fstream != null) {
fstream.close();
}
}
}
public void elaborate() throws Exception{
public void elaborate() throws Exception {
startTime = Utility.getUTCCalendarInstance();
readFile();
aggregationStatus.setAggregationState(finalAggregationState, startTime, true);