Fixed synchronization issue on monitor which retry persistence

git-svn-id: https://svn.d4science.research-infrastructures.eu/gcube/trunk/data-publishing/document-store-lib@124382 82a268e6-3cf1-43bd-a215-b396298e98cf
This commit is contained in:
Luca Frosini 2016-02-19 15:14:55 +00:00
parent 2b0ee63840
commit 947a3efc29
3 changed files with 29 additions and 23 deletions

View File

@ -94,7 +94,6 @@ public abstract class PersistenceBackendFactory {
context = sanitizeContext(context);
String slashLessContext = removeSlashFromContext(context);
File fallbackFile = new File(getFallbackLocation(), String.format("%s.%s", slashLessContext, FALLBACK_FILENAME));
fallbackFile = new File(getFallbackLocation(), FALLBACK_FILENAME);
return fallbackFile;
}

View File

@ -33,10 +33,14 @@ public class PersistenceBackendMonitor implements Runnable {
protected final PersistenceBackend persistenceBackend;
public final static int INITIAL_DELAY = 1;
public final static int DELAY = 10;
public final static TimeUnit TIME_UNIT = TimeUnit.MINUTES;
public PersistenceBackendMonitor(PersistenceBackend persistenceBackend){
this.persistenceBackend = persistenceBackend;
this.scheduler = Executors.newScheduledThreadPool(1);
this.scheduler.scheduleAtFixedRate(this, 10, 10, TimeUnit.MINUTES);
this.scheduler.scheduleAtFixedRate(this, INITIAL_DELAY, DELAY, TimeUnit.MINUTES);
}
protected void elaborateFile(File elaborationFile){
@ -51,14 +55,14 @@ public class PersistenceBackendMonitor implements Runnable {
try {
fallbackPersistenceBackend.printLine(line);
} catch (Exception e1) {
logger.error("Was not possible Line {} will be lost", line, e1);
logger.error("Line {} will be lost", line, e1);
}
}
}
} catch (FileNotFoundException e) {
logger.error("", e);
logger.error("File non trovato", e);
} catch (IOException e) {
logger.error("", e);
logger.error("IOException", e);
}
}
@ -80,29 +84,25 @@ public class PersistenceBackendMonitor implements Runnable {
}
protected void elaborateFallbackFile(File file){
protected synchronized void elaborateFallbackFile(File file){
File elaborationFile = null;
synchronized (file) {
if(file.exists()){
Long timestamp = Calendar.getInstance().getTimeInMillis();
elaborationFile = new File(file.getAbsolutePath(), ELABORATION_FILE_SUFFIX + "." + timestamp.toString());
file.renameTo(elaborationFile);
}
if(file.exists()){
Long timestamp = Calendar.getInstance().getTimeInMillis();
elaborationFile = new File(file.getAbsolutePath() + ELABORATION_FILE_SUFFIX + "." + timestamp.toString());
file.renameTo(elaborationFile);
}
if(elaborationFile!=null){
synchronized (elaborationFile) {
elaborateFile(elaborationFile);
boolean deleted = elaborationFile.delete();
if(!deleted){
logger.debug("Failed to delete file {}", elaborationFile.getAbsolutePath());
File elaborationFileNotDeleted = new File(elaborationFile.getAbsolutePath()+ELABORATION_FILE_NOT_DELETED_SUFFIX);
elaborationFile.renameTo(elaborationFileNotDeleted);
}
elaborateFile(elaborationFile);
boolean deleted = elaborationFile.delete();
if(!deleted){
logger.debug("Failed to delete file {}", elaborationFile.getAbsolutePath());
File elaborationFileNotDeleted = new File(elaborationFile.getAbsolutePath()+ELABORATION_FILE_NOT_DELETED_SUFFIX);
elaborationFile.renameTo(elaborationFileNotDeleted);
}
}
}

View File

@ -37,12 +37,16 @@ public abstract class AggregationScheduler implements Runnable {
protected final PersistenceExecutor persistenceExecutor;
protected final ScheduledExecutorService scheduler;
public final static int INITIAL_DELAY = 10;
public final static int DELAY = 10;
public final static TimeUnit TIME_UNIT = TimeUnit.MINUTES;
protected AggregationScheduler(PersistenceExecutor persistenceExecutor){
this.bufferedRecords = new HashMap<String, List<Record>>();
this.totalBufferedRecords = 0;
this.persistenceExecutor = persistenceExecutor;
this.scheduler = Executors.newScheduledThreadPool(1);
this.scheduler.scheduleAtFixedRate(this, 10, 10, TimeUnit.MINUTES);
this.scheduler.scheduleAtFixedRate(this, INITIAL_DELAY, DELAY, TIME_UNIT);
}
@SuppressWarnings("rawtypes")
@ -150,6 +154,9 @@ public abstract class AggregationScheduler implements Runnable {
}
protected void reallyFlush(PersistenceExecutor persistenceExecutor) throws Exception{
if(totalBufferedRecords==0){
return;
}
Record[] recordToPersist = new Record[totalBufferedRecords];
int i = 0;
Collection<List<Record>> values = bufferedRecords.values();