diff --git a/src/main/java/org/gcube/application/perform/service/engine/Importer.java b/src/main/java/org/gcube/application/perform/service/engine/Importer.java index 5ba4d7e..277d046 100644 --- a/src/main/java/org/gcube/application/perform/service/engine/Importer.java +++ b/src/main/java/org/gcube/application/perform/service/engine/Importer.java @@ -2,13 +2,13 @@ package org.gcube.application.perform.service.engine; import java.util.List; +import org.gcube.application.perform.service.engine.model.DBQueryDescriptor; import org.gcube.application.perform.service.engine.model.importer.ImportRequest; import org.gcube.application.perform.service.engine.model.importer.ImportRoutineDescriptor; -import org.gcube.application.perform.service.engine.model.importer.ImportTicket; public interface Importer { public ImportRoutineDescriptor importExcel(ImportRequest request); - public List getDescriptors(ImportTicket ticket); + public List getDescriptors(DBQueryDescriptor query); } diff --git a/src/main/java/org/gcube/application/perform/service/engine/ImporterImpl.java b/src/main/java/org/gcube/application/perform/service/engine/ImporterImpl.java index 1a1c227..cbebbe1 100644 --- a/src/main/java/org/gcube/application/perform/service/engine/ImporterImpl.java +++ b/src/main/java/org/gcube/application/perform/service/engine/ImporterImpl.java @@ -6,10 +6,13 @@ import java.sql.ResultSet; import java.util.List; import org.gcube.application.perform.service.LocalConfiguration; +import org.gcube.application.perform.service.engine.dm.DMUtils; +import org.gcube.application.perform.service.engine.model.DBField.ImportRoutine; +import org.gcube.application.perform.service.engine.model.DBQueryDescriptor; import org.gcube.application.perform.service.engine.model.ISQueryDescriptor; +import org.gcube.application.perform.service.engine.model.InternalException; import org.gcube.application.perform.service.engine.model.importer.ImportRequest; import org.gcube.application.perform.service.engine.model.importer.ImportRoutineDescriptor; -import org.gcube.application.perform.service.engine.model.importer.ImportTicket; import org.gcube.data.analysis.dataminermanagercl.shared.data.computations.ComputationId; import lombok.Synchronized; @@ -18,6 +21,9 @@ import lombok.extern.slf4j.Slf4j; @Slf4j public class ImporterImpl implements Importer { + private static final String ORPHAN_IMPORTS=""; + private static final String ACQUIRE_PS=""; + private static ISQueryDescriptor isQueryDescriptor=null; @@ -34,20 +40,47 @@ public class ImporterImpl implements Importer { } - public void init() { + private static final String getHostname() { + throw new RuntimeException("IMPLEMENT THIS"); + } + + public void init() throws InternalException{ + try { log.info("Initializing IMPORTER"); DataBaseManager db=DataBaseManager.get(getISQueryDescriptor()); Connection conn=db.getConnection(); - - PreparedStatement ps=conn.prepareStatement(ORPHAN_IMPORTS); + conn.setAutoCommit(true); + PreparedStatement psOrphans=conn.prepareStatement(ORPHAN_IMPORTS); + PreparedStatement psAcquire=conn.prepareStatement(ACQUIRE_PS); // set ps - ResultSet rsOrphans=ps.executeQuery(); + ResultSet rsOrphans=psOrphans.executeQuery(); long monitoredCount=0l; while(rsOrphans.next()) { - // acquire - // monitor + Long id=rsOrphans.getLong(ImportRoutine.ID); + try { + ImportRoutineDescriptor desc=rowToDescriptor(rsOrphans); + String hostname=getHostname(); + + // "acquire" + // set lock = hostname where ID =? and LOCK is null + // Acquired = updated rows == 1 + psAcquire.setString(1, hostname); + psAcquire.setLong(2, id); + + if(psAcquire.executeUpdate()>0) { + log.debug("Acquired {} ",id); + monitor(desc); + monitoredCount++; + } + }catch(Throwable t) { + log.warn("Unable to monitor orphan with ID {} ",id,t); + } } log.info("Acquired {} import executions for monitoring",monitoredCount); + }catch(Throwable t) { + log.warn("Unexpected Error while trying to check orphan import routines"); + throw new InternalException(t); + } } @@ -58,13 +91,13 @@ public class ImporterImpl implements Importer { log.debug("Registering {} computationID {} ",request,id); ImportRoutineDescriptor desc=register(id,request); log.debug("Monitoring {} computationID {} ",desc,id); - monitor(id,desc); + monitor(desc); return getDescriptorById(desc.getId()); } - private void monitor(ComputationId computationId,ImportRoutineDescriptor desc) { + private void monitor(ImportRoutineDescriptor desc) { throw new RuntimeException("IMPLEMENT THIS SHIT"); } private ComputationId submit(ImportRequest request) { @@ -81,8 +114,18 @@ public class ImporterImpl implements Importer { @Override - public List getDescriptors(ImportTicket ticket) { + public List getDescriptors(DBQueryDescriptor desc) { // TODO Auto-generated method stub return null; } + + + + + + + private static ImportRoutineDescriptor rowToDescriptor(ResultSet rs) { + throw new RuntimeException("IMPLEMENT THIS SHIT"); + } + } diff --git a/src/main/java/org/gcube/application/perform/service/engine/dm/DMException.java b/src/main/java/org/gcube/application/perform/service/engine/dm/DMException.java new file mode 100644 index 0000000..c3fab90 --- /dev/null +++ b/src/main/java/org/gcube/application/perform/service/engine/dm/DMException.java @@ -0,0 +1,37 @@ +package org.gcube.application.perform.service.engine.dm; + +public class DMException extends Exception { + + /** + * + */ + private static final long serialVersionUID = -3390718985211520032L; + + public DMException() { + super(); + // TODO Auto-generated constructor stub + } + + public DMException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) { + super(message, cause, enableSuppression, writableStackTrace); + // TODO Auto-generated constructor stub + } + + public DMException(String message, Throwable cause) { + super(message, cause); + // TODO Auto-generated constructor stub + } + + public DMException(String message) { + super(message); + // TODO Auto-generated constructor stub + } + + public DMException(Throwable cause) { + super(cause); + // TODO Auto-generated constructor stub + } + + + +} diff --git a/src/main/java/org/gcube/application/perform/service/engine/dm/DMInterface.java b/src/main/java/org/gcube/application/perform/service/engine/dm/DMInterface.java deleted file mode 100644 index 7c7d61d..0000000 --- a/src/main/java/org/gcube/application/perform/service/engine/dm/DMInterface.java +++ /dev/null @@ -1,16 +0,0 @@ -package org.gcube.application.perform.service.engine.dm; - -import java.util.Map; - -import org.gcube.application.perform.service.engine.model.InternalException; - -public interface DMInterface { - - public static DMInterface get() { - return new DMInterfaceImpl(); - } - - // Submit job and registers listener - public void submitJob(DMListenerCallback callback, String operatorId, Map parameters) throws InternalException; - -} diff --git a/src/main/java/org/gcube/application/perform/service/engine/dm/DMListener.java b/src/main/java/org/gcube/application/perform/service/engine/dm/DMListener.java deleted file mode 100644 index 728521f..0000000 --- a/src/main/java/org/gcube/application/perform/service/engine/dm/DMListener.java +++ /dev/null @@ -1,55 +0,0 @@ -package org.gcube.application.perform.service.engine.dm; - -import org.gcube.data.analysis.dataminermanagercl.server.monitor.DMMonitor; -import org.gcube.data.analysis.dataminermanagercl.server.monitor.DMMonitorListener; - -import lombok.AllArgsConstructor; -import lombok.extern.slf4j.Slf4j; - -@Slf4j -@AllArgsConstructor -public class DMListener implements DMMonitorListener { - - private DMListenerCallback callback; - private DMMonitor selfmonitor; - - - - @Override - public void running(double percentage) { - log.debug("Operation Running: " + percentage); - callback.running(percentage); - } - - @Override - public void failed(String message, Exception exception) { - log.error("Operation failed : "+message, exception); - callback.failed(message, exception); - onFinishedComputation(); - } - - @Override - public void complete(double percentage) { - log.debug("Operation Completed. Perc: " + percentage); - onFinishedComputation(); - callback.complete(percentage); - } - - @Override - public void cancelled() { - log.debug("Operation Cancelled"); - onFinishedComputation(); - callback.cancelled(); - } - - @Override - public void accepted() { - log.debug("Operation Accepted"); - callback.accepted(); - } - - - private void onFinishedComputation() { - selfmonitor.cancel(); - } -} diff --git a/src/main/java/org/gcube/application/perform/service/engine/dm/DMListenerCallback.java b/src/main/java/org/gcube/application/perform/service/engine/dm/DMListenerCallback.java deleted file mode 100644 index 0a1da73..0000000 --- a/src/main/java/org/gcube/application/perform/service/engine/dm/DMListenerCallback.java +++ /dev/null @@ -1,13 +0,0 @@ -package org.gcube.application.perform.service.engine.dm; - -import java.util.Map; - -import org.gcube.data.analysis.dataminermanagercl.server.monitor.DMMonitorListener; -import org.gcube.data.analysis.dataminermanagercl.shared.data.computations.ComputationId; - -public interface DMListenerCallback extends DMMonitorListener{ - - - public void onSubmitted(ComputationId computationId, Map parameters); - -} diff --git a/src/main/java/org/gcube/application/perform/service/engine/dm/DMInterfaceImpl.java b/src/main/java/org/gcube/application/perform/service/engine/dm/DMUtils.java similarity index 52% rename from src/main/java/org/gcube/application/perform/service/engine/dm/DMInterfaceImpl.java rename to src/main/java/org/gcube/application/perform/service/engine/dm/DMUtils.java index 5cf665d..b523012 100644 --- a/src/main/java/org/gcube/application/perform/service/engine/dm/DMInterfaceImpl.java +++ b/src/main/java/org/gcube/application/perform/service/engine/dm/DMUtils.java @@ -2,10 +2,10 @@ package org.gcube.application.perform.service.engine.dm; import java.util.Map; -import org.gcube.application.perform.service.engine.model.InternalException; import org.gcube.data.analysis.dataminermanagercl.server.DataMinerService; import org.gcube.data.analysis.dataminermanagercl.server.dmservice.SClient; import org.gcube.data.analysis.dataminermanagercl.server.monitor.DMMonitor; +import org.gcube.data.analysis.dataminermanagercl.server.monitor.DMMonitorListener; import org.gcube.data.analysis.dataminermanagercl.shared.data.computations.ComputationId; import org.gcube.data.analysis.dataminermanagercl.shared.parameters.Parameter; import org.gcube.data.analysis.dataminermanagercl.shared.process.Operator; @@ -13,19 +13,28 @@ import org.gcube.data.analysis.dataminermanagercl.shared.process.Operator; import lombok.extern.slf4j.Slf4j; @Slf4j -public class DMInterfaceImpl implements DMInterface{ +public class DMUtils { - - - - - @Override - public void submitJob(DMListenerCallback callback, String operatorId, Map parameters) throws InternalException { + public static SClient getClient() throws DMException { + try { + return new DataMinerService().getClient(); + }catch(Exception e) { + throw new DMException(e); + } + } + + + public static void monitor(SClient client,ComputationId computationId,DMMonitorListener listener) { + DMMonitor monitor=new DMMonitor(computationId,client); + monitor.add(listener); + monitor.start(); + } + + + public static ComputationId submitJob(SClient client, String operatorId, Map parameters) throws DMException { try { - log.debug("Looking for DM .."); - SClient dmClient=new DataMinerService().getClient(); log.debug("Looking for operator by Id {} ",operatorId); - Operator op=dmClient.getOperatorById(operatorId); + Operator op=client.getOperatorById(operatorId); log.debug("Preparing parameters, values : {} ",parameters); @@ -37,25 +46,10 @@ public class DMInterfaceImpl implements DMInterface{ log.info("Submitting Operator {} to DM",op); - ComputationId compid=dmClient.startComputation(op); - - - log.debug("Calling onSubmitted for {} , {} ",compid,parameters); - callback.onSubmitted(compid, parameters); - - log.debug("Registering monitor"); - monitor(callback,compid,dmClient); + return client.startComputation(op); }catch(Exception e) { - throw new InternalException("Unable to submit to DM."); + throw new DMException(e); } } - - - - private void monitor(DMListenerCallback callback, ComputationId computatationId,SClient dmClient) { - DMMonitor monitor=new DMMonitor(computatationId, dmClient); - monitor.add(new DMListener(callback,monitor)); - monitor.start(); - } } diff --git a/src/main/java/org/gcube/application/perform/service/engine/dm/ImporterMonitor.java b/src/main/java/org/gcube/application/perform/service/engine/dm/ImporterMonitor.java new file mode 100644 index 0000000..72c7ce2 --- /dev/null +++ b/src/main/java/org/gcube/application/perform/service/engine/dm/ImporterMonitor.java @@ -0,0 +1,69 @@ +package org.gcube.application.perform.service.engine.dm; + +import org.gcube.application.perform.service.engine.model.ISQueryDescriptor; +import org.gcube.application.perform.service.engine.model.importer.ImportRoutineDescriptor; +import org.gcube.application.perform.service.engine.model.importer.ImportStatus; +import org.gcube.data.analysis.dataminermanagercl.server.monitor.DMMonitorListener; + +import lombok.AllArgsConstructor; +import lombok.extern.slf4j.Slf4j; + +@AllArgsConstructor +@Slf4j +public class ImporterMonitor implements DMMonitorListener { + + private static final String UPDATE_ROUTINE=""; + + + private ImportRoutineDescriptor routine; + private ISQueryDescriptor isQuery; + + @Override + public void accepted() { + updateStatus(ImportStatus.ACCEPTED,routine,isQuery); + } + + @Override + public void cancelled() { + updateStatus(ImportStatus.CANCELLED,routine,isQuery); + } + + @Override + public void complete(double percentage) { + updateStatus(ImportStatus.COMPLETE,routine,isQuery); + loadOutputData(routine,isQuery); + } + + @Override + public void failed(String message, Exception exception) { + updateStatus(ImportStatus.FAILED,routine,isQuery); + } + + @Override + public void running(double percentage) { + updateStatus(ImportStatus.RUNNING,routine,isQuery); + } + + + + private static final void updateStatus(ImportStatus status,ImportRoutineDescriptor routine,ISQueryDescriptor is) { + throw new RuntimeException("NEED TO SET PS STMT"); +// try{ +// log.debug("Updateing status {} for {} ",status,routine); +// DataBaseManager db=DataBaseManager.get(is); +// Connection conn=db.getConnection(); +// conn.setAutoCommit(true); +// PreparedStatement psUpdate=conn.prepareStatement(UPDATE_ROUTINE); +// //TODO SET PARAMS +// +// psUpdate.executeUpdate(); +// }catch(Throwable t) { +// log.warn("Unable to update status on database"); +// } + } + + + private static final void loadOutputData(ImportRoutineDescriptor routine, ISQueryDescriptor is) { + throw new RuntimeException("NEED TO IMPLEMENT THIS"); + } +} diff --git a/src/main/java/org/gcube/application/perform/service/engine/model/DBField.java b/src/main/java/org/gcube/application/perform/service/engine/model/DBField.java index cf46fe2..8efef7f 100644 --- a/src/main/java/org/gcube/application/perform/service/engine/model/DBField.java +++ b/src/main/java/org/gcube/application/perform/service/engine/model/DBField.java @@ -50,6 +50,52 @@ public class DBField { } } + public static class ImportRoutine{ + public static final Map fields=new HashMap<>(); + + public static final String ID="id"; + public static final String FARM_ID="farmid"; + public static final String BATCH_TYPE="batch_type"; + public static final String SOURCE_URL="sourceurl"; + public static final String SOURCE_VERSION="sourceversion"; + + public static final String START="start"; + public static final String END="end"; + public static final String STATUS="status"; + public static final String CALLER="caller"; + public static final String COMPUTATION_ID="computation_id"; + public static final String COMPUTATION_URL="computation_url"; + public static final String COMPUTATION_OPID="computation_opid"; + public static final String COMPUTATION_OPNAME="computation_opname"; + public static final String COMPUTATION_REQ="computation_req"; + public static final String LOCK="lock"; + + + + + + static { + fields.put(FARM_ID, new DBField(Types.BIGINT,FARM_ID)); + fields.put(ID, new DBField(Types.BIGINT,ID)); + fields.put(BATCH_TYPE, new DBField(Types.NVARCHAR,BATCH_TYPE)); + fields.put(SOURCE_URL, new DBField(Types.NVARCHAR,SOURCE_URL)); + fields.put(SOURCE_VERSION, new DBField(Types.INTEGER,SOURCE_VERSION)); + + fields.put(START, new DBField(Types.TIMESTAMP_WITH_TIMEZONE,START)); + fields.put(END, new DBField(Types.TIMESTAMP_WITH_TIMEZONE,END)); + fields.put(STATUS, new DBField(Types.NVARCHAR,STATUS)); + fields.put(CALLER, new DBField(Types.NVARCHAR,CALLER)); + fields.put(COMPUTATION_ID, new DBField(Types.NVARCHAR,COMPUTATION_ID)); + fields.put(COMPUTATION_URL, new DBField(Types.NVARCHAR,COMPUTATION_URL)); + fields.put(COMPUTATION_OPID, new DBField(Types.NVARCHAR,COMPUTATION_OPID)); + fields.put(COMPUTATION_OPNAME, new DBField(Types.NVARCHAR,COMPUTATION_OPNAME)); + fields.put(COMPUTATION_REQ, new DBField(Types.NVARCHAR,COMPUTATION_REQ)); + fields.put(LOCK, new DBField(Types.NVARCHAR,LOCK)); + } + } + + + private int type; private String fieldName; diff --git a/src/main/java/org/gcube/application/perform/service/engine/model/importer/ImportRoutineDescriptor.java b/src/main/java/org/gcube/application/perform/service/engine/model/importer/ImportRoutineDescriptor.java index 5af687d..be614f9 100644 --- a/src/main/java/org/gcube/application/perform/service/engine/model/importer/ImportRoutineDescriptor.java +++ b/src/main/java/org/gcube/application/perform/service/engine/model/importer/ImportRoutineDescriptor.java @@ -14,13 +14,26 @@ import lombok.Setter; public class ImportRoutineDescriptor { + + + private Long id; - private String sourceFile; + private Long farmId; + private String batch_type; + private String sourceUrl; + private Integer sourceVersion; + private Instant startTime; private Instant endTime; - private Long producedKpiRows; - private Long producedAggregatedKpiRows; + private String status; private String executionID; - private String userToken; + private String caller; + + private String computationId; + private String computationUrl; + private String computationOperator; + private String computationOperatorName; + private String computationRequest; + } diff --git a/src/main/java/org/gcube/application/perform/service/engine/model/importer/ImportStatus.java b/src/main/java/org/gcube/application/perform/service/engine/model/importer/ImportStatus.java new file mode 100644 index 0000000..5fb1216 --- /dev/null +++ b/src/main/java/org/gcube/application/perform/service/engine/model/importer/ImportStatus.java @@ -0,0 +1,5 @@ +package org.gcube.application.perform.service.engine.model.importer; + +public enum ImportStatus { + ACCEPTED, RUNNING, COMPLETE, FAILED, CANCELLED +}