diff --git a/pom.xml b/pom.xml index b9ac0c8..0074df1 100644 --- a/pom.xml +++ b/pom.xml @@ -252,11 +252,12 @@ + --> org.apache.maven.plugins maven-assembly-plugin @@ -278,4 +279,6 @@ + + \ No newline at end of file diff --git a/src/main/java/org/gcube/data/spd/resources/Classification.java b/src/main/java/org/gcube/data/spd/resources/Classification.java index 269ea63..092a207 100644 --- a/src/main/java/org/gcube/data/spd/resources/Classification.java +++ b/src/main/java/org/gcube/data/spd/resources/Classification.java @@ -13,6 +13,7 @@ import javax.ws.rs.Path; import javax.ws.rs.PathParam; import javax.ws.rs.core.Response; +import org.gcube.common.resources.gcore.GCoreEndpoint; import org.gcube.data.spd.exception.MaxRetriesReachedException; import org.gcube.data.spd.manager.AppInitializer; import org.gcube.data.spd.manager.TaxonomyItemWriterManager; @@ -26,6 +27,7 @@ import org.gcube.data.spd.model.products.TaxonomyItem; import org.gcube.data.spd.model.service.exceptions.InvalidIdentifierException; import org.gcube.data.spd.model.service.exceptions.UnsupportedCapabilityException; import org.gcube.data.spd.model.service.exceptions.UnsupportedPluginException; +import org.gcube.data.spd.model.service.types.MultiLocatorResponse; import org.gcube.data.spd.model.util.Capabilities; import org.gcube.data.spd.plugin.PluginManager; import org.gcube.data.spd.plugin.fwk.AbstractPlugin; @@ -89,7 +91,8 @@ public class Classification { redirectUri.append(ctx.application().getContextPath()).append(Constants.APPLICATION_ROOT_PATH).append("/").append(Constants.RESULTSET_PATH).append("/").append(wrapper.getLocator()); logger.trace("redirect uri is {} ",redirectUri.toString()); try{ - return Response.temporaryRedirect(new URI(redirectUri.toString())).build(); + MultiLocatorResponse multiLocatorResponse = new MultiLocatorResponse( wrapper.getLocator(), null, ctx.profile(GCoreEndpoint.class).id()); + return Response.temporaryRedirect(new URI(redirectUri.toString())).entity(multiLocatorResponse).build(); }catch(Exception e){ logger.error("invalid redirect uri created",e); return Response.serverError().build(); @@ -138,7 +141,8 @@ public class Classification { redirectUri.append(ctx.application().getContextPath()).append(Constants.APPLICATION_ROOT_PATH).append("/").append(Constants.RESULTSET_PATH).append("/").append(wrapper.getLocator()); logger.trace("redirect uri is {} ",redirectUri.toString()); try{ - return Response.temporaryRedirect(new URI(redirectUri.toString())).build(); + MultiLocatorResponse multiLocatorResponse = new MultiLocatorResponse(wrapper.getLocator(), null, ctx.profile(GCoreEndpoint.class).id()); + return Response.temporaryRedirect(new URI(redirectUri.toString())).entity(multiLocatorResponse).build(); }catch(Exception e){ logger.error("invalid redirect uri created",e); return Response.serverError().build(); @@ -193,7 +197,8 @@ public class Classification { redirectUri.append(ctx.application().getContextPath()).append(Constants.APPLICATION_ROOT_PATH).append("/").append(Constants.RESULTSET_PATH).append("/").append(wrapper.getLocator()); logger.trace("redirect uri is {} ",redirectUri.toString()); try{ - return Response.temporaryRedirect(new URI(redirectUri.toString())).build(); + MultiLocatorResponse multiLocatorResponse = new MultiLocatorResponse(null, wrapper.getLocator(), ctx.profile(GCoreEndpoint.class).id()); + return Response.temporaryRedirect(new URI(redirectUri.toString())).entity(multiLocatorResponse).build(); }catch(Exception e){ logger.error("invalid redirect uri created",e); return Response.serverError().build(); diff --git a/src/main/java/org/gcube/data/spd/resources/Executor.java b/src/main/java/org/gcube/data/spd/resources/Executor.java index 6d752da..5e63945 100644 --- a/src/main/java/org/gcube/data/spd/resources/Executor.java +++ b/src/main/java/org/gcube/data/spd/resources/Executor.java @@ -16,11 +16,12 @@ import javax.ws.rs.Consumes; import javax.ws.rs.DELETE; import javax.ws.rs.GET; import javax.ws.rs.POST; -import javax.ws.rs.PUT; import javax.ws.rs.Path; import javax.ws.rs.PathParam; +import javax.ws.rs.Produces; import javax.ws.rs.core.MediaType; +import org.gcube.common.resources.gcore.GCoreEndpoint; import org.gcube.common.resources.gcore.HostingNode; import org.gcube.data.spd.executor.jobs.SpeciesJob; import org.gcube.data.spd.executor.jobs.URLJob; @@ -39,9 +40,8 @@ import org.gcube.data.spd.model.service.types.CompleteJobStatus; import org.gcube.data.spd.model.service.types.JobStatus; import org.gcube.data.spd.model.service.types.NodeStatus; import org.gcube.data.spd.model.service.types.SubmitJob; -import org.gcube.data.spd.model.util.SerializableList; +import org.gcube.data.spd.model.service.types.SubmitJobResponse; import org.gcube.data.spd.plugin.PluginManager; -import org.gcube.data.spd.utils.DynamicList; import org.gcube.data.spd.utils.DynamicMap; import org.gcube.data.spd.utils.ExecutorsContainer; import org.gcube.smartgears.ApplicationManagerProvider; @@ -204,35 +204,6 @@ public class Executor { } - @PUT - @Path("input/{jobKey}") - @Consumes(MediaType.APPLICATION_XML) - public boolean submitJob(@PathParam("jobKey") String jobKey, SerializableList input) throws InvalidIdentifierException { - //String node; - String jobId; - try{ - //node = extractNode(jobKey); - jobId = extractId(jobKey); - }catch (IdNotValidException e) { - logger.error("id not valid "+jobKey,e); - throw new InvalidIdentifierException(jobKey); - } - logger.trace("job Id extracted is {} ",jobId); - if (input.getValuesList().isEmpty()){ - logger.info("closing input stream"); - DynamicMap.remove(jobId); - } - else { - DynamicList list = DynamicMap.get(jobId); - for (String id : input.getValuesList()){ - logger.trace("elaborating input id ",id); - if (!list.add(id)) return false; - } - } - return true; - } - - @DELETE @Path("{jobKey}") public void removeJob(@PathParam("jobKey") String jobId) throws InvalidIdentifierException { @@ -244,7 +215,8 @@ public class Executor { @POST @Path("execute") @Consumes(MediaType.APPLICATION_XML) - public String submitJob(SubmitJob request) throws InvalidJobException { + @Produces(MediaType.APPLICATION_XML) + public SubmitJobResponse submitJob(SubmitJob request) throws InvalidJobException { PluginManager pluginManger = initializer.getPluginManager(); SpeciesJob job = null; switch (request.getJob()) { @@ -277,7 +249,10 @@ public class Executor { if (job ==null || !job.validateInput(request.getInput())) throw new InvalidJobException(); - return executeJob(job); + + String jobId = executeJob(job); + + return new SubmitJobResponse(job.getId(), jobId, cxt.profile(GCoreEndpoint.class).id()); } diff --git a/src/main/java/org/gcube/data/spd/resources/Manager.java b/src/main/java/org/gcube/data/spd/resources/Manager.java index c72eff2..f677aa3 100644 --- a/src/main/java/org/gcube/data/spd/resources/Manager.java +++ b/src/main/java/org/gcube/data/spd/resources/Manager.java @@ -16,6 +16,7 @@ import javax.ws.rs.QueryParam; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; +import org.gcube.common.resources.gcore.GCoreEndpoint; import org.gcube.data.spd.caching.QueryCacheFactory; import org.gcube.data.spd.manager.AppInitializer; import org.gcube.data.spd.manager.OccurrenceWriterManager; @@ -34,6 +35,7 @@ import org.gcube.data.spd.model.products.TaxonomyItem; import org.gcube.data.spd.model.service.exceptions.QueryNotValidException; import org.gcube.data.spd.model.service.exceptions.UnsupportedCapabilityException; import org.gcube.data.spd.model.service.exceptions.UnsupportedPluginException; +import org.gcube.data.spd.model.service.types.MultiLocatorResponse; import org.gcube.data.spd.model.service.types.PluginDescriptions; import org.gcube.data.spd.model.util.Capabilities; import org.gcube.data.spd.plugin.PluginManager; @@ -177,7 +179,8 @@ public class Manager { redirectUri.append(ctx.application().getContextPath()).append(Constants.APPLICATION_ROOT_PATH).append("/").append(Constants.RESULTSET_PATH).append("/").append(locator); logger.trace("redirect uri is {} ",redirectUri.toString()); try{ - return Response.temporaryRedirect(new URI(redirectUri.toString())).build(); + MultiLocatorResponse multiLocatorResponse = new MultiLocatorResponse(locator, null, ctx.profile(GCoreEndpoint.class).id()); + return Response.temporaryRedirect(new URI(redirectUri.toString())).entity(multiLocatorResponse).build(); }catch(Exception e){ logger.error("invalid redirect uri created",e); return Response.serverError().build(); diff --git a/src/main/java/org/gcube/data/spd/resources/Occurrences.java b/src/main/java/org/gcube/data/spd/resources/Occurrences.java index d39f2d7..3609f19 100644 --- a/src/main/java/org/gcube/data/spd/resources/Occurrences.java +++ b/src/main/java/org/gcube/data/spd/resources/Occurrences.java @@ -1,16 +1,18 @@ package org.gcube.data.spd.resources; +import static org.gcube.data.streams.dsl.Streams.convert; + import java.net.URI; import java.util.HashMap; -import java.util.List; import java.util.Map; +import java.util.UUID; import javax.ws.rs.GET; import javax.ws.rs.Path; -import javax.ws.rs.QueryParam; import javax.ws.rs.core.Response; import org.gcube.common.authorization.library.AuthorizedTasks; +import org.gcube.common.resources.gcore.GCoreEndpoint; import org.gcube.data.spd.exception.MaxRetriesReachedException; import org.gcube.data.spd.manager.AppInitializer; import org.gcube.data.spd.manager.OccurrenceWriterManager; @@ -19,6 +21,7 @@ import org.gcube.data.spd.model.exceptions.ExternalRepositoryException; import org.gcube.data.spd.model.exceptions.IdNotValidException; import org.gcube.data.spd.model.exceptions.StreamBlockingException; import org.gcube.data.spd.model.products.OccurrencePoint; +import org.gcube.data.spd.model.service.types.MultiLocatorResponse; import org.gcube.data.spd.plugin.PluginManager; import org.gcube.data.spd.plugin.fwk.AbstractPlugin; import org.gcube.data.spd.plugin.fwk.capabilities.OccurrencesCapability; @@ -28,12 +31,12 @@ import org.gcube.data.spd.plugin.fwk.writers.Writer; import org.gcube.data.spd.plugin.fwk.writers.rswrapper.AbstractWrapper; import org.gcube.data.spd.plugin.fwk.writers.rswrapper.LocalWrapper; import org.gcube.data.spd.plugin.fwk.writers.rswrapper.ResultWrapper; +import org.gcube.data.spd.utils.DynamicMap; import org.gcube.data.spd.utils.ExecutorsContainer; import org.gcube.data.spd.utils.QueryRetryCall; import org.gcube.data.spd.utils.ResultWrapperMantainer; import org.gcube.data.spd.utils.VOID; import org.gcube.data.streams.Stream; -import org.gcube.data.streams.dsl.Streams; import org.gcube.smartgears.ApplicationManagerProvider; import org.gcube.smartgears.ContextProvider; import org.gcube.smartgears.context.application.ApplicationContext; @@ -57,16 +60,18 @@ public class Occurrences{ @GET @Path("keys") - public Response getByKeys(@QueryParam("keys") List keys) { + public Response getByKeys() { try{ - logger.trace("keys arrived are {} ",keys); - - Stream reader = Streams.convert(keys.iterator()); + String inputLocatorId = UUID.randomUUID().toString(); + DynamicMap.put(inputLocatorId); + + logger.trace("locator used as input is {} ",inputLocatorId); + + Stream reader = convert(DynamicMap.get(inputLocatorId)); ResultWrapper wrapper = ResultWrapperMantainer.getWrapper(OccurrencePoint.class); - logger.trace("entering in the getOccurrence by productKeys with keys {}",keys); ExecutorsContainer.execJob(AuthorizedTasks.bind(new RunnableOccurrenceSearch(reader, wrapper, ExecType.KEYS))); // the output will be probably returned even before @@ -74,9 +79,12 @@ public class Occurrences{ StringBuilder redirectUri = new StringBuilder(); redirectUri.append("http://").append(ctx.container().configuration().hostname()).append(":").append(ctx.container().configuration().port()); redirectUri.append(ctx.application().getContextPath()).append(Constants.APPLICATION_ROOT_PATH).append("/").append(Constants.RESULTSET_PATH).append("/").append(wrapper.getLocator()); + logger.trace("redirect uri is {} ",redirectUri.toString()); try{ - return Response.temporaryRedirect(new URI(redirectUri.toString())).build(); + MultiLocatorResponse multiLocatorResponse = new MultiLocatorResponse(wrapper.getLocator(), inputLocatorId, ctx.profile(GCoreEndpoint.class).id()); + logger.trace("retrnign multilocator {}",multiLocatorResponse); + return Response.temporaryRedirect(new URI(redirectUri.toString())).entity(multiLocatorResponse).build(); }catch(Exception e){ logger.error("invalid redirect uri created",e); return Response.serverError().build(); @@ -89,9 +97,13 @@ public class Occurrences{ @GET @Path("ids") - public Response getByIds(@QueryParam("ids") List ids){ + public Response getByIds(){ try{ - Stream reader = Streams.convert(ids.iterator()); + + String inputLocatorId = UUID.randomUUID().toString(); + DynamicMap.put(inputLocatorId); + + Stream reader = convert(DynamicMap.get(inputLocatorId)); ResultWrapper wrapper = ResultWrapperMantainer.getWrapper(OccurrencePoint.class); ExecutorsContainer.execJob(AuthorizedTasks.bind(new RunnableOccurrenceSearch(reader, wrapper, ExecType.IDS))); @@ -102,7 +114,8 @@ public class Occurrences{ redirectUri.append(ctx.application().getContextPath()).append(Constants.APPLICATION_ROOT_PATH).append("/").append(Constants.RESULTSET_PATH).append("/").append(wrapper.getLocator()); logger.trace("redirect uri is {} ",redirectUri.toString()); try{ - return Response.temporaryRedirect(new URI(redirectUri.toString())).build(); + MultiLocatorResponse multiLocatorResponse = new MultiLocatorResponse(wrapper.getLocator(), inputLocatorId, ctx.profile(GCoreEndpoint.class).id()); + return Response.temporaryRedirect(new URI(redirectUri.toString())).entity(multiLocatorResponse).build(); }catch(Exception e){ logger.error("invalid redirect uri created",e); return Response.serverError().build(); diff --git a/src/main/java/org/gcube/data/spd/resources/ResultSetEndpoint.java b/src/main/java/org/gcube/data/spd/resources/ResultSetEndpoint.java index 9250687..6934958 100644 --- a/src/main/java/org/gcube/data/spd/resources/ResultSetEndpoint.java +++ b/src/main/java/org/gcube/data/spd/resources/ResultSetEndpoint.java @@ -1,13 +1,19 @@ package org.gcube.data.spd.resources; +import javax.ws.rs.Consumes; import javax.ws.rs.DELETE; import javax.ws.rs.GET; +import javax.ws.rs.PUT; import javax.ws.rs.Path; import javax.ws.rs.PathParam; import javax.ws.rs.Produces; import javax.ws.rs.core.MediaType; import org.gcube.data.spd.model.Constants; +import org.gcube.data.spd.model.service.exceptions.InvalidIdentifierException; +import org.gcube.data.spd.model.util.SerializableList; +import org.gcube.data.spd.utils.DynamicList; +import org.gcube.data.spd.utils.DynamicMap; import org.gcube.data.spd.utils.ResultWrapperMantainer; import org.glassfish.jersey.server.ChunkedOutput; import org.slf4j.Logger; @@ -17,8 +23,8 @@ import org.slf4j.LoggerFactory; @Path(value = Constants.RESULTSET_PATH) public class ResultSetEndpoint { - Logger logger = LoggerFactory.getLogger(ResultSetEndpoint.class); - + private static Logger logger = LoggerFactory.getLogger(ResultSetEndpoint.class); + @GET @Produces(MediaType.TEXT_XML) @Path("{locator}") @@ -35,4 +41,28 @@ public class ResultSetEndpoint { ResultWrapperMantainer.remove(locator); } + @PUT + @Consumes(MediaType.APPLICATION_XML) + @Path("{locator}") + public boolean sendInput(@PathParam("locator") String id, SerializableList input) throws InvalidIdentifierException { + //String node; + DynamicList list = DynamicMap.get(id); + if (list==null){ + logger.error("id not valid "+id); + throw new InvalidIdentifierException(id); + } + logger.trace("input id is {} ",id); + if (input.getValuesList().isEmpty()){ + logger.info("closing input stream"); + DynamicMap.remove(id); + } + else { + for (String singleInput : input.getValuesList()){ + logger.trace("elaborating input",singleInput); + if (!list.add(singleInput)) return false; + } + } + return true; + } + } diff --git a/src/main/java/org/gcube/data/spd/utils/DynamicList.java b/src/main/java/org/gcube/data/spd/utils/DynamicList.java index c0d2533..873380b 100644 --- a/src/main/java/org/gcube/data/spd/utils/DynamicList.java +++ b/src/main/java/org/gcube/data/spd/utils/DynamicList.java @@ -11,8 +11,8 @@ public class DynamicList implements Iterator{ private static Logger logger = LoggerFactory.getLogger(DynamicList.class); - private final static long TIMEOUT_IN_MILLIS = 1000; - private final static int RETRY = 10; + private final static long TIMEOUT_IN_MILLIS = 2000; + private final static int RETRY = 20; private LinkedBlockingQueue internalQueue = new LinkedBlockingQueue(50);