Lucio Lelii 2017-02-28 11:22:58 +00:00
parent 9e57374771
commit b613a26119
16 changed files with 302 additions and 151 deletions

View File

@ -6,8 +6,6 @@
<local-persistence location='target' />
<exclude>/gcube/service/resultset/*</exclude>
<exclude handlers='request-accounting'>/gcube/service/*</exclude>
</application>

View File

@ -17,7 +17,6 @@
<artifactId>${artifactId}</artifactId>
<version>${version}</version>
</MavenCoordinates>
<Type>library</Type>
<Files>
<File>${build.finalName}.war</File>
</Files>

View File

@ -0,0 +1,80 @@
package org.gcube.data.spd.executor.jobs;
import java.io.Serializable;
import java.util.Calendar;
import org.gcube.data.spd.model.service.types.JobStatus;
public class SerializableSpeciesJob extends SpeciesJob implements Serializable {
/**
*
*/
private static final long serialVersionUID = 1L;
private JobStatus status;
private String id;
private int completedEntries;
private Calendar startDate;
private Calendar endDate;
public SerializableSpeciesJob(JobStatus status, String id,
int completedEntries, Calendar startDate, Calendar endDate) {
super();
this.status = status!=JobStatus.COMPLETED?JobStatus.FAILED:JobStatus.COMPLETED;
this.id = id;
this.completedEntries = completedEntries;
this.startDate = startDate;
this.endDate = endDate;
}
@Override
public void execute() {}
@Override
public boolean isResubmitPermitted() {
return false;
}
@Override
public JobStatus getStatus() {
return status;
}
@Override
public void setStatus(JobStatus status) {
this.status= status;
}
@Override
public String getId() {
return id;
}
@Override
public boolean validateInput(String input) {
return false;
}
@Override
public int getCompletedEntries() {
return completedEntries;
}
@Override
public Calendar getStartDate() {
return startDate;
}
@Override
public Calendar getEndDate() {
return endDate;
}
}

View File

@ -1,24 +1,72 @@
package org.gcube.data.spd.executor.jobs;
import java.io.Serializable;
import java.util.Calendar;
import org.gcube.accounting.datamodel.UsageRecord.OperationResult;
import org.gcube.accounting.datamodel.usagerecords.JobUsageRecord;
import org.gcube.accounting.persistence.AccountingPersistence;
import org.gcube.accounting.persistence.AccountingPersistenceFactory;
import org.gcube.common.authorization.library.provider.AuthorizationProvider;
import org.gcube.common.scope.api.ScopeProvider;
import org.gcube.data.spd.model.service.types.JobStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public interface SpeciesJob extends Serializable, Runnable {
public abstract class SpeciesJob implements Runnable {
public JobStatus getStatus() ;
public void setStatus(JobStatus status) ;
private static Logger log = LoggerFactory.getLogger(SpeciesJob.class);
public abstract JobStatus getStatus() ;
public abstract void setStatus(JobStatus status) ;
public abstract String getId();
public abstract boolean validateInput(String input);
public abstract int getCompletedEntries();
public abstract Calendar getStartDate();
public abstract Calendar getEndDate();
public abstract void execute();
public abstract boolean isResubmitPermitted();
public final void run(){
if (getStatus()!=JobStatus.PENDING && !isResubmitPermitted()){
log.warn("the job with id {} cannot be resubmitted",getId());
throw new IllegalStateException("this job cannot be resubmitted");
}
try{
execute();
}catch(Exception e){
log.error("unexpected exception in job, setting status to FAILED",e);
this.setStatus(JobStatus.FAILED);
}
generateAccounting();
}
public String getId();
public boolean validateInput(String input);
public int getCompletedEntries();
public Calendar getStartDate();
public Calendar getEndDate();
private final void generateAccounting(){
AccountingPersistence persistence = AccountingPersistenceFactory.getPersistence();
JobUsageRecord jobUsageRecord = new JobUsageRecord();
try{
jobUsageRecord.setConsumerId(AuthorizationProvider.instance.get().getClient().getId());
jobUsageRecord.setScope(ScopeProvider.instance.get());
jobUsageRecord.setJobName(this.getClass().getSimpleName());
jobUsageRecord.setOperationResult(getStatus()==JobStatus.COMPLETED?OperationResult.SUCCESS:OperationResult.FAILED);
jobUsageRecord.setJobId(this.getId());
jobUsageRecord.setJobStartTime(this.getStartDate());
jobUsageRecord.setJobEndTime(this.getEndDate());
persistence.account(jobUsageRecord);
log.info("Job {} accounted successfully",getId());
}catch(Exception ex){
log.warn("invalid record passed to accounting ",ex);
}
}
}

View File

@ -1,9 +1,9 @@
package org.gcube.data.spd.executor.jobs;
public interface URLJob extends SpeciesJob {
public abstract class URLJob extends SpeciesJob {
public String getResultURL() ;
public abstract String getResultURL() ;
public String getErrorURL() ;
public abstract String getErrorURL() ;
}

View File

@ -8,12 +8,6 @@ import org.gcube.data.spd.plugin.fwk.AbstractPlugin;
public class CSVCreator extends CSVJob{
/**
*
*/
private static final long serialVersionUID = 1L;
private static transient OccurrenceCSVConverter converter;
public CSVCreator(Map<String, AbstractPlugin> plugins) {
@ -31,4 +25,5 @@ public class CSVCreator extends CSVJob{
public List<String> getHeader() {
return OccurrenceCSVConverter.HEADER;
}
}

View File

@ -7,12 +7,6 @@ import org.gcube.data.spd.model.products.OccurrencePoint;
import org.gcube.data.spd.plugin.fwk.AbstractPlugin;
public class CSVCreatorForOMJob extends CSVJob{
/**
*
*/
private static final long serialVersionUID = 1L;
private static transient OccurrenceCSVConverterOpenModeller converter;

View File

@ -30,14 +30,7 @@ import org.gcube.data.streams.Stream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
abstract class CSVJob implements URLJob{
/**
*
*/
private static final long serialVersionUID = 1L;
abstract class CSVJob extends URLJob{
private static Logger logger = LoggerFactory.getLogger(CSVJob.class);
@ -62,9 +55,8 @@ abstract class CSVJob implements URLJob{
this.plugins = plugins;
}
@Override
public void run() {
public void execute() {
File csvFile = null;
try{
this.startDate = Calendar.getInstance();
@ -161,7 +153,6 @@ abstract class CSVJob implements URLJob{
@Override
public String getErrorURL() {
// TODO Auto-generated method stub
return null;
}
@ -191,4 +182,10 @@ abstract class CSVJob implements URLJob{
return startDate;
}
@Override
public boolean isResubmitPermitted() {
return false;
}
}

View File

@ -37,12 +37,7 @@ import org.gcube.data.streams.generators.Generator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class DarwinCoreJob implements URLJob{
/**
*
*/
private static final long serialVersionUID = 1L;
public class DarwinCoreJob extends URLJob{
private static Logger logger = LoggerFactory.getLogger(DarwinCoreJob.class);
@ -67,10 +62,14 @@ public class DarwinCoreJob implements URLJob{
this.status = JobStatus.PENDING;
this.plugins = plugins;
}
@Override
public void run() {
public boolean isResubmitPermitted() {
return false;
}
@Override
public void execute() {
File darwincoreFile =null;
File errorFile = null;
try{

View File

@ -35,12 +35,7 @@ import org.gcube.data.spd.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class DWCAJobByChildren implements URLJob{
/**
*
*/
private static final long serialVersionUID = 1L;
public class DWCAJobByChildren extends URLJob{
private static Logger logger = LoggerFactory.getLogger(DWCAJobByChildren.class);
@ -73,7 +68,7 @@ public class DWCAJobByChildren implements URLJob{
}
private AbstractPlugin pluginToUse = null;
private AbstractPlugin getPlugin(String key) throws Exception{
if (pluginToUse==null){
String pluginName = Util.getProviderFromKey(key);
@ -89,7 +84,12 @@ public class DWCAJobByChildren implements URLJob{
}
@Override
public void run() {
public boolean isResubmitPermitted() {
return true;
}
@Override
public void execute() {
File errorFile = null;
File dwcaFile = null;
try{

View File

@ -36,12 +36,7 @@ import org.gcube.data.streams.Stream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class DWCAJobByIds implements URLJob{
/**
*
*/
private static final long serialVersionUID = 1L;
public class DWCAJobByIds extends URLJob{
private static Logger logger = LoggerFactory.getLogger(DWCAJobByChildren.class);
@ -66,9 +61,17 @@ public class DWCAJobByIds implements URLJob{
this.status = JobStatus.PENDING;
}
@Override
public void run() {
public boolean isResubmitPermitted() {
return false;
}
@Override
public void execute() {
File errorsFile= null;
File dwcaFile = null;
try{

View File

@ -24,12 +24,8 @@ import org.slf4j.LoggerFactory;
import com.thoughtworks.xstream.XStream;
public class LayerCreatorJob implements URLJob{
public class LayerCreatorJob extends URLJob{
/**
*
*/
private static final long serialVersionUID = -6560318170190865925L;
private static Logger logger = LoggerFactory.getLogger(LayerCreatorJob.class);
@ -55,6 +51,11 @@ public class LayerCreatorJob implements URLJob{
this.plugins = plugins;
this.metadata = (MetadataDetails) new XStream().fromXML(metadataDetails);
}
@Override
public boolean isResubmitPermitted() {
return false;
}
@Override
public JobStatus getStatus() {
@ -107,7 +108,7 @@ public class LayerCreatorJob implements URLJob{
}
@Override
public void run() {
public void execute() {
try{
this.startDate = Calendar.getInstance();
this.status = JobStatus.RUNNING;

View File

@ -1,7 +1,20 @@
package org.gcube.data.spd.manager;
import org.gcube.common.authorization.library.provider.SecurityTokenProvider;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.util.HashMap;
import java.util.Map.Entry;
import net.sf.ehcache.CacheManager;
import org.gcube.common.scope.api.ScopeProvider;
import org.gcube.data.spd.executor.jobs.SerializableSpeciesJob;
import org.gcube.data.spd.executor.jobs.SpeciesJob;
import org.gcube.data.spd.plugin.PluginManager;
import org.gcube.data.spd.utils.ExecutorsContainer;
import org.gcube.smartgears.ApplicationManager;
@ -12,27 +25,96 @@ import org.slf4j.LoggerFactory;
public class AppInitializer implements ApplicationManager {
private static final Logger log = LoggerFactory.getLogger(AppInitializer.class);
private static final Logger logger = LoggerFactory.getLogger(AppInitializer.class);
private static final String jobMapFileNamePrefix = "jobs";
private HashMap<String, SpeciesJob> jobMap;
private PluginManager pluginManager;
private ApplicationContext ctx = ContextProvider.get();
@Override
public void onInit() {
log.info("security token is "+SecurityTokenProvider.instance.get());
logger.info("[TEST] init called for SPD in scope {} ", ScopeProvider.instance.get());
jobMap= new HashMap<String, SpeciesJob>();
pluginManager = new PluginManager(ctx);
loadJobMap();
}
@Override
public void onShutdown() {
storeJobMap();
pluginManager.shutdown();
pluginManager = null;
ExecutorsContainer.stopAll();
log.info("App Initializer shut down on "+ScopeProvider.instance.get());
CacheManager.getInstance().shutdown();
logger.info("[TEST] App Initializer shut down on "+ScopeProvider.instance.get());
}
public PluginManager getPluginManager() {
return pluginManager;
}
public HashMap<String, SpeciesJob> getJobMap() {
return jobMap;
}
private void storeJobMap(){
String scopeNamePrefix= ScopeProvider.instance.get().replaceAll("/", ".");
String jobMapFileName = jobMapFileNamePrefix+scopeNamePrefix;
logger.trace("[TEST] storing job map file {}",jobMapFileName);
HashMap<String, SerializableSpeciesJob> spdJobMap = new HashMap<String, SerializableSpeciesJob>();
for (Entry<String, SpeciesJob> entry : jobMap.entrySet() ){
logger.trace("[TEST] stored job with id {}",entry.getKey());
SpeciesJob spdJob = entry.getValue();
if (spdJob instanceof SerializableSpeciesJob)
spdJobMap.put(entry.getKey(),(SerializableSpeciesJob)spdJob);
else
spdJobMap.put(entry.getKey(), new SerializableSpeciesJob(spdJob.getStatus(), spdJob.getId(),
spdJob.getCompletedEntries(), spdJob.getStartDate(), spdJob.getEndDate()));
}
File file = null;
try {
file = ctx.persistence().file(jobMapFileName);
if (file.exists()) file.delete();
file.createNewFile();
try(ObjectOutputStream oos = new ObjectOutputStream(new FileOutputStream(file))){
oos.writeObject(spdJobMap);
}
} catch (Exception e) {
logger.error("error writing jobMapof type "+jobMap.getClass().getName()+" on disk",e);
if (file !=null && file.exists()) file.delete();
}
}
@SuppressWarnings("unchecked")
private void loadJobMap(){
String scopeNamePrefix= ScopeProvider.instance.get().replaceAll("/", ".");
String jobMapFileName = jobMapFileNamePrefix+scopeNamePrefix;
logger.trace("[TEST] loading job Map from file {} ",jobMapFileName);
File file = ctx.persistence().file(jobMapFileName);
if (file.exists()){
file.delete();
}
try {
file.createNewFile();
} catch (IOException e1) {
logger.error("cannot create file {}",file.getAbsolutePath(),e1);
jobMap= new HashMap<String, SpeciesJob>();
return;
}
try (ObjectInput ois = new ObjectInputStream(new FileInputStream(file))){
jobMap = (HashMap<String, SpeciesJob>) ois.readObject();
} catch (Exception e) {
logger.warn("[TEST] the file {} doesn't exist, creating an empty map",file.getAbsolutePath());
jobMap= new HashMap<String, SpeciesJob>();
}
logger.trace("[TEST] loaded map is with lenght {} ",jobMap.size());
}
}

View File

@ -66,6 +66,7 @@ public class Search<T extends ResultElement> {
@SuppressWarnings("unchecked")
public void search(Map<String, Searchable<T>> searchableMapping, Query parsedQuery, Condition ... properties) throws UnsupportedCapabilityException, UnsupportedPluginException, Exception {
ClosableWriter<T> outputWriter = new Writer<T>(wrapper);
//preparing the query (and checking semantic)
List<Worker<?, ?>> workers = new ArrayList<Worker<?, ?>>();

View File

@ -1,14 +1,6 @@
package org.gcube.data.spd.resources;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map.Entry;
@ -55,9 +47,7 @@ public class Executor {
private static Logger logger = LoggerFactory.getLogger(Executor.class);
public static HashMap<String, SpeciesJob> jobMap= new HashMap<String, SpeciesJob>();
private static final String jobMapFileName = "jobs.ser";
AppInitializer initializer = (AppInitializer)ApplicationManagerProvider.get(AppInitializer.class);
@ -66,7 +56,6 @@ public class Executor {
@GET
@Path("result/{jobKey}")
public String getResultLink(@PathParam("jobKey") String jobKey) throws InvalidIdentifierException {
String node;
String jobId;
@ -79,11 +68,14 @@ public class Executor {
}
if (node.equals(cxt.container().profile(HostingNode.class).id())){
if (!jobMap.containsKey(jobId)) throw new InvalidIdentifierException(jobId);
return ((URLJob)jobMap.get(jobId)).getResultURL();
if (!initializer.getJobMap().containsKey(jobId)){
logger.error("id not valid {} ",jobId);
throw new InvalidIdentifierException(jobId);
}
return ((URLJob)initializer.getJobMap().get(jobId)).getResultURL();
}else {
//TODO
return null; // remoteJobCall(node).getResultLink(jobKey);
logger.error("node not valid {} ",node);
throw new InvalidIdentifierException();
}
}
@ -103,18 +95,21 @@ public class Executor {
}
if (node.equals(cxt.container().profile(HostingNode.class).id())){
if (!jobMap.containsKey(jobId)) throw new InvalidIdentifierException();
return ((URLJob)jobMap.get(jobId)).getErrorURL();
if (!initializer.getJobMap().containsKey(jobId)){
logger.error("id not valid {} ",jobId);
throw new InvalidIdentifierException();
}
return ((URLJob)initializer.getJobMap().get(jobId)).getErrorURL();
}else{
//TODO
return null; // remoteJobCall(node).getErrorLink(jobKey);
logger.error("node not valid {} ",node);
throw new InvalidIdentifierException();
}
}
@GET
@Path("status/{jobKey}")
public CompleteJobStatus getStatus(@PathParam("jobKey") String jobKey) throws InvalidIdentifierException {
logger.trace("[TEST] job status called with id {}", jobKey);
String node;
String jobId;
@ -122,18 +117,18 @@ public class Executor {
node = extractNode(jobKey);
jobId = extractId(jobKey);
}catch (IdNotValidException e) {
logger.error("id not valid "+jobKey,e);
logger.error("id not valid {} ",jobKey,e);
throw new InvalidIdentifierException(jobKey);
}
if (node.equals(cxt.container().profile(HostingNode.class).id())){
if (!jobMap.containsKey(jobId)){
logger.trace("id not found, throwing IDNotValidExceoption");
if (!initializer.getJobMap().containsKey(jobId)){
logger.warn("id not found, throwing IDNotValidExceoption");
throw new InvalidIdentifierException(jobId);
}
SpeciesJob job = jobMap.get(jobId);
SpeciesJob job = initializer.getJobMap().get(jobId);
CompleteJobStatus status = new CompleteJobStatus();
@ -155,60 +150,17 @@ public class Executor {
return status;
}else{
//TODO
return null ; //remoteJobCall(node).getStatus(jobKey);
logger.error("node not valid {} ",node);
throw new InvalidIdentifierException();
}
}
public static void storeJobMap(ApplicationContext context){
logger.trace("calling store job Map");
ObjectOutputStream oos = null;
File file = null;
try {
file = context.persistence().file(jobMapFileName);
//if (file.exists()) file.delete();
//file.createNewFile();
oos = new ObjectOutputStream(new FileOutputStream(file));
oos.writeObject(jobMap);
} catch (Exception e) {
logger.error("error writing jobMapof type "+jobMap.getClass().getName()+" on disk",e);
if (file !=null && file.exists()) file.delete();
}finally{
if (oos!=null)
try {
oos.close();
} catch (IOException e) {
logger.warn("error closing stream",e);
}
}
}
@SuppressWarnings("unchecked")
public static void loadJobMap(ApplicationContext context){
logger.trace("calling load job Map");
ObjectInput ois;
try {
ois = new ObjectInputStream(new FileInputStream(context.persistence().file(jobMapFileName)));
jobMap = (HashMap<String, SpeciesJob>) ois.readObject();
for (Entry<String, SpeciesJob> entry : jobMap.entrySet())
if (entry.getValue().getStatus().equals(JobStatus.RUNNING))
entry.getValue().setStatus(JobStatus.FAILED);
ois.close();
} catch (Exception e) {
logger.trace("the file doesn't exist, creating an empty map");
jobMap = new HashMap<String, SpeciesJob>();
}
}
@DELETE
@Path("{jobKey}")
public void removeJob(@PathParam("jobKey") String jobId) throws InvalidIdentifierException {
if (!jobMap.containsKey(jobId)) throw new InvalidIdentifierException(jobId);
jobMap.remove(jobId);
if (!initializer.getJobMap().containsKey(jobId)) throw new InvalidIdentifierException(jobId);
initializer.getJobMap().remove(jobId);
}
@ -249,15 +201,15 @@ public class Executor {
if (job ==null || !job.validateInput(request.getInput()))
throw new InvalidJobException();
String jobId = executeJob(job);
logger.trace("[TEST] job submitted with id {}", jobId);
return new SubmitJobResponse(job.getId(), jobId, cxt.profile(GCoreEndpoint.class).id());
}
private String executeJob(SpeciesJob job){
jobMap.put(job.getId(), job);
initializer.getJobMap().put(job.getId(), job);
ExecutorsContainer.execJob(job);
return createKey(job.getId());
}

View File

@ -3,15 +3,17 @@ package org.gcube.data.spd.utils;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
public class ExecutorsContainer {
private static final int MAX_SEARCH_THREAD_POOL= 100;
private static final int MAX_JOB_POOL= 10;
private static ExecutorService searchThreadPool = Executors.newFixedThreadPool(MAX_SEARCH_THREAD_POOL);;
private static ExecutorService searchThreadPool = Executors.newFixedThreadPool(MAX_SEARCH_THREAD_POOL, new ThreadFactoryBuilder().setNameFormat("spd-search-thread-%d").build());
private static ExecutorService jobThreadPool = Executors.newFixedThreadPool(MAX_JOB_POOL);
private static ExecutorService jobThreadPool = Executors.newFixedThreadPool(MAX_JOB_POOL,new ThreadFactoryBuilder().setNameFormat("spd-job-thread-%d").build());
public static void execSearch(Runnable runnable){