git-svn-id: http://svn.research-infrastructures.eu/public/d4science/gcube/trunk/data-publishing/gCat-Feeder-Suite@178673 82a268e6-3cf1-43bd-a215-b396298e98cf
This commit is contained in:
parent
321eacfd38
commit
f0f21c51cc
|
@ -1,15 +1,16 @@
|
|||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<parent>
|
||||
<groupId>org.gcube.data-publishing.gCat-Feeder</groupId>
|
||||
<artifactId>gCat-Feeder-Suite</artifactId>
|
||||
<version>1.0.0-SNAPSHOT</version>
|
||||
</parent>
|
||||
<artifactId>gCat-Feeder</artifactId>
|
||||
<name>gCat-Feeder</name>
|
||||
<description>Service implementation</description>
|
||||
|
||||
<dependencyManagement>
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<parent>
|
||||
<groupId>org.gcube.data-publishing.gCat-Feeder</groupId>
|
||||
<artifactId>gCat-Feeder-Suite</artifactId>
|
||||
<version>1.0.0-SNAPSHOT</version>
|
||||
</parent>
|
||||
<artifactId>gCat-Feeder</artifactId>
|
||||
<name>gCat-Feeder</name>
|
||||
<description>Service implementation</description>
|
||||
|
||||
<dependencyManagement>
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.gcube.distribution</groupId>
|
||||
|
@ -21,32 +22,33 @@
|
|||
<dependency>
|
||||
<groupId> org.glassfish.jersey </groupId>
|
||||
<artifactId>jersey-bom</artifactId>
|
||||
<version>2.25.1</version>
|
||||
<!-- <version>2.25.1</version> -->
|
||||
<version>2.14</version>
|
||||
<type>pom</type>
|
||||
<scope>import</scope>
|
||||
</dependency>
|
||||
|
||||
|
||||
</dependencies>
|
||||
</dependencyManagement>
|
||||
|
||||
|
||||
<dependencies>
|
||||
|
||||
<dependency>
|
||||
|
||||
|
||||
<dependencies>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.gcube.data-publishing.gCat-Feeder</groupId>
|
||||
<artifactId>collectors-plugin-framework</artifactId>
|
||||
<version>[1.0.0-SNAPSHOT,2.0.0-SNAPSHOT)</version>
|
||||
</dependency>
|
||||
|
||||
|
||||
<dependency>
|
||||
|
||||
|
||||
<dependency>
|
||||
<groupId>org.gcube.data-publishing.gCat-Feeder</groupId>
|
||||
<artifactId>catalogue-plugin-framework</artifactId>
|
||||
<version>[1.0.0-SNAPSHOT,2.0.0-SNAPSHOT)</version>
|
||||
</dependency>
|
||||
|
||||
|
||||
<dependency>
|
||||
|
||||
|
||||
<dependency>
|
||||
<groupId>org.gcube.core</groupId>
|
||||
<artifactId>common-smartgears-app</artifactId>
|
||||
</dependency>
|
||||
|
@ -54,7 +56,7 @@
|
|||
<groupId>org.gcube.core</groupId>
|
||||
<artifactId>common-smartgears</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<dependency>
|
||||
<groupId>javax.servlet</groupId>
|
||||
<artifactId>javax.servlet-api</artifactId>
|
||||
<version>3.0.1</version>
|
||||
|
@ -63,14 +65,14 @@
|
|||
<groupId>javax.ws.rs</groupId>
|
||||
<artifactId>javax.ws.rs-api</artifactId>
|
||||
</dependency>
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
<dependency>
|
||||
<groupId>org.glassfish.jersey.containers</groupId>
|
||||
<artifactId>jersey-container-servlet</artifactId>
|
||||
</dependency>
|
||||
|
||||
|
||||
<!-- test -->
|
||||
<dependency>
|
||||
<groupId>org.glassfish.jersey.test-framework.providers</groupId>
|
||||
|
@ -83,8 +85,19 @@
|
|||
<version>1.0.13</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
|
||||
|
||||
</dependencies>
|
||||
|
||||
|
||||
<dependency>
|
||||
<groupId>org.glassfish.jersey.ext.cdi</groupId>
|
||||
<artifactId>jersey-cdi1x</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.glassfish.jersey.ext.cdi</groupId>
|
||||
<artifactId>jersey-weld2-se</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
|
||||
|
||||
</dependencies>
|
||||
|
||||
</project>
|
|
@ -1,5 +1,6 @@
|
|||
package org.gcube.data.publishing.gCatFeeder.service;
|
||||
|
||||
import javax.inject.Singleton;
|
||||
import javax.ws.rs.ApplicationPath;
|
||||
|
||||
import org.gcube.data.publishing.gCatFeeder.service.engine.CatalogueControllersManager;
|
||||
|
@ -25,9 +26,9 @@ public class GCatFeeder extends ResourceConfig{
|
|||
@Override
|
||||
protected void configure() {
|
||||
bind(FeederEngineImpl.class).to(FeederEngine.class);
|
||||
bind(CatalogueControllersManagerImpl.class).to(CatalogueControllersManager.class);
|
||||
bind(CollectorsManagerImpl.class).to(CollectorsManager.class);
|
||||
bind(ExecutionManagerImpl.class).to(ExecutionManager.class);
|
||||
bind(CatalogueControllersManagerImpl.class).to(CatalogueControllersManager.class).in(Singleton.class);
|
||||
bind(CollectorsManagerImpl.class).to(CollectorsManager.class).in(Singleton.class);
|
||||
bind(ExecutionManagerImpl.class).to(ExecutionManager.class).in(Singleton.class);
|
||||
}
|
||||
};
|
||||
register(binder);
|
||||
|
|
|
@ -2,10 +2,7 @@ package org.gcube.data.publishing.gCatFeeder.service;
|
|||
|
||||
import javax.inject.Inject;
|
||||
|
||||
import org.gcube.data.publishing.gCatFeeder.service.engine.CatalogueControllersManager;
|
||||
import org.gcube.data.publishing.gCatFeeder.service.engine.CollectorsManager;
|
||||
import org.gcube.data.publishing.gCatFeeder.service.engine.ExecutionManager;
|
||||
import org.gcube.data.publishing.gCatFeeder.service.model.fault.InternalError;
|
||||
import org.gcube.smartgears.ApplicationManager;
|
||||
|
||||
public class GCatFeederManager implements ApplicationManager{
|
||||
|
@ -13,20 +10,10 @@ public class GCatFeederManager implements ApplicationManager{
|
|||
ExecutionManager executions;
|
||||
|
||||
|
||||
@Inject
|
||||
CollectorsManager collectors;
|
||||
|
||||
@Inject
|
||||
CatalogueControllersManager controllers;
|
||||
|
||||
@Override
|
||||
public void onInit() {
|
||||
try {
|
||||
collectors.init();
|
||||
controllers.init();
|
||||
} catch (InternalError e) {
|
||||
throw new RuntimeException("Initialization Error",e);
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
||||
|
|
|
@ -12,8 +12,5 @@ public interface CatalogueControllersManager {
|
|||
public Set<String> getAvailableControllers();
|
||||
public CataloguePlugin getPluginById(String collectorId) throws CataloguePluginNotFound;
|
||||
|
||||
public void init() throws InternalError;
|
||||
|
||||
|
||||
public void initInScope() throws InternalError;
|
||||
}
|
||||
|
|
|
@ -11,8 +11,5 @@ public interface CollectorsManager {
|
|||
public Set<String> getAvailableCollectors();
|
||||
public CollectorPlugin<?> getPluginById(String collectorId) throws CollectorNotFound;
|
||||
|
||||
public void init() throws InternalError;
|
||||
|
||||
|
||||
public void initInScope() throws InternalError;
|
||||
}
|
||||
|
|
|
@ -5,7 +5,7 @@ import java.util.ServiceLoader;
|
|||
import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
import javax.inject.Singleton;
|
||||
import javax.annotation.PostConstruct;
|
||||
|
||||
import org.gcube.data.publishing.gCatFeeder.catalogues.CataloguePlugin;
|
||||
import org.gcube.data.publishing.gCatFeeder.service.engine.CatalogueControllersManager;
|
||||
|
@ -15,7 +15,6 @@ import org.gcube.data.publishing.gCatFeeder.utils.ContextUtils;
|
|||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
@Singleton
|
||||
public class CatalogueControllersManagerImpl implements CatalogueControllersManager {
|
||||
|
||||
private static final Logger log= LoggerFactory.getLogger(CatalogueControllersManagerImpl.class);
|
||||
|
@ -25,21 +24,33 @@ public class CatalogueControllersManagerImpl implements CatalogueControllersMana
|
|||
|
||||
private ConcurrentHashMap<String,CataloguePlugin> availablePlugins=new ConcurrentHashMap<String, CataloguePlugin>();
|
||||
|
||||
|
||||
public CatalogueControllersManagerImpl() {
|
||||
@PostConstruct
|
||||
public void post() {
|
||||
//load plugins
|
||||
log.debug("Loading catalogue plugins...");
|
||||
cataloguePluginsLoader=ServiceLoader.load(CataloguePlugin.class);
|
||||
for(CataloguePlugin plugin:cataloguePluginsLoader) {
|
||||
log.debug("Loading {} ",plugin.getClass());
|
||||
log.debug("Descriptor {} ",plugin.getDescriptor());
|
||||
availablePlugins.put(plugin.getDescriptor().getId(), plugin);
|
||||
}
|
||||
log.trace("Loaded {} catalogue plugins ",availablePlugins.size());
|
||||
log.debug("Loading catalogue plugins...");
|
||||
cataloguePluginsLoader=ServiceLoader.load(CataloguePlugin.class);
|
||||
for(CataloguePlugin plugin:cataloguePluginsLoader) {
|
||||
log.debug("Loading {} ",plugin.getClass());
|
||||
log.debug("Descriptor {} ",plugin.getDescriptor());
|
||||
availablePlugins.put(plugin.getDescriptor().getId(), plugin);
|
||||
}
|
||||
log.trace("Loaded {} catalogue plugins ",availablePlugins.size());
|
||||
|
||||
log.trace("Static initialization...");
|
||||
for(Entry<String,CataloguePlugin> entry:availablePlugins.entrySet()) {
|
||||
log.debug("Static initialization for : {} ",entry.getKey());
|
||||
try {
|
||||
entry.getValue().init();
|
||||
}catch(Throwable t) {
|
||||
log.error("Unexpected exception while initializing {} ",entry.getKey(),t);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
@Override
|
||||
public Set<String> getAvailableControllers() {
|
||||
return availablePlugins.keySet();
|
||||
|
@ -51,18 +62,7 @@ public class CatalogueControllersManagerImpl implements CatalogueControllersMana
|
|||
else throw new CataloguePluginNotFound("Catalogue plugin "+pluginId+" not available.");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void init() throws InternalError {
|
||||
log.trace("Static initialization...");
|
||||
for(Entry<String,CataloguePlugin> entry:availablePlugins.entrySet()) {
|
||||
log.debug("Static initialization for : {} ",entry.getKey());
|
||||
try {
|
||||
entry.getValue().init();
|
||||
}catch(Throwable t) {
|
||||
log.error("Unexpected exception while initializing {} ",entry.getKey(),t);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void initInScope() throws InternalError {
|
||||
|
|
|
@ -5,6 +5,8 @@ import java.util.Set;
|
|||
import java.util.Map.Entry;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
import javax.annotation.PostConstruct;
|
||||
import javax.enterprise.context.ApplicationScoped;
|
||||
import javax.inject.Singleton;
|
||||
|
||||
import org.gcube.data.publishing.gCatFeeder.catalogues.CataloguePlugin;
|
||||
|
@ -17,7 +19,6 @@ import org.gcube.data.publishing.gCatfeeder.collectors.CollectorPlugin;
|
|||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
@Singleton
|
||||
public class CollectorsManagerImpl implements CollectorsManager {
|
||||
|
||||
|
||||
|
@ -28,20 +29,29 @@ public class CollectorsManagerImpl implements CollectorsManager {
|
|||
|
||||
private ConcurrentHashMap<String,CollectorPlugin> availablePlugins=new ConcurrentHashMap<>();
|
||||
|
||||
|
||||
public CollectorsManagerImpl() {
|
||||
@PostConstruct
|
||||
public void post() {
|
||||
//load plugins
|
||||
log.debug("Loading collector plugins...");
|
||||
collectorPluginsLoader=ServiceLoader.load(CollectorPlugin.class);
|
||||
for(CollectorPlugin plugin:collectorPluginsLoader) {
|
||||
log.debug("Loading {} ",plugin.getClass());
|
||||
log.debug("Descriptor {} ",plugin.getDescriptor());
|
||||
availablePlugins.put(plugin.getDescriptor().getName(), plugin);
|
||||
}
|
||||
log.trace("Loaded {} collector plugins ",availablePlugins.size());
|
||||
log.debug("Loading collector plugins...");
|
||||
collectorPluginsLoader=ServiceLoader.load(CollectorPlugin.class);
|
||||
for(CollectorPlugin plugin:collectorPluginsLoader) {
|
||||
log.debug("Loading {} ",plugin.getClass());
|
||||
log.debug("Descriptor {} ",plugin.getDescriptor());
|
||||
availablePlugins.put(plugin.getDescriptor().getName(), plugin);
|
||||
}
|
||||
log.trace("Loaded {} collector plugins ",availablePlugins.size());
|
||||
|
||||
log.trace("Static initialization...");
|
||||
for(Entry<String,CollectorPlugin> entry:availablePlugins.entrySet()) {
|
||||
log.debug("Static initialization for : {} ",entry.getKey());
|
||||
try {
|
||||
entry.getValue().init();
|
||||
}catch(Throwable t) {
|
||||
log.error("Unexpected exception while initializing {} ",entry.getKey(),t);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public Set<String> getAvailableCollectors() {
|
||||
return availablePlugins.keySet();
|
||||
|
@ -53,18 +63,7 @@ public class CollectorsManagerImpl implements CollectorsManager {
|
|||
else throw new CollectorNotFound("Collector plugin "+collectorId+" not available.");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void init() throws InternalError {
|
||||
log.trace("Static initialization...");
|
||||
for(Entry<String,CollectorPlugin> entry:availablePlugins.entrySet()) {
|
||||
log.debug("Static initialization for : {} ",entry.getKey());
|
||||
try {
|
||||
entry.getValue().init();
|
||||
}catch(Throwable t) {
|
||||
log.error("Unexpected exception while initializing {} ",entry.getKey(),t);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
@Override
|
||||
|
|
|
@ -1,21 +1,32 @@
|
|||
package org.gcube.data.publishing.gCatFeeder.service.engine.impl;
|
||||
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import javax.inject.Singleton;
|
||||
import javax.annotation.PostConstruct;
|
||||
|
||||
import org.gcube.data.publishing.gCatFeeder.service.engine.ExecutionManager;
|
||||
import org.gcube.data.publishing.gCatFeeder.service.model.ExecutionDescriptor;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
@Singleton
|
||||
|
||||
public class ExecutionManagerImpl implements ExecutionManager {
|
||||
|
||||
private ThreadPoolExecutor executor=null;
|
||||
private static final Logger log= LoggerFactory.getLogger(ExecutionManagerImpl.class);
|
||||
|
||||
|
||||
private boolean defaultConfiguration=true;
|
||||
|
||||
@PostConstruct
|
||||
private void post() {
|
||||
executor=new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS,
|
||||
new LinkedBlockingQueue<Runnable>());
|
||||
}
|
||||
|
||||
|
||||
|
||||
@Override
|
||||
public synchronized void submit(ExecutionDescriptor desc) {
|
||||
log.debug("Checking if {} is already in queue");
|
||||
|
@ -23,7 +34,11 @@ public class ExecutionManagerImpl implements ExecutionManager {
|
|||
if(!executor.getQueue().contains(toSubmit)) {
|
||||
log.trace("Inserting execution in queue {} ");
|
||||
executor.execute(toSubmit);
|
||||
log.debug("Request submitted");
|
||||
}else {
|
||||
log.debug("Execution already in queue");
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -42,7 +57,7 @@ public class ExecutionManagerImpl implements ExecutionManager {
|
|||
@Override
|
||||
public synchronized void init(ExecutionManagerConfiguration config) {
|
||||
// NEED TO BE IDEMPOTENT
|
||||
if(executor==null) {
|
||||
if(executor==null||defaultConfiguration) {
|
||||
throw new RuntimeException("NOT YET IMPLEMENTED");
|
||||
// executor=new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, new ConcurrentLinkedQueue<ExecutionTask>());
|
||||
}
|
||||
|
|
|
@ -103,59 +103,59 @@ public class ExecutionTask implements Runnable {
|
|||
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
log.debug("Starting to handle {} ",request);
|
||||
|
||||
log.debug("Setting caller token..");
|
||||
String actualToken=CommonUtils.decryptString(request.getCallerEncryptedToken());
|
||||
SecurityTokenProvider.instance.set(actualToken);
|
||||
|
||||
// try to lock request (FINISH ON FAIL)
|
||||
if(persistence.acquire(request.getId())) {
|
||||
try {
|
||||
|
||||
|
||||
log.info("Acquired : {} ",request);
|
||||
|
||||
ExecutionReport report=new ExecutionReport();
|
||||
report.getGenericInformations().setStartTime(Instant.now());
|
||||
report.setStartingScope(ContextUtils.getCurrentScopeName());
|
||||
|
||||
// -- ON SUCCESS reload request
|
||||
request =persistence.getById(request.getId());
|
||||
|
||||
// For ALL COLLECTORS IN REQUEST
|
||||
for(String collectorId : request.getCollectors()) {
|
||||
CollectorReport collectorReport=handleCollector(collectorId);
|
||||
report.getCollectorReports().add(collectorReport);
|
||||
|
||||
}
|
||||
|
||||
// FINALIZE
|
||||
|
||||
report.getGenericInformations().setEndTime(Instant.now());
|
||||
|
||||
String reportUrl=storeReport(report);
|
||||
log.info("Stored report at {} ",reportUrl);
|
||||
request.setReportUrl(reportUrl);
|
||||
request.setStatus(ExecutionStatus.SUCCESS);
|
||||
persistence.update(request);
|
||||
}catch(PersistenceError | InvalidRequest e) {
|
||||
log.error("Unexpected exception while dealing with persistence ",e);
|
||||
}catch(Throwable t) {
|
||||
log.error("Unexpected generic exception.",t);
|
||||
request.setStatus(ExecutionStatus.FAILED);
|
||||
persistence.update(request);
|
||||
}
|
||||
|
||||
|
||||
|
||||
}else {
|
||||
log.debug("Request [{}] is already being managed.",request);
|
||||
}
|
||||
}catch(Throwable t) {
|
||||
log.error("THREAD CANNOT HANDLE REQUESTS!!!. ",t);
|
||||
}
|
||||
// try {
|
||||
// log.debug("Starting to handle {} ",request);
|
||||
//
|
||||
// log.debug("Setting caller token..");
|
||||
// String actualToken=CommonUtils.decryptString(request.getCallerEncryptedToken());
|
||||
// SecurityTokenProvider.instance.set(actualToken);
|
||||
//
|
||||
// // try to lock request (FINISH ON FAIL)
|
||||
// if(persistence.acquire(request.getId())) {
|
||||
// try {
|
||||
//
|
||||
//
|
||||
// log.info("Acquired : {} ",request);
|
||||
//
|
||||
// ExecutionReport report=new ExecutionReport();
|
||||
// report.getGenericInformations().setStartTime(Instant.now());
|
||||
// report.setStartingScope(ContextUtils.getCurrentScopeName());
|
||||
//
|
||||
// // -- ON SUCCESS reload request
|
||||
// request =persistence.getById(request.getId());
|
||||
//
|
||||
// // For ALL COLLECTORS IN REQUEST
|
||||
// for(String collectorId : request.getCollectors()) {
|
||||
// CollectorReport collectorReport=handleCollector(collectorId);
|
||||
// report.getCollectorReports().add(collectorReport);
|
||||
//
|
||||
// }
|
||||
//
|
||||
// // FINALIZE
|
||||
//
|
||||
// report.getGenericInformations().setEndTime(Instant.now());
|
||||
//
|
||||
// String reportUrl=storeReport(report);
|
||||
// log.info("Stored report at {} ",reportUrl);
|
||||
// request.setReportUrl(reportUrl);
|
||||
// request.setStatus(ExecutionStatus.SUCCESS);
|
||||
// persistence.update(request);
|
||||
// }catch(PersistenceError | InvalidRequest e) {
|
||||
// log.error("Unexpected exception while dealing with persistence ",e);
|
||||
// }catch(Throwable t) {
|
||||
// log.error("Unexpected generic exception.",t);
|
||||
// request.setStatus(ExecutionStatus.FAILED);
|
||||
// persistence.update(request);
|
||||
// }
|
||||
//
|
||||
//
|
||||
//
|
||||
// }else {
|
||||
// log.debug("Request [{}] is already being managed.",request);
|
||||
// }
|
||||
// }catch(Throwable t) {
|
||||
// log.error("THREAD CANNOT HANDLE REQUESTS!!!. ",t);
|
||||
// }
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -3,6 +3,7 @@ package org.gcube.data.publishing.gCatFeeder.service.engine.impl;
|
|||
import java.util.Collection;
|
||||
import java.util.Set;
|
||||
|
||||
import javax.enterprise.context.RequestScoped;
|
||||
import javax.inject.Inject;
|
||||
|
||||
import org.gcube.data.publishing.gCatFeeder.service.engine.CatalogueControllersManager;
|
||||
|
@ -19,7 +20,6 @@ import org.gcube.data.publishing.gCatFeeder.service.model.fault.ElementNotFound;
|
|||
import org.gcube.data.publishing.gCatFeeder.service.model.fault.InvalidRequest;
|
||||
import org.gcube.data.publishing.gCatFeeder.service.model.fault.PersistenceError;
|
||||
|
||||
|
||||
public class FeederEngineImpl implements FeederEngine {
|
||||
|
||||
@Inject
|
||||
|
@ -30,7 +30,7 @@ public class FeederEngineImpl implements FeederEngine {
|
|||
private CatalogueControllersManager catalogues;
|
||||
@Inject
|
||||
private PersistenceManager persistence;
|
||||
|
||||
|
||||
@Override
|
||||
public ExecutionDescriptor submit(ExecutionRequest req) throws InternalError, PersistenceError, InvalidRequest {
|
||||
try{
|
||||
|
@ -53,9 +53,9 @@ public class FeederEngineImpl implements FeederEngine {
|
|||
return persistence.getById(id);
|
||||
}
|
||||
|
||||
|
||||
|
||||
private void verifyRequest(ExecutionRequest request) throws InternalError,CollectorNotFound,DescriptorNotFound{
|
||||
|
||||
|
||||
Set<String> availableControllers=catalogues.getAvailableControllers();
|
||||
if(request.getToInvokeControllers().size()==1&&request.getToInvokeControllers().contains("ALL")) {
|
||||
request.setToInvokeControllers(availableControllers);
|
||||
|
@ -63,17 +63,16 @@ public class FeederEngineImpl implements FeederEngine {
|
|||
for(String requestedCatalogue:request.getToInvokeControllers())
|
||||
if(!availableControllers.contains(requestedCatalogue))
|
||||
throw new DescriptorNotFound("Requested catalogue controller "+requestedCatalogue+" not found.");
|
||||
|
||||
|
||||
|
||||
|
||||
Set<String> availableCollectors=collectors.getAvailableCollectors();
|
||||
if(request.getToInvokeCollectors().size()==1&&request.getToInvokeCollectors().contains("ALL")) {
|
||||
request.setToInvokeCollectors(availableControllers);
|
||||
}
|
||||
|
||||
request.setToInvokeCollectors(availableCollectors);
|
||||
}
|
||||
for(String requestedCollector:request.getToInvokeCollectors())
|
||||
if(!availableCollectors.contains(requestedCollector))
|
||||
throw new DescriptorNotFound("Requested collector "+requestedCollector+" not found.");
|
||||
|
||||
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
|
|
@ -59,7 +59,10 @@ public class Executions {
|
|||
|
||||
log.trace("Submitting request {} ",request);
|
||||
|
||||
return engine.submit(request);
|
||||
ExecutionDescriptor toReturn= engine.submit(request);
|
||||
|
||||
log.debug("Returning {} ",toReturn);
|
||||
return toReturn;
|
||||
} catch (PersistenceError e) {
|
||||
log.warn("Unexpected Exception while talking to persistnce",e);
|
||||
throw new WebApplicationException("Invalid Request.", e,Response.Status.INTERNAL_SERVER_ERROR);
|
||||
|
|
|
@ -3,6 +3,8 @@ package org.gcube.data.publishing.gCatFeeder.service;
|
|||
import java.io.IOException;
|
||||
import java.sql.SQLException;
|
||||
|
||||
import javax.inject.Inject;
|
||||
import javax.inject.Singleton;
|
||||
import javax.ws.rs.core.Application;
|
||||
|
||||
import org.gcube.data.publishing.gCatFeeder.service.engine.CatalogueControllersManager;
|
||||
|
@ -14,35 +16,70 @@ import org.gcube.data.publishing.gCatFeeder.service.engine.impl.CatalogueControl
|
|||
import org.gcube.data.publishing.gCatFeeder.service.engine.impl.CollectorsManagerImpl;
|
||||
import org.gcube.data.publishing.gCatFeeder.service.engine.impl.ExecutionManagerImpl;
|
||||
import org.gcube.data.publishing.gCatFeeder.service.engine.impl.FeederEngineImpl;
|
||||
import org.gcube.data.publishing.gCatFeeder.service.model.fault.InternalError;
|
||||
import org.gcube.data.publishing.gCatFeeder.service.mockups.PersistenceManagerMock;
|
||||
import org.glassfish.hk2.utilities.binding.AbstractBinder;
|
||||
import org.glassfish.jersey.test.JerseyTest;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Before;
|
||||
|
||||
public class BaseTest extends JerseyTest{
|
||||
|
||||
@Inject
|
||||
ExecutionManager executions;
|
||||
|
||||
@BeforeClass
|
||||
public static void init() throws IOException, SQLException{
|
||||
|
||||
@Inject
|
||||
CollectorsManager collectors;
|
||||
|
||||
@Inject
|
||||
CatalogueControllersManager controllers;
|
||||
|
||||
|
||||
@Before
|
||||
public void init() throws IOException, SQLException{
|
||||
// // basic init
|
||||
// try {
|
||||
// collectors.init();
|
||||
// controllers.init();
|
||||
// } catch (InternalError e) {
|
||||
// throw new RuntimeException("Initialization Error",e);
|
||||
// }
|
||||
//
|
||||
//
|
||||
// // scope init
|
||||
// try {
|
||||
// collectors.initInScope();
|
||||
// controllers.initInScope();
|
||||
// } catch (InternalError ex) {
|
||||
// throw new RuntimeException("Initialization Error",ex);
|
||||
// }
|
||||
//
|
||||
// executions.load();
|
||||
|
||||
|
||||
|
||||
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
protected Application configure() {
|
||||
|
||||
|
||||
AbstractBinder binder = new AbstractBinder() {
|
||||
@Override
|
||||
protected void configure() {
|
||||
bind(FeederEngineImpl.class).to(FeederEngine.class);
|
||||
bind(CatalogueControllersManagerImpl.class).to(CatalogueControllersManager.class);
|
||||
bind(CollectorsManagerImpl.class).to(CollectorsManager.class);
|
||||
bind(ExecutionManagerImpl.class).to(ExecutionManager.class);
|
||||
bind(PersistenceManagerMock.class).to(PersistenceManager.class);
|
||||
bind(CatalogueControllersManagerImpl.class).to(CatalogueControllersManager.class).in(Singleton.class);
|
||||
bind(CollectorsManagerImpl.class).to(CollectorsManager.class).in(Singleton.class);
|
||||
bind(ExecutionManagerImpl.class).to(ExecutionManager.class).in(Singleton.class);
|
||||
bind(PersistenceManagerMock.class).to(PersistenceManager.class).in(Singleton.class);
|
||||
}
|
||||
};
|
||||
|
||||
return new GCatFeeder(binder);
|
||||
return new GCatFeeder(binder);
|
||||
|
||||
}
|
||||
|
||||
|
||||
|
||||
}
|
||||
|
|
|
@ -4,6 +4,8 @@ import javax.ws.rs.client.Entity;
|
|||
import javax.ws.rs.client.WebTarget;
|
||||
import javax.ws.rs.core.Response;
|
||||
|
||||
import org.gcube.data.publishing.gCatFeeder.service.model.ExecutionDescriptor;
|
||||
import org.gcube.data.publishing.gCatFeeder.service.model.ExecutionStatus;
|
||||
import org.junit.Test;
|
||||
|
||||
public class ExecutionsTest extends BaseTest {
|
||||
|
@ -27,11 +29,30 @@ public class ExecutionsTest extends BaseTest {
|
|||
|
||||
System.out.println(target.getUri());
|
||||
Response resp=target.request().post(Entity.json(""));
|
||||
System.out.println(resp.getStatus() + " : "+ resp.readEntity(String.class));
|
||||
// if(resp.getStatus()!=200) throw new RuntimeException("GetAll error should never happen");
|
||||
|
||||
// System.out.println(resp.getStatus() + " : "+ resp.readEntity(String.class));
|
||||
|
||||
ExecutionDescriptor desc=resp.readEntity(ExecutionDescriptor.class);
|
||||
String id=desc.getId();
|
||||
|
||||
WebTarget pollTarget=
|
||||
target(ServiceConstants.Executions.PATH).path(id);
|
||||
|
||||
boolean end=false;
|
||||
do {
|
||||
Response pollResp=pollTarget.request().get();
|
||||
if(pollResp.getStatus()!=200) throw new RuntimeException("Unexpected status "+pollResp.getStatus()+" while polling. Msg : "+pollResp.readEntity(String.class));
|
||||
else {
|
||||
ExecutionDescriptor pollResult=pollResp.readEntity(ExecutionDescriptor.class);
|
||||
System.out.println("Current status : "+pollResult.getStatus());
|
||||
switch(pollResult.getStatus()) {
|
||||
case FAILED:
|
||||
case STOPPED : throw new RuntimeException("Unexpected execution status "+pollResult.getStatus());
|
||||
case SUCCESS : end=true; break;
|
||||
default : try {
|
||||
Thread.sleep(400);} catch (InterruptedException e) {}
|
||||
}
|
||||
}
|
||||
}while(!end);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
|
@ -5,11 +5,9 @@ import java.util.Collections;
|
|||
import java.util.UUID;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
import javax.enterprise.inject.Default;
|
||||
import javax.inject.Singleton;
|
||||
|
||||
|
||||
import java.util.function.BiFunction;
|
||||
|
||||
import org.gcube.data.publishing.gCatFeeder.service.engine.PersistenceManager;
|
||||
import org.gcube.data.publishing.gCatFeeder.service.model.ExecutionDescriptor;
|
||||
import org.gcube.data.publishing.gCatFeeder.service.model.ExecutionDescriptorFilter;
|
||||
|
@ -19,7 +17,7 @@ import org.gcube.data.publishing.gCatFeeder.service.model.fault.ElementNotFound;
|
|||
import org.gcube.data.publishing.gCatFeeder.service.model.fault.InvalidRequest;
|
||||
import org.gcube.data.publishing.gCatFeeder.service.model.fault.PersistenceError;
|
||||
|
||||
@Singleton
|
||||
|
||||
public class PersistenceManagerMock implements PersistenceManager {
|
||||
|
||||
@Override
|
||||
|
@ -28,7 +26,7 @@ public class PersistenceManagerMock implements PersistenceManager {
|
|||
}
|
||||
|
||||
@Override
|
||||
public ExecutionDescriptor getById(String id) throws PersistenceError, ElementNotFound, InvalidRequest {
|
||||
public ExecutionDescriptor getById(String id) throws PersistenceError, ElementNotFound{
|
||||
if(theMap.containsKey(id))
|
||||
return theMap.get(id);
|
||||
else throw new ElementNotFound("Unable to find request with id "+id);
|
||||
|
@ -51,12 +49,13 @@ public class PersistenceManagerMock implements PersistenceManager {
|
|||
|
||||
@Override
|
||||
public boolean acquire(String id) throws PersistenceError, ElementNotFound {
|
||||
theMap.computeIfPresent(id, BiFunction<String, ExecutionDescriptor, ExecutionDescriptor> bi = (x,y) ->{
|
||||
|
||||
});
|
||||
ExecutionDescriptor desc=getById(id);
|
||||
if(desc.getStatus().equals(ExecutionStatus.PENDING))
|
||||
desc.setStatus(ExecutionStatus.RUNNING);
|
||||
return update(desc);
|
||||
}
|
||||
|
||||
// Actuall persistence in map
|
||||
// Actual persistence in map
|
||||
|
||||
private ConcurrentHashMap<String,ExecutionDescriptor> theMap=new ConcurrentHashMap<>();
|
||||
|
||||
|
|
Loading…
Reference in New Issue