diff --git a/pom.xml b/pom.xml
index b71106b..d4be20b 100644
--- a/pom.xml
+++ b/pom.xml
@@ -32,18 +32,27 @@
scm:https://svn.d4science.research-infrastructures.eu/gcube/trunk/data-publishing/${project.artifactId}
https://svn.d4science.research-infrastructures.eu/gcube/trunk/data-publishing/${project.artifactId}
+
+
+
+
+ org.gcube.distribution
+ gcube-bom
+ LATEST
+ pom
+ import
+
+
+
org.slf4j
slf4j-api
- 1.7.5
- provided
com.fasterxml.jackson.core
jackson-databind
- 2.6.0
@@ -59,7 +68,6 @@
test
-
diff --git a/src/main/java/org/gcube/documentstore/persistence/FallbackMonitor.java b/src/main/java/org/gcube/documentstore/persistence/FallbackMonitor.java
index 61e8a1e..25a95a0 100644
--- a/src/main/java/org/gcube/documentstore/persistence/FallbackMonitor.java
+++ b/src/main/java/org/gcube/documentstore/persistence/FallbackMonitor.java
@@ -5,9 +5,7 @@ package org.gcube.documentstore.persistence;
import java.io.BufferedReader;
import java.io.File;
-import java.io.FileNotFoundException;
import java.io.FileReader;
-import java.io.IOException;
import java.util.Calendar;
import java.util.concurrent.TimeUnit;
@@ -46,79 +44,71 @@ public class FallbackMonitor implements Runnable {
}
}
- protected void elaborateFile(File elaborationFile){
+ protected void elaborateFile(File elaborationFile) {
try(BufferedReader br = new BufferedReader(new FileReader(elaborationFile))) {
for(String line; (line = br.readLine()) != null; ) {
try {
Record record = RecordUtility.getRecord(line);
persistenceBackend.accountWithFallback(record);
- } catch(Exception e){
+ } catch(Throwable e){
logger.error("Was not possible parse line {} to obtain a valid Record. Going to writing back this line as string fallback file.", line, e);
FallbackPersistenceBackend fallbackPersistenceBackend = persistenceBackend.getFallbackPersistence();
try {
fallbackPersistenceBackend.printLine(line);
- } catch (Exception e1) {
+ } catch (Throwable e1) {
logger.error("Line {} will be lost", line, e1);
}
}
}
- } catch (FileNotFoundException e) {
- logger.error("File non trovato", e);
- } catch (IOException e) {
- logger.error("IOException", e);
+ } catch (Throwable e) {
+ logger.error("Error elaborating {}", elaborationFile.getAbsoluteFile(), e);
}
}
@Deprecated
protected void manageOldAccountingFile(){
FallbackPersistenceBackend fallbackPersistenceBackend = persistenceBackend.getFallbackPersistence();
- File newFile = fallbackPersistenceBackend.getFallbackFile();
-
- String oldAccountingFileName = newFile.getName();
- int lastIndexOf_ = oldAccountingFileName.lastIndexOf("_");
-
- oldAccountingFileName = oldAccountingFileName.substring(lastIndexOf_+1);
- oldAccountingFileName = oldAccountingFileName.replace(
- PersistenceBackendFactory.FALLBACK_FILENAME, "accountingFallback.log");
-
-
- File oldAccountingFile = new File(newFile.getParentFile(), oldAccountingFileName);
- elaborateFallbackFile(oldAccountingFile);
+ elaborateFallbackFile(fallbackPersistenceBackend.getOldFallbackFile());
}
- protected synchronized void elaborateFallbackFile(File file){
- logger.trace("Trying to persist {}s which failed and were persisted using fallback on file {}",
- Record.class.getSimpleName(), file.getAbsoluteFile());
- File elaborationFile = null;
-
- if(file.exists()){
- Long timestamp = Calendar.getInstance().getTimeInMillis();
- elaborationFile = new File(file.getAbsolutePath() + ELABORATION_FILE_SUFFIX + "." + timestamp.toString());
- logger.trace("Going to move fallaback file ({}) to elaboration file ({})",
- file.getAbsolutePath(), elaborationFile.getAbsolutePath());
- file.renameTo(elaborationFile);
-
- }
-
- if(elaborationFile!=null){
- elaborateFile(elaborationFile);
- boolean deleted = elaborationFile.delete();
- if(!deleted){
- logger.trace("Failed to delete file {}", elaborationFile.getAbsolutePath());
- File elaborationFileNotDeleted = new File(elaborationFile.getAbsolutePath()+ELABORATION_FILE_NOT_DELETED_SUFFIX);
- elaborationFile.renameTo(elaborationFileNotDeleted);
+ protected void elaborateFallbackFile(File elaborationFile){
+ if(elaborationFile!=null && elaborationFile.exists()){
+ try {
+ elaborateFile(elaborationFile);
+ }finally {
+ if(elaborationFile.exists()) {
+ boolean deleted = elaborationFile.delete();
+ if(!deleted){
+ logger.error("Failed to delete file {}", elaborationFile.getAbsolutePath());
+ File elaborationFileNotDeleted = new File(elaborationFile.getAbsolutePath()+ELABORATION_FILE_NOT_DELETED_SUFFIX);
+ elaborationFile.renameTo(elaborationFileNotDeleted);
+ }
+ }else {
+ logger.error("File {} does not exists. This is really starge and should not occur. Please contact the administrator.", elaborationFile.getAbsolutePath());
+ }
+
}
}
+ }
+
+ protected void elaborateFallbackFile(){
+ FallbackPersistenceBackend fallbackPersistenceBackend = persistenceBackend.getFallbackPersistence();
+ File file = fallbackPersistenceBackend.getFallbackFile();
+
+ logger.trace("Trying to persist {}s which were persisted using fallback on file {}",
+ Record.class.getSimpleName(), file.getAbsoluteFile());
+
+ Long timestamp = Calendar.getInstance().getTimeInMillis();
+ File elaborationFile = fallbackPersistenceBackend.moveFallbackFile(ELABORATION_FILE_SUFFIX + "." + timestamp.toString());
+
+ elaborateFallbackFile(elaborationFile);
}
@Override
public void run() {
- //logger.trace("TestingThread PersistenceBackendMonitor run");
- FallbackPersistenceBackend fallbackPersistenceBackend = persistenceBackend.getFallbackPersistence();
- File file = new File(fallbackPersistenceBackend.getFallbackFile().getAbsolutePath());
- elaborateFallbackFile(file);
+ elaborateFallbackFile();
manageOldAccountingFile();
}
diff --git a/src/main/java/org/gcube/documentstore/persistence/FallbackPersistenceBackend.java b/src/main/java/org/gcube/documentstore/persistence/FallbackPersistenceBackend.java
index cc97488..ba622b6 100644
--- a/src/main/java/org/gcube/documentstore/persistence/FallbackPersistenceBackend.java
+++ b/src/main/java/org/gcube/documentstore/persistence/FallbackPersistenceBackend.java
@@ -8,6 +8,9 @@ import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.io.PrintWriter;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.StandardCopyOption;
import org.gcube.documentstore.records.DSMapper;
import org.gcube.documentstore.records.Record;
@@ -27,7 +30,45 @@ public class FallbackPersistenceBackend extends PersistenceBackend {
* @return the fallbackFile
*/
protected File getFallbackFile() {
- return fallbackFile;
+ synchronized(fallbackFile) {
+ return fallbackFile;
+ }
+ }
+
+ @Deprecated
+ protected File getOldFallbackFile() {
+ synchronized(fallbackFile) {
+ String oldAccountingFileName = fallbackFile.getName();
+ int lastIndexOf_ = oldAccountingFileName.lastIndexOf("_");
+
+ oldAccountingFileName = oldAccountingFileName.substring(lastIndexOf_+1);
+ oldAccountingFileName = oldAccountingFileName.replace(
+ PersistenceBackendFactory.FALLBACK_FILENAME, "accountingFallback.log");
+
+ return new File(fallbackFile.getParentFile(), oldAccountingFileName);
+ }
+ }
+
+ /**
+ * Move the fallbackFile to a new file with the same name by appending a suffix
+ * @param suffix
+ * @return the move file
+ */
+ protected File moveFallbackFile(String suffix) {
+ synchronized(fallbackFile) {
+ try {
+ Path source = fallbackFile.toPath();
+ Path target = source.resolveSibling(fallbackFile.getName()+suffix);
+ logger.trace("Going to move fallback file {} to {}", source.toAbsolutePath().toString(), target.toAbsolutePath().toString());
+ target = Files.move(source, source.resolveSibling(fallbackFile.getName()+suffix), StandardCopyOption.ATOMIC_MOVE);
+ return target.toFile();
+ } catch (Exception e) {
+ String absPath = fallbackFile.getAbsolutePath();
+ String error = String.format("It was not possibile to move %s to %s%s.", absPath, absPath, suffix);
+ logger.error(error, e);
+ return null;
+ }
+ }
}
protected FallbackPersistenceBackend(File fallbackFile) {
@@ -56,23 +97,42 @@ public class FallbackPersistenceBackend extends PersistenceBackend {
// Nothing TO DO
}
-
+ @Override
+ protected void accountWithFallback(Record... records) throws Exception {
+ for (Record record : records) {
+ try {
+ reallyAccount(record);
+ logger.trace("{} accounted succesfully from {}", record.toString(), FallbackPersistenceBackend.class.getSimpleName());
+ } catch (Throwable th) {
+ logger.error("{} was not accounted at all", record.toString(), th);
+ }
+ }
+ }
/**
* {@inheritDoc}
- * This method is synchronized on {@link File} used, so any actions which
- * has to modify, rename or delete the file must be synchronized on this
- * file. To retrieve it use {@link #getFallbackFile()} method.
- * This is intended for internal library usage only so that is protected
*/
@Override
- protected void reallyAccount(Record record) throws Exception {
- String marshalled = DSMapper.marshal(record);
- logger.debug("reallyAccount:{}",marshalled);
- printLine(marshalled);
+ protected void reallyAccount(Record record) {
+ String marshalled = null;
+ try {
+ marshalled = DSMapper.marshal(record);
+ printLine(marshalled);
+ logger.trace("{} accounted succesfully from {}", marshalled, this.getClass().getSimpleName());
+ } catch(Throwable th) {
+ logger.error("{} was not accounted at all", marshalled!=null ? marshalled : record.toString(), th);
+ }
}
-
- public void printLine(String line) throws Exception {
+
+ /**
+ * {@inheritDoc}
+ * This method is synchronized on fallbackFile. Any actions which
+ * has to modify, rename or delete the file must be synchronized on this
+ * file. To retrieve it use {@link #getFallbackFile()} method.
+ * This is intended for internal library usage only so that is protected.
+ * It
+ */
+ protected void printLine(String line) throws Exception {
synchronized (fallbackFile) {
try(FileWriter fw = new FileWriter(fallbackFile, true);
BufferedWriter bw = new BufferedWriter(fw);
diff --git a/src/main/java/org/gcube/documentstore/persistence/PersistenceBackend.java b/src/main/java/org/gcube/documentstore/persistence/PersistenceBackend.java
index 46b0a1c..c8f6dbf 100644
--- a/src/main/java/org/gcube/documentstore/persistence/PersistenceBackend.java
+++ b/src/main/java/org/gcube/documentstore/persistence/PersistenceBackend.java
@@ -141,6 +141,63 @@ public abstract class PersistenceBackend {
*/
protected abstract void reallyAccount(Record record) throws Exception;
+ /**
+ * The function check if the use of fallback is forced and use it if any.
+ * The function always return the value of forceFallbackUse even if after using it otherwise the record is
+ * accounted two times in case the forced time is terminated
+ * @param record the rEcord to account with fallback if any
+ * @return
+ * @throws Exception
+ */
+ private synchronized boolean isFallbackForced() throws Exception {
+ if (forceFallbackUse) {
+ String fallbackPersistenceName = FallbackPersistenceBackend.class.getSimpleName();
+
+ long now = Calendar.getInstance().getTimeInMillis();
+ long diff = now - fallbackUseStartTime;
+ logger.trace("{} forced use started at {}. {} seconds were elapsed",
+ fallbackPersistenceName, fallbackUseStartTime, (long) diff/1000);
+
+ if (diff > MAX_TIME_TO_FALLBACK) {
+ logger.info("The time to force the usage of {} is terminated. Trying to restore the use of {}",
+ fallbackPersistenceName, this.getClass().getSimpleName());
+ forceFallbackUse = false;
+ fallbackUseCounter = 0;
+ fallbackUseStartTime = 0;
+ }
+
+ }
+ return forceFallbackUse;
+ }
+
+ private synchronized void registerUseOfFallback() throws Exception {
+ /*
+ * This if could be removed because the function id only used from accountWithFallback() which is overrode
+ * in FallbackPersistenceBackend.
+ * Anyway it is keep to avoid future error if the function will be used in others functions.
+ * */
+ if (!(this instanceof FallbackPersistenceBackend) ){
+ String fallbackPersistenceName = FallbackPersistenceBackend.class.getSimpleName();
+
+ fallbackUseCounter++;
+
+ logger.trace("Exception number is {}. Max Retry number is {}. After that the use of {} will be forced",
+ fallbackUseCounter, MAX_FALLBACK_RETRY, fallbackPersistenceName);
+
+ if (fallbackUseCounter == MAX_FALLBACK_RETRY) {
+ forceFallbackUse = true;
+ fallbackUseStartTime = Calendar.getInstance().getTimeInMillis();
+ logger.info("Going to force {} for too many Exceptions", fallbackPersistenceName);
+
+ // aggregationScheduler.flush(new DefaultPersitenceExecutor(fallbackPersistence));
+
+ this.close();
+ this.clean();
+ }
+ }
+ }
+
+
/***
*
* @param records
@@ -157,62 +214,23 @@ public abstract class PersistenceBackend {
try {
recordString = record.toString();
- if (forceFallbackUse) {
-
- logger.trace("Forcing the use of {} to account {}", fallbackPersistenceName, recordString);
+ if (isFallbackForced()) {
+ logger.trace("Forcing the use of {} to account {}", fallbackPersistenceName, record.toString());
fallbackPersistence.reallyAccount(record);
-
- long now = Calendar.getInstance().getTimeInMillis();
- long diff = now - fallbackUseStartTime;
- logger.trace("{} forced use started at {}. {} seconds were elapsed",
- fallbackPersistenceName, fallbackUseStartTime, (long) diff/1000);
-
- if (diff > MAX_TIME_TO_FALLBACK) {
- logger.debug("The time to force the usage of {} is terminated. Trying to restore the use of {}",
- fallbackPersistenceName, persistenceName);
- forceFallbackUse = false;
- fallbackUseCounter = 0;
- fallbackUseStartTime = 0;
- }
-
} else {
this.reallyAccount(record);
- logger.trace("{} accounted succesfully from {}.", record.toString(), persistenceName);
+ logger.trace("{} accounted succesfully from {}.", recordString, persistenceName);
}
} catch (Throwable t) {
-
try {
logger.warn("{} was not accounted succesfully using {}. Trying to use {}.", recordString,
persistenceName, fallbackPersistenceName, t);
-
fallbackPersistence.reallyAccount(record);
-
- logger.trace("{} accounted succesfully from {}", recordString, fallbackPersistenceName);
- } catch (Throwable th) {
- logger.error("{} was not accounted at all", recordString, t);
+ }finally {
+ registerUseOfFallback();
}
- if (!(this instanceof FallbackPersistenceBackend) ){
- // && (t.getCause() != null && t.getCause() instanceof TimeoutException)) {
-
- fallbackUseCounter++;
-
- logger.warn("Exception number is {}. Max Retry number is {}. After that the use of {} will be forced",
- fallbackUseCounter, MAX_FALLBACK_RETRY, fallbackPersistenceName);
-
- if (fallbackUseCounter == MAX_FALLBACK_RETRY) {
- forceFallbackUse = true;
- fallbackUseStartTime = Calendar.getInstance().getTimeInMillis();
- logger.trace("Going to force {} for too many Exceptions",
- fallbackPersistenceName);
-
- aggregationScheduler.flush(new DefaultPersitenceExecutor(fallbackPersistence));
-
- this.close();
- this.clean();
- }
- }
}
}