Lucio Lelii 2017-02-20 15:26:10 +00:00
parent 288a2e5902
commit 78b07ce32b
13 changed files with 123 additions and 204 deletions

View File

@ -26,6 +26,8 @@ public class Constants {
public static final QName RESULTSET_QNAME = new QName(NAMESPACE, "resultset");
public static final int INPUT_BUNCH = 30;
/*
public static final GcubeService<ManagerStubs> manager = service().withName(org.gcube.data.spd.model.service.Constants.manager_name).andInterface(ManagerStubs.class);

View File

@ -43,9 +43,9 @@ public abstract class JerseyRecordIterator<T> implements Iterator<T>, Closeable{
private ChunkedInputReader chunkedInputReader;
public JerseyRecordIterator(ResultLocator locator, long timeout, TimeUnit timeoutUnit) {
this.resultSetClient = AbstractPlugin.resultset().at(locator.getHost(), locator.getPort()).build();
this.locator = locator.getLocator();
public JerseyRecordIterator(String endpointId, String locator, long timeout, TimeUnit timeoutUnit) {
this.resultSetClient = AbstractPlugin.resultset(endpointId).build();
this.locator = locator;
this.timeoutInMillis = timeoutUnit.toMillis(timeout);
}
@ -56,7 +56,6 @@ public abstract class JerseyRecordIterator<T> implements Iterator<T>, Closeable{
if (this.chunkedInput==null)
initializeChunckedInput();
if (chunkedInput.isClosed() && queue.isEmpty()) return false;
try {
long startTime = System.currentTimeMillis();

View File

@ -9,9 +9,9 @@ import org.gcube.data.spd.model.products.ResultElement;
public class ResultElementRecordIterator<T extends ResultElement> extends JerseyRecordIterator<T> {
public ResultElementRecordIterator(ResultLocator locator,
public ResultElementRecordIterator(String endpointId, String locator,
long timeout, TimeUnit timeoutUnit) {
super(locator, timeout, timeoutUnit);
super(endpointId, locator, timeout, timeoutUnit);
}
@Override

View File

@ -1,29 +0,0 @@
package org.gcube.data.spd.client;
public class ResultLocator {
private String host;
private int port;
private String locator;
public ResultLocator(String host, int port, String locator) {
super();
this.host = host;
this.port = port;
this.locator = locator;
}
public String getHost() {
return host;
}
public int getPort() {
return port;
}
public String getLocator() {
return locator;
}
}

View File

@ -2,6 +2,7 @@ package org.gcube.data.spd.client.plugins;
import javax.ws.rs.client.WebTarget;
import org.gcube.common.clients.LegacyQuery;
import org.gcube.common.clients.Plugin;
import org.gcube.common.clients.ProxyBuilder;
import org.gcube.common.clients.ProxyBuilderImpl;
@ -39,8 +40,10 @@ public abstract class AbstractPlugin<S,P> implements Plugin<S,P> {
return new ProxyBuilderImpl<WebTarget,ExecutorClient>(executor_plugin);
}
public static ProxyBuilder<ResultSetClient> resultset() {
return new ProxyBuilderImpl<WebTarget,ResultSetClient>(resultset_plugin);
public static ProxyBuilder<ResultSetClient> resultset(String endpointId) {
LegacyQuery query = new LegacyQuery(resultset_plugin);
query.addCondition("$resource/ID/string() eq '"+endpointId+"'");
return new ProxyBuilderImpl<WebTarget,ResultSetClient>(resultset_plugin, query);
}
public final String name;

View File

@ -9,12 +9,11 @@ import javax.ws.rs.core.Response;
import org.gcube.common.clients.Call;
import org.gcube.common.clients.delegates.ProxyDelegate;
import org.gcube.data.spd.client.ResultElementRecordIterator;
import org.gcube.data.spd.client.ResultLocator;
import org.gcube.data.spd.client.Utils;
import org.gcube.data.spd.model.products.TaxonomyItem;
import org.gcube.data.spd.model.service.exceptions.InvalidIdentifierException;
import org.gcube.data.spd.model.service.exceptions.UnsupportedCapabilityException;
import org.gcube.data.spd.model.service.exceptions.UnsupportedPluginException;
import org.gcube.data.spd.model.service.types.MultiLocatorResponse;
import org.gcube.data.streams.Stream;
import org.gcube.data.streams.dsl.Streams;
@ -30,18 +29,16 @@ public class DefaultClassification implements ClassificationClient{
public Stream<TaxonomyItem> getTaxonChildrenById(final String id)
throws UnsupportedPluginException, UnsupportedCapabilityException,
InvalidIdentifierException {
Call<WebTarget, ResultLocator> call = new Call<WebTarget, ResultLocator>() {
Call<WebTarget, MultiLocatorResponse> call = new Call<WebTarget, MultiLocatorResponse>() {
@Override
public ResultLocator call(WebTarget manager) throws Exception {
public MultiLocatorResponse call(WebTarget manager) throws Exception {
Response response = manager.path("children").path(id).request().get(Response.class);
String host = manager.getUri().getHost();
int port = manager.getUri().getPort();
return new ResultLocator(host, port, Utils.getLocatorFromResponse(response));
return response.readEntity(MultiLocatorResponse.class);
}
};
try {
ResultLocator result = delegate.make(call);
ResultElementRecordIterator<TaxonomyItem> ri = new ResultElementRecordIterator<TaxonomyItem>(result, 2, TimeUnit.MINUTES);
MultiLocatorResponse result = delegate.make(call);
ResultElementRecordIterator<TaxonomyItem> ri = new ResultElementRecordIterator<TaxonomyItem>(result.getEndpointId(), result.getInputLocator(), 2, TimeUnit.MINUTES);
return Streams.convert(ri);
}catch(Exception e) {
throw new RuntimeException(e);
@ -68,18 +65,16 @@ public class DefaultClassification implements ClassificationClient{
public Stream<TaxonomyItem> getTaxonTreeById(final String id)
throws UnsupportedPluginException, UnsupportedCapabilityException,
InvalidIdentifierException {
Call<WebTarget, ResultLocator> call = new Call<WebTarget, ResultLocator>() {
Call<WebTarget, MultiLocatorResponse> call = new Call<WebTarget, MultiLocatorResponse>() {
@Override
public ResultLocator call(WebTarget manager) throws Exception {
public MultiLocatorResponse call(WebTarget manager) throws Exception {
Response response = manager.path("tree").path(id).request().get(Response.class);
String host = manager.getUri().getHost();
int port = manager.getUri().getPort();
return new ResultLocator(host, port, Utils.getLocatorFromResponse(response));
return response.readEntity(MultiLocatorResponse.class);
}
};
try {
ResultLocator result = delegate.make(call);
ResultElementRecordIterator<TaxonomyItem> ri = new ResultElementRecordIterator<TaxonomyItem>(result, 2, TimeUnit.MINUTES);
MultiLocatorResponse result = delegate.make(call);
ResultElementRecordIterator<TaxonomyItem> ri = new ResultElementRecordIterator<TaxonomyItem>(result.getEndpointId(), result.getInputLocator(), 2, TimeUnit.MINUTES);
return Streams.convert(ri);
}catch(Exception e) {
@ -91,18 +86,16 @@ public class DefaultClassification implements ClassificationClient{
public Stream<TaxonomyItem> getSynonymsById(final String id)
throws UnsupportedPluginException, UnsupportedCapabilityException,
InvalidIdentifierException {
Call<WebTarget, ResultLocator> call = new Call<WebTarget, ResultLocator>() {
Call<WebTarget, MultiLocatorResponse> call = new Call<WebTarget, MultiLocatorResponse>() {
@Override
public ResultLocator call(WebTarget manager) throws Exception {
public MultiLocatorResponse call(WebTarget manager) throws Exception {
Response response = manager.path("synonyms").path(id).request().get(Response.class);
String host = manager.getUri().getHost();
int port = manager.getUri().getPort();
return new ResultLocator(host, port, Utils.getLocatorFromResponse(response));
return response.readEntity(MultiLocatorResponse.class);
}
};
try {
ResultLocator result = delegate.make(call);
ResultElementRecordIterator<TaxonomyItem> ri = new ResultElementRecordIterator<TaxonomyItem>(result, 2, TimeUnit.MINUTES);
MultiLocatorResponse result = delegate.make(call);
ResultElementRecordIterator<TaxonomyItem> ri = new ResultElementRecordIterator<TaxonomyItem>(result.getEndpointId(), result.getInputLocator(), 2, TimeUnit.MINUTES);
return Streams.convert(ri);
}catch(Exception e) {

View File

@ -1,8 +1,5 @@
package org.gcube.data.spd.client.proxies;
import java.util.ArrayList;
import java.util.List;
import javax.ws.rs.client.Entity;
import javax.ws.rs.client.WebTarget;
@ -14,7 +11,7 @@ import org.gcube.data.spd.model.service.types.CompleteJobStatus;
import org.gcube.data.spd.model.service.types.JobType;
import org.gcube.data.spd.model.service.types.MetadataDetails;
import org.gcube.data.spd.model.service.types.SubmitJob;
import org.gcube.data.spd.model.util.SerializableList;
import org.gcube.data.spd.model.service.types.SubmitJobResponse;
import org.gcube.data.streams.Stream;
import com.thoughtworks.xstream.XStream;
@ -28,12 +25,12 @@ public class DefaultExecutor implements ExecutorClient{
this.delegate = delegate;
}
private Call<WebTarget, String> getCallForJobs(final String input, final JobType job){
Call<WebTarget, String> call = new Call<WebTarget, String>() {
private Call<WebTarget, SubmitJobResponse> getCallForJobs(final String input, final JobType job){
Call<WebTarget, SubmitJobResponse> call = new Call<WebTarget, SubmitJobResponse>() {
@Override
public String call(WebTarget executor) throws Exception {
public SubmitJobResponse call(WebTarget executor) throws Exception {
SubmitJob jobRequest = new SubmitJob(input, job);
return executor.path("execute").request().post(Entity.xml(jobRequest), String.class);
return executor.path("execute").request().post(Entity.xml(jobRequest), SubmitJobResponse.class);
}
};
return call;
@ -103,99 +100,67 @@ public class DefaultExecutor implements ExecutorClient{
}
private Boolean sendInputCall(final String jobId, final List<String> input)
throws InvalidIdentifierException {
Call<WebTarget, Boolean> call = new Call<WebTarget, Boolean>() {
@Override
public Boolean call(WebTarget executor) throws Exception {
return executor.path("input").path(jobId).request().put(Entity.xml(new SerializableList<String>(input)), Boolean.class);
}
};
try {
return delegate.make(call);
}catch(Exception e) {
throw new InvalidIdentifierException();
}
}
private void sendInput(String jobId, Stream<String> stream) throws Exception{
int bunch = 30;
List<String> collected = new ArrayList<String>(30);
while (stream.hasNext()){
collected.add(stream.next());
if (collected.size()>=bunch){
if (!sendInputCall(jobId, collected))
throw new Exception();
collected.clear();
}
}
if (collected.size()>0)
if (!sendInputCall(jobId, collected))
throw new Exception();
sendInputCall(jobId, new ArrayList<String>(0));
}
@Override
public String createDwCAByChildren(String taxonKey) throws Exception {
return delegate.make(getCallForJobs(taxonKey, JobType.DWCAByChildren));
return delegate.make(getCallForJobs(taxonKey, JobType.DWCAByChildren)).getJobId();
}
@Override
public String createDwCAByIds(Stream<String> ids) throws Exception {
String jobId = delegate.make(getCallForJobs(null, JobType.DWCAById));
SubmitJobResponse response = delegate.make(getCallForJobs(null, JobType.DWCAById));
try{
sendInput(jobId, ids);
DefaultResultSet.sendInput(response.getEndpointId(), response.getInputLocator(), ids);
}catch(Exception e){
e.printStackTrace();
}
return jobId;
return response.getJobId();
}
@Override
public String createCSV(Stream<String> ids) throws Exception {
String jobId = delegate.make(getCallForJobs(null, JobType.CSV));
SubmitJobResponse response = delegate.make(getCallForJobs(null, JobType.CSV));
try{
sendInput(jobId, ids);
DefaultResultSet.sendInput(response.getEndpointId(), response.getInputLocator(), ids);
}catch(Exception e){
e.printStackTrace();
}
return jobId;
return response.getJobId();
}
@Override
public String createLayer(Stream<String> keys, MetadataDetails metadata) throws Exception {
String jobId = delegate.make(getCallForJobs(new XStream().toXML(metadata), JobType.LayerCreator));
SubmitJobResponse response = delegate.make(getCallForJobs(new XStream().toXML(metadata), JobType.LayerCreator));
try{
sendInput(jobId, keys);
DefaultResultSet.sendInput(response.getEndpointId(), response.getInputLocator(), keys);
}catch(Exception e){
e.printStackTrace();
}
return jobId;
return response.getJobId();
}
@Override
public String createCSVforOM(Stream<String> ids) throws Exception {
String jobId = delegate.make(getCallForJobs(null, JobType.CSVForOM));
SubmitJobResponse response = delegate.make(getCallForJobs(null, JobType.CSVForOM));
try{
sendInput(jobId, ids);
DefaultResultSet.sendInput(response.getEndpointId(), response.getInputLocator(), ids);
}catch(Exception e){
e.printStackTrace();
}
return jobId;
return response.getJobId();
}
@Override
public String createDarwincoreFromOccurrenceKeys(Stream<String> ids)
throws Exception {
String jobId = delegate.make(getCallForJobs(null, JobType.DarwinCore));
SubmitJobResponse response = delegate.make(getCallForJobs(null, JobType.DarwinCore));
try{
sendInput(jobId, ids);
DefaultResultSet.sendInput(response.getEndpointId(), response.getInputLocator(), ids);
}catch(Exception e){
e.printStackTrace();
}
return jobId;
return response.getJobId();
}
}

View File

@ -9,13 +9,12 @@ import javax.ws.rs.core.Response;
import org.gcube.common.clients.Call;
import org.gcube.common.clients.delegates.ProxyDelegate;
import org.gcube.data.spd.client.ResultElementRecordIterator;
import org.gcube.data.spd.client.ResultLocator;
import org.gcube.data.spd.client.Utils;
import org.gcube.data.spd.model.PluginDescription;
import org.gcube.data.spd.model.exceptions.InvalidQueryException;
import org.gcube.data.spd.model.products.ResultElement;
import org.gcube.data.spd.model.service.exceptions.UnsupportedCapabilityException;
import org.gcube.data.spd.model.service.exceptions.UnsupportedPluginException;
import org.gcube.data.spd.model.service.types.MultiLocatorResponse;
import org.gcube.data.spd.model.service.types.PluginDescriptions;
import org.gcube.data.streams.Stream;
import org.gcube.data.streams.dsl.Streams;
@ -33,19 +32,17 @@ public class DefaultManager implements ManagerClient {
public <T extends ResultElement> Stream<T> search(final String query)
throws InvalidQueryException, UnsupportedPluginException,
UnsupportedCapabilityException {
Call<WebTarget, ResultLocator> call = new Call<WebTarget, ResultLocator>() {
Call<WebTarget, MultiLocatorResponse> call = new Call<WebTarget, MultiLocatorResponse>() {
@Override
public ResultLocator call(WebTarget manager) throws Exception {
public MultiLocatorResponse call(WebTarget manager) throws Exception {
Response response = manager.path("search").queryParam("query", query).request().get(Response.class);
String host = manager.getUri().getHost();
int port = manager.getUri().getPort();
return new ResultLocator(host, port, Utils.getLocatorFromResponse(response));
return response.readEntity(MultiLocatorResponse.class);
}
};
try {
ResultLocator result = delegate.make(call);
ResultElementRecordIterator<T> ri = new ResultElementRecordIterator<T>(result, 2, TimeUnit.MINUTES);
MultiLocatorResponse result = delegate.make(call);
System.out.println("MULTILOCACATOR IS "+result);
ResultElementRecordIterator<T> ri = new ResultElementRecordIterator<T>(result.getEndpointId(), result.getInputLocator(), 2, TimeUnit.MINUTES);
return Streams.convert(ri);
}catch(Exception e) {
throw new RuntimeException(e);

View File

@ -9,10 +9,8 @@ import javax.ws.rs.core.Response;
import org.gcube.common.clients.Call;
import org.gcube.common.clients.delegates.ProxyDelegate;
import org.gcube.data.spd.client.ResultElementRecordIterator;
import org.gcube.data.spd.client.ResultLocator;
import org.gcube.data.spd.client.Utils;
import org.gcube.data.spd.model.PointInfo;
import org.gcube.data.spd.model.products.OccurrencePoint;
import org.gcube.data.spd.model.service.types.MultiLocatorResponse;
import org.gcube.data.streams.Stream;
import org.gcube.data.streams.dsl.Streams;
@ -26,23 +24,18 @@ public class DefaultOccurrence implements OccurrenceClient {
@Override
public Stream<OccurrencePoint> getByIds(final List<String> ids) {
Call<WebTarget, ResultLocator> call = new Call<WebTarget, ResultLocator>() {
Call<WebTarget, MultiLocatorResponse> call = new Call<WebTarget, MultiLocatorResponse>() {
@Override
public ResultLocator call(WebTarget manager) throws Exception {
public MultiLocatorResponse call(WebTarget manager) throws Exception {
manager = manager.path("ids");
for (String value: ids)
manager = manager.queryParam("ids", value);
System.out.println(manager.getUri().toString());
Response response = manager.request().get(Response.class);
String host = manager.getUri().getHost();
int port = manager.getUri().getPort();
return new ResultLocator(host, port, Utils.getLocatorFromResponse(response));
return response.readEntity(MultiLocatorResponse.class);
}
};
try {
ResultLocator result = delegate.make(call);
ResultElementRecordIterator<OccurrencePoint> ri = new ResultElementRecordIterator<OccurrencePoint>(result, 2, TimeUnit.MINUTES);
MultiLocatorResponse result = delegate.make(call);
ResultElementRecordIterator<OccurrencePoint> ri = new ResultElementRecordIterator<OccurrencePoint>(result.getEndpointId(), result.getInputLocator(), 2, TimeUnit.MINUTES);
DefaultResultSet.sendInput(result.getEndpointId(), result.getOutputLocator(), Streams.convert(ids));
return Streams.convert(ri);
}catch(Exception e) {
throw new RuntimeException(e);
@ -52,66 +45,23 @@ public class DefaultOccurrence implements OccurrenceClient {
@Override
public Stream<OccurrencePoint> getByKeys(final List<String> keys) {
Call<WebTarget, ResultLocator> call = new Call<WebTarget, ResultLocator>() {
Call<WebTarget, MultiLocatorResponse> call = new Call<WebTarget, MultiLocatorResponse>() {
@Override
public ResultLocator call(WebTarget manager) throws Exception {
public MultiLocatorResponse call(WebTarget manager) throws Exception {
manager = manager.path("keys");
for (String value: keys)
manager = manager.queryParam("keys", value);
System.out.println("calling "+manager.getUri().toString());
Response response = manager.request().get(Response.class);
String host = manager.getUri().getHost();
int port = manager.getUri().getPort();
return new ResultLocator(host, port, Utils.getLocatorFromResponse(response));
return response.readEntity(MultiLocatorResponse.class);
}
};
try {
ResultLocator result = delegate.make(call);
ResultElementRecordIterator<OccurrencePoint> ri = new ResultElementRecordIterator<OccurrencePoint>(result, 2, TimeUnit.MINUTES);
MultiLocatorResponse result = delegate.make(call);
ResultElementRecordIterator<OccurrencePoint> ri = new ResultElementRecordIterator<OccurrencePoint>(result.getEndpointId(), result.getInputLocator(), 2, TimeUnit.MINUTES);
DefaultResultSet.sendInput(result.getEndpointId(), result.getOutputLocator(), Streams.convert(keys));
return Streams.convert(ri);
}catch(Exception e) {
throw new RuntimeException(e);
}
}
/*
@Override
public String createLayer(Stream<PointInfo> coordinatesStream) {
Stream<String> convertedStream = pipe(coordinatesStream).through(new Generator<PointInfo, String>() {
@Override
public String yield(PointInfo element) {
try {
return Bindings.toXml(element);
} catch (Exception e) {
throw new StreamSkipSignal();
}
}
});
final String coordinateLocator = publishStringsIn(convertedStream).withDefaults().toString();
Call<OccurrenceStubs, String> call = new Call<OccurrenceStubs, String>() {
@Override
public String call(OccurrenceStubs occurrence) throws Exception {
return occurrence.createLayer(coordinateLocator);
}
};
try {
return delegate.make(call);
}catch(Exception e) {
throw again(e).asServiceException();
}
}
*/
@Override
public String createLayer(Stream<PointInfo> coordinatesLocator) {
// TODO Auto-generated method stub
return null;
}
}

View File

@ -1,11 +1,19 @@
package org.gcube.data.spd.client.proxies;
import java.util.ArrayList;
import java.util.List;
import javax.ws.rs.client.Entity;
import javax.ws.rs.client.WebTarget;
import javax.ws.rs.core.GenericType;
import org.gcube.common.clients.Call;
import org.gcube.common.clients.delegates.ProxyDelegate;
import org.gcube.common.clients.stubs.jaxws.JAXWSUtils.Empty;
import org.gcube.data.spd.client.Constants;
import org.gcube.data.spd.client.plugins.AbstractPlugin;
import org.gcube.data.spd.model.util.SerializableList;
import org.gcube.data.streams.Stream;
import org.glassfish.jersey.client.ChunkedInput;
public class DefaultResultSet implements ResultSetClient {
@ -37,8 +45,8 @@ public class DefaultResultSet implements ResultSetClient {
Call<WebTarget, Empty> call = new Call<WebTarget, Empty>() {
@Override
public Empty call(WebTarget manager) throws Exception {
manager.path(locator).request().delete();
return new Empty();
manager.path(locator).request().delete();
return new Empty();
}
};
try {
@ -48,6 +56,40 @@ public class DefaultResultSet implements ResultSetClient {
}
}
protected boolean sendInput(final String locator, final List<String> input) {
Call<WebTarget, Boolean> call = new Call<WebTarget, Boolean>() {
@Override
public Boolean call(WebTarget manager) throws Exception {
return manager.path(locator).request().put(Entity.xml(new SerializableList<String>(input)), Boolean.class);
}
};
try {
return delegate.make(call);
}catch(Exception e) {
throw new RuntimeException(e);
}
}
protected static void sendInput(final String gCoreEnpointId, final String locatorId, final Stream<String> stream) throws Exception{
Thread thread = new Thread(){
public void run(){
List<String> collected = new ArrayList<String>(10);
DefaultResultSet client = (DefaultResultSet)AbstractPlugin.resultset(gCoreEnpointId).build();
while (stream.hasNext()){
collected.add(stream.next());
if (collected.size()>=Constants.INPUT_BUNCH){
if (!client.sendInput(locatorId, collected))
throw new RuntimeException();
collected.clear();
}
}
if (collected.size()>0)
if (!client.sendInput(locatorId, collected))
throw new RuntimeException();
client.sendInput(locatorId, new ArrayList<String>(0));
}
};
thread.start();
}
}

View File

@ -3,7 +3,6 @@ package org.gcube.data.spd.client.proxies;
import java.util.List;
import org.gcube.data.spd.model.PointInfo;
import org.gcube.data.spd.model.products.OccurrencePoint;
import org.gcube.data.streams.Stream;
@ -11,8 +10,6 @@ public interface OccurrenceClient {
public Stream<OccurrencePoint> getByIds(List<String> ids);
public String createLayer(Stream<PointInfo> coordinatesLocator);
public Stream<OccurrencePoint> getByKeys(List<String> keys);
}