Lucio Lelii 2017-09-22 14:37:28 +00:00
parent c5e12d5439
commit df9a34cff0
4 changed files with 131 additions and 107 deletions

View File

@ -75,6 +75,15 @@
<groupId>org.gcube.core</groupId> <groupId>org.gcube.core</groupId>
<artifactId>common-scope</artifactId> <artifactId>common-scope</artifactId>
</dependency> </dependency>
<!-- https://mvnrepository.com/artifact/com.googlecode.json-simple/json-simple -->
<dependency>
<groupId>com.googlecode.json-simple</groupId>
<artifactId>json-simple</artifactId>
<version>1.1.1</version>
</dependency>
<dependency> <dependency>
<groupId>junit</groupId> <groupId>junit</groupId>
<artifactId>junit</artifactId> <artifactId>junit</artifactId>

View File

@ -12,6 +12,8 @@ import org.gcube.common.resources.gcore.GCoreEndpoint;
import org.gcube.common.scope.api.ScopeProvider; import org.gcube.common.scope.api.ScopeProvider;
import org.gcube.resources.discovery.client.api.DiscoveryClient; import org.gcube.resources.discovery.client.api.DiscoveryClient;
import org.gcube.resources.discovery.client.queries.impl.XQuery; import org.gcube.resources.discovery.client.queries.impl.XQuery;
import org.json.simple.JSONArray;
import org.json.simple.JSONObject;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -32,16 +34,27 @@ public abstract class StandardLocalInfraAlgorithm extends StandardLocalExternalA
} }
@SuppressWarnings("unchecked")
private boolean sendMessage(String token, String email, String subject, String body) throws Exception { private boolean sendMessage(String token, String email, String subject, String body) throws Exception {
String socialServiceEnpoint = retrieveSocialServiceEnpoint(); String socialServiceEnpoint = retrieveSocialServiceEnpoint();
LOGGER.debug("contacting social service endpoint {}",socialServiceEnpoint+SEND_MESSAGE_METHOD+"?gcube-token="+token);
PostMethod putMessage = new PostMethod(socialServiceEnpoint+SEND_MESSAGE_METHOD+"?gcube-token="+token); PostMethod putMessage = new PostMethod(socialServiceEnpoint+SEND_MESSAGE_METHOD+"?gcube-token="+token);
String jsonRequest = String.format("{\"subject\":\"%s\", \"body\":\"%s\", \"recipients\":[\"%s\"]}",subject,body, email);
JSONObject obj = new JSONObject();
obj.put("subject", subject);
obj.put("body", body);
JSONArray list = new JSONArray();
list.add(email);
obj.put("recipients", list);
//String jsonRequest = String.format("{\"subject\":\"%s\", \"body\":\"%s\", \"recipients\":[{\"id\":\"%s\"}]}",subject,body, email);
putMessage.setRequestEntity(new StringRequestEntity(jsonRequest, "application/json" , "UTF-8")); putMessage.setRequestEntity(new StringRequestEntity(obj.toJSONString(), "application/json" , "UTF-8"));
LOGGER.debug("json request is {}", jsonRequest); LOGGER.debug("json request is {}", obj.toJSONString());
HttpClient httpClient = new HttpClient(); HttpClient httpClient = new HttpClient();

View File

@ -13,17 +13,18 @@ import java.net.URLEncoder;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.nio.file.Files; import java.nio.file.Files;
import org.gcube.contentmanagement.lexicalmatcher.utils.AnalysisLogger;
import org.gcube.contentmanager.storageclient.model.protocol.smp.Handler; import org.gcube.contentmanager.storageclient.model.protocol.smp.Handler;
import org.gcube.dataanalysis.ecoengine.interfaces.ActorNode; import org.gcube.dataanalysis.ecoengine.interfaces.ActorNode;
import org.gcube.dataanalysis.ecoengine.interfaces.StandardLocalInfraAlgorithm; import org.gcube.dataanalysis.ecoengine.interfaces.StandardLocalInfraAlgorithm;
import org.apache.log4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class GenericWorker extends StandardLocalInfraAlgorithm{ public class GenericWorker extends StandardLocalInfraAlgorithm{
private static String genericWorkerDir = "/genericworker/"; private static String genericWorkerDir = "/genericworker/";
private static Logger logger = LoggerFactory.getLogger(GenericWorker.class);
public static String AlgorithmClassParameter = "AlgorithmClass"; public static String AlgorithmClassParameter = "AlgorithmClass";
public static String RightSetStartIndexParameter = "RightSetStartIndex"; public static String RightSetStartIndexParameter = "RightSetStartIndex";
public static String NumberOfRightElementsToProcessParameter = "NumberOfRightElementsToProcess"; public static String NumberOfRightElementsToProcessParameter = "NumberOfRightElementsToProcess";
@ -33,12 +34,12 @@ public class GenericWorker extends StandardLocalInfraAlgorithm{
public static String SessionParameter = "Session"; public static String SessionParameter = "Session";
public static String ConfigurationFileParameter = "ConfigurationFile"; public static String ConfigurationFileParameter = "ConfigurationFile";
public static String DeleteTemporaryFilesParameter = "DeleteTemporaryFiles"; public static String DeleteTemporaryFilesParameter = "DeleteTemporaryFiles";
public static String OutputParameter = "Process_Outcome"; public static String OutputParameter = "Process_Outcome";
public static String TASK_SUCCESS = "TASK_SUCCESS"; public static String TASK_SUCCESS = "TASK_SUCCESS";
public static String TASK_FAILURE = "TASK_FAILURE"; public static String TASK_FAILURE = "TASK_FAILURE";
public static String TASK_UNDEFINED = "TASK_UNDEFINED"; public static String TASK_UNDEFINED = "TASK_UNDEFINED";
private static void inputStreamToFile(InputStream is, String path) throws FileNotFoundException, IOException { private static void inputStreamToFile(InputStream is, String path) throws FileNotFoundException, IOException {
FileOutputStream out = new FileOutputStream(new File(path)); FileOutputStream out = new FileOutputStream(new File(path));
byte buf[] = new byte[1024]; byte buf[] = new byte[1024];
@ -47,7 +48,7 @@ public class GenericWorker extends StandardLocalInfraAlgorithm{
out.write(buf, 0, len); out.write(buf, 0, len);
out.close(); out.close();
} }
public void executeAlgorithm(String algorithmClass, public void executeAlgorithm(String algorithmClass,
int rightStartIndex, int rightStartIndex,
int numberOfRightElementsToProcess, int numberOfRightElementsToProcess,
@ -57,88 +58,86 @@ public class GenericWorker extends StandardLocalInfraAlgorithm{
String session, String session,
File nodeConfigurationFileObject, File nodeConfigurationFileObject,
boolean deleteFiles) throws Exception { boolean deleteFiles) throws Exception {
PrintStream origOut = System.out; PrintStream origOut = System.out;
PrintStream origErr = System.err; PrintStream origErr = System.err;
status = 0f; status = 0f;
Logger logger = AnalysisLogger.getLogger();
StringBuffer sb = new StringBuffer(); StringBuffer sb = new StringBuffer();
File tempDir = null ; File tempDir = null ;
try { try {
Handler.activateProtocol(); Handler.activateProtocol();
// invoke the algorithm // invoke the algorithm
logger.debug("GenericWorker-> Creating algorithm " + algorithmClass); logger.debug("GenericWorker-> Creating algorithm " + algorithmClass);
ActorNode node = (ActorNode) Class.forName(algorithmClass).newInstance(); ActorNode node = (ActorNode) Class.forName(algorithmClass).newInstance();
logger.debug("GenericWorker-> executing algorithm " + algorithmClass +" with parameters:"); logger.debug("GenericWorker-> executing algorithm " + algorithmClass +" with parameters:");
logger.debug("GenericWorker-> rightStartIndex:" +rightStartIndex); logger.debug("GenericWorker-> rightStartIndex:" +rightStartIndex);
logger.debug("GenericWorker-> numberOfRightElementsToProcess:" +numberOfRightElementsToProcess); logger.debug("GenericWorker-> numberOfRightElementsToProcess:" +numberOfRightElementsToProcess);
logger.debug("GenericWorker-> leftStartIndex:" +leftStartIndex); logger.debug("GenericWorker-> leftStartIndex:" +leftStartIndex);
logger.debug("GenericWorker-> numberOfLeftElementsToProcess:" +numberOfLeftElementsToProcess); logger.debug("GenericWorker-> numberOfLeftElementsToProcess:" +numberOfLeftElementsToProcess);
logger.debug("GenericWorker-> isduplicate:" +isduplicate); logger.debug("GenericWorker-> isduplicate:" +isduplicate);
logger.debug("GenericWorker-> execution directory:" +config.getConfigPath()); logger.debug("GenericWorker-> execution directory:" +config.getConfigPath());
logger.debug("GenericWorker-> nodeConfigurationFileObject.getName():" +nodeConfigurationFileObject.getName()); logger.debug("GenericWorker-> nodeConfigurationFileObject.getName():" +nodeConfigurationFileObject.getName());
logger.debug("GenericWorker-> nodeConfigurationFileObject.getPath():" +nodeConfigurationFileObject.getAbsolutePath()); logger.debug("GenericWorker-> nodeConfigurationFileObject.getPath():" +nodeConfigurationFileObject.getAbsolutePath());
logger.debug("GenericWorker-> session :" +session); logger.debug("GenericWorker-> session :" +session);
logger.debug("GenericWorker-> delete files :" +deleteFiles); logger.debug("GenericWorker-> delete files :" +deleteFiles);
File sandboxfile = new File(config.getConfigPath(),nodeConfigurationFileObject.getName()); File sandboxfile = new File(config.getConfigPath(),nodeConfigurationFileObject.getName());
Files.copy(nodeConfigurationFileObject.toPath(), sandboxfile.toPath(), REPLACE_EXISTING); Files.copy(nodeConfigurationFileObject.toPath(), sandboxfile.toPath(), REPLACE_EXISTING);
logger.debug("GenericWorker-> copied configuration file as " +sandboxfile.getAbsolutePath()); logger.debug("GenericWorker-> copied configuration file as " +sandboxfile.getAbsolutePath());
ByteArrayOutputStream baos = new ByteArrayOutputStream(); ByteArrayOutputStream baos = new ByteArrayOutputStream();
PrintStream ps = new PrintStream(baos); PrintStream ps = new PrintStream(baos);
System.setOut(ps); System.setOut(ps);
System.setErr(ps); System.setErr(ps);
int result = node.executeNode(leftStartIndex, numberOfLeftElementsToProcess, rightStartIndex, numberOfRightElementsToProcess, isduplicate, int result = node.executeNode(leftStartIndex, numberOfLeftElementsToProcess, rightStartIndex, numberOfRightElementsToProcess, isduplicate,
config.getConfigPath(), nodeConfigurationFileObject.getName(), "log.txt"); config.getConfigPath(), nodeConfigurationFileObject.getName(), "log.txt");
System.setOut(origOut); System.setOut(origOut);
System.setErr(origErr); System.setErr(origErr);
String log = new String(baos.toByteArray(), StandardCharsets.UTF_8); String log = new String(baos.toByteArray(), StandardCharsets.UTF_8);
//manage known issues //manage known issues
/* /*
log=log.replace(".XMLStreamException: Unbound namespace URI", "Known Except"); log=log.replace(".XMLStreamException: Unbound namespace URI", "Known Except");
log=log.replace("java.io.IOException: Error copying XML", "Known Except"); log=log.replace("java.io.IOException: Error copying XML", "Known Except");
log=log.replace("java.io.FileNotFoundException: /home/gcube/tomcat/tmp/ConfigurationFile", "Known Except"); log=log.replace("java.io.FileNotFoundException: /home/gcube/tomcat/tmp/ConfigurationFile", "Known Except");
log=log.replace("java.io.FileNotFoundException: payload was not made available for this dataset", "Known Except"); log=log.replace("java.io.FileNotFoundException: payload was not made available for this dataset", "Known Except");
logger.debug("GenericWorker-> Execution Fulllog" ); logger.debug("GenericWorker-> Execution Fulllog" );
logger.debug("GenericWorker-> " + log); logger.debug("GenericWorker-> " + log);
logger.debug("GenericWorker-> Script executed! " ); logger.debug("GenericWorker-> Script executed! " );
*/ */
boolean del = sandboxfile.delete(); boolean del = sandboxfile.delete();
logger.debug("GenericWorker-> deleted sandbox file: "+del ); logger.debug("GenericWorker-> deleted sandbox file: "+del );
logger.debug("GenericWorker-> all done"); logger.debug("GenericWorker-> all done");
//if (log.contains("Exception:")){ //if (log.contains("Exception:")){
if (result!= 0){ if (result!= 0){
outputParameters.put(OutputParameter, TASK_FAILURE); outputParameters.put(OutputParameter, TASK_FAILURE);
String cutLog = URLEncoder.encode(log, "UTF-8"); String cutLog = URLEncoder.encode(log, "UTF-8");
/* /*
int maxlen = 20240; int maxlen = 20240;
if (log.length()>maxlen) if (log.length()>maxlen)
cutLog = cutLog.substring(0,maxlen)+"..."; cutLog = cutLog.substring(0,maxlen)+"...";
*/ */
cutLog = log; cutLog = log;
outputParameters.put("Log", cutLog); outputParameters.put("Log", cutLog);
logger.debug("GenericWorker-> Failure!"); logger.debug("GenericWorker-> Failure!");
} }
else{ else{
outputParameters.put(OutputParameter, TASK_SUCCESS); outputParameters.put(OutputParameter, TASK_SUCCESS);
logger.debug("GenericWorker-> Success!"); logger.debug("GenericWorker-> Success!");
} }
} catch (Throwable e) { } catch (Throwable e) {
outputParameters.put(OutputParameter, TASK_FAILURE); outputParameters.put(OutputParameter, TASK_FAILURE);
outputParameters.put("Log", e.getLocalizedMessage()); outputParameters.put("Log", e.getLocalizedMessage());
e.printStackTrace(); logger.error("GenericWorker-> ERROR: " ,e);
logger.debug("GenericWorker-> ERROR: " + e.getLocalizedMessage());
status = 100f; status = 100f;
throw new Exception(e.getLocalizedMessage()); throw new Exception(e.getLocalizedMessage());
} }
@ -146,25 +145,24 @@ public class GenericWorker extends StandardLocalInfraAlgorithm{
System.setOut(origOut); System.setOut(origOut);
System.setErr(origErr); System.setErr(origErr);
try{ try{
if (deleteFiles && (tempDir!=null)) { if (deleteFiles && (tempDir!=null)) {
logger.debug("GenericWorker-> ... deleting local files"); logger.debug("GenericWorker-> ... deleting local files");
// delete all after execution // delete all after execution
for (File singlefile : tempDir.listFiles()) { for (File singlefile : tempDir.listFiles()) {
boolean del = singlefile.delete(); boolean del = singlefile.delete();
if (!del) if (!del)
logger.debug("GenericWorker-> ERROR deleting " + singlefile.getName() + " " + del); logger.debug("GenericWorker-> ERROR deleting " + singlefile.getName() + " " + del);
else else
logger.debug("GenericWorker-> deleted LOCAL FILE " + singlefile.getName() + " " + del); logger.debug("GenericWorker-> deleted LOCAL FILE " + singlefile.getName() + " " + del);
}
logger.debug("GenericWorker-> deleting temporary directory");
tempDir.delete();
} }
logger.debug("GenericWorker-> deleting temporary directory"); if (nodeConfigurationFileObject!=null && nodeConfigurationFileObject.exists())
tempDir.delete(); nodeConfigurationFileObject.delete();
}
if (nodeConfigurationFileObject!=null && nodeConfigurationFileObject.exists())
nodeConfigurationFileObject.delete();
}catch(Exception e3){ }catch(Exception e3){
e3.printStackTrace(); logger.warn("GenericWorker-> Error deleting files",e3);
logger.debug("GenericWorker-> Error deleting files");
} }
status = 100f; status = 100f;
} }
@ -172,8 +170,8 @@ public class GenericWorker extends StandardLocalInfraAlgorithm{
@Override @Override
public void init() throws Exception { public void init() throws Exception {
} }
@Override @Override
@ -183,10 +181,10 @@ public class GenericWorker extends StandardLocalInfraAlgorithm{
@Override @Override
protected void process() throws Exception { protected void process() throws Exception {
AnalysisLogger.getLogger().debug("Parameters: "+config.getGeneralProperties()); logger.debug("Parameters: "+config.getGeneralProperties());
String algorithmClass = config.getParam(AlgorithmClassParameter); String algorithmClass = config.getParam(AlgorithmClassParameter);
int rightStartIndex = Integer.parseInt(config.getParam(RightSetStartIndexParameter)); int rightStartIndex = Integer.parseInt(config.getParam(RightSetStartIndexParameter));
int numberOfRightElementsToProcess =Integer.parseInt(config.getParam(NumberOfRightElementsToProcessParameter)); int numberOfRightElementsToProcess =Integer.parseInt(config.getParam(NumberOfRightElementsToProcessParameter));
int leftStartIndex =Integer.parseInt(config.getParam(LeftSetStartIndexParameter)); int leftStartIndex =Integer.parseInt(config.getParam(LeftSetStartIndexParameter));
@ -195,16 +193,16 @@ public class GenericWorker extends StandardLocalInfraAlgorithm{
String session=config.getParam(SessionParameter); String session=config.getParam(SessionParameter);
File nodeConfigurationFileObject=new File (config.getParam(ConfigurationFileParameter)); File nodeConfigurationFileObject=new File (config.getParam(ConfigurationFileParameter));
boolean deleteFiles= Boolean.parseBoolean(config.getParam(DeleteTemporaryFilesParameter)); boolean deleteFiles= Boolean.parseBoolean(config.getParam(DeleteTemporaryFilesParameter));
AnalysisLogger.getLogger().debug("Executing the algorithm"); logger.debug("Executing the algorithm");
executeAlgorithm(algorithmClass, rightStartIndex, numberOfRightElementsToProcess, leftStartIndex, numberOfLeftElementsToProcess, isduplicate, session, nodeConfigurationFileObject, deleteFiles); executeAlgorithm(algorithmClass, rightStartIndex, numberOfRightElementsToProcess, leftStartIndex, numberOfLeftElementsToProcess, isduplicate, session, nodeConfigurationFileObject, deleteFiles);
AnalysisLogger.getLogger().debug("Algorithm executed!"); logger.debug("Algorithm executed!");
} }
@Override @Override
protected void setInputParameters() { protected void setInputParameters() {
addStringInput(AlgorithmClassParameter, "The full class path of the algorithm", "org.gcube.dataanalysis.executor.nodes.transducers.bionym.BionymFlexibleWorkflowTransducer"); addStringInput(AlgorithmClassParameter, "The full class path of the algorithm", "org.gcube.dataanalysis.executor.nodes.transducers.bionym.BionymFlexibleWorkflowTransducer");
addIntegerInput(RightSetStartIndexParameter, "The start index of the right set in a cartesian product of the input", "1"); addIntegerInput(RightSetStartIndexParameter, "The start index of the right set in a cartesian product of the input", "1");
addIntegerInput(NumberOfRightElementsToProcessParameter, "The number of elements to process in the right set", "1"); addIntegerInput(NumberOfRightElementsToProcessParameter, "The number of elements to process in the right set", "1");
@ -218,7 +216,7 @@ public class GenericWorker extends StandardLocalInfraAlgorithm{
@Override @Override
public void shutdown() { public void shutdown() {
} }
} }

View File

@ -2,8 +2,6 @@ package org.gcube.dataanalysis.executor.nodes.transducers;
import java.io.File; import java.io.File;
import java.util.List; import java.util.List;
import org.gcube.contentmanagement.lexicalmatcher.utils.AnalysisLogger;
import org.gcube.dataanalysis.ecoengine.configuration.ALG_PROPS; import org.gcube.dataanalysis.ecoengine.configuration.ALG_PROPS;
import org.gcube.dataanalysis.ecoengine.configuration.AlgorithmConfiguration; import org.gcube.dataanalysis.ecoengine.configuration.AlgorithmConfiguration;
import org.gcube.dataanalysis.ecoengine.datatypes.StatisticalType; import org.gcube.dataanalysis.ecoengine.datatypes.StatisticalType;
@ -12,9 +10,13 @@ import org.gcube.dataanalysis.ecoengine.transducers.OccurrencePointsMerger;
import org.gcube.dataanalysis.ecoengine.utils.Transformations; import org.gcube.dataanalysis.ecoengine.utils.Transformations;
import org.gcube.dataanalysis.executor.generators.D4ScienceDistributedProcessing; import org.gcube.dataanalysis.executor.generators.D4ScienceDistributedProcessing;
import org.hibernate.SessionFactory; import org.hibernate.SessionFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class OccurrenceMergingNode extends ActorNode { public class OccurrenceMergingNode extends ActorNode {
private static Logger logger = LoggerFactory.getLogger(OccurrenceMergingNode.class);
// variables // variables
protected AlgorithmConfiguration currentconfig; protected AlgorithmConfiguration currentconfig;
protected SessionFactory dbHibConnection; protected SessionFactory dbHibConnection;
@ -69,12 +71,14 @@ public class OccurrenceMergingNode extends ActorNode {
try{ try{
status = 0; status = 0;
AlgorithmConfiguration config = Transformations.restoreConfig(new File(sandboxFolder,nodeConfigurationFileObject).getAbsolutePath()); String configFileAbsolutePath = new File(sandboxFolder,nodeConfigurationFileObject).getAbsolutePath();
logger.debug("config file absolute path is {}", configFileAbsolutePath);
AlgorithmConfiguration config = Transformations.restoreConfig(configFileAbsolutePath);
config.setConfigPath(sandboxFolder); config.setConfigPath(sandboxFolder);
processor.setConfiguration(config); processor.setConfiguration(config);
AnalysisLogger.getLogger().info("Initializing variables"); logger.info("Initializing variables");
processor.init(); processor.init();
AnalysisLogger.getLogger().info("Initializing DB"); logger.info("Initializing DB");
processor.initDB(false); processor.initDB(false);
status = 0.5f; status = 0.5f;
processor.takeRange(leftStartIndex, numberOfLeftElementsToProcess, rightStarIndex, numberOfRightElementsToProcess); processor.takeRange(leftStartIndex, numberOfLeftElementsToProcess, rightStarIndex, numberOfRightElementsToProcess);
@ -123,7 +127,7 @@ public class OccurrenceMergingNode extends ActorNode {
try { try {
processor.postProcess(); processor.postProcess();
} catch (Exception e) { } catch (Exception e) {
AnalysisLogger.getLogger().info("Postprocessing Inapplicable"); logger.info("Postprocessing Inapplicable");
} }
} }