This commit is contained in:
Fabio Sinibaldi 2019-01-25 16:43:09 +00:00
parent decc90cccd
commit c060f291ce
17 changed files with 495 additions and 120 deletions

View File

@ -4,6 +4,9 @@
<wb-resource deploy-path="/" source-path="/src/main/webapp" tag="defaultRootSource"/>
<wb-resource deploy-path="/WEB-INF/classes" source-path="/src/main/java"/>
<wb-resource deploy-path="/WEB-INF/classes" source-path="/src/main/resources"/>
<dependent-module archiveName="data-miner-manager-cl-1.6.0-SNAPSHOT.jar" deploy-path="/WEB-INF/lib" handle="module:/resource/data-miner-manager-cl/data-miner-manager-cl">
<dependency-type>uses</dependency-type>
</dependent-module>
<property name="context-root" value="perform-service"/>
<property name="java-output-path" value="/perform-service/target/classes"/>
</wb-module>

View File

@ -4,6 +4,7 @@ import java.io.IOException;
import java.sql.SQLException;
import java.util.Map;
import org.gcube.application.perform.service.engine.dm.DMException;
import org.gcube.application.perform.service.engine.model.CSVExportRequest;
import org.gcube.application.perform.service.engine.model.InternalException;
import org.gcube.application.perform.service.engine.model.InvalidRequestException;
@ -13,5 +14,5 @@ public interface PerformanceManager {
public Map<String,String> generateCSV(CSVExportRequest request)throws SQLException, InvalidRequestException, InternalException, IOException;
public void loadOutputData(ImportRoutineDescriptor desc)throws SQLException, InvalidRequestException, InternalException, IOException;
public void loadOutputData(ImportRoutineDescriptor desc)throws SQLException, InvalidRequestException, InternalException, IOException, DMException;
}

View File

@ -0,0 +1,130 @@
package org.gcube.application.perform.service.engine.dm;
import java.util.ArrayList;
import java.util.Date;
import java.util.Timer;
import org.gcube.data.analysis.dataminermanagercl.server.dmservice.SClient;
import org.gcube.data.analysis.dataminermanagercl.server.monitor.DMMonitor;
import org.gcube.data.analysis.dataminermanagercl.server.monitor.DMMonitorListener;
import org.gcube.data.analysis.dataminermanagercl.server.monitor.DMMonitorTask;
import org.gcube.data.analysis.dataminermanagercl.shared.data.computations.ComputationId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class AsynchDMMonitor extends DMMonitor {
private static final Logger log= LoggerFactory.getLogger(AsynchDMMonitor.class);
private int sleep = 2000; // Sleep duration in millisecond
private int delay = 2000; // Delay on first check in millisecond
private int period = 2000;// Interval between monitoring requests in millisecond
private ComputationId computationId;
private SClient sClient;
private ArrayList<DMMonitorListener> listeners = new ArrayList<DMMonitorListener>();
private Timer timer;
private boolean notEnd;
public AsynchDMMonitor(ComputationId computationId, SClient sClient) {
super(computationId, sClient);
this.computationId = computationId;
this.sClient = sClient;
}
@Override
public void start() {
throw new RuntimeException("Unecpetcted call to start() method");
}
public void startAsynch() throws DMException {
try {
log.debug("Start Monitoring");
notEnd = true;
timer = new Timer(false);
DMMonitorTask dmMonitorTask = new DMMonitorTask(this,
computationId, sClient, listeners);
log.debug("Start: " + new Date());
timer.schedule(dmMonitorTask, delay, period);
log.debug("Scheduled.");
} catch (Throwable e) {
throw new DMException(e);
}
}
public void add(DMMonitorListener listener) {
listeners.add(listener);
}
public void addAll(ArrayList<DMMonitorListener> listeners) {
this.listeners.addAll(listeners);
}
public void cancel() {
if (timer != null)
timer.cancel();
notEnd = false;
}
/**
*
* @return Sleep duration in millisecond
*/
public int getSleep() {
return sleep;
}
/**
*
* @param sleep Sleep duration in millisecond
*/
public void setSleep(int sleep) {
this.sleep = sleep;
}
/**
*
* @return Delay on first check in millisecond
*/
public int getDelay() {
return delay;
}
/**
*
* @param delay Delay on first check in millisecond
*/
public void setDelay(int delay) {
this.delay = delay;
}
/**
*
* @return Interval between monitoring requests in millisecond
*/
public int getPeriod() {
return period;
}
/**
*
* @param period Interval between monitoring requests in millisecond
*/
public void setPeriod(int period) {
this.period = period;
}
}

View File

@ -1,5 +1,7 @@
package org.gcube.application.perform.service.engine.dm;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.gcube.application.perform.service.engine.model.importer.ImportRoutineDescriptor;
@ -7,7 +9,11 @@ import org.gcube.data.analysis.dataminermanagercl.server.DataMinerService;
import org.gcube.data.analysis.dataminermanagercl.server.dmservice.SClient;
import org.gcube.data.analysis.dataminermanagercl.server.monitor.DMMonitor;
import org.gcube.data.analysis.dataminermanagercl.server.monitor.DMMonitorListener;
import org.gcube.data.analysis.dataminermanagercl.shared.data.OutputData;
import org.gcube.data.analysis.dataminermanagercl.shared.data.computations.ComputationId;
import org.gcube.data.analysis.dataminermanagercl.shared.data.output.FileResource;
import org.gcube.data.analysis.dataminermanagercl.shared.data.output.MapResource;
import org.gcube.data.analysis.dataminermanagercl.shared.data.output.Resource;
import org.gcube.data.analysis.dataminermanagercl.shared.parameters.Parameter;
import org.gcube.data.analysis.dataminermanagercl.shared.process.Operator;
import org.slf4j.Logger;
@ -17,60 +23,87 @@ import org.slf4j.LoggerFactory;
public class DMUtils {
private static final Logger log= LoggerFactory.getLogger(DMUtils.class);
public static SClient getClient() throws DMException {
try {
return new DataMinerService().getClient();
return new DataMinerService().getClient();
}catch(Exception e) {
throw new DMException(e);
}
}
public static ComputationId getComputation(ImportRoutineDescriptor desc) {
return new ComputationId(desc.getComputationId(), desc.getComputationUrl(), desc.getComputationOperator(), desc.getComputationOperatorName(), desc.getComputationRequest());
}
public static void monitor(SClient client,ComputationId computationId,DMMonitorListener listener) {
DMMonitor monitor=new DMMonitor(computationId,client);
monitor.add(listener);
monitor.start();
}
public static void monitor(ComputationId computationId,DMMonitorListener listener) throws DMException {
DMMonitor monitor=new DMMonitor(computationId,getClient());
AsynchDMMonitor monitor=new AsynchDMMonitor(computationId,getClient());
monitor.add(listener);
monitor.start();
monitor.startAsynch();
}
public static ComputationId submitJob(String operatorId, Map<String,String> parameters) throws DMException {
return submitJob(getClient(),operatorId,parameters);
}
public static ComputationId submitJob(SClient client, String operatorId, Map<String,String> parameters) throws DMException {
try {
log.debug("Looking for operator by Id {} ",operatorId);
Operator op=client.getOperatorById(operatorId);
log.debug("Preparing parameters, values : {} ",parameters);
for(Parameter param:op.getOperatorParameters()) {
String paramName=param.getName();
if(parameters.containsKey(paramName))
param.setValue(parameters.get(paramName));
}
log.debug("Looking for operator by Id {} ",operatorId);
Operator op=client.getOperatorById(operatorId);
log.info("Submitting Operator {} to DM",op);
return client.startComputation(op);
List<Parameter> params=client.getInputParameters(op);
log.debug("Preparing parameters, values : {} ",parameters);
for(Parameter param:params) {
String paramName=param.getName();
if(parameters.containsKey(paramName))
param.setValue(parameters.get(paramName));
}
op.setOperatorParameters(params);
log.info("Submitting Operator {} to DM",op);
return client.startComputation(op);
}catch(Exception e) {
throw new DMException(e);
}
}
public static final Map<String,String> getOutputFiles(ComputationId computationId){
throw new RuntimeException("Missing Implementation");
public static final Map<String,String> getOutputFiles(ComputationId computationId) throws DMException{
try{
Map<String,String> toReturn=new HashMap<String,String>();
SClient client=getClient();
OutputData data=client.getOutputDataByComputationId(computationId);
Resource resource = data.getResource();
if (resource.isMap()) {
MapResource mapResource = (MapResource) resource;
for (String key : mapResource.getMap().keySet()) {
Resource res = mapResource.getMap().get(key);
switch (res.getResourceType()) {
case FILE:
FileResource fileResource = (FileResource) res;
toReturn.put(fileResource.getDescription(), fileResource.getUrl());
break;
}
}
}
return toReturn;
}catch(Exception e) {
throw new DMException(e);
}
}
}

View File

@ -4,7 +4,6 @@ import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.Timestamp;
import java.time.Instant;
import java.util.Map;
import javax.inject.Inject;
@ -14,11 +13,9 @@ import org.gcube.application.perform.service.engine.impl.Queries;
import org.gcube.application.perform.service.engine.model.DBField;
import org.gcube.application.perform.service.engine.model.DBField.ImportRoutine;
import org.gcube.application.perform.service.engine.model.DBQueryDescriptor;
import org.gcube.application.perform.service.engine.model.ISQueryDescriptor;
import org.gcube.application.perform.service.engine.model.importer.ImportRoutineDescriptor;
import org.gcube.application.perform.service.engine.model.importer.ImportStatus;
import org.gcube.data.analysis.dataminermanagercl.server.monitor.DMMonitorListener;
import org.gcube.data.analysis.dataminermanagercl.shared.data.computations.ComputationId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -27,16 +24,17 @@ public class ImporterMonitor implements DMMonitorListener {
private static final Logger log= LoggerFactory.getLogger(ImporterMonitor.class);
@Inject
private PerformanceManager performance;
public ImporterMonitor(ImportRoutineDescriptor routine) {
super();
this.routine = routine;
}
private ImportRoutineDescriptor routine;
public ImporterMonitor(PerformanceManager performance, ImportRoutineDescriptor routine) {
super();
this.performance = performance;
this.routine = routine;
}
@Override
public void accepted() {
@ -50,10 +48,12 @@ public class ImporterMonitor implements DMMonitorListener {
@Override
public void complete(double percentage) {
updateStatus(ImportStatus.COMPLETE,routine);
try{
performance.loadOutputData(routine);
updateStatus(ImportStatus.COMPLETE,routine);
log.debug("Completed monitoring of {} ",routine);
}catch(Throwable t) {
log.error("Unable to load output data for "+routine,t);
updateStatus(ImportStatus.FAILED,routine);
}
}
@ -102,12 +102,5 @@ public class ImporterMonitor implements DMMonitorListener {
}
private static final void loadOutputData(ImportRoutineDescriptor routine, ISQueryDescriptor is) {
log.debug("Loading output data for {} ",routine);
ComputationId id=DMUtils.getComputation(routine);
Map<String,String> outputFiles=DMUtils.getOutputFiles(id);
// if(outputFiles.containsKey(Local))
}
}

View File

@ -11,9 +11,12 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import javax.inject.Inject;
import org.gcube.application.perform.service.LocalConfiguration;
import org.gcube.application.perform.service.engine.DataBaseManager;
import org.gcube.application.perform.service.engine.Importer;
import org.gcube.application.perform.service.engine.PerformanceManager;
import org.gcube.application.perform.service.engine.dm.DMException;
import org.gcube.application.perform.service.engine.dm.DMUtils;
import org.gcube.application.perform.service.engine.dm.ImporterMonitor;
@ -22,6 +25,7 @@ import org.gcube.application.perform.service.engine.model.DBField;
import org.gcube.application.perform.service.engine.model.DBField.ImportRoutine;
import org.gcube.application.perform.service.engine.model.DBQueryDescriptor;
import org.gcube.application.perform.service.engine.model.InternalException;
import org.gcube.application.perform.service.engine.model.InvalidRequestException;
import org.gcube.application.perform.service.engine.model.importer.ImportRequest;
import org.gcube.application.perform.service.engine.model.importer.ImportRoutineDescriptor;
import org.gcube.application.perform.service.engine.model.importer.ImportStatus;
@ -40,7 +44,8 @@ public class ImporterImpl implements Importer {
private static final Logger log= LoggerFactory.getLogger(ImporterImpl.class);
@Inject
private PerformanceManager performance;
@ -67,15 +72,15 @@ public class ImporterImpl implements Importer {
try {
conn.setAutoCommit(true);
String hostname=getHostname();
DBField lockField=ImportRoutine.fields.get(ImportRoutine.LOCK);
PreparedStatement psOrphans=Queries.ORPHAN_IMPORTS.get(conn,new DBQueryDescriptor(lockField, hostname));
PreparedStatement psAcquire=Queries.ACQUIRE_IMPORT_ROUTINE.prepare(conn);
// set ps
ResultSet rsOrphans=psOrphans.executeQuery();
long monitoredCount=0l;
while(rsOrphans.next()) {
@ -127,13 +132,13 @@ public class ImporterImpl implements Importer {
private void monitor(ImportRoutineDescriptor desc) throws DMException {
log.debug("Monitoring {} ",desc);
DMUtils.monitor(DMUtils.getComputation(desc), new ImporterMonitor(desc));
DMUtils.monitor(DMUtils.getComputation(desc), new ImporterMonitor(performance,desc));
}
private ComputationId submit(ImportRequest request) throws DMException {
private ComputationId submit(ImportRequest request) throws DMException, InvalidRequestException {
/**
* dataminer-prototypes.d4science.org/wps/WebProcessingService?
* request=Execute&service=WPS&Version=1.0.0&gcube-token=***REMOVED***&lang=en-US&
@ -145,11 +150,15 @@ public class ImporterImpl implements Importer {
* FarmID=ID
*/
log.debug("Preparing DM Parameters from request : {} ",request);
Map<String,String> parameters=new HashMap<>();
parameters.put("InputData", request.getSource());
parameters.put("BatchType", request.getBatchType());
parameters.put("FarmID", request.getFarmId().toString());
try {
parameters.put("InputData", request.getSource());
parameters.put("BatchType", request.getBatchType());
parameters.put("FarmID", request.getFarmId().toString());
}catch(Throwable t) {
throw new InvalidRequestException("Invalid request : "+request,t);
}
return DMUtils.submitJob(LocalConfiguration.getProperty(LocalConfiguration.IMPORTER_COMPUTATION_ID), parameters);
}

View File

@ -1,8 +1,11 @@
package org.gcube.application.perform.service.engine.impl;
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.Reader;
import java.net.URL;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
@ -10,6 +13,7 @@ import java.sql.SQLException;
import java.sql.Statement;
import java.sql.Timestamp;
import java.sql.Types;
import java.time.Instant;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
@ -23,6 +27,7 @@ import org.apache.commons.csv.CSVRecord;
import org.gcube.application.perform.service.LocalConfiguration;
import org.gcube.application.perform.service.engine.DataBaseManager;
import org.gcube.application.perform.service.engine.PerformanceManager;
import org.gcube.application.perform.service.engine.dm.DMException;
import org.gcube.application.perform.service.engine.dm.DMUtils;
import org.gcube.application.perform.service.engine.model.CSVExportRequest;
import org.gcube.application.perform.service.engine.model.DBField;
@ -39,16 +44,16 @@ import org.slf4j.LoggerFactory;
public class PerformanceManagerImpl implements PerformanceManager{
private static final Logger log= LoggerFactory.getLogger(PerformanceManagerImpl.class);
private static Map<AnalysisType,Set<ImportedTable>> analysisConfiguration=new HashMap<>();
public static Map<AnalysisType, Set<ImportedTable>> getAnalysisConfiguration() {
return analysisConfiguration;
}
@Override
public Map<String, String> generateCSV(CSVExportRequest request) throws SQLException, InvalidRequestException, InternalException, IOException {
log.trace("Serving {} ",request);
@ -66,7 +71,7 @@ public class PerformanceManagerImpl implements PerformanceManager{
}
@Override
public void loadOutputData(ImportRoutineDescriptor desc) throws SQLException, InvalidRequestException, InternalException, IOException{
public void loadOutputData(ImportRoutineDescriptor desc) throws SQLException, InvalidRequestException, InternalException, IOException, DMException{
log.info("Importing output for {} ",desc);
ComputationId computation=DMUtils.getComputation(desc);
Map<String,String> outputs=DMUtils.getOutputFiles(computation);
@ -75,10 +80,12 @@ public class PerformanceManagerImpl implements PerformanceManager{
for(Entry<String,String> entry:outputs.entrySet()) {
parse(entry.getValue(),entry.getKey(),desc,conn);
}
log.info("IMPORTED ALL FILES for {}, gonna clean previous routines output. ",desc);
log.debug("IMPORTED ALL FILES for {}, gonna clean previous routines output. ",desc);
removeOlderEquivalents(desc, conn);
log.debug("COMMITTING...");
conn.commit();
log.info("Successfully imported data for {} ",desc);
}finally {
conn.close();
}
@ -99,19 +106,19 @@ public class PerformanceManagerImpl implements PerformanceManager{
if(Boolean.parseBoolean(LocalConfiguration.getProperty(LocalConfiguration.COMMIT_SCHEMA)))
conn.commit();
}
public static void importSchema(SchemaDefinition schema,String csvBasePath) throws IOException, SQLException, InternalException {
log.info("Loading schema {} ",schema);
String actualCSVPath=csvBasePath+"/"+schema.getCsvPath();
log.debug("CSV path : {} ",actualCSVPath);
ArrayList<DBField> csvFieldsDefinition=getCSVFieldsDefinition(actualCSVPath);
ArrayList<DBField> csvFieldsDefinition=getCSVFieldsDefinition(actualCSVPath,schema);
AnalysisType analysisType=schema.getRelatedAnalysis();
String tablename=(analysisType.getId()+"_"+schema.getRelatedDescription()).toLowerCase().replaceAll(" ", "_");
@ -127,7 +134,7 @@ public class PerformanceManagerImpl implements PerformanceManager{
}
static Set<ImportedTable> getAnalysisSet(CSVExportRequest request) throws InvalidRequestException{
AnalysisType type=request.getType();
if(!analysisConfiguration.containsKey(type))
@ -137,15 +144,16 @@ public class PerformanceManagerImpl implements PerformanceManager{
private static final void removeOlderEquivalents(ImportRoutineDescriptor last,Connection conn) throws SQLException, InvalidRequestException {
log.debug("Removing imports replaced by {} ",last);
DBQueryDescriptor desc=new DBQueryDescriptor().
add(DBField.ImportRoutine.fields.get(ImportRoutine.BATCH_TYPE),last.getBatch_type()).
add(DBField.ImportRoutine.fields.get(ImportRoutine.SOURCE_URL),last.getSourceUrl()).
add(DBField.ImportRoutine.fields.get(ImportRoutine.END),new Timestamp(last.getEndTime().toEpochMilli()));
add(DBField.ImportRoutine.fields.get(ImportRoutine.BATCH_TYPE),last.getBatch_type()).
add(DBField.ImportRoutine.fields.get(ImportRoutine.SOURCE_URL),last.getSourceUrl()).
add(DBField.ImportRoutine.fields.get(ImportRoutine.ID),last.getId()).
add(DBField.ImportRoutine.fields.get(ImportRoutine.END),new Timestamp(Instant.now().toEpochMilli()));
ResultSet rsEquivalents=Queries.GET_OLDER_EQUIVALENT_IMPORT_ROUTINE.get(conn, desc).executeQuery();
while(rsEquivalents.next()) {
ImportRoutineDescriptor older=Queries.rowToDescriptor(rsEquivalents);
log.debug("Removing outputs from {} ",older);
@ -155,17 +163,19 @@ public class PerformanceManagerImpl implements PerformanceManager{
table.cleanByImportRoutine(older,conn);
}
}
}
private static final long parse(String path, String description, ImportRoutineDescriptor routine, Connection conn) throws IOException, SQLException, InvalidRequestException {
Reader in = new FileReader(path);
URL csvUrl = new URL(path);
BufferedReader in = new BufferedReader(new InputStreamReader(csvUrl.openStream()));
CSVParser parser= CSVFormat.DEFAULT.withFirstRecordAsHeader().parse(in);
AnalysisType type=new AnalysisType(routine);
try {
log.debug("Parsing file {} : {} ",description,path);
// Extract CSV Schema
@ -205,11 +215,22 @@ public class PerformanceManagerImpl implements PerformanceManager{
private static final String INTEGER_REGEX="\\d*";
private static ArrayList<DBField> getCSVFieldsDefinition(String csvFile) throws IOException{
private static ArrayList<DBField> getCSVFieldsDefinition(String csvFile,SchemaDefinition schema) throws IOException{
Reader in = null;
CSVParser parser= null;
try {
HashSet<String> deanonimizationLabels=new HashSet<>();
if(schema.getAssociationUUIDField()!=null)
deanonimizationLabels.add(schema.getAssociationUUIDField());
if(schema.getFarmUUIDField()!=null)
deanonimizationLabels.add(schema.getFarmUUIDField());
if(schema.getBatchUUIDField()!=null)
deanonimizationLabels.add(schema.getBatchUUIDField());
if(schema.getCompanyUUIDField()!=null)
deanonimizationLabels.add(schema.getCompanyUUIDField());
in=new FileReader(csvFile);
parser=CSVFormat.DEFAULT.withFirstRecordAsHeader().parse(in);
Map<String,Integer> headers=parser.getHeaderMap();
@ -219,10 +240,15 @@ public class PerformanceManagerImpl implements PerformanceManager{
for(Entry<String,Integer> header:headers.entrySet()) {
String value=record.get(header.getKey());
String name=header.getKey();
int type=Types.VARCHAR;
if(value.matches(FLOAT_REGEX)) type=Types.REAL;
else if(value.matches(INTEGER_REGEX)) type=Types.BIGINT;
toReturn.add(new DBField(type, header.getKey()));
//Deanonimized fields will always contain strings
if(!deanonimizationLabels.contains(name)) {
//NB INT will be managed as real in order to deal with Dataminer output format
if(value.matches(FLOAT_REGEX)||value.matches(INTEGER_REGEX)) type=Types.REAL;
}
toReturn.add(new DBField(type, name));
}
return toReturn;
@ -231,6 +257,6 @@ public class PerformanceManagerImpl implements PerformanceManager{
if(parser!=null) parser.close();
}
}
}

View File

@ -39,9 +39,11 @@ public class Queries {
public static final Query GET_OLDER_EQUIVALENT_IMPORT_ROUTINE=new Query("Select * from "+ImportRoutine.TABLE+" WHERE "
+ImportRoutine.BATCH_TYPE+"=? AND "
+ImportRoutine.SOURCE_URL+"=? AND "
+ImportRoutine.END+"<?",
+ImportRoutine.ID+"<>? AND "
+"("+ImportRoutine.END+"<? OR "+ImportRoutine.END+" IS NULL) ",
new DBField[]{DBField.ImportRoutine.fields.get(ImportRoutine.BATCH_TYPE),
DBField.ImportRoutine.fields.get(ImportRoutine.SOURCE_URL),
DBField.ImportRoutine.fields.get(ImportRoutine.ID),
DBField.ImportRoutine.fields.get(ImportRoutine.END)});

View File

@ -48,6 +48,13 @@ public class ImportRequest {
public void setFarmId(Long farmId) {
this.farmId = farmId;
}
@Override
public String toString() {
return "ImportRequest [source=" + source + ", version=" + version + ", batchType=" + batchType + ", farmId="
+ farmId + "]";
}
}

View File

@ -146,6 +146,17 @@ public class ImportRoutineDescriptor {
public void setComputationRequest(String computationRequest) {
this.computationRequest = computationRequest;
}
@Override
public String toString() {
return "ImportRoutineDescriptor [id=" + id + ", farmId=" + farmId + ", batch_type=" + batch_type
+ ", sourceUrl=" + sourceUrl + ", sourceVersion=" + sourceVersion + ", startTime=" + startTime
+ ", endTime=" + endTime + ", status=" + status + ", lock=" + lock + ", caller=" + caller
+ ", computationId=" + computationId + ", computationUrl=" + computationUrl + ", computationOperator="
+ computationOperator + ", computationOperatorName=" + computationOperatorName + ", computationRequest="
+ computationRequest + "]";
}
}

View File

@ -149,13 +149,18 @@ public class ImportedTable {
for(Entry<String,String> csvField:csvRow.entrySet()) {
DBField toSetField=labels.get(csvField.getKey());
Object value=csvField.getValue();
Object value=csvField.getValue();
if(value!=null)
switch(toSetField.getType()) {
case Types.BIGINT : value=Long.parseLong((String) value);
break;
case Types.REAL : value=Double.parseDouble((String) value);
break;
try {
switch(toSetField.getType()) {
case Types.BIGINT : value=Long.parseLong((String) value);
break;
case Types.REAL : value=Double.parseDouble((String) value);
break;
}
}catch(NumberFormatException e) {
log.error("Unable to parse field {} value was {} ",csvField.getKey(),csvField.getValue());
throw e;
}
desc.add(toSetField, value);
}

View File

@ -30,7 +30,7 @@ import org.slf4j.LoggerFactory;
@ManagedBy(PerformServiceManager.class)
public class Import {
private static final Logger log= LoggerFactory.getLogger(Mappings.class);
private static final Logger log= LoggerFactory.getLogger(Import.class);
@Inject
private Importer importer;
@ -44,15 +44,23 @@ public class Import {
* @param farmid
* @param sourceFile
*/
@POST
public void importExcel(@QueryParam(ServiceConstants.Import.BATCH_TYPE_PARAMETER)String batchType,
@POST
@Produces(MediaType.APPLICATION_JSON)
public ImportRoutineDescriptor importExcel(@QueryParam(ServiceConstants.Import.BATCH_TYPE_PARAMETER)String batchType,
@QueryParam(ServiceConstants.Import.FARM_ID_PARAMETER)Long farmid,
@QueryParam(ServiceConstants.Import.EXCEL_FILE_PARAMETER)String sourceFile,
@QueryParam(ServiceConstants.Import.EXCEL_FILE_VERSION_PARAMETER)String sourceVersion) {
InterfaceCommons.checkMandatory(batchType, ServiceConstants.Import.BATCH_TYPE_PARAMETER);
InterfaceCommons.checkMandatory(farmid, ServiceConstants.Import.FARM_ID_PARAMETER);
InterfaceCommons.checkMandatory(sourceFile, ServiceConstants.Import.EXCEL_FILE_PARAMETER);
InterfaceCommons.checkMandatory(sourceVersion, ServiceConstants.Import.EXCEL_FILE_VERSION_PARAMETER);
ImportRequest req=new ImportRequest(sourceFile, sourceVersion, batchType, farmid);
try {
importer.importExcel(new ImportRequest(sourceFile, sourceVersion, batchType, farmid));
return importer.importExcel(req);
}catch(Throwable t) {
log.warn("Unexpected Exception on IMPORT ",t);
log.warn("Unexpected Exception on IMPORT "+req,t);
throw new WebApplicationException("Unexpected Exception.", t,Response.Status.INTERNAL_SERVER_ERROR);
}

View File

@ -0,0 +1 @@
,fabio,pc-fabio,25.01.2019 16:45,file:///home/fabio/.config/libreoffice/4;

View File

@ -0,0 +1,68 @@
package org.gcube.application.perform.service;
import java.net.MalformedURLException;
import java.nio.file.Paths;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Semaphore;
import org.gcube.application.perform.service.engine.dm.DMException;
import org.gcube.application.perform.service.engine.dm.DMUtils;
import org.gcube.data.analysis.dataminermanagercl.server.monitor.DMMonitorListener;
import org.gcube.data.analysis.dataminermanagercl.shared.data.computations.ComputationId;
public class DMTests {
public static void main(String[] args) throws DMException, MalformedURLException {
Map<String,String> params=new HashMap<String,String>();
TokenSetter.set("/gcube/preprod/preVRE");
LocalConfiguration.init(Paths.get("src/main/webapp/WEB-INF/config.properties").toUri().toURL());
ComputationId compId=DMUtils.submitJob(LocalConfiguration.getProperty(LocalConfiguration.IMPORTER_COMPUTATION_ID), params);
final Semaphore sem=new Semaphore(0);
DMUtils.monitor(compId, new DMMonitorListener() {
@Override
public void running(double percentage) {
System.out.println("RUN");
}
@Override
public void failed(String message, Exception exception) {
System.out.println("FAIL");
sem.release();
}
@Override
public void complete(double percentage) {
System.out.println("DONE");
sem.release();
}
@Override
public void cancelled() {
System.out.println("CANC");
sem.release();
}
@Override
public void accepted() {
System.out.println("ACCEPTED");
}
});
System.out.println("WAITING FOR MONITOR");
try {
sem.acquire();
} catch (InterruptedException e) {
}
System.out.println("WOKE UP");
}
}

View File

@ -0,0 +1,40 @@
package org.gcube.application.perform.service;
import java.io.IOException;
import java.net.MalformedURLException;
import java.nio.file.Paths;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Map.Entry;
import java.util.Set;
import org.gcube.application.perform.service.engine.DataBaseManager;
import org.gcube.application.perform.service.engine.impl.PerformanceManagerImpl;
import org.gcube.application.perform.service.engine.model.InternalException;
import org.gcube.application.perform.service.engine.model.importer.AnalysisType;
import org.gcube.application.perform.service.engine.model.importer.ImportedTable;
public class DeleteSchema {
public static void main(String[] args) throws SQLException, IOException, InternalException {
LocalConfiguration.init(Paths.get("src/main/webapp/WEB-INF/config.properties").toUri().toURL());
PerformServiceLifecycleManager.initSchema("src/main/webapp/WEB-INF");
TokenSetter.set("/gcube/preprod/preVRE");
Connection conn=DataBaseManager.get().getConnection();
Statement stmt=conn.createStatement();
for(Entry<AnalysisType,Set<ImportedTable>> entry:PerformanceManagerImpl.getAnalysisConfiguration().entrySet()) {
for(ImportedTable t:entry.getValue()) {
stmt.execute("DROP TABLE "+t.getTableName());
}
}
conn.commit();
// throw new RuntimeException("Uncomment commit to really perform cleanup");
}
}

View File

@ -7,18 +7,33 @@ import org.junit.Test;
public class ImportTests extends CommonTest{
Long farmid=12682101l;
@Test
public void submit() {
/**
* http://perform.dev.d4science.org/perform-service/gcube/service/import?
* batch_type=HATCHERY_INDIVIDUAL&
* farmid=12682101&
* source=https://data1-d.d4science.org/shub/E_aUJUbDNzeUlLL29KL2xlZUloWFQ5TEdlZ0ZnZzlNNTVLNUEzeDVRNFVoVHlLMW5DVG5RbVVXVzlYeUUzZWFXRA==
* source_version=1.1
*/
WebTarget target=
target(ServiceConstants.Import.PATH).
queryParam(ServiceConstants.Import.BATCH_TYPE_PARAMETER, "pino").
queryParam(ServiceConstants.Import.FARM_ID_PARAMETER, 12682549).
queryParam(ServiceConstants.Import.EXCEL_FILE_PARAMETER, "gino").
queryParam(ServiceConstants.Import.EXCEL_FILE_VERSION_PARAMETER, "gino2.1");
queryParam(ServiceConstants.Import.BATCH_TYPE_PARAMETER, "HATCHERY_INDIVIDUAL").
queryParam(ServiceConstants.Import.FARM_ID_PARAMETER, farmid).
queryParam(ServiceConstants.Import.EXCEL_FILE_PARAMETER, "https://data1-d.d4science.org/shub/E_aUJUbDNzeUlLL29KL2xlZUloWFQ5TEdlZ0ZnZzlNNTVLNUEzeDVRNFVoVHlLMW5DVG5RbVVXVzlYeUUzZWFXRA==").
queryParam(ServiceConstants.Import.EXCEL_FILE_VERSION_PARAMETER, "1.1");
System.out.println(target.getUri());
Response resp=target.request().post(null);
System.out.println("Status : "+resp.getStatus() );
System.out.println("Status : "+resp.getStatus() );
try {
Thread.sleep(1000*60*10);
} catch (InterruptedException e) {
}
}

View File

@ -3,19 +3,23 @@ package org.gcube.application.perform.service;
import java.io.IOException;
import java.net.MalformedURLException;
import java.nio.file.Paths;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Arrays;
import org.gcube.application.perform.service.engine.DataBaseManager;
import org.gcube.application.perform.service.engine.PerformanceManager;
import org.gcube.application.perform.service.engine.impl.ExportCSVQuery;
import org.gcube.application.perform.service.engine.dm.DMException;
import org.gcube.application.perform.service.engine.impl.ImporterImpl;
import org.gcube.application.perform.service.engine.impl.PerformanceManagerImpl;
import org.gcube.application.perform.service.engine.model.CSVExportRequest;
import org.gcube.application.perform.service.engine.impl.Queries;
import org.gcube.application.perform.service.engine.model.DBField;
import org.gcube.application.perform.service.engine.model.DBQueryDescriptor;
import org.gcube.application.perform.service.engine.model.InternalException;
import org.gcube.application.perform.service.engine.model.importer.AnalysisType;
import org.gcube.application.perform.service.engine.model.importer.ImportRoutineDescriptor;
public class LoadSchemaTest {
public static void main(String[] args) throws MalformedURLException, IOException, SQLException, InternalException {
public static void main(String[] args) throws MalformedURLException, IOException, SQLException, InternalException, DMException {
LocalConfiguration.init(Paths.get("src/main/webapp/WEB-INF/config.properties").toUri().toURL());
PerformServiceLifecycleManager.initSchema("src/main/webapp/WEB-INF");
@ -25,13 +29,32 @@ public class LoadSchemaTest {
PerformanceManager mng =new PerformanceManagerImpl();
CSVExportRequest req=new CSVExportRequest(new AnalysisType("GROW_OUT_AGGREGATED", "GROW_OUT_AGGREGATED"));
req.addAreas(Arrays.asList("A1","A2"));
req.addFarmId(12682549l);
req.addPeriods(Arrays.asList("p1"));
req.addSpecies(Arrays.asList("Gadidae"));
System.out.println(mng.generateCSV(req));
/**
* [id=26,
* farmId=12682101,
* batch_type=HATCHERY_INDIVIDUAL,
* sourceUrl=https://data1-d.d4science.org/shub/E_aUJUbDNzeUlLL29KL2xlZUloWFQ5TEdlZ0ZnZzlNNTVLNUEzeDVRNFVoVHlLMW5DVG5RbVVXVzlYeUUzZWFXRA==, sourceVersion=1.1, startTime=2019-01-25T14:52:31.442Z, endTime=null, status=ACCEPTED, lock=localhost, caller=MSgXVCkHb0SDoQLlCLQV9Kj8dPJlJ6gY+XicZJhenQkyuxA11kGUIdhxKs3jUdGK, computationId=7c6f5d7e-b778-42be-af20-b99229fa99ee, computationUrl=http://dataminer1-pre.d4science.org:80//wps/RetrieveResultServlet?id=7c6f5d7e-b778-42be-af20-b99229fa99ee, computationOperator=org.gcube.dataanalysis.wps.statisticalmanager.synchserver.mappedclasses.transducerers.PERFORMFISH_DATA_EXTRACTOR, computationOperatorName=Performfish Data Extractor, computationRequest=https://dataminer1-pre.d4science.org/wps/WebProcessingService?request=Execute&service=WPS&Version=1.0.0&gcube-token=***REMOVED***&lang=en-US&Identifier=org.gcube.dataanalysis.wps.statisticalmanager.synchserver.mappedclasses.transducerers.PERFORMFISH_DATA_EXTRACTOR&DataInputs=InputData=https%3A%2F%2Fdata1-d.d4science.org%2Fshub%2FE_aUJUbDNzeUlLL29KL2xlZUloWFQ5TEdlZ0ZnZzlNNTVLNUEzeDVRNFVoVHlLMW5DVG5RbVVXVzlYeUUzZWFXRA%3D%3D;BatchType=HATCHERY_INDIVIDUAL;FarmID=12682101;]
*/
ResultSet rs=Queries.GET_IMPORT_ROUTINE_BY_ID.get(DataBaseManager.get().getConnection(), new DBQueryDescriptor(DBField.ImportRoutine.fields.get(DBField.ImportRoutine.ID),26l)).executeQuery();
rs.next();
Queries.rowToDescriptor(rs);
ImportRoutineDescriptor desc=Queries.rowToDescriptor(rs);
mng.loadOutputData(desc);
//
// CSVExportRequest req=new CSVExportRequest(new AnalysisType("GROW_OUT_AGGREGATED", "GROW_OUT_AGGREGATED"));
// req.addAreas(Arrays.asList("A1","A2"));
// req.addFarmId(12682549l);
// req.addPeriods(Arrays.asList("p1"));
// req.addSpecies(Arrays.asList("Gadidae"));
//
// System.out.println(mng.generateCSV(req));
}
}