This commit is contained in:
Fabio Sinibaldi 2019-01-02 17:38:21 +00:00
parent ad6a753e2d
commit 5b2124e74e
11 changed files with 251 additions and 128 deletions

View File

@ -2,13 +2,13 @@ package org.gcube.application.perform.service.engine;
import java.util.List;
import org.gcube.application.perform.service.engine.model.DBQueryDescriptor;
import org.gcube.application.perform.service.engine.model.importer.ImportRequest;
import org.gcube.application.perform.service.engine.model.importer.ImportRoutineDescriptor;
import org.gcube.application.perform.service.engine.model.importer.ImportTicket;
public interface Importer {
public ImportRoutineDescriptor importExcel(ImportRequest request);
public List<ImportRoutineDescriptor> getDescriptors(ImportTicket ticket);
public List<ImportRoutineDescriptor> getDescriptors(DBQueryDescriptor query);
}

View File

@ -6,10 +6,13 @@ import java.sql.ResultSet;
import java.util.List;
import org.gcube.application.perform.service.LocalConfiguration;
import org.gcube.application.perform.service.engine.dm.DMUtils;
import org.gcube.application.perform.service.engine.model.DBField.ImportRoutine;
import org.gcube.application.perform.service.engine.model.DBQueryDescriptor;
import org.gcube.application.perform.service.engine.model.ISQueryDescriptor;
import org.gcube.application.perform.service.engine.model.InternalException;
import org.gcube.application.perform.service.engine.model.importer.ImportRequest;
import org.gcube.application.perform.service.engine.model.importer.ImportRoutineDescriptor;
import org.gcube.application.perform.service.engine.model.importer.ImportTicket;
import org.gcube.data.analysis.dataminermanagercl.shared.data.computations.ComputationId;
import lombok.Synchronized;
@ -18,6 +21,9 @@ import lombok.extern.slf4j.Slf4j;
@Slf4j
public class ImporterImpl implements Importer {
private static final String ORPHAN_IMPORTS="";
private static final String ACQUIRE_PS="";
private static ISQueryDescriptor isQueryDescriptor=null;
@ -34,20 +40,47 @@ public class ImporterImpl implements Importer {
}
public void init() {
private static final String getHostname() {
throw new RuntimeException("IMPLEMENT THIS");
}
public void init() throws InternalException{
try {
log.info("Initializing IMPORTER");
DataBaseManager db=DataBaseManager.get(getISQueryDescriptor());
Connection conn=db.getConnection();
PreparedStatement ps=conn.prepareStatement(ORPHAN_IMPORTS);
conn.setAutoCommit(true);
PreparedStatement psOrphans=conn.prepareStatement(ORPHAN_IMPORTS);
PreparedStatement psAcquire=conn.prepareStatement(ACQUIRE_PS);
// set ps
ResultSet rsOrphans=ps.executeQuery();
ResultSet rsOrphans=psOrphans.executeQuery();
long monitoredCount=0l;
while(rsOrphans.next()) {
// acquire
// monitor
Long id=rsOrphans.getLong(ImportRoutine.ID);
try {
ImportRoutineDescriptor desc=rowToDescriptor(rsOrphans);
String hostname=getHostname();
// "acquire"
// set lock = hostname where ID =? and LOCK is null
// Acquired = updated rows == 1
psAcquire.setString(1, hostname);
psAcquire.setLong(2, id);
if(psAcquire.executeUpdate()>0) {
log.debug("Acquired {} ",id);
monitor(desc);
monitoredCount++;
}
}catch(Throwable t) {
log.warn("Unable to monitor orphan with ID {} ",id,t);
}
}
log.info("Acquired {} import executions for monitoring",monitoredCount);
}catch(Throwable t) {
log.warn("Unexpected Error while trying to check orphan import routines");
throw new InternalException(t);
}
}
@ -58,13 +91,13 @@ public class ImporterImpl implements Importer {
log.debug("Registering {} computationID {} ",request,id);
ImportRoutineDescriptor desc=register(id,request);
log.debug("Monitoring {} computationID {} ",desc,id);
monitor(id,desc);
monitor(desc);
return getDescriptorById(desc.getId());
}
private void monitor(ComputationId computationId,ImportRoutineDescriptor desc) {
private void monitor(ImportRoutineDescriptor desc) {
throw new RuntimeException("IMPLEMENT THIS SHIT");
}
private ComputationId submit(ImportRequest request) {
@ -81,8 +114,18 @@ public class ImporterImpl implements Importer {
@Override
public List<ImportRoutineDescriptor> getDescriptors(ImportTicket ticket) {
public List<ImportRoutineDescriptor> getDescriptors(DBQueryDescriptor desc) {
// TODO Auto-generated method stub
return null;
}
private static ImportRoutineDescriptor rowToDescriptor(ResultSet rs) {
throw new RuntimeException("IMPLEMENT THIS SHIT");
}
}

View File

@ -0,0 +1,37 @@
package org.gcube.application.perform.service.engine.dm;
public class DMException extends Exception {
/**
*
*/
private static final long serialVersionUID = -3390718985211520032L;
public DMException() {
super();
// TODO Auto-generated constructor stub
}
public DMException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
super(message, cause, enableSuppression, writableStackTrace);
// TODO Auto-generated constructor stub
}
public DMException(String message, Throwable cause) {
super(message, cause);
// TODO Auto-generated constructor stub
}
public DMException(String message) {
super(message);
// TODO Auto-generated constructor stub
}
public DMException(Throwable cause) {
super(cause);
// TODO Auto-generated constructor stub
}
}

View File

@ -1,16 +0,0 @@
package org.gcube.application.perform.service.engine.dm;
import java.util.Map;
import org.gcube.application.perform.service.engine.model.InternalException;
public interface DMInterface {
public static DMInterface get() {
return new DMInterfaceImpl();
}
// Submit job and registers listener
public void submitJob(DMListenerCallback callback, String operatorId, Map<String,String> parameters) throws InternalException;
}

View File

@ -1,55 +0,0 @@
package org.gcube.application.perform.service.engine.dm;
import org.gcube.data.analysis.dataminermanagercl.server.monitor.DMMonitor;
import org.gcube.data.analysis.dataminermanagercl.server.monitor.DMMonitorListener;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
@Slf4j
@AllArgsConstructor
public class DMListener implements DMMonitorListener {
private DMListenerCallback callback;
private DMMonitor selfmonitor;
@Override
public void running(double percentage) {
log.debug("Operation Running: " + percentage);
callback.running(percentage);
}
@Override
public void failed(String message, Exception exception) {
log.error("Operation failed : "+message, exception);
callback.failed(message, exception);
onFinishedComputation();
}
@Override
public void complete(double percentage) {
log.debug("Operation Completed. Perc: " + percentage);
onFinishedComputation();
callback.complete(percentage);
}
@Override
public void cancelled() {
log.debug("Operation Cancelled");
onFinishedComputation();
callback.cancelled();
}
@Override
public void accepted() {
log.debug("Operation Accepted");
callback.accepted();
}
private void onFinishedComputation() {
selfmonitor.cancel();
}
}

View File

@ -1,13 +0,0 @@
package org.gcube.application.perform.service.engine.dm;
import java.util.Map;
import org.gcube.data.analysis.dataminermanagercl.server.monitor.DMMonitorListener;
import org.gcube.data.analysis.dataminermanagercl.shared.data.computations.ComputationId;
public interface DMListenerCallback extends DMMonitorListener{
public void onSubmitted(ComputationId computationId, Map<String,String> parameters);
}

View File

@ -2,10 +2,10 @@ package org.gcube.application.perform.service.engine.dm;
import java.util.Map;
import org.gcube.application.perform.service.engine.model.InternalException;
import org.gcube.data.analysis.dataminermanagercl.server.DataMinerService;
import org.gcube.data.analysis.dataminermanagercl.server.dmservice.SClient;
import org.gcube.data.analysis.dataminermanagercl.server.monitor.DMMonitor;
import org.gcube.data.analysis.dataminermanagercl.server.monitor.DMMonitorListener;
import org.gcube.data.analysis.dataminermanagercl.shared.data.computations.ComputationId;
import org.gcube.data.analysis.dataminermanagercl.shared.parameters.Parameter;
import org.gcube.data.analysis.dataminermanagercl.shared.process.Operator;
@ -13,19 +13,28 @@ import org.gcube.data.analysis.dataminermanagercl.shared.process.Operator;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class DMInterfaceImpl implements DMInterface{
public class DMUtils {
@Override
public void submitJob(DMListenerCallback callback, String operatorId, Map<String, String> parameters) throws InternalException {
public static SClient getClient() throws DMException {
try {
return new DataMinerService().getClient();
}catch(Exception e) {
throw new DMException(e);
}
}
public static void monitor(SClient client,ComputationId computationId,DMMonitorListener listener) {
DMMonitor monitor=new DMMonitor(computationId,client);
monitor.add(listener);
monitor.start();
}
public static ComputationId submitJob(SClient client, String operatorId, Map<String,String> parameters) throws DMException {
try {
log.debug("Looking for DM ..");
SClient dmClient=new DataMinerService().getClient();
log.debug("Looking for operator by Id {} ",operatorId);
Operator op=dmClient.getOperatorById(operatorId);
Operator op=client.getOperatorById(operatorId);
log.debug("Preparing parameters, values : {} ",parameters);
@ -37,25 +46,10 @@ public class DMInterfaceImpl implements DMInterface{
log.info("Submitting Operator {} to DM",op);
ComputationId compid=dmClient.startComputation(op);
log.debug("Calling onSubmitted for {} , {} ",compid,parameters);
callback.onSubmitted(compid, parameters);
log.debug("Registering monitor");
monitor(callback,compid,dmClient);
return client.startComputation(op);
}catch(Exception e) {
throw new InternalException("Unable to submit to DM.");
throw new DMException(e);
}
}
private void monitor(DMListenerCallback callback, ComputationId computatationId,SClient dmClient) {
DMMonitor monitor=new DMMonitor(computatationId, dmClient);
monitor.add(new DMListener(callback,monitor));
monitor.start();
}
}

View File

@ -0,0 +1,69 @@
package org.gcube.application.perform.service.engine.dm;
import org.gcube.application.perform.service.engine.model.ISQueryDescriptor;
import org.gcube.application.perform.service.engine.model.importer.ImportRoutineDescriptor;
import org.gcube.application.perform.service.engine.model.importer.ImportStatus;
import org.gcube.data.analysis.dataminermanagercl.server.monitor.DMMonitorListener;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
@AllArgsConstructor
@Slf4j
public class ImporterMonitor implements DMMonitorListener {
private static final String UPDATE_ROUTINE="";
private ImportRoutineDescriptor routine;
private ISQueryDescriptor isQuery;
@Override
public void accepted() {
updateStatus(ImportStatus.ACCEPTED,routine,isQuery);
}
@Override
public void cancelled() {
updateStatus(ImportStatus.CANCELLED,routine,isQuery);
}
@Override
public void complete(double percentage) {
updateStatus(ImportStatus.COMPLETE,routine,isQuery);
loadOutputData(routine,isQuery);
}
@Override
public void failed(String message, Exception exception) {
updateStatus(ImportStatus.FAILED,routine,isQuery);
}
@Override
public void running(double percentage) {
updateStatus(ImportStatus.RUNNING,routine,isQuery);
}
private static final void updateStatus(ImportStatus status,ImportRoutineDescriptor routine,ISQueryDescriptor is) {
throw new RuntimeException("NEED TO SET PS STMT");
// try{
// log.debug("Updateing status {} for {} ",status,routine);
// DataBaseManager db=DataBaseManager.get(is);
// Connection conn=db.getConnection();
// conn.setAutoCommit(true);
// PreparedStatement psUpdate=conn.prepareStatement(UPDATE_ROUTINE);
// //TODO SET PARAMS
//
// psUpdate.executeUpdate();
// }catch(Throwable t) {
// log.warn("Unable to update status on database");
// }
}
private static final void loadOutputData(ImportRoutineDescriptor routine, ISQueryDescriptor is) {
throw new RuntimeException("NEED TO IMPLEMENT THIS");
}
}

View File

@ -50,6 +50,52 @@ public class DBField {
}
}
public static class ImportRoutine{
public static final Map<String,DBField> fields=new HashMap<>();
public static final String ID="id";
public static final String FARM_ID="farmid";
public static final String BATCH_TYPE="batch_type";
public static final String SOURCE_URL="sourceurl";
public static final String SOURCE_VERSION="sourceversion";
public static final String START="start";
public static final String END="end";
public static final String STATUS="status";
public static final String CALLER="caller";
public static final String COMPUTATION_ID="computation_id";
public static final String COMPUTATION_URL="computation_url";
public static final String COMPUTATION_OPID="computation_opid";
public static final String COMPUTATION_OPNAME="computation_opname";
public static final String COMPUTATION_REQ="computation_req";
public static final String LOCK="lock";
static {
fields.put(FARM_ID, new DBField(Types.BIGINT,FARM_ID));
fields.put(ID, new DBField(Types.BIGINT,ID));
fields.put(BATCH_TYPE, new DBField(Types.NVARCHAR,BATCH_TYPE));
fields.put(SOURCE_URL, new DBField(Types.NVARCHAR,SOURCE_URL));
fields.put(SOURCE_VERSION, new DBField(Types.INTEGER,SOURCE_VERSION));
fields.put(START, new DBField(Types.TIMESTAMP_WITH_TIMEZONE,START));
fields.put(END, new DBField(Types.TIMESTAMP_WITH_TIMEZONE,END));
fields.put(STATUS, new DBField(Types.NVARCHAR,STATUS));
fields.put(CALLER, new DBField(Types.NVARCHAR,CALLER));
fields.put(COMPUTATION_ID, new DBField(Types.NVARCHAR,COMPUTATION_ID));
fields.put(COMPUTATION_URL, new DBField(Types.NVARCHAR,COMPUTATION_URL));
fields.put(COMPUTATION_OPID, new DBField(Types.NVARCHAR,COMPUTATION_OPID));
fields.put(COMPUTATION_OPNAME, new DBField(Types.NVARCHAR,COMPUTATION_OPNAME));
fields.put(COMPUTATION_REQ, new DBField(Types.NVARCHAR,COMPUTATION_REQ));
fields.put(LOCK, new DBField(Types.NVARCHAR,LOCK));
}
}
private int type;
private String fieldName;

View File

@ -14,13 +14,26 @@ import lombok.Setter;
public class ImportRoutineDescriptor {
private Long id;
private String sourceFile;
private Long farmId;
private String batch_type;
private String sourceUrl;
private Integer sourceVersion;
private Instant startTime;
private Instant endTime;
private Long producedKpiRows;
private Long producedAggregatedKpiRows;
private String status;
private String executionID;
private String userToken;
private String caller;
private String computationId;
private String computationUrl;
private String computationOperator;
private String computationOperatorName;
private String computationRequest;
}

View File

@ -0,0 +1,5 @@
package org.gcube.application.perform.service.engine.model.importer;
public enum ImportStatus {
ACCEPTED, RUNNING, COMPLETE, FAILED, CANCELLED
}