From 78b07ce32bce0318dbb7d2fa7a5f5615d624dbd6 Mon Sep 17 00:00:00 2001 From: Lucio Lelii Date: Mon, 20 Feb 2017 15:26:10 +0000 Subject: [PATCH] git-svn-id: http://svn.research-infrastructures.eu/public/d4science/gcube/branches/data-access/spd-client-library/4.0@142785 82a268e6-3cf1-43bd-a215-b396298e98cf --- .../org/gcube/data/spd/client/Constants.java | 4 +- .../data/spd/client/JerseyRecordIterator.java | 11 ++- .../client/ResultElementRecordIterator.java | 4 +- .../gcube/data/spd/client/ResultLocator.java | 29 ------- .../spd/client/plugins/AbstractPlugin.java | 7 +- .../spd/client/plugins/ResultSetPlugin.java | 2 +- .../client/proxies/DefaultClassification.java | 39 ++++------ .../spd/client/proxies/DefaultExecutor.java | 77 +++++-------------- .../spd/client/proxies/DefaultManager.java | 17 ++-- .../spd/client/proxies/DefaultOccurrence.java | 76 ++++-------------- .../spd/client/proxies/DefaultResultSet.java | 56 ++++++++++++-- .../spd/client/proxies/OccurrenceClient.java | 3 - .../spd/client/proxies/ResultSetClient.java | 2 +- 13 files changed, 123 insertions(+), 204 deletions(-) delete mode 100644 src/main/java/org/gcube/data/spd/client/ResultLocator.java diff --git a/src/main/java/org/gcube/data/spd/client/Constants.java b/src/main/java/org/gcube/data/spd/client/Constants.java index 68b90ed..ee0f981 100644 --- a/src/main/java/org/gcube/data/spd/client/Constants.java +++ b/src/main/java/org/gcube/data/spd/client/Constants.java @@ -25,7 +25,9 @@ public class Constants { public static final QName EXECUTOR_QNAME = new QName(NAMESPACE, "executor"); public static final QName RESULTSET_QNAME = new QName(NAMESPACE, "resultset"); - + + public static final int INPUT_BUNCH = 30; + /* public static final GcubeService manager = service().withName(org.gcube.data.spd.model.service.Constants.manager_name).andInterface(ManagerStubs.class); diff --git a/src/main/java/org/gcube/data/spd/client/JerseyRecordIterator.java b/src/main/java/org/gcube/data/spd/client/JerseyRecordIterator.java index 4ab47de..59b3c9f 100644 --- a/src/main/java/org/gcube/data/spd/client/JerseyRecordIterator.java +++ b/src/main/java/org/gcube/data/spd/client/JerseyRecordIterator.java @@ -43,9 +43,9 @@ public abstract class JerseyRecordIterator implements Iterator, Closeable{ private ChunkedInputReader chunkedInputReader; - public JerseyRecordIterator(ResultLocator locator, long timeout, TimeUnit timeoutUnit) { - this.resultSetClient = AbstractPlugin.resultset().at(locator.getHost(), locator.getPort()).build(); - this.locator = locator.getLocator(); + public JerseyRecordIterator(String endpointId, String locator, long timeout, TimeUnit timeoutUnit) { + this.resultSetClient = AbstractPlugin.resultset(endpointId).build(); + this.locator = locator; this.timeoutInMillis = timeoutUnit.toMillis(timeout); } @@ -55,8 +55,7 @@ public abstract class JerseyRecordIterator implements Iterator, Closeable{ public boolean hasNext() { if (this.chunkedInput==null) initializeChunckedInput(); - - + if (chunkedInput.isClosed() && queue.isEmpty()) return false; try { long startTime = System.currentTimeMillis(); @@ -64,7 +63,7 @@ public abstract class JerseyRecordIterator implements Iterator, Closeable{ while(retrievedElement==null && (System.currentTimeMillis()-startTime)<=timeoutInMillis && (!chunkedInput.isClosed() || !queue.isEmpty() )) retrievedElement = queue.poll(INTERNAL_TIMEOUT_IN_MILLIS, TimeUnit.MILLISECONDS); - + currentElement = retrievedElement; return currentElement!=null; } catch (InterruptedException e) { diff --git a/src/main/java/org/gcube/data/spd/client/ResultElementRecordIterator.java b/src/main/java/org/gcube/data/spd/client/ResultElementRecordIterator.java index 0ceb2be..34f2837 100644 --- a/src/main/java/org/gcube/data/spd/client/ResultElementRecordIterator.java +++ b/src/main/java/org/gcube/data/spd/client/ResultElementRecordIterator.java @@ -9,9 +9,9 @@ import org.gcube.data.spd.model.products.ResultElement; public class ResultElementRecordIterator extends JerseyRecordIterator { - public ResultElementRecordIterator(ResultLocator locator, + public ResultElementRecordIterator(String endpointId, String locator, long timeout, TimeUnit timeoutUnit) { - super(locator, timeout, timeoutUnit); + super(endpointId, locator, timeout, timeoutUnit); } @Override diff --git a/src/main/java/org/gcube/data/spd/client/ResultLocator.java b/src/main/java/org/gcube/data/spd/client/ResultLocator.java deleted file mode 100644 index 211399e..0000000 --- a/src/main/java/org/gcube/data/spd/client/ResultLocator.java +++ /dev/null @@ -1,29 +0,0 @@ -package org.gcube.data.spd.client; - -public class ResultLocator { - - private String host; - private int port; - private String locator; - - public ResultLocator(String host, int port, String locator) { - super(); - this.host = host; - this.port = port; - this.locator = locator; - } - - public String getHost() { - return host; - } - - public int getPort() { - return port; - } - - public String getLocator() { - return locator; - } - - -} diff --git a/src/main/java/org/gcube/data/spd/client/plugins/AbstractPlugin.java b/src/main/java/org/gcube/data/spd/client/plugins/AbstractPlugin.java index 3686086..2f3fdf1 100644 --- a/src/main/java/org/gcube/data/spd/client/plugins/AbstractPlugin.java +++ b/src/main/java/org/gcube/data/spd/client/plugins/AbstractPlugin.java @@ -2,6 +2,7 @@ package org.gcube.data.spd.client.plugins; import javax.ws.rs.client.WebTarget; +import org.gcube.common.clients.LegacyQuery; import org.gcube.common.clients.Plugin; import org.gcube.common.clients.ProxyBuilder; import org.gcube.common.clients.ProxyBuilderImpl; @@ -39,8 +40,10 @@ public abstract class AbstractPlugin implements Plugin { return new ProxyBuilderImpl(executor_plugin); } - public static ProxyBuilder resultset() { - return new ProxyBuilderImpl(resultset_plugin); + public static ProxyBuilder resultset(String endpointId) { + LegacyQuery query = new LegacyQuery(resultset_plugin); + query.addCondition("$resource/ID/string() eq '"+endpointId+"'"); + return new ProxyBuilderImpl(resultset_plugin, query); } public final String name; diff --git a/src/main/java/org/gcube/data/spd/client/plugins/ResultSetPlugin.java b/src/main/java/org/gcube/data/spd/client/plugins/ResultSetPlugin.java index cdfed69..a35a394 100644 --- a/src/main/java/org/gcube/data/spd/client/plugins/ResultSetPlugin.java +++ b/src/main/java/org/gcube/data/spd/client/plugins/ResultSetPlugin.java @@ -40,5 +40,5 @@ public class ResultSetPlugin extends AbstractPlugin GcubeService service = GcubeService.service().withName(Constants.RESULTSET_QNAME).andPath("resultset"); return TargetFactory.stubFor(service).at(address); } - + } diff --git a/src/main/java/org/gcube/data/spd/client/proxies/DefaultClassification.java b/src/main/java/org/gcube/data/spd/client/proxies/DefaultClassification.java index 7fdc21d..ec32b8e 100644 --- a/src/main/java/org/gcube/data/spd/client/proxies/DefaultClassification.java +++ b/src/main/java/org/gcube/data/spd/client/proxies/DefaultClassification.java @@ -9,12 +9,11 @@ import javax.ws.rs.core.Response; import org.gcube.common.clients.Call; import org.gcube.common.clients.delegates.ProxyDelegate; import org.gcube.data.spd.client.ResultElementRecordIterator; -import org.gcube.data.spd.client.ResultLocator; -import org.gcube.data.spd.client.Utils; import org.gcube.data.spd.model.products.TaxonomyItem; import org.gcube.data.spd.model.service.exceptions.InvalidIdentifierException; import org.gcube.data.spd.model.service.exceptions.UnsupportedCapabilityException; import org.gcube.data.spd.model.service.exceptions.UnsupportedPluginException; +import org.gcube.data.spd.model.service.types.MultiLocatorResponse; import org.gcube.data.streams.Stream; import org.gcube.data.streams.dsl.Streams; @@ -30,18 +29,16 @@ public class DefaultClassification implements ClassificationClient{ public Stream getTaxonChildrenById(final String id) throws UnsupportedPluginException, UnsupportedCapabilityException, InvalidIdentifierException { - Call call = new Call() { + Call call = new Call() { @Override - public ResultLocator call(WebTarget manager) throws Exception { + public MultiLocatorResponse call(WebTarget manager) throws Exception { Response response = manager.path("children").path(id).request().get(Response.class); - String host = manager.getUri().getHost(); - int port = manager.getUri().getPort(); - return new ResultLocator(host, port, Utils.getLocatorFromResponse(response)); + return response.readEntity(MultiLocatorResponse.class); } }; try { - ResultLocator result = delegate.make(call); - ResultElementRecordIterator ri = new ResultElementRecordIterator(result, 2, TimeUnit.MINUTES); + MultiLocatorResponse result = delegate.make(call); + ResultElementRecordIterator ri = new ResultElementRecordIterator(result.getEndpointId(), result.getInputLocator(), 2, TimeUnit.MINUTES); return Streams.convert(ri); }catch(Exception e) { throw new RuntimeException(e); @@ -68,18 +65,16 @@ public class DefaultClassification implements ClassificationClient{ public Stream getTaxonTreeById(final String id) throws UnsupportedPluginException, UnsupportedCapabilityException, InvalidIdentifierException { - Call call = new Call() { + Call call = new Call() { @Override - public ResultLocator call(WebTarget manager) throws Exception { + public MultiLocatorResponse call(WebTarget manager) throws Exception { Response response = manager.path("tree").path(id).request().get(Response.class); - String host = manager.getUri().getHost(); - int port = manager.getUri().getPort(); - return new ResultLocator(host, port, Utils.getLocatorFromResponse(response)); + return response.readEntity(MultiLocatorResponse.class); } }; try { - ResultLocator result = delegate.make(call); - ResultElementRecordIterator ri = new ResultElementRecordIterator(result, 2, TimeUnit.MINUTES); + MultiLocatorResponse result = delegate.make(call); + ResultElementRecordIterator ri = new ResultElementRecordIterator(result.getEndpointId(), result.getInputLocator(), 2, TimeUnit.MINUTES); return Streams.convert(ri); }catch(Exception e) { @@ -91,18 +86,16 @@ public class DefaultClassification implements ClassificationClient{ public Stream getSynonymsById(final String id) throws UnsupportedPluginException, UnsupportedCapabilityException, InvalidIdentifierException { - Call call = new Call() { + Call call = new Call() { @Override - public ResultLocator call(WebTarget manager) throws Exception { + public MultiLocatorResponse call(WebTarget manager) throws Exception { Response response = manager.path("synonyms").path(id).request().get(Response.class); - String host = manager.getUri().getHost(); - int port = manager.getUri().getPort(); - return new ResultLocator(host, port, Utils.getLocatorFromResponse(response)); + return response.readEntity(MultiLocatorResponse.class); } }; try { - ResultLocator result = delegate.make(call); - ResultElementRecordIterator ri = new ResultElementRecordIterator(result, 2, TimeUnit.MINUTES); + MultiLocatorResponse result = delegate.make(call); + ResultElementRecordIterator ri = new ResultElementRecordIterator(result.getEndpointId(), result.getInputLocator(), 2, TimeUnit.MINUTES); return Streams.convert(ri); }catch(Exception e) { diff --git a/src/main/java/org/gcube/data/spd/client/proxies/DefaultExecutor.java b/src/main/java/org/gcube/data/spd/client/proxies/DefaultExecutor.java index 23c72f9..264e415 100644 --- a/src/main/java/org/gcube/data/spd/client/proxies/DefaultExecutor.java +++ b/src/main/java/org/gcube/data/spd/client/proxies/DefaultExecutor.java @@ -1,8 +1,5 @@ package org.gcube.data.spd.client.proxies; -import java.util.ArrayList; -import java.util.List; - import javax.ws.rs.client.Entity; import javax.ws.rs.client.WebTarget; @@ -14,7 +11,7 @@ import org.gcube.data.spd.model.service.types.CompleteJobStatus; import org.gcube.data.spd.model.service.types.JobType; import org.gcube.data.spd.model.service.types.MetadataDetails; import org.gcube.data.spd.model.service.types.SubmitJob; -import org.gcube.data.spd.model.util.SerializableList; +import org.gcube.data.spd.model.service.types.SubmitJobResponse; import org.gcube.data.streams.Stream; import com.thoughtworks.xstream.XStream; @@ -28,12 +25,12 @@ public class DefaultExecutor implements ExecutorClient{ this.delegate = delegate; } - private Call getCallForJobs(final String input, final JobType job){ - Call call = new Call() { + private Call getCallForJobs(final String input, final JobType job){ + Call call = new Call() { @Override - public String call(WebTarget executor) throws Exception { + public SubmitJobResponse call(WebTarget executor) throws Exception { SubmitJob jobRequest = new SubmitJob(input, job); - return executor.path("execute").request().post(Entity.xml(jobRequest), String.class); + return executor.path("execute").request().post(Entity.xml(jobRequest), SubmitJobResponse.class); } }; return call; @@ -102,100 +99,68 @@ public class DefaultExecutor implements ExecutorClient{ } } - - private Boolean sendInputCall(final String jobId, final List input) - throws InvalidIdentifierException { - Call call = new Call() { - @Override - public Boolean call(WebTarget executor) throws Exception { - return executor.path("input").path(jobId).request().put(Entity.xml(new SerializableList(input)), Boolean.class); - } - }; - try { - return delegate.make(call); - }catch(Exception e) { - throw new InvalidIdentifierException(); - } - } - private void sendInput(String jobId, Stream stream) throws Exception{ - int bunch = 30; - List collected = new ArrayList(30); - while (stream.hasNext()){ - collected.add(stream.next()); - if (collected.size()>=bunch){ - if (!sendInputCall(jobId, collected)) - throw new Exception(); - collected.clear(); - } - } - if (collected.size()>0) - if (!sendInputCall(jobId, collected)) - throw new Exception(); - - sendInputCall(jobId, new ArrayList(0)); - } @Override public String createDwCAByChildren(String taxonKey) throws Exception { - return delegate.make(getCallForJobs(taxonKey, JobType.DWCAByChildren)); + return delegate.make(getCallForJobs(taxonKey, JobType.DWCAByChildren)).getJobId(); } @Override public String createDwCAByIds(Stream ids) throws Exception { - String jobId = delegate.make(getCallForJobs(null, JobType.DWCAById)); + SubmitJobResponse response = delegate.make(getCallForJobs(null, JobType.DWCAById)); try{ - sendInput(jobId, ids); + DefaultResultSet.sendInput(response.getEndpointId(), response.getInputLocator(), ids); }catch(Exception e){ e.printStackTrace(); } - return jobId; + return response.getJobId(); } @Override public String createCSV(Stream ids) throws Exception { - String jobId = delegate.make(getCallForJobs(null, JobType.CSV)); + SubmitJobResponse response = delegate.make(getCallForJobs(null, JobType.CSV)); try{ - sendInput(jobId, ids); + DefaultResultSet.sendInput(response.getEndpointId(), response.getInputLocator(), ids); }catch(Exception e){ e.printStackTrace(); } - return jobId; + return response.getJobId(); } @Override public String createLayer(Stream keys, MetadataDetails metadata) throws Exception { - String jobId = delegate.make(getCallForJobs(new XStream().toXML(metadata), JobType.LayerCreator)); + SubmitJobResponse response = delegate.make(getCallForJobs(new XStream().toXML(metadata), JobType.LayerCreator)); try{ - sendInput(jobId, keys); + DefaultResultSet.sendInput(response.getEndpointId(), response.getInputLocator(), keys); }catch(Exception e){ e.printStackTrace(); } - return jobId; + return response.getJobId(); } @Override public String createCSVforOM(Stream ids) throws Exception { - String jobId = delegate.make(getCallForJobs(null, JobType.CSVForOM)); + SubmitJobResponse response = delegate.make(getCallForJobs(null, JobType.CSVForOM)); try{ - sendInput(jobId, ids); + DefaultResultSet.sendInput(response.getEndpointId(), response.getInputLocator(), ids); }catch(Exception e){ e.printStackTrace(); } - return jobId; + return response.getJobId(); } @Override public String createDarwincoreFromOccurrenceKeys(Stream ids) throws Exception { - String jobId = delegate.make(getCallForJobs(null, JobType.DarwinCore)); + SubmitJobResponse response = delegate.make(getCallForJobs(null, JobType.DarwinCore)); try{ - sendInput(jobId, ids); + DefaultResultSet.sendInput(response.getEndpointId(), response.getInputLocator(), ids); }catch(Exception e){ e.printStackTrace(); } - return jobId; + return response.getJobId(); } } diff --git a/src/main/java/org/gcube/data/spd/client/proxies/DefaultManager.java b/src/main/java/org/gcube/data/spd/client/proxies/DefaultManager.java index e4da41b..f9af5d9 100644 --- a/src/main/java/org/gcube/data/spd/client/proxies/DefaultManager.java +++ b/src/main/java/org/gcube/data/spd/client/proxies/DefaultManager.java @@ -9,13 +9,12 @@ import javax.ws.rs.core.Response; import org.gcube.common.clients.Call; import org.gcube.common.clients.delegates.ProxyDelegate; import org.gcube.data.spd.client.ResultElementRecordIterator; -import org.gcube.data.spd.client.ResultLocator; -import org.gcube.data.spd.client.Utils; import org.gcube.data.spd.model.PluginDescription; import org.gcube.data.spd.model.exceptions.InvalidQueryException; import org.gcube.data.spd.model.products.ResultElement; import org.gcube.data.spd.model.service.exceptions.UnsupportedCapabilityException; import org.gcube.data.spd.model.service.exceptions.UnsupportedPluginException; +import org.gcube.data.spd.model.service.types.MultiLocatorResponse; import org.gcube.data.spd.model.service.types.PluginDescriptions; import org.gcube.data.streams.Stream; import org.gcube.data.streams.dsl.Streams; @@ -33,19 +32,17 @@ public class DefaultManager implements ManagerClient { public Stream search(final String query) throws InvalidQueryException, UnsupportedPluginException, UnsupportedCapabilityException { - Call call = new Call() { + Call call = new Call() { @Override - public ResultLocator call(WebTarget manager) throws Exception { + public MultiLocatorResponse call(WebTarget manager) throws Exception { Response response = manager.path("search").queryParam("query", query).request().get(Response.class); - String host = manager.getUri().getHost(); - int port = manager.getUri().getPort(); - return new ResultLocator(host, port, Utils.getLocatorFromResponse(response)); + return response.readEntity(MultiLocatorResponse.class); } }; try { - ResultLocator result = delegate.make(call); - - ResultElementRecordIterator ri = new ResultElementRecordIterator(result, 2, TimeUnit.MINUTES); + MultiLocatorResponse result = delegate.make(call); + System.out.println("MULTILOCACATOR IS "+result); + ResultElementRecordIterator ri = new ResultElementRecordIterator(result.getEndpointId(), result.getInputLocator(), 2, TimeUnit.MINUTES); return Streams.convert(ri); }catch(Exception e) { throw new RuntimeException(e); diff --git a/src/main/java/org/gcube/data/spd/client/proxies/DefaultOccurrence.java b/src/main/java/org/gcube/data/spd/client/proxies/DefaultOccurrence.java index d777064..20a67fa 100644 --- a/src/main/java/org/gcube/data/spd/client/proxies/DefaultOccurrence.java +++ b/src/main/java/org/gcube/data/spd/client/proxies/DefaultOccurrence.java @@ -9,10 +9,8 @@ import javax.ws.rs.core.Response; import org.gcube.common.clients.Call; import org.gcube.common.clients.delegates.ProxyDelegate; import org.gcube.data.spd.client.ResultElementRecordIterator; -import org.gcube.data.spd.client.ResultLocator; -import org.gcube.data.spd.client.Utils; -import org.gcube.data.spd.model.PointInfo; import org.gcube.data.spd.model.products.OccurrencePoint; +import org.gcube.data.spd.model.service.types.MultiLocatorResponse; import org.gcube.data.streams.Stream; import org.gcube.data.streams.dsl.Streams; @@ -26,23 +24,18 @@ public class DefaultOccurrence implements OccurrenceClient { @Override public Stream getByIds(final List ids) { - Call call = new Call() { + Call call = new Call() { @Override - public ResultLocator call(WebTarget manager) throws Exception { + public MultiLocatorResponse call(WebTarget manager) throws Exception { manager = manager.path("ids"); - for (String value: ids) - manager = manager.queryParam("ids", value); - System.out.println(manager.getUri().toString()); Response response = manager.request().get(Response.class); - String host = manager.getUri().getHost(); - int port = manager.getUri().getPort(); - return new ResultLocator(host, port, Utils.getLocatorFromResponse(response)); + return response.readEntity(MultiLocatorResponse.class); } }; try { - ResultLocator result = delegate.make(call); - ResultElementRecordIterator ri = new ResultElementRecordIterator(result, 2, TimeUnit.MINUTES); - + MultiLocatorResponse result = delegate.make(call); + ResultElementRecordIterator ri = new ResultElementRecordIterator(result.getEndpointId(), result.getInputLocator(), 2, TimeUnit.MINUTES); + DefaultResultSet.sendInput(result.getEndpointId(), result.getOutputLocator(), Streams.convert(ids)); return Streams.convert(ri); }catch(Exception e) { throw new RuntimeException(e); @@ -52,66 +45,23 @@ public class DefaultOccurrence implements OccurrenceClient { @Override public Stream getByKeys(final List keys) { - Call call = new Call() { + Call call = new Call() { @Override - public ResultLocator call(WebTarget manager) throws Exception { + public MultiLocatorResponse call(WebTarget manager) throws Exception { manager = manager.path("keys"); - for (String value: keys) - manager = manager.queryParam("keys", value); - System.out.println("calling "+manager.getUri().toString()); Response response = manager.request().get(Response.class); - String host = manager.getUri().getHost(); - int port = manager.getUri().getPort(); - return new ResultLocator(host, port, Utils.getLocatorFromResponse(response)); + return response.readEntity(MultiLocatorResponse.class); } }; try { - ResultLocator result = delegate.make(call); - ResultElementRecordIterator ri = new ResultElementRecordIterator(result, 2, TimeUnit.MINUTES); + MultiLocatorResponse result = delegate.make(call); + ResultElementRecordIterator ri = new ResultElementRecordIterator(result.getEndpointId(), result.getInputLocator(), 2, TimeUnit.MINUTES); + DefaultResultSet.sendInput(result.getEndpointId(), result.getOutputLocator(), Streams.convert(keys)); return Streams.convert(ri); }catch(Exception e) { throw new RuntimeException(e); } } - /* - @Override - public String createLayer(Stream coordinatesStream) { - Stream convertedStream = pipe(coordinatesStream).through(new Generator() { - - @Override - public String yield(PointInfo element) { - try { - return Bindings.toXml(element); - } catch (Exception e) { - throw new StreamSkipSignal(); - } - } - - }); - - final String coordinateLocator = publishStringsIn(convertedStream).withDefaults().toString(); - - Call call = new Call() { - @Override - public String call(OccurrenceStubs occurrence) throws Exception { - return occurrence.createLayer(coordinateLocator); - } - }; - - try { - return delegate.make(call); - }catch(Exception e) { - throw again(e).asServiceException(); - } - } - */ - - @Override - public String createLayer(Stream coordinatesLocator) { - // TODO Auto-generated method stub - return null; - } - } diff --git a/src/main/java/org/gcube/data/spd/client/proxies/DefaultResultSet.java b/src/main/java/org/gcube/data/spd/client/proxies/DefaultResultSet.java index 6ea89b8..d428638 100644 --- a/src/main/java/org/gcube/data/spd/client/proxies/DefaultResultSet.java +++ b/src/main/java/org/gcube/data/spd/client/proxies/DefaultResultSet.java @@ -1,17 +1,25 @@ package org.gcube.data.spd.client.proxies; +import java.util.ArrayList; +import java.util.List; + +import javax.ws.rs.client.Entity; import javax.ws.rs.client.WebTarget; import javax.ws.rs.core.GenericType; import org.gcube.common.clients.Call; import org.gcube.common.clients.delegates.ProxyDelegate; import org.gcube.common.clients.stubs.jaxws.JAXWSUtils.Empty; +import org.gcube.data.spd.client.Constants; +import org.gcube.data.spd.client.plugins.AbstractPlugin; +import org.gcube.data.spd.model.util.SerializableList; +import org.gcube.data.streams.Stream; import org.glassfish.jersey.client.ChunkedInput; public class DefaultResultSet implements ResultSetClient { private final ProxyDelegate delegate; - + public DefaultResultSet(ProxyDelegate config){ this.delegate = config; } @@ -26,19 +34,19 @@ public class DefaultResultSet implements ResultSetClient { }; try { return delegate.make(call); - + }catch(Exception e) { throw new RuntimeException(e); } } - + @Override public void closeResultSet(final String locator){ Call call = new Call() { @Override public Empty call(WebTarget manager) throws Exception { - manager.path(locator).request().delete(); - return new Empty(); + manager.path(locator).request().delete(); + return new Empty(); } }; try { @@ -47,7 +55,41 @@ public class DefaultResultSet implements ResultSetClient { throw new RuntimeException(e); } } - - + + protected boolean sendInput(final String locator, final List input) { + Call call = new Call() { + @Override + public Boolean call(WebTarget manager) throws Exception { + return manager.path(locator).request().put(Entity.xml(new SerializableList(input)), Boolean.class); + } + }; + try { + return delegate.make(call); + }catch(Exception e) { + throw new RuntimeException(e); + } + } + + protected static void sendInput(final String gCoreEnpointId, final String locatorId, final Stream stream) throws Exception{ + Thread thread = new Thread(){ + public void run(){ + List collected = new ArrayList(10); + DefaultResultSet client = (DefaultResultSet)AbstractPlugin.resultset(gCoreEnpointId).build(); + while (stream.hasNext()){ + collected.add(stream.next()); + if (collected.size()>=Constants.INPUT_BUNCH){ + if (!client.sendInput(locatorId, collected)) + throw new RuntimeException(); + collected.clear(); + } + } + if (collected.size()>0) + if (!client.sendInput(locatorId, collected)) + throw new RuntimeException(); + client.sendInput(locatorId, new ArrayList(0)); + } + }; + thread.start(); + } } diff --git a/src/main/java/org/gcube/data/spd/client/proxies/OccurrenceClient.java b/src/main/java/org/gcube/data/spd/client/proxies/OccurrenceClient.java index 238aef4..57ff682 100644 --- a/src/main/java/org/gcube/data/spd/client/proxies/OccurrenceClient.java +++ b/src/main/java/org/gcube/data/spd/client/proxies/OccurrenceClient.java @@ -3,7 +3,6 @@ package org.gcube.data.spd.client.proxies; import java.util.List; -import org.gcube.data.spd.model.PointInfo; import org.gcube.data.spd.model.products.OccurrencePoint; import org.gcube.data.streams.Stream; @@ -11,8 +10,6 @@ public interface OccurrenceClient { public Stream getByIds(List ids); - public String createLayer(Stream coordinatesLocator); - public Stream getByKeys(List keys); } diff --git a/src/main/java/org/gcube/data/spd/client/proxies/ResultSetClient.java b/src/main/java/org/gcube/data/spd/client/proxies/ResultSetClient.java index 44d1ce1..f20c1f1 100644 --- a/src/main/java/org/gcube/data/spd/client/proxies/ResultSetClient.java +++ b/src/main/java/org/gcube/data/spd/client/proxies/ResultSetClient.java @@ -7,5 +7,5 @@ public interface ResultSetClient { ChunkedInput getResultSet(String locator); void closeResultSet(String locator); - + }