git-svn-id: http://svn.research-infrastructures.eu/public/d4science/gcube/trunk/data-publishing/gCat-Feeder-Suite@178556 82a268e6-3cf1-43bd-a215-b396298e98cf
This commit is contained in:
parent
24209aa014
commit
5dd63116b6
|
@ -1,11 +1,13 @@
|
||||||
package org.gcube.data.publishing.gCatFeeder.catalogues;
|
package org.gcube.data.publishing.gCatFeeder.catalogues;
|
||||||
|
|
||||||
|
import java.io.Serializable;
|
||||||
|
|
||||||
import org.gcube.data.publishing.gCatFeeder.catalogues.model.PublishReport;
|
import org.gcube.data.publishing.gCatFeeder.catalogues.model.PublishReport;
|
||||||
import org.gcube.data.publishing.gCatFeeder.catalogues.model.faults.WrongObjectFormatException;
|
import org.gcube.data.publishing.gCatFeeder.catalogues.model.faults.WrongObjectFormatException;
|
||||||
|
|
||||||
public interface CatalogueController {
|
public interface CatalogueController {
|
||||||
|
|
||||||
public PublishReport publishItem(String serializedItem) throws WrongObjectFormatException;
|
public PublishReport publishItem(Serializable toPublish) throws WrongObjectFormatException;
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
package org.gcube.data.publishing.gCatFeeder.catalogues.model;
|
package org.gcube.data.publishing.gCatFeeder.catalogues.model;
|
||||||
|
|
||||||
import lombok.Getter;
|
import lombok.Getter;
|
||||||
|
import lombok.NonNull;
|
||||||
import lombok.RequiredArgsConstructor;
|
import lombok.RequiredArgsConstructor;
|
||||||
import lombok.Setter;
|
import lombok.Setter;
|
||||||
|
|
||||||
|
@ -8,9 +9,12 @@ import lombok.Setter;
|
||||||
@RequiredArgsConstructor
|
@RequiredArgsConstructor
|
||||||
@Setter
|
@Setter
|
||||||
public class PublishReport {
|
public class PublishReport {
|
||||||
|
|
||||||
private boolean successful=false;
|
|
||||||
|
|
||||||
|
|
||||||
|
@NonNull
|
||||||
|
private Boolean successful;
|
||||||
|
|
||||||
|
@NonNull
|
||||||
private String publishedId;
|
private String publishedId;
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,9 +1,10 @@
|
||||||
package org.gcube.data.publishing.gCatfeeder.collectors;
|
package org.gcube.data.publishing.gCatfeeder.collectors;
|
||||||
|
|
||||||
import org.gcube.data.publishing.gCatFeeder.model.CatalogueInstanceDescriptor;
|
import org.gcube.data.publishing.gCatFeeder.model.CatalogueInstanceDescriptor;
|
||||||
|
import org.gcube.data.publishing.gCatfeeder.collectors.model.faults.CatalogueInstanceNotFound;
|
||||||
|
|
||||||
public interface CatalogueRetriever {
|
public interface CatalogueRetriever {
|
||||||
|
|
||||||
public CatalogueInstanceDescriptor getInstance();
|
public CatalogueInstanceDescriptor getInstance() throws CatalogueInstanceNotFound;
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,20 +1,21 @@
|
||||||
package org.gcube.data.publishing.gCatfeeder.collectors;
|
package org.gcube.data.publishing.gCatfeeder.collectors;
|
||||||
|
|
||||||
|
import java.io.Serializable;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
|
||||||
import org.gcube.data.publishing.gCatFeeder.model.FormatData;
|
|
||||||
import org.gcube.data.publishing.gCatfeeder.collectors.model.CustomData;
|
import org.gcube.data.publishing.gCatfeeder.collectors.model.CustomData;
|
||||||
import org.gcube.data.publishing.gCatfeeder.collectors.model.PluginDescriptor;
|
import org.gcube.data.publishing.gCatfeeder.collectors.model.PluginDescriptor;
|
||||||
import org.gcube.data.publishing.gCatfeeder.collectors.model.PublisherFormatData;
|
|
||||||
import org.gcube.data.publishing.gCatfeeder.collectors.model.faults.CatalogueNotSupportedException;
|
import org.gcube.data.publishing.gCatfeeder.collectors.model.faults.CatalogueNotSupportedException;
|
||||||
|
|
||||||
public interface CollectorPlugin<E extends CustomData>{
|
public interface CollectorPlugin<E extends CustomData>{
|
||||||
|
|
||||||
public PluginDescriptor getDescriptor();
|
public PluginDescriptor getDescriptor();
|
||||||
|
|
||||||
public CatalogueRetriever getRetrieverByCatalogueType(FormatData catalogueType) throws CatalogueNotSupportedException;
|
public CatalogueRetriever getRetrieverByCatalogueType(String catalogueType) throws CatalogueNotSupportedException;
|
||||||
|
|
||||||
public Set<DataTransformer<? extends PublisherFormatData,E>> getImplementedTransformers();
|
public Set<String> getSupportedCatalogueTypes();
|
||||||
|
|
||||||
|
public DataTransformer<Serializable,E> getTransformerByCatalogueType(String catalogueType);
|
||||||
|
|
||||||
public DataCollector<E> getCollector();
|
public DataCollector<E> getCollector();
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,15 +1,13 @@
|
||||||
package org.gcube.data.publishing.gCatfeeder.collectors;
|
package org.gcube.data.publishing.gCatfeeder.collectors;
|
||||||
|
|
||||||
|
import java.io.Serializable;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
|
||||||
import org.gcube.data.publishing.gCatFeeder.model.FormatData;
|
|
||||||
import org.gcube.data.publishing.gCatfeeder.collectors.model.CustomData;
|
import org.gcube.data.publishing.gCatfeeder.collectors.model.CustomData;
|
||||||
import org.gcube.data.publishing.gCatfeeder.collectors.model.PublisherFormatData;
|
|
||||||
|
|
||||||
public interface DataTransformer<T extends PublisherFormatData,E extends CustomData> {
|
public interface DataTransformer<T extends Serializable,E extends CustomData> {
|
||||||
|
|
||||||
public FormatData getTargetFormat();
|
|
||||||
|
|
||||||
public Set<T> transform(Collection<E> collectedData);
|
public Set<T> transform(Collection<E> collectedData);
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,12 +0,0 @@
|
||||||
package org.gcube.data.publishing.gCatfeeder.collectors.model;
|
|
||||||
|
|
||||||
import org.gcube.data.publishing.gCatFeeder.model.FormatData;
|
|
||||||
|
|
||||||
public interface PublisherFormatData {
|
|
||||||
|
|
||||||
|
|
||||||
public FormatData getFormat();
|
|
||||||
|
|
||||||
public String serialize();
|
|
||||||
|
|
||||||
}
|
|
|
@ -1,27 +1,27 @@
|
||||||
package org.gcube.data.publishing.gCatfeeder.collectors.model.faults;
|
package org.gcube.data.publishing.gCatfeeder.collectors.model.faults;
|
||||||
|
|
||||||
public class CatalogueNotFound extends CrawlerException {
|
public class CatalogueInstanceNotFound extends CrawlerException {
|
||||||
|
|
||||||
public CatalogueNotFound() {
|
public CatalogueInstanceNotFound() {
|
||||||
// TODO Auto-generated constructor stub
|
// TODO Auto-generated constructor stub
|
||||||
}
|
}
|
||||||
|
|
||||||
public CatalogueNotFound(String message) {
|
public CatalogueInstanceNotFound(String message) {
|
||||||
super(message);
|
super(message);
|
||||||
// TODO Auto-generated constructor stub
|
// TODO Auto-generated constructor stub
|
||||||
}
|
}
|
||||||
|
|
||||||
public CatalogueNotFound(Throwable cause) {
|
public CatalogueInstanceNotFound(Throwable cause) {
|
||||||
super(cause);
|
super(cause);
|
||||||
// TODO Auto-generated constructor stub
|
// TODO Auto-generated constructor stub
|
||||||
}
|
}
|
||||||
|
|
||||||
public CatalogueNotFound(String message, Throwable cause) {
|
public CatalogueInstanceNotFound(String message, Throwable cause) {
|
||||||
super(message, cause);
|
super(message, cause);
|
||||||
// TODO Auto-generated constructor stub
|
// TODO Auto-generated constructor stub
|
||||||
}
|
}
|
||||||
|
|
||||||
public CatalogueNotFound(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
|
public CatalogueInstanceNotFound(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
|
||||||
super(message, cause, enableSuppression, writableStackTrace);
|
super(message, cause, enableSuppression, writableStackTrace);
|
||||||
// TODO Auto-generated constructor stub
|
// TODO Auto-generated constructor stub
|
||||||
}
|
}
|
|
@ -28,6 +28,11 @@
|
||||||
</dependency>
|
</dependency>
|
||||||
|
|
||||||
|
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.gcube.core</groupId>
|
||||||
|
<artifactId>common-encryption</artifactId>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
</dependencies>
|
</dependencies>
|
||||||
|
|
||||||
</project>
|
</project>
|
|
@ -1,5 +0,0 @@
|
||||||
package org.gcube.data.publishing.gCatFeeder.model;
|
|
||||||
|
|
||||||
public enum FormatData {
|
|
||||||
CKAN
|
|
||||||
}
|
|
|
@ -0,0 +1,22 @@
|
||||||
|
package org.gcube.data.publishing.gCatFeeder.utils;
|
||||||
|
|
||||||
|
import org.gcube.common.encryption.encrypter.StringEncrypter;
|
||||||
|
|
||||||
|
public class CommonUtils {
|
||||||
|
|
||||||
|
public static String decryptString(String toDecrypt){
|
||||||
|
try{
|
||||||
|
return StringEncrypter.getEncrypter().decrypt(toDecrypt);
|
||||||
|
}catch(Exception e) {
|
||||||
|
throw new RuntimeException("Unable to decrypt : "+toDecrypt,e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static String encryptString(String toEncrypt){
|
||||||
|
try{
|
||||||
|
return StringEncrypter.getEncrypter().encrypt(toEncrypt);
|
||||||
|
}catch(Exception e) {
|
||||||
|
throw new RuntimeException("Unable to encrypt : "+toEncrypt,e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -39,6 +39,12 @@
|
||||||
</dependency>
|
</dependency>
|
||||||
|
|
||||||
|
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.gcube.data-publishing.gCat-Feeder</groupId>
|
||||||
|
<artifactId>catalogue-plugin-framework</artifactId>
|
||||||
|
<version>[1.0.0-SNAPSHOT,2.0.0-SNAPSHOT)</version>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
|
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>org.gcube.core</groupId>
|
<groupId>org.gcube.core</groupId>
|
||||||
|
|
|
@ -1,19 +1,25 @@
|
||||||
package org.gcube.data.publishing.gCatFeeder.service;
|
package org.gcube.data.publishing.gCatFeeder.service;
|
||||||
|
|
||||||
|
import javax.inject.Inject;
|
||||||
|
|
||||||
|
import org.gcube.data.publishing.gCatFeeder.service.engine.ExecutionManager;
|
||||||
import org.gcube.smartgears.ApplicationManager;
|
import org.gcube.smartgears.ApplicationManager;
|
||||||
|
|
||||||
public class GCatFeederManager implements ApplicationManager{
|
public class GCatFeederManager implements ApplicationManager{
|
||||||
|
|
||||||
|
@Inject
|
||||||
|
ExecutionManager executions;
|
||||||
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onInit() {
|
public void onInit() {
|
||||||
|
executions.load();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onShutdown() {
|
public void onShutdown() {
|
||||||
// TODO Auto-generated method stub
|
executions.stop();
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -1,5 +1,17 @@
|
||||||
package org.gcube.data.publishing.gCatFeeder.service.engine;
|
package org.gcube.data.publishing.gCatFeeder.service.engine;
|
||||||
|
|
||||||
|
import java.util.Set;
|
||||||
|
|
||||||
|
import org.gcube.data.publishing.gCatFeeder.catalogues.CataloguePlugin;
|
||||||
|
import org.gcube.data.publishing.gCatFeeder.service.model.fault.CataloguePluginNotFound;
|
||||||
|
import org.gcube.data.publishing.gCatFeeder.service.model.fault.InternalError;
|
||||||
|
|
||||||
public interface CatalogueControllersManager {
|
public interface CatalogueControllersManager {
|
||||||
|
|
||||||
|
|
||||||
|
public Set<String> getAvailableControllers();
|
||||||
|
public CataloguePlugin getPluginById(String collectorId) throws CataloguePluginNotFound;
|
||||||
|
|
||||||
|
public void init() throws InternalError;
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -9,7 +9,7 @@ import org.gcube.data.publishing.gCatfeeder.collectors.CollectorPlugin;
|
||||||
public interface CollectorsManager {
|
public interface CollectorsManager {
|
||||||
|
|
||||||
public Set<String> getAvailableCollectors();
|
public Set<String> getAvailableCollectors();
|
||||||
public CollectorPlugin<?> getPluginById() throws CollectorNotFound;
|
public CollectorPlugin<?> getPluginById(String collectorId) throws CollectorNotFound;
|
||||||
|
|
||||||
public void init() throws InternalError;
|
public void init() throws InternalError;
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,5 +1,6 @@
|
||||||
package org.gcube.data.publishing.gCatFeeder.service.engine;
|
package org.gcube.data.publishing.gCatFeeder.service.engine;
|
||||||
|
|
||||||
|
import org.gcube.data.publishing.gCatFeeder.service.engine.impl.ExecutionManagerConfiguration;
|
||||||
import org.gcube.data.publishing.gCatFeeder.service.model.ExecutionDescriptor;
|
import org.gcube.data.publishing.gCatFeeder.service.model.ExecutionDescriptor;
|
||||||
|
|
||||||
public interface ExecutionManager {
|
public interface ExecutionManager {
|
||||||
|
@ -7,4 +8,8 @@ public interface ExecutionManager {
|
||||||
public void submit(ExecutionDescriptor desc);
|
public void submit(ExecutionDescriptor desc);
|
||||||
|
|
||||||
public void stop();
|
public void stop();
|
||||||
|
|
||||||
|
public void load();
|
||||||
|
|
||||||
|
public void init(ExecutionManagerConfiguration config);
|
||||||
}
|
}
|
||||||
|
|
|
@ -15,5 +15,18 @@ public interface PersistenceManager {
|
||||||
public ExecutionDescriptor getById(String id)throws PersistenceError,ElementNotFound,InvalidRequest;
|
public ExecutionDescriptor getById(String id)throws PersistenceError,ElementNotFound,InvalidRequest;
|
||||||
public Collection<ExecutionDescriptor> get(ExecutionDescriptorFilter filter)throws PersistenceError,InvalidRequest;
|
public Collection<ExecutionDescriptor> get(ExecutionDescriptorFilter filter)throws PersistenceError,InvalidRequest;
|
||||||
|
|
||||||
|
|
||||||
|
// DIRECT QUERIES
|
||||||
|
public boolean update(ExecutionDescriptor toUpdate)throws PersistenceError,ElementNotFound;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Updates status only if current status value is PENDING
|
||||||
|
*
|
||||||
|
* @param id
|
||||||
|
* @return
|
||||||
|
* @throws PersistenceError
|
||||||
|
* @throws ElementNotFound
|
||||||
|
*/
|
||||||
|
public boolean acquire(String id)throws PersistenceError,ElementNotFound;
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,8 @@
|
||||||
|
package org.gcube.data.publishing.gCatFeeder.service.engine.impl;
|
||||||
|
|
||||||
|
public class ExecutionManagerConfiguration {
|
||||||
|
|
||||||
|
private int threadPoolSize;
|
||||||
|
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,51 @@
|
||||||
|
package org.gcube.data.publishing.gCatFeeder.service.engine.impl;
|
||||||
|
|
||||||
|
import java.util.concurrent.ThreadPoolExecutor;
|
||||||
|
|
||||||
|
import javax.inject.Singleton;
|
||||||
|
|
||||||
|
import org.gcube.data.publishing.gCatFeeder.service.engine.ExecutionManager;
|
||||||
|
import org.gcube.data.publishing.gCatFeeder.service.model.ExecutionDescriptor;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
@Singleton
|
||||||
|
public class ExecutionManagerImpl implements ExecutionManager {
|
||||||
|
|
||||||
|
private ThreadPoolExecutor executor=null;
|
||||||
|
private static final Logger log= LoggerFactory.getLogger(ExecutionManagerImpl.class);
|
||||||
|
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public synchronized void submit(ExecutionDescriptor desc) {
|
||||||
|
log.debug("Checking if {} is already in queue");
|
||||||
|
ExecutionTask toSubmit=new ExecutionTask(desc);
|
||||||
|
if(!executor.getQueue().contains(toSubmit)) {
|
||||||
|
log.trace("Inserting execution in queue {} ");
|
||||||
|
executor.execute(toSubmit);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void stop() {
|
||||||
|
// Clear queue
|
||||||
|
// Stop all
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void load() {
|
||||||
|
// connect to persistence
|
||||||
|
// load all pending
|
||||||
|
throw new RuntimeException("NOT YET IMPLEMENTED");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public synchronized void init(ExecutionManagerConfiguration config) {
|
||||||
|
// NEED TO BE IDEMPOTENT
|
||||||
|
if(executor==null) {
|
||||||
|
throw new RuntimeException("NOT YET IMPLEMENTED");
|
||||||
|
// executor=new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, new ConcurrentLinkedQueue<ExecutionTask>());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,265 @@
|
||||||
|
package org.gcube.data.publishing.gCatFeeder.service.engine.impl;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
|
import java.io.Serializable;
|
||||||
|
import java.time.Instant;
|
||||||
|
import java.util.Set;
|
||||||
|
|
||||||
|
import javax.inject.Inject;
|
||||||
|
|
||||||
|
import org.gcube.common.authorization.library.provider.SecurityTokenProvider;
|
||||||
|
import org.gcube.data.publishing.gCatFeeder.catalogues.CatalogueController;
|
||||||
|
import org.gcube.data.publishing.gCatFeeder.catalogues.CataloguePlugin;
|
||||||
|
import org.gcube.data.publishing.gCatFeeder.catalogues.model.PublishReport;
|
||||||
|
import org.gcube.data.publishing.gCatFeeder.catalogues.model.faults.ControllerInstantiationFault;
|
||||||
|
import org.gcube.data.publishing.gCatFeeder.catalogues.model.faults.WrongObjectFormatException;
|
||||||
|
import org.gcube.data.publishing.gCatFeeder.model.CatalogueInstanceDescriptor;
|
||||||
|
import org.gcube.data.publishing.gCatFeeder.service.engine.CatalogueControllersManager;
|
||||||
|
import org.gcube.data.publishing.gCatFeeder.service.engine.CollectorsManager;
|
||||||
|
import org.gcube.data.publishing.gCatFeeder.service.engine.PersistenceManager;
|
||||||
|
import org.gcube.data.publishing.gCatFeeder.service.model.ExecutionDescriptor;
|
||||||
|
import org.gcube.data.publishing.gCatFeeder.service.model.ExecutionStatus;
|
||||||
|
import org.gcube.data.publishing.gCatFeeder.service.model.fault.CataloguePluginNotFound;
|
||||||
|
import org.gcube.data.publishing.gCatFeeder.service.model.fault.CollectorNotFound;
|
||||||
|
import org.gcube.data.publishing.gCatFeeder.service.model.fault.InvalidRequest;
|
||||||
|
import org.gcube.data.publishing.gCatFeeder.service.model.fault.PersistenceError;
|
||||||
|
import org.gcube.data.publishing.gCatFeeder.service.model.reports.CatalogueReport;
|
||||||
|
import org.gcube.data.publishing.gCatFeeder.service.model.reports.CollectorReport;
|
||||||
|
import org.gcube.data.publishing.gCatFeeder.service.model.reports.ExecutionReport;
|
||||||
|
import org.gcube.data.publishing.gCatFeeder.utils.CommonUtils;
|
||||||
|
import org.gcube.data.publishing.gCatFeeder.utils.ContextUtils;
|
||||||
|
import org.gcube.data.publishing.gCatfeeder.collectors.CatalogueRetriever;
|
||||||
|
import org.gcube.data.publishing.gCatfeeder.collectors.CollectorPlugin;
|
||||||
|
import org.gcube.data.publishing.gCatfeeder.collectors.DataCollector;
|
||||||
|
import org.gcube.data.publishing.gCatfeeder.collectors.DataTransformer;
|
||||||
|
import org.gcube.data.publishing.gCatfeeder.collectors.model.faults.CatalogueInstanceNotFound;
|
||||||
|
import org.gcube.data.publishing.gCatfeeder.collectors.model.faults.CatalogueNotSupportedException;
|
||||||
|
import org.gcube.data.publishing.gCatfeeder.collectors.model.faults.CollectorFault;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
public class ExecutionTask implements Runnable {
|
||||||
|
|
||||||
|
private static final Logger log= LoggerFactory.getLogger(ExecutionTask.class);
|
||||||
|
|
||||||
|
private ExecutionDescriptor request;
|
||||||
|
private PersistenceManager persistence;
|
||||||
|
private CollectorsManager collectors;
|
||||||
|
private CatalogueControllersManager catalogues;
|
||||||
|
|
||||||
|
public ExecutionTask(ExecutionDescriptor desc) {
|
||||||
|
super();
|
||||||
|
this.request=desc;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int hashCode() {
|
||||||
|
final int prime = 31;
|
||||||
|
int result = 1;
|
||||||
|
result = prime * result + ((request == null) ? 0 : request.hashCode());
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean equals(Object obj) {
|
||||||
|
if (this == obj)
|
||||||
|
return true;
|
||||||
|
if (obj == null)
|
||||||
|
return false;
|
||||||
|
if (getClass() != obj.getClass())
|
||||||
|
return false;
|
||||||
|
ExecutionTask other = (ExecutionTask) obj;
|
||||||
|
if (request == null) {
|
||||||
|
if (other.request != null)
|
||||||
|
return false;
|
||||||
|
} else if (!request.equals(other.request))
|
||||||
|
return false;
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@Inject
|
||||||
|
public void setPersistence(PersistenceManager p) {
|
||||||
|
this.persistence=p;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Inject
|
||||||
|
public void setCollectorPluginManager(CollectorsManager c) {
|
||||||
|
this.collectors=c;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Inject
|
||||||
|
public void setCataloguesPluginManager(CatalogueControllersManager c) {
|
||||||
|
this.catalogues=c;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
try {
|
||||||
|
log.debug("Starting to handle {} ",request);
|
||||||
|
|
||||||
|
log.debug("Setting caller token..");
|
||||||
|
String actualToken=CommonUtils.decryptString(request.getCallerEncryptedToken());
|
||||||
|
SecurityTokenProvider.instance.set(actualToken);
|
||||||
|
|
||||||
|
// try to lock request (FINISH ON FAIL)
|
||||||
|
if(persistence.acquire(request.getId())) {
|
||||||
|
try {
|
||||||
|
|
||||||
|
|
||||||
|
log.info("Acquired : {} ",request);
|
||||||
|
|
||||||
|
ExecutionReport report=new ExecutionReport();
|
||||||
|
report.getGenericInformations().setStartTime(Instant.now());
|
||||||
|
report.setStartingScope(ContextUtils.getCurrentScopeName());
|
||||||
|
|
||||||
|
// -- ON SUCCESS reload request
|
||||||
|
request =persistence.getById(request.getId());
|
||||||
|
|
||||||
|
// For ALL COLLECTORS IN REQUEST
|
||||||
|
for(String collectorId : request.getCollectors()) {
|
||||||
|
CollectorReport collectorReport=handleCollector(collectorId);
|
||||||
|
report.getCollectorReports().add(collectorReport);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
// FINALIZE
|
||||||
|
|
||||||
|
report.getGenericInformations().setEndTime(Instant.now());
|
||||||
|
|
||||||
|
String reportUrl=storeReport(report);
|
||||||
|
log.info("Stored report at {} ",reportUrl);
|
||||||
|
request.setReportUrl(reportUrl);
|
||||||
|
request.setStatus(ExecutionStatus.SUCCESS);
|
||||||
|
persistence.update(request);
|
||||||
|
}catch(PersistenceError | InvalidRequest e) {
|
||||||
|
log.error("Unexpected exception while dealing with persistence ",e);
|
||||||
|
}catch(Throwable t) {
|
||||||
|
log.error("Unexpected generic exception.",t);
|
||||||
|
request.setStatus(ExecutionStatus.FAILED);
|
||||||
|
persistence.update(request);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
}else {
|
||||||
|
log.debug("Request [{}] is already being managed.",request);
|
||||||
|
}
|
||||||
|
}catch(Throwable t) {
|
||||||
|
log.error("THREAD CANNOT HANDLE REQUESTS!!!. ",t);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
private CatalogueReport handleCatalogueController(String cataloguePluginId,CollectorPlugin collectorPlugin,Set<Serializable> collectedData) {
|
||||||
|
CatalogueReport catalogueReport=new CatalogueReport();
|
||||||
|
catalogueReport.getGenericInformations().setStartTime(Instant.now());
|
||||||
|
try {
|
||||||
|
log.debug("Checking catalogue {} support ",cataloguePluginId);
|
||||||
|
if(collectorPlugin.getSupportedCatalogueTypes().contains(cataloguePluginId)) {
|
||||||
|
CataloguePlugin cataloguePlugin=catalogues.getPluginById(cataloguePluginId);
|
||||||
|
// ** INSTANTIATE CATALOGUE CONTROLLER
|
||||||
|
log.debug("Looking for catalogue instance ..");
|
||||||
|
CatalogueRetriever retriever=collectorPlugin.getRetrieverByCatalogueType(cataloguePluginId);
|
||||||
|
CatalogueInstanceDescriptor instanceDescriptor=retriever.getInstance();
|
||||||
|
CatalogueController controller=cataloguePlugin.instantiateController(instanceDescriptor);
|
||||||
|
|
||||||
|
// ** TRANSFORM
|
||||||
|
log.debug("Transforming Collected Data");
|
||||||
|
DataTransformer transformer=collectorPlugin.getTransformerByCatalogueType(cataloguePluginId);
|
||||||
|
Set<Serializable> transformed=transformer.transform(collectedData);
|
||||||
|
log.trace("Going to publish {} items to {} ",transformed.size(),instanceDescriptor.getUrl());
|
||||||
|
|
||||||
|
// ** PUBLISH VIA CONTROLLER
|
||||||
|
for(Serializable item : transformed) {
|
||||||
|
try {
|
||||||
|
PublishReport itemReport=controller.publishItem(item);
|
||||||
|
}catch(WrongObjectFormatException e) {
|
||||||
|
catalogueReport.getPublishedRecords().add(new PublishReport(false,"Wrong format : "+e.getMessage()));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
catalogueReport.getGenericInformations().setSuccess(true);
|
||||||
|
catalogueReport.getGenericInformations().setGenericMessage("Data published. See detailed log.");
|
||||||
|
|
||||||
|
}else {
|
||||||
|
catalogueReport.getGenericInformations().setSuccess(false);
|
||||||
|
catalogueReport.getGenericInformations().setGenericMessage("Catalogue not supported by Collector Plugin.");
|
||||||
|
}
|
||||||
|
}catch(CataloguePluginNotFound e) {
|
||||||
|
String msg="Supported catalogue implementation not found. Catalogue id : "+cataloguePluginId;
|
||||||
|
log.warn(msg,e);
|
||||||
|
catalogueReport.getGenericInformations().setSuccess(false);
|
||||||
|
catalogueReport.getGenericInformations().setGenericMessage(msg);
|
||||||
|
} catch (CatalogueNotSupportedException e) {
|
||||||
|
String msg="Catalogue not supported by Collector Plugin.";
|
||||||
|
log.warn(msg,e);
|
||||||
|
catalogueReport.getGenericInformations().setSuccess(false);
|
||||||
|
catalogueReport.getGenericInformations().setGenericMessage(msg);
|
||||||
|
} catch (ControllerInstantiationFault e) {
|
||||||
|
String msg="Unable to contact Catalogue instance.";
|
||||||
|
log.warn(msg,e);
|
||||||
|
catalogueReport.getGenericInformations().setSuccess(false);
|
||||||
|
catalogueReport.getGenericInformations().setGenericMessage(msg);
|
||||||
|
} catch (CatalogueInstanceNotFound e) {
|
||||||
|
String msg="Unable to find Catalogue instance.";
|
||||||
|
log.warn(msg,e);
|
||||||
|
catalogueReport.getGenericInformations().setSuccess(false);
|
||||||
|
catalogueReport.getGenericInformations().setGenericMessage(msg);
|
||||||
|
}
|
||||||
|
catalogueReport.getGenericInformations().setEndTime(Instant.now());
|
||||||
|
return catalogueReport;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
private CollectorReport handleCollector(String collectorId) {
|
||||||
|
CollectorReport collectorReport=new CollectorReport();
|
||||||
|
collectorReport.getGenericInformations().setStartTime(Instant.now());
|
||||||
|
collectorReport.setSource(collectorId);
|
||||||
|
|
||||||
|
try {
|
||||||
|
// * COLLECT DATA
|
||||||
|
log.info("Starting collector {} ",collectorId);
|
||||||
|
CollectorPlugin collectorPlugin=collectors.getPluginById(collectorId);
|
||||||
|
DataCollector collector=collectorPlugin.getCollector();
|
||||||
|
log.info("Collecting data..");
|
||||||
|
Set collectedData=collector.collect();
|
||||||
|
log.debug("Collected {} items. Going to transform..",collectedData.size());
|
||||||
|
collectorReport.setCollectedItems(collectedData.size());
|
||||||
|
|
||||||
|
// * FOR ALL CATALOGUES IN REQUEST
|
||||||
|
for(String cataloguePluginId:request.getCatalogues()) {
|
||||||
|
CatalogueReport catalogueReport=handleCatalogueController(cataloguePluginId,collectorPlugin,collectedData);
|
||||||
|
|
||||||
|
collectorReport.getPublisherReports().add(catalogueReport);
|
||||||
|
}
|
||||||
|
|
||||||
|
} catch (CollectorNotFound e) {
|
||||||
|
String msg="Requested collector implementation not found.";
|
||||||
|
log.warn(msg,e);
|
||||||
|
collectorReport.getGenericInformations().setSuccess(false);
|
||||||
|
collectorReport.getGenericInformations().setGenericMessage(msg);
|
||||||
|
} catch (CollectorFault e) {
|
||||||
|
String msg="Collector Failed. "+e.getMessage();
|
||||||
|
log.warn(msg,e);
|
||||||
|
collectorReport.getGenericInformations().setSuccess(false);
|
||||||
|
collectorReport.getGenericInformations().setGenericMessage(msg);
|
||||||
|
}
|
||||||
|
|
||||||
|
collectorReport.getGenericInformations().setEndTime(Instant.now());
|
||||||
|
return collectorReport;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
private static final String storeReport(ExecutionReport report) {
|
||||||
|
throw new RuntimeException("Implement me");
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -1,5 +1,95 @@
|
||||||
package org.gcube.data.publishing.gCatFeeder.service.model;
|
package org.gcube.data.publishing.gCatFeeder.service.model;
|
||||||
|
|
||||||
|
import java.util.Set;
|
||||||
|
|
||||||
public class ExecutionDescriptor {
|
public class ExecutionDescriptor {
|
||||||
|
|
||||||
|
private String id;
|
||||||
|
private Set<String> collectors;
|
||||||
|
private Set<String> catalogues;
|
||||||
|
|
||||||
|
|
||||||
|
private String callerEncryptedToken;
|
||||||
|
private String callerIdentity;
|
||||||
|
private String callerContext;
|
||||||
|
|
||||||
|
private ExecutionStatus status;
|
||||||
|
private String reportUrl;
|
||||||
|
|
||||||
|
|
||||||
|
public String getId() {
|
||||||
|
return id;
|
||||||
|
}
|
||||||
|
public void setId(String id) {
|
||||||
|
this.id = id;
|
||||||
|
}
|
||||||
|
public Set<String> getCollectors() {
|
||||||
|
return collectors;
|
||||||
|
}
|
||||||
|
public void setCollectors(Set<String> collectors) {
|
||||||
|
this.collectors = collectors;
|
||||||
|
}
|
||||||
|
public Set<String> getCatalogues() {
|
||||||
|
return catalogues;
|
||||||
|
}
|
||||||
|
public void setCatalogues(Set<String> catalogues) {
|
||||||
|
this.catalogues = catalogues;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getCallerEncryptedToken() {
|
||||||
|
return callerEncryptedToken;
|
||||||
|
}
|
||||||
|
public void setCallerEncryptedToken(String callerEncryptedToken) {
|
||||||
|
this.callerEncryptedToken = callerEncryptedToken;
|
||||||
|
}
|
||||||
|
public String getCallerIdentity() {
|
||||||
|
return callerIdentity;
|
||||||
|
}
|
||||||
|
public void setCallerIdentity(String callerIdentity) {
|
||||||
|
this.callerIdentity = callerIdentity;
|
||||||
|
}
|
||||||
|
public String getCallerContext() {
|
||||||
|
return callerContext;
|
||||||
|
}
|
||||||
|
public void setCallerContext(String callerContext) {
|
||||||
|
this.callerContext = callerContext;
|
||||||
|
}
|
||||||
|
public ExecutionStatus getStatus() {
|
||||||
|
return status;
|
||||||
|
}
|
||||||
|
public void setStatus(ExecutionStatus status) {
|
||||||
|
this.status = status;
|
||||||
|
}
|
||||||
|
public String getReportUrl() {
|
||||||
|
return reportUrl;
|
||||||
|
}
|
||||||
|
public void setReportUrl(String reportUrl) {
|
||||||
|
this.reportUrl = reportUrl;
|
||||||
|
}
|
||||||
|
@Override
|
||||||
|
public int hashCode() {
|
||||||
|
final int prime = 31;
|
||||||
|
int result = 1;
|
||||||
|
result = prime * result + ((id == null) ? 0 : id.hashCode());
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
@Override
|
||||||
|
public boolean equals(Object obj) {
|
||||||
|
if (this == obj)
|
||||||
|
return true;
|
||||||
|
if (obj == null)
|
||||||
|
return false;
|
||||||
|
if (getClass() != obj.getClass())
|
||||||
|
return false;
|
||||||
|
ExecutionDescriptor other = (ExecutionDescriptor) obj;
|
||||||
|
if (id == null) {
|
||||||
|
if (other.id != null)
|
||||||
|
return false;
|
||||||
|
} else if (!id.equals(other.id))
|
||||||
|
return false;
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,11 @@
|
||||||
|
package org.gcube.data.publishing.gCatFeeder.service.model;
|
||||||
|
|
||||||
|
public enum ExecutionStatus {
|
||||||
|
|
||||||
|
PENDING,
|
||||||
|
RUNNING,
|
||||||
|
STOPPED,
|
||||||
|
FAILED,
|
||||||
|
SUCCESS
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,37 @@
|
||||||
|
package org.gcube.data.publishing.gCatFeeder.service.model.fault;
|
||||||
|
|
||||||
|
public class CataloguePluginNotFound extends Exception {
|
||||||
|
|
||||||
|
/**
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
private static final long serialVersionUID = -2510068055460236480L;
|
||||||
|
|
||||||
|
public CataloguePluginNotFound() {
|
||||||
|
super();
|
||||||
|
// TODO Auto-generated constructor stub
|
||||||
|
}
|
||||||
|
|
||||||
|
public CataloguePluginNotFound(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
|
||||||
|
super(message, cause, enableSuppression, writableStackTrace);
|
||||||
|
// TODO Auto-generated constructor stub
|
||||||
|
}
|
||||||
|
|
||||||
|
public CataloguePluginNotFound(String message, Throwable cause) {
|
||||||
|
super(message, cause);
|
||||||
|
// TODO Auto-generated constructor stub
|
||||||
|
}
|
||||||
|
|
||||||
|
public CataloguePluginNotFound(String message) {
|
||||||
|
super(message);
|
||||||
|
// TODO Auto-generated constructor stub
|
||||||
|
}
|
||||||
|
|
||||||
|
public CataloguePluginNotFound(Throwable cause) {
|
||||||
|
super(cause);
|
||||||
|
// TODO Auto-generated constructor stub
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,26 @@
|
||||||
|
package org.gcube.data.publishing.gCatFeeder.service.model.reports;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
|
|
||||||
|
import org.gcube.data.publishing.gCatFeeder.catalogues.model.PublishReport;
|
||||||
|
|
||||||
|
public class CatalogueReport{
|
||||||
|
private GenericInfos genericInformations=new GenericInfos();
|
||||||
|
private ArrayList<PublishReport> publishedRecords=new ArrayList<>();
|
||||||
|
|
||||||
|
public GenericInfos getGenericInformations() {
|
||||||
|
return genericInformations;
|
||||||
|
}
|
||||||
|
public void setGenericInformations(GenericInfos genericInformations) {
|
||||||
|
this.genericInformations = genericInformations;
|
||||||
|
}
|
||||||
|
public ArrayList<PublishReport> getPublishedRecords() {
|
||||||
|
return publishedRecords;
|
||||||
|
}
|
||||||
|
public void setPublishedRecords(ArrayList<PublishReport> publishedRecords) {
|
||||||
|
this.publishedRecords = publishedRecords;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,47 @@
|
||||||
|
package org.gcube.data.publishing.gCatFeeder.service.model.reports;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
|
|
||||||
|
public class CollectorReport{
|
||||||
|
private GenericInfos genericInformations=new GenericInfos();
|
||||||
|
|
||||||
|
private String source;
|
||||||
|
private long collectedItems;
|
||||||
|
|
||||||
|
private ArrayList<CatalogueReport> publisherReports=new ArrayList<>();
|
||||||
|
|
||||||
|
public GenericInfos getGenericInformations() {
|
||||||
|
return genericInformations;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setGenericInformations(GenericInfos genericInformations) {
|
||||||
|
this.genericInformations = genericInformations;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getSource() {
|
||||||
|
return source;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setSource(String source) {
|
||||||
|
this.source = source;
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getCollectedItems() {
|
||||||
|
return collectedItems;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setCollectedItems(long collectedItems) {
|
||||||
|
this.collectedItems = collectedItems;
|
||||||
|
}
|
||||||
|
|
||||||
|
public ArrayList<CatalogueReport> getPublisherReports() {
|
||||||
|
return publisherReports;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setPublisherReports(ArrayList<CatalogueReport> publisherReports) {
|
||||||
|
this.publisherReports = publisherReports;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,38 @@
|
||||||
|
package org.gcube.data.publishing.gCatFeeder.service.model.reports;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
|
|
||||||
|
public class ExecutionReport {
|
||||||
|
|
||||||
|
private GenericInfos genericInformations=new GenericInfos();
|
||||||
|
|
||||||
|
private String startingScope;
|
||||||
|
|
||||||
|
private ArrayList<CollectorReport> collectorReports=new ArrayList<>();
|
||||||
|
|
||||||
|
public GenericInfos getGenericInformations() {
|
||||||
|
return genericInformations;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setGenericInformations(GenericInfos genericInformations) {
|
||||||
|
this.genericInformations = genericInformations;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getStartingScope() {
|
||||||
|
return startingScope;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setStartingScope(String startingScope) {
|
||||||
|
this.startingScope = startingScope;
|
||||||
|
}
|
||||||
|
|
||||||
|
public ArrayList<CollectorReport> getCollectorReports() {
|
||||||
|
return collectorReports;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setCollectorReports(ArrayList<CollectorReport> collectorReports) {
|
||||||
|
this.collectorReports = collectorReports;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,40 @@
|
||||||
|
package org.gcube.data.publishing.gCatFeeder.service.model.reports;
|
||||||
|
|
||||||
|
import java.time.Instant;
|
||||||
|
|
||||||
|
public class GenericInfos{
|
||||||
|
|
||||||
|
private Instant startTime;
|
||||||
|
private Instant endTime;
|
||||||
|
private Boolean success=false;
|
||||||
|
private String genericMessage;
|
||||||
|
|
||||||
|
|
||||||
|
public Instant getStartTime() {
|
||||||
|
return startTime;
|
||||||
|
}
|
||||||
|
public void setStartTime(Instant startTime) {
|
||||||
|
this.startTime = startTime;
|
||||||
|
}
|
||||||
|
public Instant getEndTime() {
|
||||||
|
return endTime;
|
||||||
|
}
|
||||||
|
public void setEndTime(Instant endTime) {
|
||||||
|
this.endTime = endTime;
|
||||||
|
}
|
||||||
|
public Boolean getSuccess() {
|
||||||
|
return success;
|
||||||
|
}
|
||||||
|
public void setSuccess(Boolean success) {
|
||||||
|
this.success = success;
|
||||||
|
}
|
||||||
|
public String getGenericMessage() {
|
||||||
|
return genericMessage;
|
||||||
|
}
|
||||||
|
public void setGenericMessage(String genericMessage) {
|
||||||
|
this.genericMessage = genericMessage;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
}
|
Loading…
Reference in New Issue