This commit is contained in:
Gianpaolo Coro 2016-09-23 08:39:21 +00:00
parent a1cb85737b
commit 3f5287800f
1 changed files with 105 additions and 96 deletions

View File

@ -238,7 +238,10 @@ public class AbstractEcologicalEngineMapper extends AbstractAnnotatedAlgorithm {
for (File file : generatedFiles) {
if (file.exists()) {
AnalysisLogger.getLogger().debug("Deleting File " + file.getAbsolutePath());
try{AnalysisLogger.getLogger().debug("Deleting File Check " + file.delete());}catch(Exception e){}
try {
AnalysisLogger.getLogger().debug("Deleting File Check " + file.delete());
} catch (Exception e) {
}
} else
AnalysisLogger.getLogger().debug("Deleting File - File does not exist " + file.getAbsolutePath());
}
@ -249,58 +252,72 @@ public class AbstractEcologicalEngineMapper extends AbstractAnnotatedAlgorithm {
public void manageUserToken() {
String scope = null;
String username = null;
String token = null;
// DONE get scope and username from SmartGears
// get scope from SmartGears
TokenManager tokenm = new TokenManager();
tokenm.getCredentials();
scope = tokenm.getScope();
username = tokenm.getUserName();
token = tokenm.getToken();
// set parameters
inputs.put(ConfigurationManager.scopeParameter, scope);
inputs.put(ConfigurationManager.usernameParameter, username);
inputs.put(ConfigurationManager.tokenParameter, token);
}
long statusInterrupt = 0;
float previousStatus = -3;
public void updateStatus(float status) {
if (agent != null) {
long stream =0;
try{
long stream = 0;
try {
stream = org.n52.wps.server.database.DatabaseFactory.getDatabase().getContentLengthForStoreResponse(wpsExternalID);
//AnalysisLogger.getLogger().debug("STATUS bytes " + stream + " interrupt bytes " + statusInterrupt);
if (statusInterrupt == 0 || statusInterrupt > stream - 3 && stream != 468) {
statusInterrupt = stream;
} else {
AnalysisLogger.getLogger().debug("STATUS INTERRUPTED!");
agent.shutdown();
statusInterrupt = -1;
agent = null;
status = -1f;
super.update(new Integer((int) status));
updateComputationOnWS(status, null);
System.gc();
// AnalysisLogger.getLogger().debug("STATUS bytes " + stream + " interrupt bytes " + statusInterrupt);
if (statusInterrupt == 0 || statusInterrupt > stream - 3 && stream != 468) {
statusInterrupt = stream;
} else {
AnalysisLogger.getLogger().debug("STATUS INTERRUPTED!");
agent.shutdown();
statusInterrupt = -1;
agent = null;
status = -1f;
super.update(new Integer((int) status));
try {
updateComputationOnWS(status, null);
} catch (Exception e) {
}
System.gc();
}
if (status != previousStatus) {
AnalysisLogger.getLogger().debug("STATUS update to:" + status + " - status interrupt " + statusInterrupt);
previousStatus = status;
super.update(new Integer((int) status));
try {
updateComputationOnWS(status, null);
} catch (Exception e) {
}
}
} catch (Exception e) {
AnalysisLogger.getLogger().debug("WARNING - STATUS RETRIEVAL EXCEPTION " + e.getLocalizedMessage());
AnalysisLogger.getLogger().debug(e);
// stream = statusInterrupt;
}
if (status!=previousStatus){
AnalysisLogger.getLogger().debug("STATUS update to:" + status+" - status interrupt "+statusInterrupt);
previousStatus=status;
super.update(new Integer((int) status));
updateComputationOnWS(status, null);
}
}catch(Exception e){
AnalysisLogger.getLogger().debug("WARNING: STATUS RETRIEVAL EXCEPTION "+e.getLocalizedMessage());
//stream = statusInterrupt;
}
}
}
public void updateComputationOnWS(float status, String exception) {
updateComputationOnWS(status, exception, null, null);
}
public void updateComputationOnWS(float status, String exception, List<StoredData> inputData, List<File> generatedData) {
if (currentComputation != null) {
currentComputation.setStatus(""+status);
if (exception!=null && exception.length()>0)
currentComputation.setStatus("" + status);
if (exception != null && exception.length() > 0)
currentComputation.setException(exception);
DataspaceManager manager = new DataspaceManager(config, currentComputation, inputData, null, generatedData);
try {
@ -311,7 +328,7 @@ public class AbstractEcologicalEngineMapper extends AbstractAnnotatedAlgorithm {
}
}
}
@Execute
public void run() throws Exception {
String algorithm = "";
@ -383,9 +400,9 @@ public class AbstractEcologicalEngineMapper extends AbstractAnnotatedAlgorithm {
// merging wps with ecological engine parameters - modifies the
// config
AnalysisLogger.getLogger().info("6 - Translating WPS Inputs into Ecological Engine Inputs");
AnalysisLogger.getLogger().debug("Operator class is "+this.getClass().getCanonicalName());
AnalysisLogger.getLogger().debug("Operator class is " + this.getClass().getCanonicalName());
// build computation Data
currentComputation = new ComputationData(config.getTaskID(), config.getAgent(), "", "", startTime, "-", "0", config.getTaskID(), configManager.getUsername(),config.getGcubeScope(),this.getClass().getCanonicalName());
currentComputation = new ComputationData(config.getTaskID(), config.getAgent(), "", "", startTime, "-", "0", config.getTaskID(), configManager.getUsername(), config.getGcubeScope(), this.getClass().getCanonicalName());
inputsManager.mergeWpsAndEcologicalInputs(supportDatabaseInfo);
generatedInputTables = inputsManager.getGeneratedTables();
generatedFiles = inputsManager.getGeneratedInputFiles();
@ -408,9 +425,10 @@ public class AbstractEcologicalEngineMapper extends AbstractAnnotatedAlgorithm {
time("A priori output retrieval");
// run the computation
AnalysisLogger.getLogger().info("9 - Running the computation and updater");
AnalysisLogger.getLogger().info("Initializing the computation");
AnalysisLogger.getLogger().info("Initializing the WPS status of the computation");
updateStatus(0);
AnalysisLogger.getLogger().info("Initializing the computation");
agent.init();
AnalysisLogger.getLogger().info("Updating status");
runStatusUpdater();
@ -440,13 +458,13 @@ public class AbstractEcologicalEngineMapper extends AbstractAnnotatedAlgorithm {
// delete all temporary tables
AnalysisLogger.getLogger().info("12 - Deleting possible generated temporary tables");
AnalysisLogger.getLogger().debug("Final Computation Output: " + outputs);
endTime = new java.text.SimpleDateFormat("dd/MM/yyyy HH:mm:ss").format(System.currentTimeMillis());
if (statusInterrupt!=-1){
if (statusInterrupt != -1) {
saveComputationOnWS(inputsManager.getProvenanceData(), outputmanager.getProvenanceData(), agent, generatedFiles);
}else{
} else {
AnalysisLogger.getLogger().debug("Computation interrupted - no update");
throw new Exception ("Computation cancelled");
throw new Exception("Computation cancelled");
}
AnalysisLogger.getLogger().debug("All done");
} catch (Exception e) {
@ -454,15 +472,15 @@ public class AbstractEcologicalEngineMapper extends AbstractAnnotatedAlgorithm {
AnalysisLogger.getLogger().debug(e);
e.printStackTrace();
int exitstatus = -2;
if (statusInterrupt==-1)
if (statusInterrupt == -1)
exitstatus = -1;
if (inputsManager!=null)
updateComputationOnWS(exitstatus,e.getMessage(),inputsManager.getProvenanceData(),generatedFiles);
else
updateComputationOnWS(exitstatus,e.getMessage());
if (statusInterrupt==-1)
throw new Exception ("Computation cancelled");
if (inputsManager != null)
updateComputationOnWS(exitstatus, e.getMessage(), inputsManager.getProvenanceData(), generatedFiles);
else
updateComputationOnWS(exitstatus, e.getMessage());
if (statusInterrupt == -1)
throw new Exception("Computation cancelled");
else
throw e;
} finally {
@ -479,7 +497,7 @@ public class AbstractEcologicalEngineMapper extends AbstractAnnotatedAlgorithm {
displayTimes();
cleanResources();
AnalysisLogger.getLogger().debug("All done - Computation Finished");
}
}
@ -488,7 +506,7 @@ public class AbstractEcologicalEngineMapper extends AbstractAnnotatedAlgorithm {
@Override
public void run() {
while (agent != null && statusInterrupt!=-1 && agent.getStatus() < 100) {
while (agent != null && statusInterrupt != -1 && agent.getStatus() < 100) {
try {
updateStatus(agent.getStatus());
Thread.sleep(10000);
@ -510,9 +528,9 @@ public class AbstractEcologicalEngineMapper extends AbstractAnnotatedAlgorithm {
}
private void saveComputationOnWS(List<StoredData> inputData, List<StoredData> outputData, ComputationalAgent agent, List<File> generatedFiles) {
AnalysisLogger.getLogger().debug("Provenance manager started for operator "+this.getClass().getCanonicalName());
ComputationData computation = new ComputationData(config.getTaskID(), config.getAgent(), agent.getDescription(), agent.getInfrastructure().name(), startTime, endTime, "100", config.getTaskID(),config.getParam(ConfigurationManager.serviceUserNameParameterVariable),config.getGcubeScope(),this.getClass().getCanonicalName());
AnalysisLogger.getLogger().debug("Provenance manager started for operator " + this.getClass().getCanonicalName());
ComputationData computation = new ComputationData(config.getTaskID(), config.getAgent(), agent.getDescription(), agent.getInfrastructure().name(), startTime, endTime, "100", config.getTaskID(), config.getParam(ConfigurationManager.serviceUserNameParameterVariable), config.getGcubeScope(), this.getClass().getCanonicalName());
// post on WS
DataspaceManager manager = new DataspaceManager(config, computation, inputData, outputData, generatedFiles);
Thread t = new Thread(manager);
@ -542,66 +560,57 @@ public class AbstractEcologicalEngineMapper extends AbstractAnnotatedAlgorithm {
}
private void cleanResources() {
times = null;
agent = null;
// manage open files and garbage
AnalysisLogger.getLogger().debug("Managing open files");
// String checkOpenFiles = "ls -l /proc/*/fd/* 2|grep \"wps/ecocfg\"";
try{
String checkOpenFiles = "for i in `ls -l /proc/*/fd/* 2>/dev/null | grep delete | grep tomcat | awk '{print $9}'`; do du -hL $i | awk '{print $1}' | tr '\n' ' '; ls -l $i | awk '{print $6\" \"$7\" \"$8\" \"$9\" \"$10\" \"$11\" \"$12}'; done";
List<String> openFiles =command(checkOpenFiles,"./");
AnalysisLogger.getLogger().debug("Open Files "+openFiles);
if (openFiles!=null){
for (String openFile:openFiles){
if (!openFile.contains("cannot access") && openFile.contains("(deleted)")){
String size = openFile.substring(0,openFile.indexOf(" ")).trim();
String pid = openFile.substring(openFile.indexOf("/proc/"),openFile.indexOf("->"));
pid = pid.trim();
if (!size.equals("0")){
AnalysisLogger.getLogger().debug("Killing "+pid + " with size "+size);
command(":>"+pid,"./");
// String checkOpenFiles = "ls -l /proc/*/fd/* 2|grep \"wps/ecocfg\"";
try {
String checkOpenFiles = "for i in `ls -l /proc/*/fd/* 2>/dev/null | grep delete | grep tomcat | awk '{print $9}'`; do du -hL $i | awk '{print $1}' | tr '\n' ' '; ls -l $i | awk '{print $6\" \"$7\" \"$8\" \"$9\" \"$10\" \"$11\" \"$12}'; done";
List<String> openFiles = command(checkOpenFiles, "./");
AnalysisLogger.getLogger().debug("Open Files " + openFiles);
if (openFiles != null) {
for (String openFile : openFiles) {
if (!openFile.contains("cannot access") && openFile.contains("(deleted)")) {
String size = openFile.substring(0, openFile.indexOf(" ")).trim();
String pid = openFile.substring(openFile.indexOf("/proc/"), openFile.indexOf("->"));
pid = pid.trim();
if (!size.equals("0")) {
AnalysisLogger.getLogger().debug("Killing " + pid + " with size " + size);
command(":>" + pid, "./");
}
}
}
}
} catch (Exception e) {
AnalysisLogger.getLogger().debug("Could not kill files " + e.getLocalizedMessage());
}
}catch(Exception e){
AnalysisLogger.getLogger().debug("Could not kill files "+e.getLocalizedMessage());
}
System.gc();
}
public static List<String> command(final String cmdline,
final String directory) {
try {
Process process =
new ProcessBuilder(new String[] {"bash", "-c", cmdline})
.redirectErrorStream(true)
.directory(new File(directory))
.start();
public static List<String> command(final String cmdline, final String directory) {
try {
Process process = new ProcessBuilder(new String[] { "bash", "-c", cmdline }).redirectErrorStream(true).directory(new File(directory)).start();
List<String> output = new ArrayList<String>();
BufferedReader br = new BufferedReader(
new InputStreamReader(process.getInputStream()));
String line = null;
while ( (line = br.readLine()) != null )
output.add(line);
List<String> output = new ArrayList<String>();
BufferedReader br = new BufferedReader(new InputStreamReader(process.getInputStream()));
String line = null;
while ((line = br.readLine()) != null)
output.add(line);
//There should really be a timeout here.
if (0 != process.waitFor())
return null;
// There should really be a timeout here.
if (0 != process.waitFor())
return null;
return output;
return output;
} catch (Exception e) {
return null;
}
}
} catch (Exception e) {
return null;
}
}
}