From b613a2611946196caf08c8d29ad77187b04539e8 Mon Sep 17 00:00:00 2001 From: Lucio Lelii Date: Tue, 28 Feb 2017 11:22:58 +0000 Subject: [PATCH] git-svn-id: https://svn.d4science.research-infrastructures.eu/gcube/branches/data-access/species-products-discovery/3.0@144393 82a268e6-3cf1-43bd-a215-b396298e98cf --- distro/gcube-app.xml | 2 - distro/profile.xml | 1 - .../executor/jobs/SerializableSpeciesJob.java | 80 ++++++++++++++ .../data/spd/executor/jobs/SpeciesJob.java | 74 ++++++++++--- .../gcube/data/spd/executor/jobs/URLJob.java | 6 +- .../spd/executor/jobs/csv/CSVCreator.java | 7 +- .../executor/jobs/csv/CSVCreatorForOMJob.java | 6 -- .../data/spd/executor/jobs/csv/CSVJob.java | 19 ++-- .../jobs/darwincore/DarwinCoreJob.java | 15 ++- .../executor/jobs/dwca/DWCAJobByChildren.java | 16 +-- .../spd/executor/jobs/dwca/DWCAJobByIds.java | 17 +-- .../executor/jobs/layer/LayerCreatorJob.java | 13 +-- .../data/spd/manager/AppInitializer.java | 90 +++++++++++++++- .../gcube/data/spd/manager/search/Search.java | 1 + .../gcube/data/spd/resources/Executor.java | 100 +++++------------- .../data/spd/utils/ExecutorsContainer.java | 6 +- 16 files changed, 302 insertions(+), 151 deletions(-) create mode 100644 src/main/java/org/gcube/data/spd/executor/jobs/SerializableSpeciesJob.java diff --git a/distro/gcube-app.xml b/distro/gcube-app.xml index 53ceda5..4954e3c 100644 --- a/distro/gcube-app.xml +++ b/distro/gcube-app.xml @@ -6,8 +6,6 @@ /gcube/service/resultset/* - /gcube/service/* - diff --git a/distro/profile.xml b/distro/profile.xml index 2acd75f..f043418 100644 --- a/distro/profile.xml +++ b/distro/profile.xml @@ -17,7 +17,6 @@ ${artifactId} ${version} - library ${build.finalName}.war diff --git a/src/main/java/org/gcube/data/spd/executor/jobs/SerializableSpeciesJob.java b/src/main/java/org/gcube/data/spd/executor/jobs/SerializableSpeciesJob.java new file mode 100644 index 0000000..dedaa7c --- /dev/null +++ b/src/main/java/org/gcube/data/spd/executor/jobs/SerializableSpeciesJob.java @@ -0,0 +1,80 @@ +package org.gcube.data.spd.executor.jobs; + +import java.io.Serializable; +import java.util.Calendar; + +import org.gcube.data.spd.model.service.types.JobStatus; + +public class SerializableSpeciesJob extends SpeciesJob implements Serializable { + + /** + * + */ + private static final long serialVersionUID = 1L; + + private JobStatus status; + private String id; + private int completedEntries; + private Calendar startDate; + private Calendar endDate; + + public SerializableSpeciesJob(JobStatus status, String id, + int completedEntries, Calendar startDate, Calendar endDate) { + super(); + this.status = status!=JobStatus.COMPLETED?JobStatus.FAILED:JobStatus.COMPLETED; + this.id = id; + this.completedEntries = completedEntries; + this.startDate = startDate; + this.endDate = endDate; + } + + + + @Override + public void execute() {} + + + + @Override + public boolean isResubmitPermitted() { + return false; + } + + + + @Override + public JobStatus getStatus() { + return status; + } + + @Override + public void setStatus(JobStatus status) { + this.status= status; + } + + @Override + public String getId() { + return id; + } + + @Override + public boolean validateInput(String input) { + return false; + } + + @Override + public int getCompletedEntries() { + return completedEntries; + } + + @Override + public Calendar getStartDate() { + return startDate; + } + + @Override + public Calendar getEndDate() { + return endDate; + } + +} diff --git a/src/main/java/org/gcube/data/spd/executor/jobs/SpeciesJob.java b/src/main/java/org/gcube/data/spd/executor/jobs/SpeciesJob.java index 3bbc0ff..45ed938 100644 --- a/src/main/java/org/gcube/data/spd/executor/jobs/SpeciesJob.java +++ b/src/main/java/org/gcube/data/spd/executor/jobs/SpeciesJob.java @@ -1,24 +1,72 @@ package org.gcube.data.spd.executor.jobs; -import java.io.Serializable; import java.util.Calendar; +import org.gcube.accounting.datamodel.UsageRecord.OperationResult; +import org.gcube.accounting.datamodel.usagerecords.JobUsageRecord; +import org.gcube.accounting.persistence.AccountingPersistence; +import org.gcube.accounting.persistence.AccountingPersistenceFactory; +import org.gcube.common.authorization.library.provider.AuthorizationProvider; +import org.gcube.common.scope.api.ScopeProvider; import org.gcube.data.spd.model.service.types.JobStatus; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; -public interface SpeciesJob extends Serializable, Runnable { +public abstract class SpeciesJob implements Runnable { - public JobStatus getStatus() ; - - public void setStatus(JobStatus status) ; + private static Logger log = LoggerFactory.getLogger(SpeciesJob.class); + + public abstract JobStatus getStatus() ; + + public abstract void setStatus(JobStatus status) ; + + public abstract String getId(); + + public abstract boolean validateInput(String input); + + public abstract int getCompletedEntries(); + + public abstract Calendar getStartDate(); + + public abstract Calendar getEndDate(); + + public abstract void execute(); + + public abstract boolean isResubmitPermitted(); + + public final void run(){ + if (getStatus()!=JobStatus.PENDING && !isResubmitPermitted()){ + log.warn("the job with id {} cannot be resubmitted",getId()); + throw new IllegalStateException("this job cannot be resubmitted"); + } + try{ + execute(); + }catch(Exception e){ + log.error("unexpected exception in job, setting status to FAILED",e); + this.setStatus(JobStatus.FAILED); + } + generateAccounting(); + } - public String getId(); - public boolean validateInput(String input); - - public int getCompletedEntries(); - - public Calendar getStartDate(); - - public Calendar getEndDate(); + private final void generateAccounting(){ + AccountingPersistence persistence = AccountingPersistenceFactory.getPersistence(); + JobUsageRecord jobUsageRecord = new JobUsageRecord(); + try{ + + jobUsageRecord.setConsumerId(AuthorizationProvider.instance.get().getClient().getId()); + jobUsageRecord.setScope(ScopeProvider.instance.get()); + jobUsageRecord.setJobName(this.getClass().getSimpleName()); + jobUsageRecord.setOperationResult(getStatus()==JobStatus.COMPLETED?OperationResult.SUCCESS:OperationResult.FAILED); + jobUsageRecord.setJobId(this.getId()); + jobUsageRecord.setJobStartTime(this.getStartDate()); + jobUsageRecord.setJobEndTime(this.getEndDate()); + + persistence.account(jobUsageRecord); + log.info("Job {} accounted successfully",getId()); + }catch(Exception ex){ + log.warn("invalid record passed to accounting ",ex); + } + } } diff --git a/src/main/java/org/gcube/data/spd/executor/jobs/URLJob.java b/src/main/java/org/gcube/data/spd/executor/jobs/URLJob.java index c6d62d2..963ec29 100644 --- a/src/main/java/org/gcube/data/spd/executor/jobs/URLJob.java +++ b/src/main/java/org/gcube/data/spd/executor/jobs/URLJob.java @@ -1,9 +1,9 @@ package org.gcube.data.spd.executor.jobs; -public interface URLJob extends SpeciesJob { +public abstract class URLJob extends SpeciesJob { - public String getResultURL() ; + public abstract String getResultURL() ; - public String getErrorURL() ; + public abstract String getErrorURL() ; } diff --git a/src/main/java/org/gcube/data/spd/executor/jobs/csv/CSVCreator.java b/src/main/java/org/gcube/data/spd/executor/jobs/csv/CSVCreator.java index 43fae3d..6f239e2 100644 --- a/src/main/java/org/gcube/data/spd/executor/jobs/csv/CSVCreator.java +++ b/src/main/java/org/gcube/data/spd/executor/jobs/csv/CSVCreator.java @@ -8,12 +8,6 @@ import org.gcube.data.spd.plugin.fwk.AbstractPlugin; public class CSVCreator extends CSVJob{ - /** - * - */ - private static final long serialVersionUID = 1L; - - private static transient OccurrenceCSVConverter converter; public CSVCreator(Map plugins) { @@ -31,4 +25,5 @@ public class CSVCreator extends CSVJob{ public List getHeader() { return OccurrenceCSVConverter.HEADER; } + } diff --git a/src/main/java/org/gcube/data/spd/executor/jobs/csv/CSVCreatorForOMJob.java b/src/main/java/org/gcube/data/spd/executor/jobs/csv/CSVCreatorForOMJob.java index 3494235..6bfaffb 100644 --- a/src/main/java/org/gcube/data/spd/executor/jobs/csv/CSVCreatorForOMJob.java +++ b/src/main/java/org/gcube/data/spd/executor/jobs/csv/CSVCreatorForOMJob.java @@ -7,12 +7,6 @@ import org.gcube.data.spd.model.products.OccurrencePoint; import org.gcube.data.spd.plugin.fwk.AbstractPlugin; public class CSVCreatorForOMJob extends CSVJob{ - - /** - * - */ - private static final long serialVersionUID = 1L; - private static transient OccurrenceCSVConverterOpenModeller converter; diff --git a/src/main/java/org/gcube/data/spd/executor/jobs/csv/CSVJob.java b/src/main/java/org/gcube/data/spd/executor/jobs/csv/CSVJob.java index 2caf264..4aa17fd 100644 --- a/src/main/java/org/gcube/data/spd/executor/jobs/csv/CSVJob.java +++ b/src/main/java/org/gcube/data/spd/executor/jobs/csv/CSVJob.java @@ -30,14 +30,7 @@ import org.gcube.data.streams.Stream; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -abstract class CSVJob implements URLJob{ - - - - /** - * - */ - private static final long serialVersionUID = 1L; +abstract class CSVJob extends URLJob{ private static Logger logger = LoggerFactory.getLogger(CSVJob.class); @@ -62,9 +55,8 @@ abstract class CSVJob implements URLJob{ this.plugins = plugins; } - @Override - public void run() { + public void execute() { File csvFile = null; try{ this.startDate = Calendar.getInstance(); @@ -161,7 +153,6 @@ abstract class CSVJob implements URLJob{ @Override public String getErrorURL() { - // TODO Auto-generated method stub return null; } @@ -191,4 +182,10 @@ abstract class CSVJob implements URLJob{ return startDate; } + @Override + public boolean isResubmitPermitted() { + return false; + } + + } diff --git a/src/main/java/org/gcube/data/spd/executor/jobs/darwincore/DarwinCoreJob.java b/src/main/java/org/gcube/data/spd/executor/jobs/darwincore/DarwinCoreJob.java index fdebb17..c1875c4 100644 --- a/src/main/java/org/gcube/data/spd/executor/jobs/darwincore/DarwinCoreJob.java +++ b/src/main/java/org/gcube/data/spd/executor/jobs/darwincore/DarwinCoreJob.java @@ -37,12 +37,7 @@ import org.gcube.data.streams.generators.Generator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class DarwinCoreJob implements URLJob{ - - /** - * - */ - private static final long serialVersionUID = 1L; +public class DarwinCoreJob extends URLJob{ private static Logger logger = LoggerFactory.getLogger(DarwinCoreJob.class); @@ -67,10 +62,14 @@ public class DarwinCoreJob implements URLJob{ this.status = JobStatus.PENDING; this.plugins = plugins; } - @Override - public void run() { + public boolean isResubmitPermitted() { + return false; + } + + @Override + public void execute() { File darwincoreFile =null; File errorFile = null; try{ diff --git a/src/main/java/org/gcube/data/spd/executor/jobs/dwca/DWCAJobByChildren.java b/src/main/java/org/gcube/data/spd/executor/jobs/dwca/DWCAJobByChildren.java index 8497ec8..bfb6cb8 100644 --- a/src/main/java/org/gcube/data/spd/executor/jobs/dwca/DWCAJobByChildren.java +++ b/src/main/java/org/gcube/data/spd/executor/jobs/dwca/DWCAJobByChildren.java @@ -35,12 +35,7 @@ import org.gcube.data.spd.utils.Utils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class DWCAJobByChildren implements URLJob{ - - /** - * - */ - private static final long serialVersionUID = 1L; +public class DWCAJobByChildren extends URLJob{ private static Logger logger = LoggerFactory.getLogger(DWCAJobByChildren.class); @@ -73,7 +68,7 @@ public class DWCAJobByChildren implements URLJob{ } private AbstractPlugin pluginToUse = null; - + private AbstractPlugin getPlugin(String key) throws Exception{ if (pluginToUse==null){ String pluginName = Util.getProviderFromKey(key); @@ -89,7 +84,12 @@ public class DWCAJobByChildren implements URLJob{ } @Override - public void run() { + public boolean isResubmitPermitted() { + return true; + } + + @Override + public void execute() { File errorFile = null; File dwcaFile = null; try{ diff --git a/src/main/java/org/gcube/data/spd/executor/jobs/dwca/DWCAJobByIds.java b/src/main/java/org/gcube/data/spd/executor/jobs/dwca/DWCAJobByIds.java index d391f9b..72dcf01 100644 --- a/src/main/java/org/gcube/data/spd/executor/jobs/dwca/DWCAJobByIds.java +++ b/src/main/java/org/gcube/data/spd/executor/jobs/dwca/DWCAJobByIds.java @@ -36,12 +36,7 @@ import org.gcube.data.streams.Stream; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class DWCAJobByIds implements URLJob{ - - /** - * - */ - private static final long serialVersionUID = 1L; +public class DWCAJobByIds extends URLJob{ private static Logger logger = LoggerFactory.getLogger(DWCAJobByChildren.class); @@ -66,9 +61,17 @@ public class DWCAJobByIds implements URLJob{ this.status = JobStatus.PENDING; } + @Override - public void run() { + public boolean isResubmitPermitted() { + return false; + } + + + + @Override + public void execute() { File errorsFile= null; File dwcaFile = null; try{ diff --git a/src/main/java/org/gcube/data/spd/executor/jobs/layer/LayerCreatorJob.java b/src/main/java/org/gcube/data/spd/executor/jobs/layer/LayerCreatorJob.java index f06e08c..68b8361 100644 --- a/src/main/java/org/gcube/data/spd/executor/jobs/layer/LayerCreatorJob.java +++ b/src/main/java/org/gcube/data/spd/executor/jobs/layer/LayerCreatorJob.java @@ -24,12 +24,8 @@ import org.slf4j.LoggerFactory; import com.thoughtworks.xstream.XStream; -public class LayerCreatorJob implements URLJob{ +public class LayerCreatorJob extends URLJob{ - /** - * - */ - private static final long serialVersionUID = -6560318170190865925L; private static Logger logger = LoggerFactory.getLogger(LayerCreatorJob.class); @@ -55,6 +51,11 @@ public class LayerCreatorJob implements URLJob{ this.plugins = plugins; this.metadata = (MetadataDetails) new XStream().fromXML(metadataDetails); } + + @Override + public boolean isResubmitPermitted() { + return false; + } @Override public JobStatus getStatus() { @@ -107,7 +108,7 @@ public class LayerCreatorJob implements URLJob{ } @Override - public void run() { + public void execute() { try{ this.startDate = Calendar.getInstance(); this.status = JobStatus.RUNNING; diff --git a/src/main/java/org/gcube/data/spd/manager/AppInitializer.java b/src/main/java/org/gcube/data/spd/manager/AppInitializer.java index 940fe65..c6c1136 100644 --- a/src/main/java/org/gcube/data/spd/manager/AppInitializer.java +++ b/src/main/java/org/gcube/data/spd/manager/AppInitializer.java @@ -1,7 +1,20 @@ package org.gcube.data.spd.manager; -import org.gcube.common.authorization.library.provider.SecurityTokenProvider; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.util.HashMap; +import java.util.Map.Entry; + +import net.sf.ehcache.CacheManager; + import org.gcube.common.scope.api.ScopeProvider; +import org.gcube.data.spd.executor.jobs.SerializableSpeciesJob; +import org.gcube.data.spd.executor.jobs.SpeciesJob; import org.gcube.data.spd.plugin.PluginManager; import org.gcube.data.spd.utils.ExecutorsContainer; import org.gcube.smartgears.ApplicationManager; @@ -12,27 +25,96 @@ import org.slf4j.LoggerFactory; public class AppInitializer implements ApplicationManager { - private static final Logger log = LoggerFactory.getLogger(AppInitializer.class); + private static final Logger logger = LoggerFactory.getLogger(AppInitializer.class); + + private static final String jobMapFileNamePrefix = "jobs"; + + private HashMap jobMap; private PluginManager pluginManager; private ApplicationContext ctx = ContextProvider.get(); + @Override public void onInit() { - log.info("security token is "+SecurityTokenProvider.instance.get()); + logger.info("[TEST] init called for SPD in scope {} ", ScopeProvider.instance.get()); + jobMap= new HashMap(); pluginManager = new PluginManager(ctx); + loadJobMap(); } @Override public void onShutdown() { + storeJobMap(); pluginManager.shutdown(); pluginManager = null; ExecutorsContainer.stopAll(); - log.info("App Initializer shut down on "+ScopeProvider.instance.get()); + CacheManager.getInstance().shutdown(); + logger.info("[TEST] App Initializer shut down on "+ScopeProvider.instance.get()); } public PluginManager getPluginManager() { return pluginManager; } + + public HashMap getJobMap() { + return jobMap; + } + + private void storeJobMap(){ + String scopeNamePrefix= ScopeProvider.instance.get().replaceAll("/", "."); + String jobMapFileName = jobMapFileNamePrefix+scopeNamePrefix; + logger.trace("[TEST] storing job map file {}",jobMapFileName); + HashMap spdJobMap = new HashMap(); + for (Entry entry : jobMap.entrySet() ){ + logger.trace("[TEST] stored job with id {}",entry.getKey()); + SpeciesJob spdJob = entry.getValue(); + if (spdJob instanceof SerializableSpeciesJob) + spdJobMap.put(entry.getKey(),(SerializableSpeciesJob)spdJob); + else + spdJobMap.put(entry.getKey(), new SerializableSpeciesJob(spdJob.getStatus(), spdJob.getId(), + spdJob.getCompletedEntries(), spdJob.getStartDate(), spdJob.getEndDate())); + } + + File file = null; + try { + file = ctx.persistence().file(jobMapFileName); + if (file.exists()) file.delete(); + file.createNewFile(); + + try(ObjectOutputStream oos = new ObjectOutputStream(new FileOutputStream(file))){ + oos.writeObject(spdJobMap); + } + } catch (Exception e) { + logger.error("error writing jobMapof type "+jobMap.getClass().getName()+" on disk",e); + if (file !=null && file.exists()) file.delete(); + } + } + + @SuppressWarnings("unchecked") + private void loadJobMap(){ + String scopeNamePrefix= ScopeProvider.instance.get().replaceAll("/", "."); + String jobMapFileName = jobMapFileNamePrefix+scopeNamePrefix; + logger.trace("[TEST] loading job Map from file {} ",jobMapFileName); + File file = ctx.persistence().file(jobMapFileName); + if (file.exists()){ + file.delete(); + } + try { + file.createNewFile(); + } catch (IOException e1) { + logger.error("cannot create file {}",file.getAbsolutePath(),e1); + jobMap= new HashMap(); + return; + } + try (ObjectInput ois = new ObjectInputStream(new FileInputStream(file))){ + jobMap = (HashMap) ois.readObject(); + } catch (Exception e) { + logger.warn("[TEST] the file {} doesn't exist, creating an empty map",file.getAbsolutePath()); + jobMap= new HashMap(); + } + logger.trace("[TEST] loaded map is with lenght {} ",jobMap.size()); + } + } diff --git a/src/main/java/org/gcube/data/spd/manager/search/Search.java b/src/main/java/org/gcube/data/spd/manager/search/Search.java index 5ba56ca..5aad0d0 100644 --- a/src/main/java/org/gcube/data/spd/manager/search/Search.java +++ b/src/main/java/org/gcube/data/spd/manager/search/Search.java @@ -66,6 +66,7 @@ public class Search { @SuppressWarnings("unchecked") public void search(Map> searchableMapping, Query parsedQuery, Condition ... properties) throws UnsupportedCapabilityException, UnsupportedPluginException, Exception { + ClosableWriter outputWriter = new Writer(wrapper); //preparing the query (and checking semantic) List> workers = new ArrayList>(); diff --git a/src/main/java/org/gcube/data/spd/resources/Executor.java b/src/main/java/org/gcube/data/spd/resources/Executor.java index 5e63945..c506882 100644 --- a/src/main/java/org/gcube/data/spd/resources/Executor.java +++ b/src/main/java/org/gcube/data/spd/resources/Executor.java @@ -1,14 +1,6 @@ package org.gcube.data.spd.resources; -import java.io.File; -import java.io.FileInputStream; -import java.io.FileOutputStream; -import java.io.IOException; -import java.io.ObjectInput; -import java.io.ObjectInputStream; -import java.io.ObjectOutputStream; import java.util.ArrayList; -import java.util.HashMap; import java.util.List; import java.util.Map.Entry; @@ -55,9 +47,7 @@ public class Executor { private static Logger logger = LoggerFactory.getLogger(Executor.class); - public static HashMap jobMap= new HashMap(); - private static final String jobMapFileName = "jobs.ser"; AppInitializer initializer = (AppInitializer)ApplicationManagerProvider.get(AppInitializer.class); @@ -66,7 +56,6 @@ public class Executor { @GET @Path("result/{jobKey}") public String getResultLink(@PathParam("jobKey") String jobKey) throws InvalidIdentifierException { - String node; String jobId; @@ -79,11 +68,14 @@ public class Executor { } if (node.equals(cxt.container().profile(HostingNode.class).id())){ - if (!jobMap.containsKey(jobId)) throw new InvalidIdentifierException(jobId); - return ((URLJob)jobMap.get(jobId)).getResultURL(); + if (!initializer.getJobMap().containsKey(jobId)){ + logger.error("id not valid {} ",jobId); + throw new InvalidIdentifierException(jobId); + } + return ((URLJob)initializer.getJobMap().get(jobId)).getResultURL(); }else { - //TODO - return null; // remoteJobCall(node).getResultLink(jobKey); + logger.error("node not valid {} ",node); + throw new InvalidIdentifierException(); } } @@ -103,18 +95,21 @@ public class Executor { } if (node.equals(cxt.container().profile(HostingNode.class).id())){ - if (!jobMap.containsKey(jobId)) throw new InvalidIdentifierException(); - return ((URLJob)jobMap.get(jobId)).getErrorURL(); + if (!initializer.getJobMap().containsKey(jobId)){ + logger.error("id not valid {} ",jobId); + throw new InvalidIdentifierException(); + } + return ((URLJob)initializer.getJobMap().get(jobId)).getErrorURL(); }else{ - //TODO - return null; // remoteJobCall(node).getErrorLink(jobKey); + logger.error("node not valid {} ",node); + throw new InvalidIdentifierException(); } } @GET @Path("status/{jobKey}") public CompleteJobStatus getStatus(@PathParam("jobKey") String jobKey) throws InvalidIdentifierException { - + logger.trace("[TEST] job status called with id {}", jobKey); String node; String jobId; @@ -122,18 +117,18 @@ public class Executor { node = extractNode(jobKey); jobId = extractId(jobKey); }catch (IdNotValidException e) { - logger.error("id not valid "+jobKey,e); + logger.error("id not valid {} ",jobKey,e); throw new InvalidIdentifierException(jobKey); } if (node.equals(cxt.container().profile(HostingNode.class).id())){ - if (!jobMap.containsKey(jobId)){ - logger.trace("id not found, throwing IDNotValidExceoption"); + if (!initializer.getJobMap().containsKey(jobId)){ + logger.warn("id not found, throwing IDNotValidExceoption"); throw new InvalidIdentifierException(jobId); } - SpeciesJob job = jobMap.get(jobId); + SpeciesJob job = initializer.getJobMap().get(jobId); CompleteJobStatus status = new CompleteJobStatus(); @@ -155,60 +150,17 @@ public class Executor { return status; }else{ - //TODO - return null ; //remoteJobCall(node).getStatus(jobKey); + logger.error("node not valid {} ",node); + throw new InvalidIdentifierException(); } } - - public static void storeJobMap(ApplicationContext context){ - logger.trace("calling store job Map"); - ObjectOutputStream oos = null; - File file = null; - try { - file = context.persistence().file(jobMapFileName); - //if (file.exists()) file.delete(); - //file.createNewFile(); - oos = new ObjectOutputStream(new FileOutputStream(file)); - oos.writeObject(jobMap); - - } catch (Exception e) { - logger.error("error writing jobMapof type "+jobMap.getClass().getName()+" on disk",e); - if (file !=null && file.exists()) file.delete(); - }finally{ - if (oos!=null) - try { - oos.close(); - } catch (IOException e) { - logger.warn("error closing stream",e); - } - } - } - - @SuppressWarnings("unchecked") - public static void loadJobMap(ApplicationContext context){ - logger.trace("calling load job Map"); - ObjectInput ois; - try { - ois = new ObjectInputStream(new FileInputStream(context.persistence().file(jobMapFileName))); - jobMap = (HashMap) ois.readObject(); - for (Entry entry : jobMap.entrySet()) - if (entry.getValue().getStatus().equals(JobStatus.RUNNING)) - entry.getValue().setStatus(JobStatus.FAILED); - ois.close(); - } catch (Exception e) { - logger.trace("the file doesn't exist, creating an empty map"); - jobMap = new HashMap(); - } - } - - @DELETE @Path("{jobKey}") public void removeJob(@PathParam("jobKey") String jobId) throws InvalidIdentifierException { - if (!jobMap.containsKey(jobId)) throw new InvalidIdentifierException(jobId); - jobMap.remove(jobId); + if (!initializer.getJobMap().containsKey(jobId)) throw new InvalidIdentifierException(jobId); + initializer.getJobMap().remove(jobId); } @@ -249,15 +201,15 @@ public class Executor { if (job ==null || !job.validateInput(request.getInput())) throw new InvalidJobException(); - + String jobId = executeJob(job); - + logger.trace("[TEST] job submitted with id {}", jobId); return new SubmitJobResponse(job.getId(), jobId, cxt.profile(GCoreEndpoint.class).id()); } private String executeJob(SpeciesJob job){ - jobMap.put(job.getId(), job); + initializer.getJobMap().put(job.getId(), job); ExecutorsContainer.execJob(job); return createKey(job.getId()); } diff --git a/src/main/java/org/gcube/data/spd/utils/ExecutorsContainer.java b/src/main/java/org/gcube/data/spd/utils/ExecutorsContainer.java index 23d42b0..8cc5ef4 100644 --- a/src/main/java/org/gcube/data/spd/utils/ExecutorsContainer.java +++ b/src/main/java/org/gcube/data/spd/utils/ExecutorsContainer.java @@ -3,15 +3,17 @@ package org.gcube.data.spd.utils; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import com.google.common.util.concurrent.ThreadFactoryBuilder; + public class ExecutorsContainer { private static final int MAX_SEARCH_THREAD_POOL= 100; private static final int MAX_JOB_POOL= 10; - private static ExecutorService searchThreadPool = Executors.newFixedThreadPool(MAX_SEARCH_THREAD_POOL);; + private static ExecutorService searchThreadPool = Executors.newFixedThreadPool(MAX_SEARCH_THREAD_POOL, new ThreadFactoryBuilder().setNameFormat("spd-search-thread-%d").build()); - private static ExecutorService jobThreadPool = Executors.newFixedThreadPool(MAX_JOB_POOL); + private static ExecutorService jobThreadPool = Executors.newFixedThreadPool(MAX_JOB_POOL,new ThreadFactoryBuilder().setNameFormat("spd-job-thread-%d").build()); public static void execSearch(Runnable runnable){