This commit is contained in:
Fabio Sinibaldi 2018-12-21 18:22:28 +00:00
parent 99e8c65b93
commit 51cfe3d776
7 changed files with 121 additions and 46 deletions

View File

@ -1,13 +1,16 @@
package org.gcube.application.perform.service.engine.dm;
import java.util.Map;
import org.gcube.application.perform.service.engine.model.InternalException;
public interface DMInterface {
public static DMInterface get() {
return new DMInterfaceImpl();
}
public void submitJob();
public void getJobStatus();
public void getJobOutput();
// Submit job and registers listener
public void submitJob(DMListenerCallback callback, String operatorId, Map<String,String> parameters) throws InternalException;
}

View File

@ -1,26 +1,61 @@
package org.gcube.application.perform.service.engine.dm;
import java.util.Map;
import org.gcube.application.perform.service.engine.model.InternalException;
import org.gcube.data.analysis.dataminermanagercl.server.DataMinerService;
import org.gcube.data.analysis.dataminermanagercl.server.dmservice.SClient;
import org.gcube.data.analysis.dataminermanagercl.server.monitor.DMMonitor;
import org.gcube.data.analysis.dataminermanagercl.shared.data.computations.ComputationId;
import org.gcube.data.analysis.dataminermanagercl.shared.parameters.Parameter;
import org.gcube.data.analysis.dataminermanagercl.shared.process.Operator;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class DMInterfaceImpl implements DMInterface{
@Override
public void submitJob() {
// TODO Auto-generated method stub
public void submitJob(DMListenerCallback callback, String operatorId, Map<String, String> parameters) throws InternalException {
try {
log.debug("Looking for DM ..");
SClient dmClient=new DataMinerService().getClient();
log.debug("Looking for operator by Id {} ",operatorId);
Operator op=dmClient.getOperatorById(operatorId);
}
log.debug("Preparing parameters, values : {} ",parameters);
for(Parameter param:op.getOperatorParameters()) {
String paramName=param.getName();
if(parameters.containsKey(paramName))
param.setValue(parameters.get(paramName));
}
@Override
public void getJobStatus() {
// TODO Auto-generated method stub
log.info("Submitting Operator {} to DM",op);
}
@Override
public void getJobOutput() {
// TODO Auto-generated method stub
ComputationId compid=dmClient.startComputation(op);
log.debug("Calling onSubmitted for {} , {} ",compid,parameters);
callback.onSubmitted(compid, parameters);
log.debug("Registering monitor");
monitor(callback,compid,dmClient);
}catch(Exception e) {
throw new InternalException("Unable to submit to DM.");
}
}
private void monitor(DMListenerCallback callback, ComputationId computatationId,SClient dmClient) {
DMMonitor monitor=new DMMonitor(computatationId, dmClient);
monitor.add(new DMListener(callback,monitor));
monitor.start();
}
}

View File

@ -0,0 +1,55 @@
package org.gcube.application.perform.service.engine.dm;
import org.gcube.data.analysis.dataminermanagercl.server.monitor.DMMonitor;
import org.gcube.data.analysis.dataminermanagercl.server.monitor.DMMonitorListener;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
@Slf4j
@AllArgsConstructor
public class DMListener implements DMMonitorListener {
private DMListenerCallback callback;
private DMMonitor selfmonitor;
@Override
public void running(double percentage) {
log.debug("Operation Running: " + percentage);
callback.running(percentage);
}
@Override
public void failed(String message, Exception exception) {
log.error("Operation failed : "+message, exception);
callback.failed(message, exception);
onFinishedComputation();
}
@Override
public void complete(double percentage) {
log.debug("Operation Completed. Perc: " + percentage);
onFinishedComputation();
callback.complete(percentage);
}
@Override
public void cancelled() {
log.debug("Operation Cancelled");
onFinishedComputation();
callback.cancelled();
}
@Override
public void accepted() {
log.debug("Operation Accepted");
callback.accepted();
}
private void onFinishedComputation() {
selfmonitor.cancel();
}
}

View File

@ -0,0 +1,13 @@
package org.gcube.application.perform.service.engine.dm;
import java.util.Map;
import org.gcube.data.analysis.dataminermanagercl.server.monitor.DMMonitorListener;
import org.gcube.data.analysis.dataminermanagercl.shared.data.computations.ComputationId;
public interface DMListenerCallback extends DMMonitorListener{
public void onSubmitted(ComputationId computationId, Map<String,String> parameters);
}

View File

@ -1,21 +0,0 @@
package org.gcube.application.perform.service.engine.dm;
public abstract class DMPoolMonitor<T extends PostProcess>{
private T PostProcess;
private PoolMonitorConfiguration configuration;
public DMPoolMonitor(T PostProcess){
// INIT THREAD POOL
// .. queue, core/max size
}
public void submitRequest() {
}
}

View File

@ -1,5 +0,0 @@
package org.gcube.application.perform.service.engine.dm;
public class PoolMonitorConfiguration {
}

View File

@ -1,5 +0,0 @@
package org.gcube.application.perform.service.engine.dm;
public interface PostProcess {
}