From 37029e9e2d0c1d9785ade0b4ebc1c1ee5163ea24 Mon Sep 17 00:00:00 2001 From: "fabio.sinibaldi" Date: Thu, 24 Jan 2019 14:57:24 +0000 Subject: [PATCH] git-svn-id: http://svn.research-infrastructures.eu/public/d4science/gcube/trunk/application/perform-service@176798 82a268e6-3cf1-43bd-a215-b396298e98cf --- .../service/engine/dm/ImporterMonitor.java | 13 ++++++-- .../engine/impl/PerformanceManagerImpl.java | 31 +++++++++++++++++-- .../perform/service/engine/impl/Queries.java | 9 ++++++ .../engine/model/DBQueryDescriptor.java | 5 +-- .../importer/ImportRoutineDescriptor.java | 2 -- .../engine/model/importer/ImportedTable.java | 9 +++++- 6 files changed, 60 insertions(+), 9 deletions(-) diff --git a/src/main/java/org/gcube/application/perform/service/engine/dm/ImporterMonitor.java b/src/main/java/org/gcube/application/perform/service/engine/dm/ImporterMonitor.java index f6236d0..76b8589 100644 --- a/src/main/java/org/gcube/application/perform/service/engine/dm/ImporterMonitor.java +++ b/src/main/java/org/gcube/application/perform/service/engine/dm/ImporterMonitor.java @@ -6,7 +6,10 @@ import java.sql.Timestamp; import java.time.Instant; import java.util.Map; +import javax.inject.Inject; + import org.gcube.application.perform.service.engine.DataBaseManager; +import org.gcube.application.perform.service.engine.PerformanceManager; import org.gcube.application.perform.service.engine.impl.Queries; import org.gcube.application.perform.service.engine.model.DBField; import org.gcube.application.perform.service.engine.model.DBField.ImportRoutine; @@ -24,6 +27,9 @@ public class ImporterMonitor implements DMMonitorListener { private static final Logger log= LoggerFactory.getLogger(ImporterMonitor.class); + @Inject + private PerformanceManager performance; + public ImporterMonitor(ImportRoutineDescriptor routine) { super(); this.routine = routine; @@ -31,7 +37,6 @@ public class ImporterMonitor implements DMMonitorListener { private ImportRoutineDescriptor routine; - private ISQueryDescriptor isQuery; @Override public void accepted() { @@ -46,7 +51,11 @@ public class ImporterMonitor implements DMMonitorListener { @Override public void complete(double percentage) { updateStatus(ImportStatus.COMPLETE,routine); - loadOutputData(routine,isQuery); + try{ + performance.loadOutputData(routine); + }catch(Throwable t) { + updateStatus(ImportStatus.FAILED,routine); + } } @Override diff --git a/src/main/java/org/gcube/application/perform/service/engine/impl/PerformanceManagerImpl.java b/src/main/java/org/gcube/application/perform/service/engine/impl/PerformanceManagerImpl.java index 5c7d8bd..b407197 100644 --- a/src/main/java/org/gcube/application/perform/service/engine/impl/PerformanceManagerImpl.java +++ b/src/main/java/org/gcube/application/perform/service/engine/impl/PerformanceManagerImpl.java @@ -6,8 +6,10 @@ import java.io.IOException; import java.io.Reader; import java.sql.Connection; import java.sql.PreparedStatement; +import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; +import java.sql.Timestamp; import java.sql.Types; import java.util.ArrayList; import java.util.HashMap; @@ -25,6 +27,7 @@ import org.gcube.application.perform.service.engine.PerformanceManager; import org.gcube.application.perform.service.engine.dm.DMUtils; import org.gcube.application.perform.service.engine.model.CSVExportRequest; import org.gcube.application.perform.service.engine.model.DBField; +import org.gcube.application.perform.service.engine.model.DBField.ImportRoutine; import org.gcube.application.perform.service.engine.model.DBQueryDescriptor; import org.gcube.application.perform.service.engine.model.InternalException; import org.gcube.application.perform.service.engine.model.InvalidRequestException; @@ -77,7 +80,9 @@ public class PerformanceManagerImpl implements PerformanceManager{ for(Entry entry:outputs.entrySet()) { parse(entry.getValue(),entry.getKey(),desc,conn); } - log.info("IMPORTED ALL FILES for {} ",desc); + log.info("IMPORTED ALL FILES for {}, gonna clean previous routines output. ",desc); + + removeOlderEquivalents(desc, conn); conn.commit(); }finally { conn.close(); @@ -135,7 +140,29 @@ public class PerformanceManagerImpl implements PerformanceManager{ return analysisConfiguration.get(request.getType()); } - + private static final void removeOlderEquivalents(ImportRoutineDescriptor last,Connection conn) throws SQLException, InvalidRequestException { + log.debug("Removing imports replaced by {} ",last); + + DBQueryDescriptor desc=new DBQueryDescriptor(). + add(DBField.ImportRoutine.fields.get(ImportRoutine.BATCH_TYPE),last.getBatch_type()). + add(DBField.ImportRoutine.fields.get(ImportRoutine.SOURCE_URL),last.getSourceUrl()). + add(DBField.ImportRoutine.fields.get(ImportRoutine.END),new Timestamp(last.getEndTime().toEpochMilli())); + + + ResultSet rsEquivalents=Queries.GET_OLDER_EQUIVALENT_IMPORT_ROUTINE.get(conn, desc).executeQuery(); + + while(rsEquivalents.next()) { + ImportRoutineDescriptor older=Queries.rowToDescriptor(rsEquivalents); + log.debug("Removing outputs from {} ",older); + AnalysisType type=new AnalysisType(older); + for(ImportedTable table:analysisConfiguration.get(type)) { + log.debug("Cleaning {} of {} outputs",table.getTableName(),older); + table.cleanByImportRoutine(older,conn); + } + } + + + } private static final long parse(String path, String description, ImportRoutineDescriptor routine, Connection conn) throws IOException, SQLException, InvalidRequestException { diff --git a/src/main/java/org/gcube/application/perform/service/engine/impl/Queries.java b/src/main/java/org/gcube/application/perform/service/engine/impl/Queries.java index 56bca55..4ececb4 100644 --- a/src/main/java/org/gcube/application/perform/service/engine/impl/Queries.java +++ b/src/main/java/org/gcube/application/perform/service/engine/impl/Queries.java @@ -35,6 +35,15 @@ public class Queries { new DBField[] {DBField.Batch.fields.get(DBField.Batch.FARM_ID)}); + + public static final Query GET_OLDER_EQUIVALENT_IMPORT_ROUTINE=new Query("Select * from "+ImportRoutine.TABLE+" WHERE " + +ImportRoutine.BATCH_TYPE+"=? AND " + +ImportRoutine.SOURCE_URL+"=? AND " + +ImportRoutine.END+" entry : condition.entrySet()) { diff --git a/src/main/java/org/gcube/application/perform/service/engine/model/importer/ImportRoutineDescriptor.java b/src/main/java/org/gcube/application/perform/service/engine/model/importer/ImportRoutineDescriptor.java index 50a8447..becba20 100644 --- a/src/main/java/org/gcube/application/perform/service/engine/model/importer/ImportRoutineDescriptor.java +++ b/src/main/java/org/gcube/application/perform/service/engine/model/importer/ImportRoutineDescriptor.java @@ -148,6 +148,4 @@ public class ImportRoutineDescriptor { } - - } diff --git a/src/main/java/org/gcube/application/perform/service/engine/model/importer/ImportedTable.java b/src/main/java/org/gcube/application/perform/service/engine/model/importer/ImportedTable.java index 4e80621..71afddd 100644 --- a/src/main/java/org/gcube/application/perform/service/engine/model/importer/ImportedTable.java +++ b/src/main/java/org/gcube/application/perform/service/engine/model/importer/ImportedTable.java @@ -168,7 +168,14 @@ public class ImportedTable { return tablename; } - + public int cleanByImportRoutine(ImportRoutineDescriptor toClean,Connection conn) throws InvalidRequestException, SQLException { + DBField routineField=getRoutineIdField(); + Query cleanQuery=new Query(String.format("DELETE FROM %1$s WHERE %2$s =?", this.tablename,routineField.getFieldName()), + new DBField[] {routineField}); + + return cleanQuery.get(conn, new DBQueryDescriptor(routineField,toClean.getId())).executeUpdate(); + + } public File exportCSV(CSVExportRequest request) throws InvalidRequestException, SQLException, InternalException, IOException {