Refs #10677: Force use of fallback causes duplication of records in fallback files

Task-Url: https://support.d4science.org/issues/10677

git-svn-id: https://svn.d4science.research-infrastructures.eu/gcube/trunk/data-publishing/document-store-lib@160446 82a268e6-3cf1-43bd-a215-b396298e98cf
This commit is contained in:
Luca Frosini 2017-12-13 15:59:00 +00:00
parent 51840ce06d
commit 51f952c441
4 changed files with 182 additions and 106 deletions

16
pom.xml
View File

@ -32,18 +32,27 @@
<developerConnection>scm:https://svn.d4science.research-infrastructures.eu/gcube/trunk/data-publishing/${project.artifactId}</developerConnection>
<url>https://svn.d4science.research-infrastructures.eu/gcube/trunk/data-publishing/${project.artifactId}</url>
</scm>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.gcube.distribution</groupId>
<artifactId>gcube-bom</artifactId>
<version>LATEST</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.5</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.6.0</version>
</dependency>
<!-- Test Dependencies -->
<dependency>
@ -59,7 +68,6 @@
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>

View File

@ -5,9 +5,7 @@ package org.gcube.documentstore.persistence;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.util.Calendar;
import java.util.concurrent.TimeUnit;
@ -46,79 +44,71 @@ public class FallbackMonitor implements Runnable {
}
}
protected void elaborateFile(File elaborationFile){
protected void elaborateFile(File elaborationFile) {
try(BufferedReader br = new BufferedReader(new FileReader(elaborationFile))) {
for(String line; (line = br.readLine()) != null; ) {
try {
Record record = RecordUtility.getRecord(line);
persistenceBackend.accountWithFallback(record);
} catch(Exception e){
} catch(Throwable e){
logger.error("Was not possible parse line {} to obtain a valid Record. Going to writing back this line as string fallback file.", line, e);
FallbackPersistenceBackend fallbackPersistenceBackend = persistenceBackend.getFallbackPersistence();
try {
fallbackPersistenceBackend.printLine(line);
} catch (Exception e1) {
} catch (Throwable e1) {
logger.error("Line {} will be lost", line, e1);
}
}
}
} catch (FileNotFoundException e) {
logger.error("File non trovato", e);
} catch (IOException e) {
logger.error("IOException", e);
} catch (Throwable e) {
logger.error("Error elaborating {}", elaborationFile.getAbsoluteFile(), e);
}
}
@Deprecated
protected void manageOldAccountingFile(){
FallbackPersistenceBackend fallbackPersistenceBackend = persistenceBackend.getFallbackPersistence();
File newFile = fallbackPersistenceBackend.getFallbackFile();
String oldAccountingFileName = newFile.getName();
int lastIndexOf_ = oldAccountingFileName.lastIndexOf("_");
oldAccountingFileName = oldAccountingFileName.substring(lastIndexOf_+1);
oldAccountingFileName = oldAccountingFileName.replace(
PersistenceBackendFactory.FALLBACK_FILENAME, "accountingFallback.log");
File oldAccountingFile = new File(newFile.getParentFile(), oldAccountingFileName);
elaborateFallbackFile(oldAccountingFile);
elaborateFallbackFile(fallbackPersistenceBackend.getOldFallbackFile());
}
protected synchronized void elaborateFallbackFile(File file){
logger.trace("Trying to persist {}s which failed and were persisted using fallback on file {}",
Record.class.getSimpleName(), file.getAbsoluteFile());
File elaborationFile = null;
if(file.exists()){
Long timestamp = Calendar.getInstance().getTimeInMillis();
elaborationFile = new File(file.getAbsolutePath() + ELABORATION_FILE_SUFFIX + "." + timestamp.toString());
logger.trace("Going to move fallaback file ({}) to elaboration file ({})",
file.getAbsolutePath(), elaborationFile.getAbsolutePath());
file.renameTo(elaborationFile);
}
if(elaborationFile!=null){
elaborateFile(elaborationFile);
boolean deleted = elaborationFile.delete();
if(!deleted){
logger.trace("Failed to delete file {}", elaborationFile.getAbsolutePath());
File elaborationFileNotDeleted = new File(elaborationFile.getAbsolutePath()+ELABORATION_FILE_NOT_DELETED_SUFFIX);
elaborationFile.renameTo(elaborationFileNotDeleted);
protected void elaborateFallbackFile(File elaborationFile){
if(elaborationFile!=null && elaborationFile.exists()){
try {
elaborateFile(elaborationFile);
}finally {
if(elaborationFile.exists()) {
boolean deleted = elaborationFile.delete();
if(!deleted){
logger.error("Failed to delete file {}", elaborationFile.getAbsolutePath());
File elaborationFileNotDeleted = new File(elaborationFile.getAbsolutePath()+ELABORATION_FILE_NOT_DELETED_SUFFIX);
elaborationFile.renameTo(elaborationFileNotDeleted);
}
}else {
logger.error("File {} does not exists. This is really starge and should not occur. Please contact the administrator.", elaborationFile.getAbsolutePath());
}
}
}
}
protected void elaborateFallbackFile(){
FallbackPersistenceBackend fallbackPersistenceBackend = persistenceBackend.getFallbackPersistence();
File file = fallbackPersistenceBackend.getFallbackFile();
logger.trace("Trying to persist {}s which were persisted using fallback on file {}",
Record.class.getSimpleName(), file.getAbsoluteFile());
Long timestamp = Calendar.getInstance().getTimeInMillis();
File elaborationFile = fallbackPersistenceBackend.moveFallbackFile(ELABORATION_FILE_SUFFIX + "." + timestamp.toString());
elaborateFallbackFile(elaborationFile);
}
@Override
public void run() {
//logger.trace("TestingThread PersistenceBackendMonitor run");
FallbackPersistenceBackend fallbackPersistenceBackend = persistenceBackend.getFallbackPersistence();
File file = new File(fallbackPersistenceBackend.getFallbackFile().getAbsolutePath());
elaborateFallbackFile(file);
elaborateFallbackFile();
manageOldAccountingFile();
}

View File

@ -8,6 +8,9 @@ import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.io.PrintWriter;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardCopyOption;
import org.gcube.documentstore.records.DSMapper;
import org.gcube.documentstore.records.Record;
@ -27,7 +30,45 @@ public class FallbackPersistenceBackend extends PersistenceBackend {
* @return the fallbackFile
*/
protected File getFallbackFile() {
return fallbackFile;
synchronized(fallbackFile) {
return fallbackFile;
}
}
@Deprecated
protected File getOldFallbackFile() {
synchronized(fallbackFile) {
String oldAccountingFileName = fallbackFile.getName();
int lastIndexOf_ = oldAccountingFileName.lastIndexOf("_");
oldAccountingFileName = oldAccountingFileName.substring(lastIndexOf_+1);
oldAccountingFileName = oldAccountingFileName.replace(
PersistenceBackendFactory.FALLBACK_FILENAME, "accountingFallback.log");
return new File(fallbackFile.getParentFile(), oldAccountingFileName);
}
}
/**
* Move the fallbackFile to a new file with the same name by appending a suffix
* @param suffix
* @return the move file
*/
protected File moveFallbackFile(String suffix) {
synchronized(fallbackFile) {
try {
Path source = fallbackFile.toPath();
Path target = source.resolveSibling(fallbackFile.getName()+suffix);
logger.trace("Going to move fallback file {} to {}", source.toAbsolutePath().toString(), target.toAbsolutePath().toString());
target = Files.move(source, source.resolveSibling(fallbackFile.getName()+suffix), StandardCopyOption.ATOMIC_MOVE);
return target.toFile();
} catch (Exception e) {
String absPath = fallbackFile.getAbsolutePath();
String error = String.format("It was not possibile to move %s to %s%s.", absPath, absPath, suffix);
logger.error(error, e);
return null;
}
}
}
protected FallbackPersistenceBackend(File fallbackFile) {
@ -56,23 +97,42 @@ public class FallbackPersistenceBackend extends PersistenceBackend {
// Nothing TO DO
}
@Override
protected void accountWithFallback(Record... records) throws Exception {
for (Record record : records) {
try {
reallyAccount(record);
logger.trace("{} accounted succesfully from {}", record.toString(), FallbackPersistenceBackend.class.getSimpleName());
} catch (Throwable th) {
logger.error("{} was not accounted at all", record.toString(), th);
}
}
}
/**
* {@inheritDoc}
* This method is synchronized on {@link File} used, so any actions which
* has to modify, rename or delete the file must be synchronized on this
* file. To retrieve it use {@link #getFallbackFile()} method.
* This is intended for internal library usage only so that is protected
*/
@Override
protected void reallyAccount(Record record) throws Exception {
String marshalled = DSMapper.marshal(record);
logger.debug("reallyAccount:{}",marshalled);
printLine(marshalled);
protected void reallyAccount(Record record) {
String marshalled = null;
try {
marshalled = DSMapper.marshal(record);
printLine(marshalled);
logger.trace("{} accounted succesfully from {}", marshalled, this.getClass().getSimpleName());
} catch(Throwable th) {
logger.error("{} was not accounted at all", marshalled!=null ? marshalled : record.toString(), th);
}
}
public void printLine(String line) throws Exception {
/**
* {@inheritDoc}
* This method is synchronized on fallbackFile. Any actions which
* has to modify, rename or delete the file must be synchronized on this
* file. To retrieve it use {@link #getFallbackFile()} method.
* This is intended for internal library usage only so that is protected.
* It
*/
protected void printLine(String line) throws Exception {
synchronized (fallbackFile) {
try(FileWriter fw = new FileWriter(fallbackFile, true);
BufferedWriter bw = new BufferedWriter(fw);

View File

@ -141,6 +141,63 @@ public abstract class PersistenceBackend {
*/
protected abstract void reallyAccount(Record record) throws Exception;
/**
* The function check if the use of fallback is forced and use it if any.
* The function always return the value of forceFallbackUse even if after using it otherwise the record is
* accounted two times in case the forced time is terminated
* @param record the rEcord to account with fallback if any
* @return
* @throws Exception
*/
private synchronized boolean isFallbackForced() throws Exception {
if (forceFallbackUse) {
String fallbackPersistenceName = FallbackPersistenceBackend.class.getSimpleName();
long now = Calendar.getInstance().getTimeInMillis();
long diff = now - fallbackUseStartTime;
logger.trace("{} forced use started at {}. {} seconds were elapsed",
fallbackPersistenceName, fallbackUseStartTime, (long) diff/1000);
if (diff > MAX_TIME_TO_FALLBACK) {
logger.info("The time to force the usage of {} is terminated. Trying to restore the use of {}",
fallbackPersistenceName, this.getClass().getSimpleName());
forceFallbackUse = false;
fallbackUseCounter = 0;
fallbackUseStartTime = 0;
}
}
return forceFallbackUse;
}
private synchronized void registerUseOfFallback() throws Exception {
/*
* This if could be removed because the function id only used from accountWithFallback() which is overrode
* in FallbackPersistenceBackend.
* Anyway it is keep to avoid future error if the function will be used in others functions.
* */
if (!(this instanceof FallbackPersistenceBackend) ){
String fallbackPersistenceName = FallbackPersistenceBackend.class.getSimpleName();
fallbackUseCounter++;
logger.trace("Exception number is {}. Max Retry number is {}. After that the use of {} will be forced",
fallbackUseCounter, MAX_FALLBACK_RETRY, fallbackPersistenceName);
if (fallbackUseCounter == MAX_FALLBACK_RETRY) {
forceFallbackUse = true;
fallbackUseStartTime = Calendar.getInstance().getTimeInMillis();
logger.info("Going to force {} for too many Exceptions", fallbackPersistenceName);
// aggregationScheduler.flush(new DefaultPersitenceExecutor(fallbackPersistence));
this.close();
this.clean();
}
}
}
/***
*
* @param records
@ -157,62 +214,23 @@ public abstract class PersistenceBackend {
try {
recordString = record.toString();
if (forceFallbackUse) {
logger.trace("Forcing the use of {} to account {}", fallbackPersistenceName, recordString);
if (isFallbackForced()) {
logger.trace("Forcing the use of {} to account {}", fallbackPersistenceName, record.toString());
fallbackPersistence.reallyAccount(record);
long now = Calendar.getInstance().getTimeInMillis();
long diff = now - fallbackUseStartTime;
logger.trace("{} forced use started at {}. {} seconds were elapsed",
fallbackPersistenceName, fallbackUseStartTime, (long) diff/1000);
if (diff > MAX_TIME_TO_FALLBACK) {
logger.debug("The time to force the usage of {} is terminated. Trying to restore the use of {}",
fallbackPersistenceName, persistenceName);
forceFallbackUse = false;
fallbackUseCounter = 0;
fallbackUseStartTime = 0;
}
} else {
this.reallyAccount(record);
logger.trace("{} accounted succesfully from {}.", record.toString(), persistenceName);
logger.trace("{} accounted succesfully from {}.", recordString, persistenceName);
}
} catch (Throwable t) {
try {
logger.warn("{} was not accounted succesfully using {}. Trying to use {}.", recordString,
persistenceName, fallbackPersistenceName, t);
fallbackPersistence.reallyAccount(record);
logger.trace("{} accounted succesfully from {}", recordString, fallbackPersistenceName);
} catch (Throwable th) {
logger.error("{} was not accounted at all", recordString, t);
}finally {
registerUseOfFallback();
}
if (!(this instanceof FallbackPersistenceBackend) ){
// && (t.getCause() != null && t.getCause() instanceof TimeoutException)) {
fallbackUseCounter++;
logger.warn("Exception number is {}. Max Retry number is {}. After that the use of {} will be forced",
fallbackUseCounter, MAX_FALLBACK_RETRY, fallbackPersistenceName);
if (fallbackUseCounter == MAX_FALLBACK_RETRY) {
forceFallbackUse = true;
fallbackUseStartTime = Calendar.getInstance().getTimeInMillis();
logger.trace("Going to force {} for too many Exceptions",
fallbackPersistenceName);
aggregationScheduler.flush(new DefaultPersitenceExecutor(fallbackPersistence));
this.close();
this.clean();
}
}
}
}