test fixes
parent
eb1c33f8c3
commit
c29a561efd
@ -1 +1,2 @@
|
||||
/org.eclipse.jdt.core.prefs
|
||||
/org.eclipse.core.resources.prefs
|
||||
|
@ -1,50 +0,0 @@
|
||||
package org.gcube.usecases.ws.thredds;
|
||||
|
||||
import static org.gcube.common.authorization.client.Constants.authorizationService;
|
||||
|
||||
import java.util.Properties;
|
||||
|
||||
import org.gcube.common.authorization.library.AuthorizationEntry;
|
||||
import org.gcube.common.authorization.library.provider.SecurityTokenProvider;
|
||||
import org.gcube.common.scope.api.ScopeProvider;
|
||||
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
@Slf4j
|
||||
public class TokenSetter {
|
||||
|
||||
private static Properties props=null;
|
||||
|
||||
static{
|
||||
|
||||
}
|
||||
|
||||
|
||||
public static synchronized void set(String scope){
|
||||
if(props==null) {
|
||||
props=new Properties();
|
||||
try {
|
||||
props.load(TokenSetter.class.getResourceAsStream("/tokens.properties"));
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException("YOU NEED TO SET TOKEN FILE IN CONFIGURATION");
|
||||
}
|
||||
}
|
||||
if(!props.containsKey(scope)) throw new RuntimeException("No token found for scope : "+scope);
|
||||
SecurityTokenProvider.instance.set(props.getProperty(scope));
|
||||
}
|
||||
|
||||
|
||||
public static void setToken(String token){
|
||||
try{
|
||||
AuthorizationEntry entry = authorizationService().get(token);
|
||||
ScopeProvider.instance.set(entry.getContext());
|
||||
SecurityTokenProvider.instance.set(token);
|
||||
}catch(Throwable t) {
|
||||
throw new RuntimeException("Unable to set token "+token,t);
|
||||
}
|
||||
}
|
||||
|
||||
public static String getCurrentToken() {
|
||||
return SecurityTokenProvider.instance.get();
|
||||
}
|
||||
}
|
@ -1,208 +0,0 @@
|
||||
package org.gcube.usecases.ws.thredds.engine;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.io.PrintWriter;
|
||||
import java.net.URL;
|
||||
import java.nio.file.Files;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.Semaphore;
|
||||
|
||||
import org.gcube.common.authorization.library.provider.SecurityTokenProvider;
|
||||
import org.gcube.data.transfer.library.DataTransferClient;
|
||||
import org.gcube.data.transfer.library.TransferResult;
|
||||
import org.gcube.data.transfer.library.faults.DestinationNotSetException;
|
||||
import org.gcube.data.transfer.library.faults.FailedTransferException;
|
||||
import org.gcube.data.transfer.library.faults.InitializationException;
|
||||
import org.gcube.data.transfer.library.faults.InvalidDestinationException;
|
||||
import org.gcube.data.transfer.library.faults.InvalidSourceException;
|
||||
import org.gcube.data.transfer.library.faults.ServiceNotFoundException;
|
||||
import org.gcube.data.transfer.library.faults.SourceNotSetException;
|
||||
import org.gcube.data.transfer.library.faults.UnreachableNodeException;
|
||||
import org.gcube.data.transfer.model.Destination;
|
||||
import org.gcube.data.transfer.model.DestinationClashPolicy;
|
||||
import org.gcube.data.transfer.model.PluginInvocation;
|
||||
import org.gcube.spatial.data.sdi.model.metadata.MetadataReport;
|
||||
import org.gcube.spatial.data.sdi.utils.ScopeUtils;
|
||||
import org.gcube.usecases.ws.thredds.Commons;
|
||||
import org.gcube.usecases.ws.thredds.NetUtils;
|
||||
import org.gcube.usecases.ws.thredds.TokenSetter;
|
||||
import org.gcube.usecases.ws.thredds.engine.PublishRequest.Mode;
|
||||
|
||||
import lombok.NonNull;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
@RequiredArgsConstructor
|
||||
@Slf4j
|
||||
public class PublishThread implements Runnable {
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
@NonNull
|
||||
private PublishRequest request;
|
||||
@NonNull
|
||||
private ConcurrentHashMap<String,PublishReport> reports;
|
||||
|
||||
|
||||
private PublishReport publishReport;
|
||||
// private Map<String,Report> reports;
|
||||
//
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
log.info("Request is {}",request);
|
||||
log.debug("Switching from {} to {}",SecurityTokenProvider.instance.get(),request.getPublishToken());
|
||||
TokenSetter.setToken(request.getPublishToken());
|
||||
log.debug("Current scope is :{}, token is {} ",ScopeUtils.getCurrentScope(),SecurityTokenProvider.instance.get());
|
||||
|
||||
Destination dest=new Destination();
|
||||
dest.setPersistenceId("thredds");
|
||||
dest.setSubFolder("public/netcdf/"+request.getCatalog());
|
||||
dest.setOnExistingFileName(DestinationClashPolicy.REWRITE);
|
||||
dest.setCreateSubfolders(true);
|
||||
dest.setOnExistingSubFolder(DestinationClashPolicy.APPEND);
|
||||
String threddsHostName;
|
||||
|
||||
|
||||
try {
|
||||
threddsHostName = Commons.getThreddsHost();
|
||||
|
||||
DataTransferClient client=Commons.getDTClient(threddsHostName);
|
||||
|
||||
|
||||
File toPublishSource=null;
|
||||
|
||||
if(request.getMode().equals(Mode.NCML)) {
|
||||
if(request.isQueue()){
|
||||
log.debug("Waiting for queue {}, expected Count {} ",request.getQueueId(),request.getQueueCount());
|
||||
waitFor(request.getQueueId(), request.getQueueCount());
|
||||
|
||||
log.debug("Loading netcdfFile ..");
|
||||
File ncmlFile=NetUtils.download(new URL(request.getSource().getUrl()));
|
||||
String toUpdateSource=new String(Files.readAllBytes(ncmlFile.toPath()));
|
||||
|
||||
for(String reportId:request.getToGatherReportsId()) {
|
||||
PublishReport report=getReport(reportId);
|
||||
//file://home/gcube/julien.barde/Workspace/DataMiner/Output_Data_Sets/Ichthyop2013.nc
|
||||
|
||||
String toSetUrl="file:/"+report.getTransferResult().getRemotePath();
|
||||
toUpdateSource=toUpdateSource.replaceAll(reportId, toSetUrl);
|
||||
}
|
||||
|
||||
toPublishSource=File.createTempFile("nc_", ".ncml");
|
||||
PrintWriter out = new PrintWriter(toPublishSource);
|
||||
out.write(toUpdateSource);
|
||||
out.flush();
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
TransferResult result;
|
||||
|
||||
|
||||
// TODO NB Test run without metadata publication
|
||||
// result=client.httpSource(request.getSource().getUrl(), dest);
|
||||
// publishReport=new PublishReport(false,request.getSource().getId(),result,null);
|
||||
|
||||
|
||||
if(!request.isGenerateMeta()) {
|
||||
log.debug("Transfering before publishing meta..");
|
||||
|
||||
result = toPublishSource==null?client.httpSource(request.getSource().getUrl(), dest):
|
||||
client.localFile(toPublishSource, dest);
|
||||
|
||||
//NB DECOMMENT THIS!!!!!!
|
||||
// Metadata meta=SDIAbstractPlugin.metadata().build();
|
||||
//
|
||||
// log.debug("Publishing metadata.. ");
|
||||
//
|
||||
// MetadataPublishOptions opts=new MetadataPublishOptions(new TemplateInvocationBuilder().threddsOnlineResources(threddsHostName, request.getSource().getName(), request.getCatalog()).get());
|
||||
// opts.setGeonetworkCategory("Datasets");
|
||||
// MetadataReport report=meta.pushMetadata(request.getMetadata(), opts);
|
||||
|
||||
// publishReport=new PublishReport(false,request.getSource().getId(),request.getSource().getName(),result,report);
|
||||
publishReport=new PublishReport(false,request.getSource().getId(),request.getSource().getName(),result,new MetadataReport());
|
||||
|
||||
|
||||
}else {
|
||||
log.debug("Metadata not provided.. ");
|
||||
if(request.isQueue()&&request.getMode().equals(Mode.NC)) {
|
||||
log.debug("Dataset file is linked in ncml, skipping metadata generation");
|
||||
result=client.httpSource(request.getSource().getUrl(), dest);
|
||||
}else
|
||||
result=client.httpSource(request.getSource().getUrl(), dest,new PluginInvocation("SIS/GEOTK"));
|
||||
|
||||
|
||||
publishReport=new PublishReport(false,request.getSource().getId(),request.getSource().getName(),result,null);
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
} catch (UnreachableNodeException | ServiceNotFoundException e) {
|
||||
log.error("Unable to find Thredds. Publish scope is {} ",ScopeUtils.getCurrentScope(),e);
|
||||
} catch (InvalidSourceException | SourceNotSetException | FailedTransferException | InitializationException
|
||||
| InvalidDestinationException | DestinationNotSetException e) {
|
||||
log.error("Unable to transfer file, ",e);
|
||||
} catch (IOException e) {
|
||||
log.error("Unable to read/ write file. ",e);
|
||||
}
|
||||
|
||||
onCompletion();
|
||||
}
|
||||
|
||||
|
||||
private void onCompletion() {
|
||||
if(publishReport==null) publishReport=new PublishReport(true, request.getSource().getId(),request.getSource().getName(), null, null);
|
||||
publishReport(publishReport);
|
||||
|
||||
if(request.getMode().equals(Mode.NC)&&(request.isQueue())) {
|
||||
alert(request.getQueueId(),request.getQueueCount());
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
private PublishReport getReport(String reportId) {
|
||||
return reports.get(reportId);
|
||||
}
|
||||
|
||||
|
||||
private void publishReport(PublishReport report) {
|
||||
reports.put(report.getSourceId(), report);
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
// static
|
||||
|
||||
|
||||
private static ConcurrentHashMap<String,Semaphore> semaphores=new ConcurrentHashMap<>();
|
||||
|
||||
|
||||
|
||||
private static void waitFor(String queueId,Integer expected) {
|
||||
try {
|
||||
log.debug("Waiting for queue {}. Expected Count is {} ",queueId,expected);
|
||||
semaphores.getOrDefault(queueId, new Semaphore(expected*-1)).acquire();
|
||||
} catch (InterruptedException e) {
|
||||
log.debug("Queue {} is completed.");
|
||||
}
|
||||
}
|
||||
|
||||
private static void alert(String queueId, Integer expected) {
|
||||
log.debug("Alerting queue {}. Expected count is {} ",queueId,expected);
|
||||
Semaphore sem=semaphores.getOrDefault(queueId, new Semaphore(expected*-1));
|
||||
sem.release();
|
||||
log.debug(String.format("Queue %1$s alerted. Remaining : %2$s out of %3$s ",queueId,sem.availablePermits(),expected));
|
||||
}
|
||||
|
||||
|
||||
|
||||
}
|
@ -1,102 +0,0 @@
|
||||
package org.gcube.usecases.ws.thredds.engine;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.PrintWriter;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.LinkedBlockingDeque;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.gcube.usecases.ws.thredds.FolderConfiguration;
|
||||
|
||||
import lombok.Data;
|
||||
import lombok.Synchronized;
|
||||
|
||||
public class TransferRequestServer {
|
||||
|
||||
@Data
|
||||
public static class Report{
|
||||
private AtomicLong requestCount=new AtomicLong(0l);
|
||||
private AtomicLong requestServed=new AtomicLong(0l);
|
||||
private ConcurrentHashMap<String,PublishReport> reports=new ConcurrentHashMap<>();
|
||||
|
||||
|
||||
public File toFile(FolderConfiguration configuration) {
|
||||
return toFile(configuration,this);
|
||||
}
|
||||
|
||||
|
||||
private static final File toFile(FolderConfiguration config,Report report) {
|
||||
PrintWriter writer =null;
|
||||
try {
|
||||
File toReturn=File.createTempFile("tempFile", ".tmp");
|
||||
writer=new PrintWriter(toReturn);
|
||||
|
||||
writer.println("REPORT FOR WS-SYNCH");
|
||||
writer.println("Configuratiion was : "+config);
|
||||
writer.println("Submitted runs : "+report.getRequestCount());
|
||||
writer.println("Item reports : ");
|
||||
for(Entry<String,PublishReport> entry: report.getReports().entrySet()) {
|
||||
PublishReport rep=entry.getValue();
|
||||
writer.println("*********************************************************");
|
||||
if(rep.isError()) writer.println("OPERATION IS FAILED");
|
||||
writer.println("ITEM ID : "+rep.getSourceId());
|
||||
writer.println("ITEM NAME : "+rep.getSourceName());
|
||||
if(rep.getTransferResult()!=null)writer.println("Transfer report : "+rep.getTransferResult());
|
||||
if(rep.getMetadataReport()!=null)writer.println("Metadata report : "+rep.getMetadataReport());
|
||||
}
|
||||
return toReturn;
|
||||
}catch(Throwable t) {
|
||||
throw new RuntimeException(t);
|
||||
}finally {
|
||||
if(writer!=null) {
|
||||
IOUtils.closeQuietly(writer);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private Report report=new Report();
|
||||
private ExecutorService service=null;
|
||||
|
||||
public TransferRequestServer() {
|
||||
BlockingQueue<Runnable> linkedBlockingDeque = new LinkedBlockingDeque<Runnable>(
|
||||
100);
|
||||
service= new ThreadPoolExecutor(1, 10, 30,
|
||||
TimeUnit.SECONDS, linkedBlockingDeque,
|
||||
new ThreadPoolExecutor.CallerRunsPolicy());
|
||||
}
|
||||
|
||||
|
||||
public void put(PublishRequest request){
|
||||
System.out.println("Submitting transfer "+getReport().requestCount.incrementAndGet());
|
||||
service.execute(new PublishThread(request, getReport().getReports()));
|
||||
|
||||
// service.execute(new RequestThread(baseUrl,filename,this,publishScope,toPublishMeta));
|
||||
}
|
||||
@Synchronized
|
||||
public Report getReport(){
|
||||
return report;
|
||||
}
|
||||
|
||||
|
||||
public void waitCompletion() {
|
||||
boolean running=true;
|
||||
service.shutdown();
|
||||
while(running){
|
||||
System.out.println("******************* WAITING FOR TERMINATION ***************** ");
|
||||
try{
|
||||
running=!service.awaitTermination(Long.MAX_VALUE, TimeUnit.MINUTES);
|
||||
}catch(InterruptedException e){
|
||||
running=!service.isTerminated();
|
||||
}
|
||||
}
|
||||
System.out.println("Service is completed : "+service.isTerminated());
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,30 @@
|
||||
package org.gcube.usecases.ws.thredds;
|
||||
|
||||
import java.util.Properties;
|
||||
|
||||
import org.gcube.common.authorization.library.provider.SecurityTokenProvider;
|
||||
import org.gcube.usecases.ws.thredds.engine.impl.security.Security;
|
||||
import org.gcube.usecases.ws.thredds.engine.impl.security.User;
|
||||
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
@Slf4j
|
||||
public class TokenSetter {
|
||||
|
||||
private static Properties props=null;
|
||||
|
||||
|
||||
public static synchronized void set(String scope){
|
||||
if(props==null) {
|
||||
props=new Properties();
|
||||
try {
|
||||
props.load(TokenSetter.class.getResourceAsStream("/tokens.properties"));
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException("YOU NEED TO SET TOKEN FILE IN CONFIGURATION");
|
||||
}
|
||||
}
|
||||
if(!props.containsKey(scope)) throw new RuntimeException("No token found for scope : "+scope);
|
||||
Security.set(new User(null, null, props.getProperty(scope), scope));
|
||||
}
|
||||
|
||||
}
|
Loading…
Reference in New Issue