Downgrade to legacy storage implementation
This commit is contained in:
parent
f1c2e4eb7a
commit
9eab028bab
|
@ -19,7 +19,7 @@ import lombok.extern.slf4j.Slf4j;
|
||||||
@Slf4j
|
@Slf4j
|
||||||
public class StorageUtils {
|
public class StorageUtils {
|
||||||
|
|
||||||
public static final IClient getClient(){
|
private static final IClient getClient(){
|
||||||
return new StorageClient(InterfaceConstants.SERVICE_CLASS, InterfaceConstants.SERVICE_NAME,
|
return new StorageClient(InterfaceConstants.SERVICE_CLASS, InterfaceConstants.SERVICE_NAME,
|
||||||
ContextUtils.getCurrentCaller(), AccessType.SHARED, MemoryType.VOLATILE).getClient();
|
ContextUtils.getCurrentCaller(), AccessType.SHARED, MemoryType.VOLATILE).getClient();
|
||||||
}
|
}
|
||||||
|
@ -30,7 +30,7 @@ public class StorageUtils {
|
||||||
}
|
}
|
||||||
|
|
||||||
public void forceClose(){
|
public void forceClose(){
|
||||||
client.forceClose();
|
// client.forceClose();
|
||||||
}
|
}
|
||||||
|
|
||||||
public TempFile putOntoStorage(InputStream source,String filename) throws RemoteBackendException, FileNotFoundException{
|
public TempFile putOntoStorage(InputStream source,String filename) throws RemoteBackendException, FileNotFoundException{
|
||||||
|
|
|
@ -0,0 +1,101 @@
|
||||||
|
package org.gcube.application.geoportal.common;
|
||||||
|
|
||||||
|
import com.mongodb.MongoWaitQueueFullException;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.gcube.application.geoportal.common.rest.InterfaceConstants;
|
||||||
|
import org.gcube.application.geoportal.common.utils.ContextUtils;
|
||||||
|
import org.gcube.application.geoportal.common.utils.StorageUtils;
|
||||||
|
import org.gcube.contentmanagement.blobstorage.service.IClient;
|
||||||
|
import org.gcube.contentmanager.storageclient.wrapper.AccessType;
|
||||||
|
import org.gcube.contentmanager.storageclient.wrapper.MemoryType;
|
||||||
|
import org.gcube.contentmanager.storageclient.wrapper.StorageClient;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import java.io.FileInputStream;
|
||||||
|
import java.io.FileNotFoundException;
|
||||||
|
import java.time.Duration;
|
||||||
|
import java.time.LocalDateTime;
|
||||||
|
import java.time.temporal.ChronoUnit;
|
||||||
|
import java.util.concurrent.ExecutorService;
|
||||||
|
import java.util.concurrent.Executors;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
|
|
||||||
|
@Slf4j
|
||||||
|
public class StorageUtilsTest {
|
||||||
|
|
||||||
|
IClient client= null;
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void init(){
|
||||||
|
TokenSetter.set("/gcube/devsec/devVRE");
|
||||||
|
client=new StorageClient(InterfaceConstants.SERVICE_CLASS, InterfaceConstants.SERVICE_NAME,
|
||||||
|
ContextUtils.getCurrentCaller(), AccessType.SHARED, MemoryType.VOLATILE).getClient();
|
||||||
|
}
|
||||||
|
|
||||||
|
private String getFileID() throws FileNotFoundException {
|
||||||
|
return client.put(true).LFile(
|
||||||
|
new FileInputStream("../test-data/concessioni/relazione.pdf")).
|
||||||
|
RFile(StorageUtils.getUniqueString());
|
||||||
|
}
|
||||||
|
|
||||||
|
private String getURL(String id){
|
||||||
|
return client.getHttpsUrl().RFile(id);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testParallelStorage() throws FileNotFoundException, InterruptedException {
|
||||||
|
|
||||||
|
ExecutorService service = Executors.newFixedThreadPool(10);
|
||||||
|
LocalDateTime start=LocalDateTime.now();
|
||||||
|
AtomicLong executed = new AtomicLong(0);
|
||||||
|
AtomicLong launched = new AtomicLong(0);
|
||||||
|
|
||||||
|
String id=getFileID();
|
||||||
|
//for 100 secs
|
||||||
|
while(Duration.between(start,LocalDateTime.now()).
|
||||||
|
compareTo(Duration.of(100, ChronoUnit.SECONDS))<0){
|
||||||
|
service.execute(new Runnable() {
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
try {
|
||||||
|
System.out.println(getURL(id));
|
||||||
|
} catch (Throwable t) {
|
||||||
|
log.info("Too many connections... ");
|
||||||
|
}finally{
|
||||||
|
executed.incrementAndGet();
|
||||||
|
try {Thread.sleep(500);} catch (InterruptedException i) {}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
launched.incrementAndGet();
|
||||||
|
}
|
||||||
|
|
||||||
|
while (!service.awaitTermination(2, TimeUnit.MINUTES)) {
|
||||||
|
log.info("Waiting .. completed {}, out of {} ",executed.get(),launched.get());
|
||||||
|
if(executed.get()==launched.get()) service.shutdown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSerialStorage() throws FileNotFoundException {
|
||||||
|
TokenSetter.set("/gcube/devsec/devVRE");
|
||||||
|
//get client
|
||||||
|
client=new StorageClient(InterfaceConstants.SERVICE_CLASS, InterfaceConstants.SERVICE_NAME,
|
||||||
|
ContextUtils.getCurrentCaller(), AccessType.SHARED, MemoryType.VOLATILE).getClient();
|
||||||
|
|
||||||
|
//put file
|
||||||
|
String id= client.put(true).LFile(
|
||||||
|
new FileInputStream("../test-data/concessioni/relazione.pdf")).
|
||||||
|
RFile(StorageUtils.getUniqueString());
|
||||||
|
|
||||||
|
for(int i = 0; i<1000;i++){
|
||||||
|
//get ID
|
||||||
|
System.out.println(client.getHttpsUrl().RFile(id));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,33 @@
|
||||||
|
package org.gcube.application.geoportal.common;
|
||||||
|
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.gcube.common.authorization.library.provider.SecurityTokenProvider;
|
||||||
|
import org.gcube.common.scope.api.ScopeProvider;
|
||||||
|
|
||||||
|
import java.util.Properties;
|
||||||
|
|
||||||
|
@Slf4j
|
||||||
|
public class TokenSetter {
|
||||||
|
|
||||||
|
|
||||||
|
private static Properties props=new Properties();
|
||||||
|
|
||||||
|
static{
|
||||||
|
try {
|
||||||
|
props.load(TokenSetter.class.getResourceAsStream("/tokens.properties"));
|
||||||
|
} catch (Exception e) {
|
||||||
|
throw new RuntimeException("YOU NEED TO SET TOKEN FILE IN CONFIGURATION");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
public static void set(String scope){
|
||||||
|
try{
|
||||||
|
if(!props.containsKey(scope)) throw new RuntimeException("No token found for scope : "+scope);
|
||||||
|
SecurityTokenProvider.instance.set(props.getProperty(scope));
|
||||||
|
}catch(Throwable e){
|
||||||
|
log.warn("Unable to set token for scope "+scope,e);
|
||||||
|
}
|
||||||
|
ScopeProvider.instance.set(scope);
|
||||||
|
}
|
||||||
|
}
|
|
@ -1,23 +1,10 @@
|
||||||
package org.gcube.application.geoportal.service.engine.providers;
|
package org.gcube.application.geoportal.service.engine.providers;
|
||||||
|
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.gcube.application.geoportal.common.rest.InterfaceConstants;
|
|
||||||
import org.gcube.application.geoportal.common.utils.StorageUtils;
|
import org.gcube.application.geoportal.common.utils.StorageUtils;
|
||||||
import org.gcube.application.geoportal.service.model.internal.faults.ConfigurationException;
|
import org.gcube.application.geoportal.service.model.internal.faults.ConfigurationException;
|
||||||
import org.gcube.application.geoportal.service.utils.ContextUtils;
|
|
||||||
import org.gcube.contentmanagement.blobstorage.service.IClient;
|
|
||||||
import org.gcube.contentmanagement.blobstorage.transport.backend.RemoteBackendException;
|
|
||||||
import org.gcube.contentmanager.storageclient.wrapper.AccessType;
|
|
||||||
import org.gcube.contentmanager.storageclient.wrapper.MemoryType;
|
|
||||||
import org.gcube.contentmanager.storageclient.wrapper.StorageClient;
|
|
||||||
import org.gcube.data.transfer.library.utils.Utils;
|
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.io.InputStream;
|
|
||||||
import java.net.MalformedURLException;
|
|
||||||
import java.net.URL;
|
|
||||||
import java.time.Duration;
|
|
||||||
import java.time.temporal.ChronoUnit;
|
|
||||||
@Slf4j
|
@Slf4j
|
||||||
public class StorageClientProvider extends AbstractScopedMap<StorageUtils> {
|
public class StorageClientProvider extends AbstractScopedMap<StorageUtils> {
|
||||||
|
|
||||||
|
@ -30,7 +17,11 @@ public class StorageClientProvider extends AbstractScopedMap<StorageUtils> {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected StorageUtils retrieveObject() throws ConfigurationException {
|
protected StorageUtils retrieveObject() throws ConfigurationException {
|
||||||
return new StorageUtils();
|
try{
|
||||||
|
return new StorageUtils();
|
||||||
|
}catch(Throwable t){
|
||||||
|
throw new ConfigurationException("unable to get Storage",t);
|
||||||
|
}
|
||||||
// return new StorageClient(InterfaceConstants.SERVICE_CLASS, InterfaceConstants.SERVICE_NAME, ContextUtils.getCurrentCaller(), AccessType.SHARED, MemoryType.VOLATILE).getClient();
|
// return new StorageClient(InterfaceConstants.SERVICE_CLASS, InterfaceConstants.SERVICE_NAME, ContextUtils.getCurrentCaller(), AccessType.SHARED, MemoryType.VOLATILE).getClient();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1,15 +1,21 @@
|
||||||
package org.gcube.application.geoportal.service.engine.caches;
|
package org.gcube.application.geoportal.service.engine.caches;
|
||||||
|
|
||||||
|
import ch.qos.logback.core.net.SyslogOutputStream;
|
||||||
import com.mongodb.MongoWaitQueueFullException;
|
import com.mongodb.MongoWaitQueueFullException;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.gcube.application.cms.tests.TokenSetter;
|
import org.gcube.application.cms.tests.TokenSetter;
|
||||||
|
import org.gcube.application.cms.tests.model.TestModel;
|
||||||
|
import org.gcube.application.geoportal.common.utils.StorageUtils;
|
||||||
import org.gcube.application.geoportal.service.BasicServiceTestUnit;
|
import org.gcube.application.geoportal.service.BasicServiceTestUnit;
|
||||||
|
import org.gcube.application.geoportal.service.engine.ImplementationProvider;
|
||||||
import org.gcube.application.geoportal.service.engine.mongo.ConcessioniMongoManager;
|
import org.gcube.application.geoportal.service.engine.mongo.ConcessioniMongoManager;
|
||||||
import org.gcube.application.geoportal.service.engine.providers.AbstractScopedMap;
|
import org.gcube.application.geoportal.service.engine.providers.AbstractScopedMap;
|
||||||
import org.gcube.application.geoportal.service.engine.providers.TTLObject;
|
import org.gcube.application.geoportal.service.engine.providers.TTLObject;
|
||||||
import org.gcube.application.geoportal.service.model.internal.faults.ConfigurationException;
|
import org.gcube.application.geoportal.service.model.internal.faults.ConfigurationException;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
|
import java.io.FileNotFoundException;
|
||||||
import java.time.Duration;
|
import java.time.Duration;
|
||||||
import java.time.Instant;
|
import java.time.Instant;
|
||||||
import java.time.LocalDateTime;
|
import java.time.LocalDateTime;
|
||||||
|
@ -98,5 +104,44 @@ public class Caches extends BasicServiceTestUnit {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testStorageCache() throws ConfigurationException, FileNotFoundException, InterruptedException {
|
||||||
|
TokenSetter.set(scope);
|
||||||
|
ExecutorService service = Executors.newFixedThreadPool(10);
|
||||||
|
LocalDateTime start=LocalDateTime.now();
|
||||||
|
AtomicLong executed = new AtomicLong(0);
|
||||||
|
AtomicLong launched = new AtomicLong(0);
|
||||||
|
|
||||||
|
final StorageUtils storage=ImplementationProvider.get().getStorageProvider().getObject();
|
||||||
|
|
||||||
|
String id =storage.putOntoStorage(new File(TestModel.getBaseFolder(),"relazione.pdf"))[0].getId();
|
||||||
|
|
||||||
|
//for 100 secs
|
||||||
|
while(Duration.between(start,LocalDateTime.now()).
|
||||||
|
compareTo(Duration.of(100, ChronoUnit.SECONDS))<0){
|
||||||
|
service.execute(new Runnable() {
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
try {
|
||||||
|
// System.out.println(ImplementationProvider.get().getStorageProvider().getObject().getURL(id));
|
||||||
|
storage.getURL(id);
|
||||||
|
// } catch (ConfigurationException e) {
|
||||||
|
// e.printStackTrace();
|
||||||
|
} catch (MongoWaitQueueFullException e) {
|
||||||
|
log.info("Too many connections... ");
|
||||||
|
}finally{
|
||||||
|
executed.incrementAndGet();
|
||||||
|
try {Thread.sleep(500);} catch (InterruptedException i) {}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
launched.incrementAndGet();
|
||||||
|
}
|
||||||
|
|
||||||
|
while (!service.awaitTermination(2, TimeUnit.MINUTES)) {
|
||||||
|
log.info("Waiting .. completed {}, out of {} ",executed.get(),launched.get());
|
||||||
|
if(executed.get()==launched.get()) service.shutdown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
5
pom.xml
5
pom.xml
|
@ -107,12 +107,13 @@
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>org.gcube.contentmanagement</groupId>
|
<groupId>org.gcube.contentmanagement</groupId>
|
||||||
<artifactId>storage-manager-core</artifactId>
|
<artifactId>storage-manager-core</artifactId>
|
||||||
<version>[2.0.0, 3.0.0-SNAPSHOT)</version>
|
|
||||||
|
<version>[2.0.0, 2.9.0-SNAPSHOT)</version>
|
||||||
</dependency>
|
</dependency>
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>org.gcube.contentmanagement</groupId>
|
<groupId>org.gcube.contentmanagement</groupId>
|
||||||
<artifactId>storage-manager-wrapper</artifactId>
|
<artifactId>storage-manager-wrapper</artifactId>
|
||||||
<version>[2.0.0, 3.0.0-SNAPSHOT)</version>
|
<version>[2.0.0, 2.9.0-SNAPSHOT)</version>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
|
||||||
<dependency>
|
<dependency>
|
||||||
|
|
|
@ -37,7 +37,7 @@ public class ConcessionPublisherThread implements Runnable{
|
||||||
private ConcurrentLinkedQueue<Concessione> error=new ConcurrentLinkedQueue<>();
|
private ConcurrentLinkedQueue<Concessione> error=new ConcurrentLinkedQueue<>();
|
||||||
private ConcurrentLinkedQueue<Concessione> success=new ConcurrentLinkedQueue<>();
|
private ConcurrentLinkedQueue<Concessione> success=new ConcurrentLinkedQueue<>();
|
||||||
private ConcurrentLinkedQueue<Concessione> warning=new ConcurrentLinkedQueue<>();
|
private ConcurrentLinkedQueue<Concessione> warning=new ConcurrentLinkedQueue<>();
|
||||||
private ConcurrentLinkedQueue<Concessione> noReport=new ConcurrentLinkedQueue<>();
|
private ConcurrentLinkedQueue<String> noReport=new ConcurrentLinkedQueue<>();
|
||||||
}
|
}
|
||||||
|
|
||||||
private static ConcessionPublisherThread.Report report=new ConcessionPublisherThread.Report();
|
private static ConcessionPublisherThread.Report report=new ConcessionPublisherThread.Report();
|
||||||
|
@ -106,7 +106,7 @@ public class ConcessionPublisherThread implements Runnable{
|
||||||
|
|
||||||
}catch(Throwable t){
|
}catch(Throwable t){
|
||||||
log.error("Problematic entry "+this,t);
|
log.error("Problematic entry "+this,t);
|
||||||
report.getNoReport().add(c);
|
report.getNoReport().add(projectName + baseDir);
|
||||||
}finally{
|
}finally{
|
||||||
log.info("Completed N {}", completed.incrementAndGet());
|
log.info("Completed N {}", completed.incrementAndGet());
|
||||||
}
|
}
|
||||||
|
|
|
@ -81,7 +81,9 @@ public class MockFromFolder {
|
||||||
System.out.println("WARNING "+report.getWarning().size());
|
System.out.println("WARNING "+report.getWarning().size());
|
||||||
report.getWarning().forEach(concessionePrinter);
|
report.getWarning().forEach(concessionePrinter);
|
||||||
System.out.println("NO REPORT "+report.getNoReport().size());
|
System.out.println("NO REPORT "+report.getNoReport().size());
|
||||||
report.getNoReport().forEach(concessionePrinter);
|
report.getNoReport().forEach(s -> {
|
||||||
|
System.out.println(s);
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue