This commit is contained in:
Fabio Sinibaldi 2019-01-24 14:57:24 +00:00
parent 4e27039e1d
commit 37029e9e2d
6 changed files with 60 additions and 9 deletions

View File

@ -6,7 +6,10 @@ import java.sql.Timestamp;
import java.time.Instant;
import java.util.Map;
import javax.inject.Inject;
import org.gcube.application.perform.service.engine.DataBaseManager;
import org.gcube.application.perform.service.engine.PerformanceManager;
import org.gcube.application.perform.service.engine.impl.Queries;
import org.gcube.application.perform.service.engine.model.DBField;
import org.gcube.application.perform.service.engine.model.DBField.ImportRoutine;
@ -24,6 +27,9 @@ public class ImporterMonitor implements DMMonitorListener {
private static final Logger log= LoggerFactory.getLogger(ImporterMonitor.class);
@Inject
private PerformanceManager performance;
public ImporterMonitor(ImportRoutineDescriptor routine) {
super();
this.routine = routine;
@ -31,7 +37,6 @@ public class ImporterMonitor implements DMMonitorListener {
private ImportRoutineDescriptor routine;
private ISQueryDescriptor isQuery;
@Override
public void accepted() {
@ -46,7 +51,11 @@ public class ImporterMonitor implements DMMonitorListener {
@Override
public void complete(double percentage) {
updateStatus(ImportStatus.COMPLETE,routine);
loadOutputData(routine,isQuery);
try{
performance.loadOutputData(routine);
}catch(Throwable t) {
updateStatus(ImportStatus.FAILED,routine);
}
}
@Override

View File

@ -6,8 +6,10 @@ import java.io.IOException;
import java.io.Reader;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.sql.Timestamp;
import java.sql.Types;
import java.util.ArrayList;
import java.util.HashMap;
@ -25,6 +27,7 @@ import org.gcube.application.perform.service.engine.PerformanceManager;
import org.gcube.application.perform.service.engine.dm.DMUtils;
import org.gcube.application.perform.service.engine.model.CSVExportRequest;
import org.gcube.application.perform.service.engine.model.DBField;
import org.gcube.application.perform.service.engine.model.DBField.ImportRoutine;
import org.gcube.application.perform.service.engine.model.DBQueryDescriptor;
import org.gcube.application.perform.service.engine.model.InternalException;
import org.gcube.application.perform.service.engine.model.InvalidRequestException;
@ -77,7 +80,9 @@ public class PerformanceManagerImpl implements PerformanceManager{
for(Entry<String,String> entry:outputs.entrySet()) {
parse(entry.getValue(),entry.getKey(),desc,conn);
}
log.info("IMPORTED ALL FILES for {} ",desc);
log.info("IMPORTED ALL FILES for {}, gonna clean previous routines output. ",desc);
removeOlderEquivalents(desc, conn);
conn.commit();
}finally {
conn.close();
@ -135,7 +140,29 @@ public class PerformanceManagerImpl implements PerformanceManager{
return analysisConfiguration.get(request.getType());
}
private static final void removeOlderEquivalents(ImportRoutineDescriptor last,Connection conn) throws SQLException, InvalidRequestException {
log.debug("Removing imports replaced by {} ",last);
DBQueryDescriptor desc=new DBQueryDescriptor().
add(DBField.ImportRoutine.fields.get(ImportRoutine.BATCH_TYPE),last.getBatch_type()).
add(DBField.ImportRoutine.fields.get(ImportRoutine.SOURCE_URL),last.getSourceUrl()).
add(DBField.ImportRoutine.fields.get(ImportRoutine.END),new Timestamp(last.getEndTime().toEpochMilli()));
ResultSet rsEquivalents=Queries.GET_OLDER_EQUIVALENT_IMPORT_ROUTINE.get(conn, desc).executeQuery();
while(rsEquivalents.next()) {
ImportRoutineDescriptor older=Queries.rowToDescriptor(rsEquivalents);
log.debug("Removing outputs from {} ",older);
AnalysisType type=new AnalysisType(older);
for(ImportedTable table:analysisConfiguration.get(type)) {
log.debug("Cleaning {} of {} outputs",table.getTableName(),older);
table.cleanByImportRoutine(older,conn);
}
}
}
private static final long parse(String path, String description, ImportRoutineDescriptor routine, Connection conn) throws IOException, SQLException, InvalidRequestException {

View File

@ -35,6 +35,15 @@ public class Queries {
new DBField[] {DBField.Batch.fields.get(DBField.Batch.FARM_ID)});
public static final Query GET_OLDER_EQUIVALENT_IMPORT_ROUTINE=new Query("Select * from "+ImportRoutine.TABLE+" WHERE "
+ImportRoutine.BATCH_TYPE+"=? AND "
+ImportRoutine.SOURCE_URL+"=? AND "
+ImportRoutine.END+"<?",
new DBField[]{DBField.ImportRoutine.fields.get(ImportRoutine.BATCH_TYPE),
DBField.ImportRoutine.fields.get(ImportRoutine.SOURCE_URL),
DBField.ImportRoutine.fields.get(ImportRoutine.END)});
// Imports with lock = hostname or lock == null
public static final Query ORPHAN_IMPORTS=new Query("SELECT * from "+ImportRoutine.TABLE+" where "+ImportRoutine.LOCK+" = ? OR "+ImportRoutine.LOCK+" IS NULL ",

View File

@ -4,6 +4,8 @@ import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import org.gcube.application.perform.service.engine.model.importer.ImportRoutineDescriptor;
public class DBQueryDescriptor {
@ -28,8 +30,7 @@ public class DBQueryDescriptor {
add(field,value);
}
public String toString() {
StringBuilder builder=new StringBuilder();
for(Entry<DBField,Object> entry : condition.entrySet()) {

View File

@ -148,6 +148,4 @@ public class ImportRoutineDescriptor {
}
}

View File

@ -168,7 +168,14 @@ public class ImportedTable {
return tablename;
}
public int cleanByImportRoutine(ImportRoutineDescriptor toClean,Connection conn) throws InvalidRequestException, SQLException {
DBField routineField=getRoutineIdField();
Query cleanQuery=new Query(String.format("DELETE FROM %1$s WHERE %2$s =?", this.tablename,routineField.getFieldName()),
new DBField[] {routineField});
return cleanQuery.get(conn, new DBQueryDescriptor(routineField,toClean.getId())).executeUpdate();
}
public File exportCSV(CSVExportRequest request) throws InvalidRequestException, SQLException, InternalException, IOException {