diff --git a/distro/gcube-app.xml b/distro/gcube-app.xml
index 53ceda5..4954e3c 100644
--- a/distro/gcube-app.xml
+++ b/distro/gcube-app.xml
@@ -6,8 +6,6 @@
/gcube/service/resultset/*
- /gcube/service/*
-
diff --git a/distro/profile.xml b/distro/profile.xml
index 2acd75f..f043418 100644
--- a/distro/profile.xml
+++ b/distro/profile.xml
@@ -17,7 +17,6 @@
${artifactId}
${version}
- library
${build.finalName}.war
diff --git a/src/main/java/org/gcube/data/spd/executor/jobs/SerializableSpeciesJob.java b/src/main/java/org/gcube/data/spd/executor/jobs/SerializableSpeciesJob.java
new file mode 100644
index 0000000..dedaa7c
--- /dev/null
+++ b/src/main/java/org/gcube/data/spd/executor/jobs/SerializableSpeciesJob.java
@@ -0,0 +1,80 @@
+package org.gcube.data.spd.executor.jobs;
+
+import java.io.Serializable;
+import java.util.Calendar;
+
+import org.gcube.data.spd.model.service.types.JobStatus;
+
+public class SerializableSpeciesJob extends SpeciesJob implements Serializable {
+
+ /**
+ *
+ */
+ private static final long serialVersionUID = 1L;
+
+ private JobStatus status;
+ private String id;
+ private int completedEntries;
+ private Calendar startDate;
+ private Calendar endDate;
+
+ public SerializableSpeciesJob(JobStatus status, String id,
+ int completedEntries, Calendar startDate, Calendar endDate) {
+ super();
+ this.status = status!=JobStatus.COMPLETED?JobStatus.FAILED:JobStatus.COMPLETED;
+ this.id = id;
+ this.completedEntries = completedEntries;
+ this.startDate = startDate;
+ this.endDate = endDate;
+ }
+
+
+
+ @Override
+ public void execute() {}
+
+
+
+ @Override
+ public boolean isResubmitPermitted() {
+ return false;
+ }
+
+
+
+ @Override
+ public JobStatus getStatus() {
+ return status;
+ }
+
+ @Override
+ public void setStatus(JobStatus status) {
+ this.status= status;
+ }
+
+ @Override
+ public String getId() {
+ return id;
+ }
+
+ @Override
+ public boolean validateInput(String input) {
+ return false;
+ }
+
+ @Override
+ public int getCompletedEntries() {
+ return completedEntries;
+ }
+
+ @Override
+ public Calendar getStartDate() {
+ return startDate;
+ }
+
+ @Override
+ public Calendar getEndDate() {
+ return endDate;
+ }
+
+}
diff --git a/src/main/java/org/gcube/data/spd/executor/jobs/SpeciesJob.java b/src/main/java/org/gcube/data/spd/executor/jobs/SpeciesJob.java
index 3bbc0ff..45ed938 100644
--- a/src/main/java/org/gcube/data/spd/executor/jobs/SpeciesJob.java
+++ b/src/main/java/org/gcube/data/spd/executor/jobs/SpeciesJob.java
@@ -1,24 +1,72 @@
package org.gcube.data.spd.executor.jobs;
-import java.io.Serializable;
import java.util.Calendar;
+import org.gcube.accounting.datamodel.UsageRecord.OperationResult;
+import org.gcube.accounting.datamodel.usagerecords.JobUsageRecord;
+import org.gcube.accounting.persistence.AccountingPersistence;
+import org.gcube.accounting.persistence.AccountingPersistenceFactory;
+import org.gcube.common.authorization.library.provider.AuthorizationProvider;
+import org.gcube.common.scope.api.ScopeProvider;
import org.gcube.data.spd.model.service.types.JobStatus;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
-public interface SpeciesJob extends Serializable, Runnable {
+public abstract class SpeciesJob implements Runnable {
- public JobStatus getStatus() ;
-
- public void setStatus(JobStatus status) ;
+ private static Logger log = LoggerFactory.getLogger(SpeciesJob.class);
+
+ public abstract JobStatus getStatus() ;
+
+ public abstract void setStatus(JobStatus status) ;
+
+ public abstract String getId();
+
+ public abstract boolean validateInput(String input);
+
+ public abstract int getCompletedEntries();
+
+ public abstract Calendar getStartDate();
+
+ public abstract Calendar getEndDate();
+
+ public abstract void execute();
+
+ public abstract boolean isResubmitPermitted();
+
+ public final void run(){
+ if (getStatus()!=JobStatus.PENDING && !isResubmitPermitted()){
+ log.warn("the job with id {} cannot be resubmitted",getId());
+ throw new IllegalStateException("this job cannot be resubmitted");
+ }
+ try{
+ execute();
+ }catch(Exception e){
+ log.error("unexpected exception in job, setting status to FAILED",e);
+ this.setStatus(JobStatus.FAILED);
+ }
+ generateAccounting();
+ }
- public String getId();
- public boolean validateInput(String input);
-
- public int getCompletedEntries();
-
- public Calendar getStartDate();
-
- public Calendar getEndDate();
+ private final void generateAccounting(){
+ AccountingPersistence persistence = AccountingPersistenceFactory.getPersistence();
+ JobUsageRecord jobUsageRecord = new JobUsageRecord();
+ try{
+
+ jobUsageRecord.setConsumerId(AuthorizationProvider.instance.get().getClient().getId());
+ jobUsageRecord.setScope(ScopeProvider.instance.get());
+ jobUsageRecord.setJobName(this.getClass().getSimpleName());
+ jobUsageRecord.setOperationResult(getStatus()==JobStatus.COMPLETED?OperationResult.SUCCESS:OperationResult.FAILED);
+ jobUsageRecord.setJobId(this.getId());
+ jobUsageRecord.setJobStartTime(this.getStartDate());
+ jobUsageRecord.setJobEndTime(this.getEndDate());
+
+ persistence.account(jobUsageRecord);
+ log.info("Job {} accounted successfully",getId());
+ }catch(Exception ex){
+ log.warn("invalid record passed to accounting ",ex);
+ }
+ }
}
diff --git a/src/main/java/org/gcube/data/spd/executor/jobs/URLJob.java b/src/main/java/org/gcube/data/spd/executor/jobs/URLJob.java
index c6d62d2..963ec29 100644
--- a/src/main/java/org/gcube/data/spd/executor/jobs/URLJob.java
+++ b/src/main/java/org/gcube/data/spd/executor/jobs/URLJob.java
@@ -1,9 +1,9 @@
package org.gcube.data.spd.executor.jobs;
-public interface URLJob extends SpeciesJob {
+public abstract class URLJob extends SpeciesJob {
- public String getResultURL() ;
+ public abstract String getResultURL() ;
- public String getErrorURL() ;
+ public abstract String getErrorURL() ;
}
diff --git a/src/main/java/org/gcube/data/spd/executor/jobs/csv/CSVCreator.java b/src/main/java/org/gcube/data/spd/executor/jobs/csv/CSVCreator.java
index 43fae3d..6f239e2 100644
--- a/src/main/java/org/gcube/data/spd/executor/jobs/csv/CSVCreator.java
+++ b/src/main/java/org/gcube/data/spd/executor/jobs/csv/CSVCreator.java
@@ -8,12 +8,6 @@ import org.gcube.data.spd.plugin.fwk.AbstractPlugin;
public class CSVCreator extends CSVJob{
- /**
- *
- */
- private static final long serialVersionUID = 1L;
-
-
private static transient OccurrenceCSVConverter converter;
public CSVCreator(Map plugins) {
@@ -31,4 +25,5 @@ public class CSVCreator extends CSVJob{
public List getHeader() {
return OccurrenceCSVConverter.HEADER;
}
+
}
diff --git a/src/main/java/org/gcube/data/spd/executor/jobs/csv/CSVCreatorForOMJob.java b/src/main/java/org/gcube/data/spd/executor/jobs/csv/CSVCreatorForOMJob.java
index 3494235..6bfaffb 100644
--- a/src/main/java/org/gcube/data/spd/executor/jobs/csv/CSVCreatorForOMJob.java
+++ b/src/main/java/org/gcube/data/spd/executor/jobs/csv/CSVCreatorForOMJob.java
@@ -7,12 +7,6 @@ import org.gcube.data.spd.model.products.OccurrencePoint;
import org.gcube.data.spd.plugin.fwk.AbstractPlugin;
public class CSVCreatorForOMJob extends CSVJob{
-
- /**
- *
- */
- private static final long serialVersionUID = 1L;
-
private static transient OccurrenceCSVConverterOpenModeller converter;
diff --git a/src/main/java/org/gcube/data/spd/executor/jobs/csv/CSVJob.java b/src/main/java/org/gcube/data/spd/executor/jobs/csv/CSVJob.java
index 2caf264..4aa17fd 100644
--- a/src/main/java/org/gcube/data/spd/executor/jobs/csv/CSVJob.java
+++ b/src/main/java/org/gcube/data/spd/executor/jobs/csv/CSVJob.java
@@ -30,14 +30,7 @@ import org.gcube.data.streams.Stream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-abstract class CSVJob implements URLJob{
-
-
-
- /**
- *
- */
- private static final long serialVersionUID = 1L;
+abstract class CSVJob extends URLJob{
private static Logger logger = LoggerFactory.getLogger(CSVJob.class);
@@ -62,9 +55,8 @@ abstract class CSVJob implements URLJob{
this.plugins = plugins;
}
-
@Override
- public void run() {
+ public void execute() {
File csvFile = null;
try{
this.startDate = Calendar.getInstance();
@@ -161,7 +153,6 @@ abstract class CSVJob implements URLJob{
@Override
public String getErrorURL() {
- // TODO Auto-generated method stub
return null;
}
@@ -191,4 +182,10 @@ abstract class CSVJob implements URLJob{
return startDate;
}
+ @Override
+ public boolean isResubmitPermitted() {
+ return false;
+ }
+
+
}
diff --git a/src/main/java/org/gcube/data/spd/executor/jobs/darwincore/DarwinCoreJob.java b/src/main/java/org/gcube/data/spd/executor/jobs/darwincore/DarwinCoreJob.java
index fdebb17..c1875c4 100644
--- a/src/main/java/org/gcube/data/spd/executor/jobs/darwincore/DarwinCoreJob.java
+++ b/src/main/java/org/gcube/data/spd/executor/jobs/darwincore/DarwinCoreJob.java
@@ -37,12 +37,7 @@ import org.gcube.data.streams.generators.Generator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class DarwinCoreJob implements URLJob{
-
- /**
- *
- */
- private static final long serialVersionUID = 1L;
+public class DarwinCoreJob extends URLJob{
private static Logger logger = LoggerFactory.getLogger(DarwinCoreJob.class);
@@ -67,10 +62,14 @@ public class DarwinCoreJob implements URLJob{
this.status = JobStatus.PENDING;
this.plugins = plugins;
}
-
@Override
- public void run() {
+ public boolean isResubmitPermitted() {
+ return false;
+ }
+
+ @Override
+ public void execute() {
File darwincoreFile =null;
File errorFile = null;
try{
diff --git a/src/main/java/org/gcube/data/spd/executor/jobs/dwca/DWCAJobByChildren.java b/src/main/java/org/gcube/data/spd/executor/jobs/dwca/DWCAJobByChildren.java
index 8497ec8..bfb6cb8 100644
--- a/src/main/java/org/gcube/data/spd/executor/jobs/dwca/DWCAJobByChildren.java
+++ b/src/main/java/org/gcube/data/spd/executor/jobs/dwca/DWCAJobByChildren.java
@@ -35,12 +35,7 @@ import org.gcube.data.spd.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class DWCAJobByChildren implements URLJob{
-
- /**
- *
- */
- private static final long serialVersionUID = 1L;
+public class DWCAJobByChildren extends URLJob{
private static Logger logger = LoggerFactory.getLogger(DWCAJobByChildren.class);
@@ -73,7 +68,7 @@ public class DWCAJobByChildren implements URLJob{
}
private AbstractPlugin pluginToUse = null;
-
+
private AbstractPlugin getPlugin(String key) throws Exception{
if (pluginToUse==null){
String pluginName = Util.getProviderFromKey(key);
@@ -89,7 +84,12 @@ public class DWCAJobByChildren implements URLJob{
}
@Override
- public void run() {
+ public boolean isResubmitPermitted() {
+ return true;
+ }
+
+ @Override
+ public void execute() {
File errorFile = null;
File dwcaFile = null;
try{
diff --git a/src/main/java/org/gcube/data/spd/executor/jobs/dwca/DWCAJobByIds.java b/src/main/java/org/gcube/data/spd/executor/jobs/dwca/DWCAJobByIds.java
index d391f9b..72dcf01 100644
--- a/src/main/java/org/gcube/data/spd/executor/jobs/dwca/DWCAJobByIds.java
+++ b/src/main/java/org/gcube/data/spd/executor/jobs/dwca/DWCAJobByIds.java
@@ -36,12 +36,7 @@ import org.gcube.data.streams.Stream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class DWCAJobByIds implements URLJob{
-
- /**
- *
- */
- private static final long serialVersionUID = 1L;
+public class DWCAJobByIds extends URLJob{
private static Logger logger = LoggerFactory.getLogger(DWCAJobByChildren.class);
@@ -66,9 +61,17 @@ public class DWCAJobByIds implements URLJob{
this.status = JobStatus.PENDING;
}
+
@Override
- public void run() {
+ public boolean isResubmitPermitted() {
+ return false;
+ }
+
+
+
+ @Override
+ public void execute() {
File errorsFile= null;
File dwcaFile = null;
try{
diff --git a/src/main/java/org/gcube/data/spd/executor/jobs/layer/LayerCreatorJob.java b/src/main/java/org/gcube/data/spd/executor/jobs/layer/LayerCreatorJob.java
index f06e08c..68b8361 100644
--- a/src/main/java/org/gcube/data/spd/executor/jobs/layer/LayerCreatorJob.java
+++ b/src/main/java/org/gcube/data/spd/executor/jobs/layer/LayerCreatorJob.java
@@ -24,12 +24,8 @@ import org.slf4j.LoggerFactory;
import com.thoughtworks.xstream.XStream;
-public class LayerCreatorJob implements URLJob{
+public class LayerCreatorJob extends URLJob{
- /**
- *
- */
- private static final long serialVersionUID = -6560318170190865925L;
private static Logger logger = LoggerFactory.getLogger(LayerCreatorJob.class);
@@ -55,6 +51,11 @@ public class LayerCreatorJob implements URLJob{
this.plugins = plugins;
this.metadata = (MetadataDetails) new XStream().fromXML(metadataDetails);
}
+
+ @Override
+ public boolean isResubmitPermitted() {
+ return false;
+ }
@Override
public JobStatus getStatus() {
@@ -107,7 +108,7 @@ public class LayerCreatorJob implements URLJob{
}
@Override
- public void run() {
+ public void execute() {
try{
this.startDate = Calendar.getInstance();
this.status = JobStatus.RUNNING;
diff --git a/src/main/java/org/gcube/data/spd/manager/AppInitializer.java b/src/main/java/org/gcube/data/spd/manager/AppInitializer.java
index 940fe65..c6c1136 100644
--- a/src/main/java/org/gcube/data/spd/manager/AppInitializer.java
+++ b/src/main/java/org/gcube/data/spd/manager/AppInitializer.java
@@ -1,7 +1,20 @@
package org.gcube.data.spd.manager;
-import org.gcube.common.authorization.library.provider.SecurityTokenProvider;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.HashMap;
+import java.util.Map.Entry;
+
+import net.sf.ehcache.CacheManager;
+
import org.gcube.common.scope.api.ScopeProvider;
+import org.gcube.data.spd.executor.jobs.SerializableSpeciesJob;
+import org.gcube.data.spd.executor.jobs.SpeciesJob;
import org.gcube.data.spd.plugin.PluginManager;
import org.gcube.data.spd.utils.ExecutorsContainer;
import org.gcube.smartgears.ApplicationManager;
@@ -12,27 +25,96 @@ import org.slf4j.LoggerFactory;
public class AppInitializer implements ApplicationManager {
- private static final Logger log = LoggerFactory.getLogger(AppInitializer.class);
+ private static final Logger logger = LoggerFactory.getLogger(AppInitializer.class);
+
+ private static final String jobMapFileNamePrefix = "jobs";
+
+ private HashMap jobMap;
private PluginManager pluginManager;
private ApplicationContext ctx = ContextProvider.get();
+
@Override
public void onInit() {
- log.info("security token is "+SecurityTokenProvider.instance.get());
+ logger.info("[TEST] init called for SPD in scope {} ", ScopeProvider.instance.get());
+ jobMap= new HashMap();
pluginManager = new PluginManager(ctx);
+ loadJobMap();
}
@Override
public void onShutdown() {
+ storeJobMap();
pluginManager.shutdown();
pluginManager = null;
ExecutorsContainer.stopAll();
- log.info("App Initializer shut down on "+ScopeProvider.instance.get());
+ CacheManager.getInstance().shutdown();
+ logger.info("[TEST] App Initializer shut down on "+ScopeProvider.instance.get());
}
public PluginManager getPluginManager() {
return pluginManager;
}
+
+ public HashMap getJobMap() {
+ return jobMap;
+ }
+
+ private void storeJobMap(){
+ String scopeNamePrefix= ScopeProvider.instance.get().replaceAll("/", ".");
+ String jobMapFileName = jobMapFileNamePrefix+scopeNamePrefix;
+ logger.trace("[TEST] storing job map file {}",jobMapFileName);
+ HashMap spdJobMap = new HashMap();
+ for (Entry entry : jobMap.entrySet() ){
+ logger.trace("[TEST] stored job with id {}",entry.getKey());
+ SpeciesJob spdJob = entry.getValue();
+ if (spdJob instanceof SerializableSpeciesJob)
+ spdJobMap.put(entry.getKey(),(SerializableSpeciesJob)spdJob);
+ else
+ spdJobMap.put(entry.getKey(), new SerializableSpeciesJob(spdJob.getStatus(), spdJob.getId(),
+ spdJob.getCompletedEntries(), spdJob.getStartDate(), spdJob.getEndDate()));
+ }
+
+ File file = null;
+ try {
+ file = ctx.persistence().file(jobMapFileName);
+ if (file.exists()) file.delete();
+ file.createNewFile();
+
+ try(ObjectOutputStream oos = new ObjectOutputStream(new FileOutputStream(file))){
+ oos.writeObject(spdJobMap);
+ }
+ } catch (Exception e) {
+ logger.error("error writing jobMapof type "+jobMap.getClass().getName()+" on disk",e);
+ if (file !=null && file.exists()) file.delete();
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ private void loadJobMap(){
+ String scopeNamePrefix= ScopeProvider.instance.get().replaceAll("/", ".");
+ String jobMapFileName = jobMapFileNamePrefix+scopeNamePrefix;
+ logger.trace("[TEST] loading job Map from file {} ",jobMapFileName);
+ File file = ctx.persistence().file(jobMapFileName);
+ if (file.exists()){
+ file.delete();
+ }
+ try {
+ file.createNewFile();
+ } catch (IOException e1) {
+ logger.error("cannot create file {}",file.getAbsolutePath(),e1);
+ jobMap= new HashMap();
+ return;
+ }
+ try (ObjectInput ois = new ObjectInputStream(new FileInputStream(file))){
+ jobMap = (HashMap) ois.readObject();
+ } catch (Exception e) {
+ logger.warn("[TEST] the file {} doesn't exist, creating an empty map",file.getAbsolutePath());
+ jobMap= new HashMap();
+ }
+ logger.trace("[TEST] loaded map is with lenght {} ",jobMap.size());
+ }
+
}
diff --git a/src/main/java/org/gcube/data/spd/manager/search/Search.java b/src/main/java/org/gcube/data/spd/manager/search/Search.java
index 5ba56ca..5aad0d0 100644
--- a/src/main/java/org/gcube/data/spd/manager/search/Search.java
+++ b/src/main/java/org/gcube/data/spd/manager/search/Search.java
@@ -66,6 +66,7 @@ public class Search {
@SuppressWarnings("unchecked")
public void search(Map> searchableMapping, Query parsedQuery, Condition ... properties) throws UnsupportedCapabilityException, UnsupportedPluginException, Exception {
+
ClosableWriter outputWriter = new Writer(wrapper);
//preparing the query (and checking semantic)
List> workers = new ArrayList>();
diff --git a/src/main/java/org/gcube/data/spd/resources/Executor.java b/src/main/java/org/gcube/data/spd/resources/Executor.java
index 5e63945..c506882 100644
--- a/src/main/java/org/gcube/data/spd/resources/Executor.java
+++ b/src/main/java/org/gcube/data/spd/resources/Executor.java
@@ -1,14 +1,6 @@
package org.gcube.data.spd.resources;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.ObjectInput;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
import java.util.ArrayList;
-import java.util.HashMap;
import java.util.List;
import java.util.Map.Entry;
@@ -55,9 +47,7 @@ public class Executor {
private static Logger logger = LoggerFactory.getLogger(Executor.class);
- public static HashMap jobMap= new HashMap();
- private static final String jobMapFileName = "jobs.ser";
AppInitializer initializer = (AppInitializer)ApplicationManagerProvider.get(AppInitializer.class);
@@ -66,7 +56,6 @@ public class Executor {
@GET
@Path("result/{jobKey}")
public String getResultLink(@PathParam("jobKey") String jobKey) throws InvalidIdentifierException {
-
String node;
String jobId;
@@ -79,11 +68,14 @@ public class Executor {
}
if (node.equals(cxt.container().profile(HostingNode.class).id())){
- if (!jobMap.containsKey(jobId)) throw new InvalidIdentifierException(jobId);
- return ((URLJob)jobMap.get(jobId)).getResultURL();
+ if (!initializer.getJobMap().containsKey(jobId)){
+ logger.error("id not valid {} ",jobId);
+ throw new InvalidIdentifierException(jobId);
+ }
+ return ((URLJob)initializer.getJobMap().get(jobId)).getResultURL();
}else {
- //TODO
- return null; // remoteJobCall(node).getResultLink(jobKey);
+ logger.error("node not valid {} ",node);
+ throw new InvalidIdentifierException();
}
}
@@ -103,18 +95,21 @@ public class Executor {
}
if (node.equals(cxt.container().profile(HostingNode.class).id())){
- if (!jobMap.containsKey(jobId)) throw new InvalidIdentifierException();
- return ((URLJob)jobMap.get(jobId)).getErrorURL();
+ if (!initializer.getJobMap().containsKey(jobId)){
+ logger.error("id not valid {} ",jobId);
+ throw new InvalidIdentifierException();
+ }
+ return ((URLJob)initializer.getJobMap().get(jobId)).getErrorURL();
}else{
- //TODO
- return null; // remoteJobCall(node).getErrorLink(jobKey);
+ logger.error("node not valid {} ",node);
+ throw new InvalidIdentifierException();
}
}
@GET
@Path("status/{jobKey}")
public CompleteJobStatus getStatus(@PathParam("jobKey") String jobKey) throws InvalidIdentifierException {
-
+ logger.trace("[TEST] job status called with id {}", jobKey);
String node;
String jobId;
@@ -122,18 +117,18 @@ public class Executor {
node = extractNode(jobKey);
jobId = extractId(jobKey);
}catch (IdNotValidException e) {
- logger.error("id not valid "+jobKey,e);
+ logger.error("id not valid {} ",jobKey,e);
throw new InvalidIdentifierException(jobKey);
}
if (node.equals(cxt.container().profile(HostingNode.class).id())){
- if (!jobMap.containsKey(jobId)){
- logger.trace("id not found, throwing IDNotValidExceoption");
+ if (!initializer.getJobMap().containsKey(jobId)){
+ logger.warn("id not found, throwing IDNotValidExceoption");
throw new InvalidIdentifierException(jobId);
}
- SpeciesJob job = jobMap.get(jobId);
+ SpeciesJob job = initializer.getJobMap().get(jobId);
CompleteJobStatus status = new CompleteJobStatus();
@@ -155,60 +150,17 @@ public class Executor {
return status;
}else{
- //TODO
- return null ; //remoteJobCall(node).getStatus(jobKey);
+ logger.error("node not valid {} ",node);
+ throw new InvalidIdentifierException();
}
}
-
- public static void storeJobMap(ApplicationContext context){
- logger.trace("calling store job Map");
- ObjectOutputStream oos = null;
- File file = null;
- try {
- file = context.persistence().file(jobMapFileName);
- //if (file.exists()) file.delete();
- //file.createNewFile();
- oos = new ObjectOutputStream(new FileOutputStream(file));
- oos.writeObject(jobMap);
-
- } catch (Exception e) {
- logger.error("error writing jobMapof type "+jobMap.getClass().getName()+" on disk",e);
- if (file !=null && file.exists()) file.delete();
- }finally{
- if (oos!=null)
- try {
- oos.close();
- } catch (IOException e) {
- logger.warn("error closing stream",e);
- }
- }
- }
-
- @SuppressWarnings("unchecked")
- public static void loadJobMap(ApplicationContext context){
- logger.trace("calling load job Map");
- ObjectInput ois;
- try {
- ois = new ObjectInputStream(new FileInputStream(context.persistence().file(jobMapFileName)));
- jobMap = (HashMap) ois.readObject();
- for (Entry entry : jobMap.entrySet())
- if (entry.getValue().getStatus().equals(JobStatus.RUNNING))
- entry.getValue().setStatus(JobStatus.FAILED);
- ois.close();
- } catch (Exception e) {
- logger.trace("the file doesn't exist, creating an empty map");
- jobMap = new HashMap();
- }
- }
-
-
@DELETE
@Path("{jobKey}")
public void removeJob(@PathParam("jobKey") String jobId) throws InvalidIdentifierException {
- if (!jobMap.containsKey(jobId)) throw new InvalidIdentifierException(jobId);
- jobMap.remove(jobId);
+ if (!initializer.getJobMap().containsKey(jobId)) throw new InvalidIdentifierException(jobId);
+ initializer.getJobMap().remove(jobId);
}
@@ -249,15 +201,15 @@ public class Executor {
if (job ==null || !job.validateInput(request.getInput()))
throw new InvalidJobException();
-
+
String jobId = executeJob(job);
-
+ logger.trace("[TEST] job submitted with id {}", jobId);
return new SubmitJobResponse(job.getId(), jobId, cxt.profile(GCoreEndpoint.class).id());
}
private String executeJob(SpeciesJob job){
- jobMap.put(job.getId(), job);
+ initializer.getJobMap().put(job.getId(), job);
ExecutorsContainer.execJob(job);
return createKey(job.getId());
}
diff --git a/src/main/java/org/gcube/data/spd/utils/ExecutorsContainer.java b/src/main/java/org/gcube/data/spd/utils/ExecutorsContainer.java
index 23d42b0..8cc5ef4 100644
--- a/src/main/java/org/gcube/data/spd/utils/ExecutorsContainer.java
+++ b/src/main/java/org/gcube/data/spd/utils/ExecutorsContainer.java
@@ -3,15 +3,17 @@ package org.gcube.data.spd.utils;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
public class ExecutorsContainer {
private static final int MAX_SEARCH_THREAD_POOL= 100;
private static final int MAX_JOB_POOL= 10;
- private static ExecutorService searchThreadPool = Executors.newFixedThreadPool(MAX_SEARCH_THREAD_POOL);;
+ private static ExecutorService searchThreadPool = Executors.newFixedThreadPool(MAX_SEARCH_THREAD_POOL, new ThreadFactoryBuilder().setNameFormat("spd-search-thread-%d").build());
- private static ExecutorService jobThreadPool = Executors.newFixedThreadPool(MAX_JOB_POOL);
+ private static ExecutorService jobThreadPool = Executors.newFixedThreadPool(MAX_JOB_POOL,new ThreadFactoryBuilder().setNameFormat("spd-job-thread-%d").build());
public static void execSearch(Runnable runnable){