This commit is contained in:
Fabio Sinibaldi 2019-03-27 17:03:51 +00:00
parent 9705974338
commit c1b2e7d353
20 changed files with 413 additions and 125 deletions

View File

@ -80,6 +80,23 @@
<artifactId>commons-dbcp2</artifactId> <artifactId>commons-dbcp2</artifactId>
<version>2.0.1</version> <version>2.0.1</version>
</dependency> </dependency>
<dependency>
<groupId>postgresql</groupId>
<artifactId>postgresql</artifactId>
<version>9.1-901.jdbc4</version>
</dependency>
<!-- STORAGE -->
<dependency>
<groupId>org.gcube.contentmanagement</groupId>
<artifactId>storage-manager-core</artifactId>
<version>[2.0.0-SNAPSHOT, 3.0.0-SNAPSHOT)</version>
</dependency>
<dependency>
<groupId>org.gcube.contentmanagement</groupId>
<artifactId>storage-manager-wrapper</artifactId>
<version>[2.0.0-SNAPSHOT, 3.0.0-SNAPSHOT)</version>
</dependency>
<!-- test --> <!-- test -->
@ -110,7 +127,7 @@
<dependency> <dependency>
<groupId>com.h2database</groupId> <groupId>com.h2database</groupId>
<artifactId>h2</artifactId> <artifactId>h2</artifactId>
<version>1.3.170</version> <version>1.4.198</version>
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>

View File

@ -5,15 +5,21 @@ import javax.ws.rs.ApplicationPath;
import org.gcube.data.publishing.gCatFeeder.service.engine.CatalogueControllersManager; import org.gcube.data.publishing.gCatFeeder.service.engine.CatalogueControllersManager;
import org.gcube.data.publishing.gCatFeeder.service.engine.CollectorsManager; import org.gcube.data.publishing.gCatFeeder.service.engine.CollectorsManager;
import org.gcube.data.publishing.gCatFeeder.service.engine.ConnectionManager;
import org.gcube.data.publishing.gCatFeeder.service.engine.ExecutionManager; import org.gcube.data.publishing.gCatFeeder.service.engine.ExecutionManager;
import org.gcube.data.publishing.gCatFeeder.service.engine.FeederEngine; import org.gcube.data.publishing.gCatFeeder.service.engine.FeederEngine;
import org.gcube.data.publishing.gCatFeeder.service.engine.Infrastructure; import org.gcube.data.publishing.gCatFeeder.service.engine.Infrastructure;
import org.gcube.data.publishing.gCatFeeder.service.engine.LocalConfiguration;
import org.gcube.data.publishing.gCatFeeder.service.engine.PersistenceManager;
import org.gcube.data.publishing.gCatFeeder.service.engine.Storage; import org.gcube.data.publishing.gCatFeeder.service.engine.Storage;
import org.gcube.data.publishing.gCatFeeder.service.engine.impl.CatalogueControllersManagerImpl; import org.gcube.data.publishing.gCatFeeder.service.engine.impl.CatalogueControllersManagerImpl;
import org.gcube.data.publishing.gCatFeeder.service.engine.impl.CollectorsManagerImpl; import org.gcube.data.publishing.gCatFeeder.service.engine.impl.CollectorsManagerImpl;
import org.gcube.data.publishing.gCatFeeder.service.engine.impl.ExecutionManagerImpl; import org.gcube.data.publishing.gCatFeeder.service.engine.impl.ExecutionManagerImpl;
import org.gcube.data.publishing.gCatFeeder.service.engine.impl.FeederEngineImpl; import org.gcube.data.publishing.gCatFeeder.service.engine.impl.FeederEngineImpl;
import org.gcube.data.publishing.gCatFeeder.service.engine.impl.InfrastructureUtilsImpl; import org.gcube.data.publishing.gCatFeeder.service.engine.impl.InfrastructureUtilsImpl;
import org.gcube.data.publishing.gCatFeeder.service.engine.impl.LocalConfigurationImpl;
import org.gcube.data.publishing.gCatFeeder.service.engine.impl.persistence.ConnectionManagerImpl;
import org.gcube.data.publishing.gCatFeeder.service.engine.impl.persistence.PersistenceManagerImpl;
import org.gcube.data.publishing.gCatFeeder.service.engine.impl.persistence.StorageImpl; import org.gcube.data.publishing.gCatFeeder.service.engine.impl.persistence.StorageImpl;
import org.gcube.data.publishing.gCatFeeder.service.rest.Capabilities; import org.gcube.data.publishing.gCatFeeder.service.rest.Capabilities;
import org.gcube.data.publishing.gCatFeeder.service.rest.Executions; import org.gcube.data.publishing.gCatFeeder.service.rest.Executions;
@ -35,6 +41,9 @@ public class GCatFeeder extends ResourceConfig{
bind(ExecutionManagerImpl.class).to(ExecutionManager.class).in(Singleton.class); bind(ExecutionManagerImpl.class).to(ExecutionManager.class).in(Singleton.class);
bind(InfrastructureUtilsImpl.class).to(Infrastructure.class); bind(InfrastructureUtilsImpl.class).to(Infrastructure.class);
bind(StorageImpl.class).to(Storage.class); bind(StorageImpl.class).to(Storage.class);
bind(PersistenceManagerImpl.class).to(PersistenceManager.class);
bind(ConnectionManagerImpl.class).to(ConnectionManager.class).in(Singleton.class);
bind(LocalConfigurationImpl.class).to(LocalConfiguration.class).in(Singleton.class);
} }
}; };
register(binder); register(binder);

View File

@ -3,6 +3,8 @@ package org.gcube.data.publishing.gCatFeeder.service.engine;
import java.sql.Connection; import java.sql.Connection;
import java.sql.SQLException; import java.sql.SQLException;
import org.gcube.data.publishing.gCatFeeder.service.model.fault.InternalError;
public interface ConnectionManager { public interface ConnectionManager {

View File

@ -1,6 +1,5 @@
package org.gcube.data.publishing.gCatFeeder.service.engine; package org.gcube.data.publishing.gCatFeeder.service.engine;
import org.gcube.data.publishing.gCatFeeder.service.engine.impl.ExecutionManagerConfiguration;
import org.gcube.data.publishing.gCatFeeder.service.model.ExecutionDescriptor; import org.gcube.data.publishing.gCatFeeder.service.model.ExecutionDescriptor;
public interface ExecutionManager { public interface ExecutionManager {
@ -11,5 +10,4 @@ public interface ExecutionManager {
public void load(); public void load();
public void init(ExecutionManagerConfiguration config);
} }

View File

@ -1,8 +0,0 @@
package org.gcube.data.publishing.gCatFeeder.service.engine.impl;
public class ExecutionManagerConfiguration {
private int threadPoolSize;
}

View File

@ -11,6 +11,7 @@ import org.gcube.data.publishing.gCatFeeder.service.engine.CatalogueControllersM
import org.gcube.data.publishing.gCatFeeder.service.engine.CollectorsManager; import org.gcube.data.publishing.gCatFeeder.service.engine.CollectorsManager;
import org.gcube.data.publishing.gCatFeeder.service.engine.ExecutionManager; import org.gcube.data.publishing.gCatFeeder.service.engine.ExecutionManager;
import org.gcube.data.publishing.gCatFeeder.service.engine.Infrastructure; import org.gcube.data.publishing.gCatFeeder.service.engine.Infrastructure;
import org.gcube.data.publishing.gCatFeeder.service.engine.LocalConfiguration;
import org.gcube.data.publishing.gCatFeeder.service.engine.PersistenceManager; import org.gcube.data.publishing.gCatFeeder.service.engine.PersistenceManager;
import org.gcube.data.publishing.gCatFeeder.service.engine.Storage; import org.gcube.data.publishing.gCatFeeder.service.engine.Storage;
import org.gcube.data.publishing.gCatFeeder.service.model.ExecutionDescriptor; import org.gcube.data.publishing.gCatFeeder.service.model.ExecutionDescriptor;
@ -23,14 +24,18 @@ public class ExecutionManagerImpl implements ExecutionManager {
private ThreadPoolExecutor executor=null; private ThreadPoolExecutor executor=null;
private static final Logger log= LoggerFactory.getLogger(ExecutionManagerImpl.class); private static final Logger log= LoggerFactory.getLogger(ExecutionManagerImpl.class);
private boolean defaultConfiguration=true; @Inject
private PersistenceManager persistence; private PersistenceManager persistence;
@Inject
private CollectorsManager collectors; private CollectorsManager collectors;
@Inject
private CatalogueControllersManager catalogues; private CatalogueControllersManager catalogues;
@Inject
private Infrastructure infrastructure; private Infrastructure infrastructure;
@Inject
private Storage storage; private Storage storage;
@Inject
private LocalConfiguration config;
@PostConstruct @PostConstruct
@ -39,30 +44,30 @@ public class ExecutionManagerImpl implements ExecutionManager {
new LinkedBlockingQueue<Runnable>()); new LinkedBlockingQueue<Runnable>());
} }
@Inject // @Inject
public void setPersistence(PersistenceManager p) { // public void setPersistence(PersistenceManager p) {
this.persistence=p; // this.persistence=p;
} // }
//
@Inject // @Inject
public void setCollectorPluginManager(CollectorsManager c) { // public void setCollectorPluginManager(CollectorsManager c) {
this.collectors=c; // this.collectors=c;
} // }
//
@Inject // @Inject
public void setCataloguesPluginManager(CatalogueControllersManager c) { // public void setCataloguesPluginManager(CatalogueControllersManager c) {
this.catalogues=c; // this.catalogues=c;
} // }
//
//
@Inject // @Inject
public void setInfastructureInterface(Infrastructure infra) { // public void setInfastructureInterface(Infrastructure infra) {
this.infrastructure=infra; // this.infrastructure=infra;
} // }
@Inject // @Inject
public void setStorage(Storage storage) { // public void setStorage(Storage storage) {
this.storage = storage; // this.storage = storage;
} // }
@Override @Override
public synchronized void submit(ExecutionDescriptor desc) { public synchronized void submit(ExecutionDescriptor desc) {
@ -97,13 +102,6 @@ public class ExecutionManagerImpl implements ExecutionManager {
throw new RuntimeException("NOT YET IMPLEMENTED"); throw new RuntimeException("NOT YET IMPLEMENTED");
} }
@Override
public synchronized void init(ExecutionManagerConfiguration config) {
// NEED TO BE IDEMPOTENT
if(executor==null||defaultConfiguration) {
throw new RuntimeException("NOT YET IMPLEMENTED");
// executor=new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, new ConcurrentLinkedQueue<ExecutionTask>());
}
}
} }

View File

@ -0,0 +1,30 @@
package org.gcube.data.publishing.gCatFeeder.service.engine.impl;
import java.io.IOException;
import java.util.Properties;
import javax.annotation.PostConstruct;
import org.gcube.data.publishing.gCatFeeder.service.BaseTest;
import org.gcube.data.publishing.gCatFeeder.service.engine.LocalConfiguration;
public class LocalConfigurationImpl implements LocalConfiguration {
private Properties props;
@PostConstruct
public void load() {
props=new Properties();
try{
props.load(BaseTest.class.getResourceAsStream("/gcat-feeder-config.properties"));
}catch(IOException e) {throw new RuntimeException(e);}
}
@Override
public String getProperty(String propertyName) {
if(props.isEmpty()) throw new RuntimeException("No properties loaded");
return props.getProperty(propertyName);
}
}

View File

@ -15,6 +15,7 @@ import org.apache.commons.dbcp2.PoolingDataSource;
import org.apache.commons.pool2.ObjectPool; import org.apache.commons.pool2.ObjectPool;
import org.apache.commons.pool2.impl.GenericObjectPool; import org.apache.commons.pool2.impl.GenericObjectPool;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig; import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.gcube.data.publishing.gCatFeeder.service.model.fault.InternalError;
import org.gcube.data.publishing.gCatFeeder.service.engine.ConnectionManager; import org.gcube.data.publishing.gCatFeeder.service.engine.ConnectionManager;
import org.gcube.data.publishing.gCatFeeder.service.engine.Infrastructure; import org.gcube.data.publishing.gCatFeeder.service.engine.Infrastructure;
import org.gcube.data.publishing.gCatFeeder.service.engine.LocalConfiguration; import org.gcube.data.publishing.gCatFeeder.service.engine.LocalConfiguration;
@ -75,7 +76,7 @@ public class ConnectionManagerImpl implements ConnectionManager {
} }
private synchronized DataSource getDataSource() throws InternalError { private synchronized DataSource getDataSource() throws InternalError, SQLException {
DatabaseConnectionDescriptor dbDescriptor=getDB(); DatabaseConnectionDescriptor dbDescriptor=getDB();
if(!datasources.containsKey(dbDescriptor.getUrl())) { if(!datasources.containsKey(dbDescriptor.getUrl())) {
@ -84,7 +85,7 @@ public class ConnectionManagerImpl implements ConnectionManager {
return datasources.get(dbDescriptor.getUrl()); return datasources.get(dbDescriptor.getUrl());
} }
private DataSource setupDataSource(DatabaseConnectionDescriptor db) { private DataSource setupDataSource(DatabaseConnectionDescriptor db) throws SQLException, InternalError {
log.trace("Setting up data source for {} ",db); log.trace("Setting up data source for {} ",db);
@ -113,6 +114,21 @@ public class ConnectionManagerImpl implements ConnectionManager {
PoolingDataSource<PoolableConnection> dataSource = PoolingDataSource<PoolableConnection> dataSource =
new PoolingDataSource<>(connectionPool); new PoolingDataSource<>(connectionPool);
log.trace("Initializing schema...");
Connection conn=null;
try{
conn=dataSource.getConnection();
conn.createStatement().executeUpdate(Queries.getInitDB(db.getFlavor()));
conn.commit();
}catch(SQLException e) {
throw new InternalError("Unable to Init database "+db,e);
}finally {
if(conn!=null) conn.close();
}
return dataSource; return dataSource;
} }

View File

@ -33,6 +33,8 @@ public class DBField {
fields.put(REPORT_URL, new DBField(Types.VARCHAR,REPORT_URL)); fields.put(REPORT_URL, new DBField(Types.VARCHAR,REPORT_URL));
fields.put(START, new DBField(Types.TIMESTAMP,START)); fields.put(START, new DBField(Types.TIMESTAMP,START));
fields.put(END, new DBField(Types.TIMESTAMP,END)); fields.put(END, new DBField(Types.TIMESTAMP,END));
fields.put(COLLECTORS, new DBField(Types.VARCHAR,COLLECTORS));
fields.put(CONTROLLERS, new DBField(Types.VARCHAR,CONTROLLERS));
} }
} }

View File

@ -6,10 +6,10 @@ import java.util.Map.Entry;
public class DBQueryDescriptor { public class DBQueryDescriptor {
private Map<DBField,Object> condition; private Map<DBField,Object> condition=new HashMap<DBField,Object>();
public DBQueryDescriptor() { public DBQueryDescriptor() {
condition=new HashMap<DBField,Object>(); // TODO Auto-generated constructor stub
} }
public Map<DBField, Object> getCondition() { public Map<DBField, Object> getCondition() {
@ -23,13 +23,13 @@ public class DBQueryDescriptor {
} }
public DBQueryDescriptor(DBField field, Object value) { public DBQueryDescriptor(DBField field, Object value) {
this();
add(field,value); add(field,value);
} }
public String toString() { public String toString() {
StringBuilder builder=new StringBuilder(); StringBuilder builder=new StringBuilder();
if(condition.isEmpty())return "EMPTY";
for(Entry<DBField,Object> entry : condition.entrySet()) { for(Entry<DBField,Object> entry : condition.entrySet()) {
builder.append(String.format("%1$s = %2$s AND ", entry.getKey().getFieldName(),entry.getValue())); builder.append(String.format("%1$s = %2$s AND ", entry.getKey().getFieldName(),entry.getValue()));
} }

View File

@ -2,15 +2,21 @@ package org.gcube.data.publishing.gCatFeeder.service.engine.impl.persistence;
public class DatabaseConnectionDescriptor { public class DatabaseConnectionDescriptor {
public static enum Flavor{
POSTGRES,MYSQL
}
private String username; private String username;
private String url; private String url;
private String password; private String password;
private Flavor flavor=Flavor.POSTGRES;
@Override @Override
public String toString() { public String toString() {
return "DatabaseConnectionDescriptor [username=" + username + ", url=" + url + "]"; return "DatabaseConnectionDescriptor [username=" + username + ", url=" + url + ", password=" + password
+ ", flavor=" + flavor + "]";
} }
public DatabaseConnectionDescriptor(String username, String url, String password) { public DatabaseConnectionDescriptor(String username, String url, String password) {
@ -20,6 +26,11 @@ public class DatabaseConnectionDescriptor {
this.password = password; this.password = password;
} }
public DatabaseConnectionDescriptor(String username, String url, String password, Flavor flavor) {
this(username,url,password);
this.flavor = flavor;
}
public String getUsername() { public String getUsername() {
return username; return username;
} }
@ -32,6 +43,8 @@ public class DatabaseConnectionDescriptor {
return password; return password;
} }
public Flavor getFlavor() {
return flavor;
}
} }

View File

@ -48,8 +48,8 @@ public class PersistenceManagerImpl implements PersistenceManager {
}else { }else {
log.debug("Inserting request .."); log.debug("Inserting request ..");
// PREPARE REQUEST // PREPARE REQUEST
PreparedStatement psInsert=Queries.INSERT_NEW.prepare(conn, Statement.RETURN_GENERATED_KEYS); PreparedStatement psInsert=Queries.INSERT_NEW.prepare(conn, Statement.RETURN_GENERATED_KEYS);
psInsert=Queries.INSERT_NEW.fill(psInsert, queryDescriptor);
psInsert.executeUpdate(); psInsert.executeUpdate();
ResultSet rsId=psInsert.getGeneratedKeys(); ResultSet rsId=psInsert.getGeneratedKeys();
rsId.next(); rsId.next();
@ -69,7 +69,7 @@ public class PersistenceManagerImpl implements PersistenceManager {
throw new PersistenceError(t); throw new PersistenceError(t);
}finally { }finally {
try { try {
conn.close(); if(conn!=null)conn.close();
} catch (SQLException e) { } catch (SQLException e) {
throw new PersistenceError(e); throw new PersistenceError(e);
} }
@ -94,7 +94,7 @@ public class PersistenceManagerImpl implements PersistenceManager {
throw new PersistenceError(t); throw new PersistenceError(t);
}finally { }finally {
try { try {
conn.close(); if(conn!=null)conn.close();
} catch (SQLException e) { } catch (SQLException e) {
throw new PersistenceError(e); throw new PersistenceError(e);
} }
@ -120,7 +120,7 @@ public class PersistenceManagerImpl implements PersistenceManager {
throw new PersistenceError(t); throw new PersistenceError(t);
}finally { }finally {
try { try {
conn.close(); if(conn!=null)conn.close();
} catch (SQLException e) { } catch (SQLException e) {
throw new PersistenceError(e); throw new PersistenceError(e);
} }
@ -143,7 +143,7 @@ public class PersistenceManagerImpl implements PersistenceManager {
throw new PersistenceError(t); throw new PersistenceError(t);
}finally { }finally {
try { try {
conn.close(); if(conn!=null)conn.close();
} catch (SQLException e) { } catch (SQLException e) {
throw new PersistenceError(e); throw new PersistenceError(e);
} }
@ -166,7 +166,7 @@ public class PersistenceManagerImpl implements PersistenceManager {
throw new PersistenceError(t); throw new PersistenceError(t);
}finally { }finally {
try { try {
conn.close(); if(conn!=null)conn.close();
} catch (SQLException e) { } catch (SQLException e) {
throw new PersistenceError(e); throw new PersistenceError(e);
} }

View File

@ -21,6 +21,7 @@ import java.util.Collections;
import java.util.HashSet; import java.util.HashSet;
import java.util.Set; import java.util.Set;
import org.gcube.data.publishing.gCatFeeder.service.engine.impl.persistence.DatabaseConnectionDescriptor.Flavor;
import org.gcube.data.publishing.gCatFeeder.service.model.ExecutionDescriptor; import org.gcube.data.publishing.gCatFeeder.service.model.ExecutionDescriptor;
import org.gcube.data.publishing.gCatFeeder.service.model.ExecutionRequest; import org.gcube.data.publishing.gCatFeeder.service.model.ExecutionRequest;
import org.gcube.data.publishing.gCatFeeder.service.model.ExecutionStatus; import org.gcube.data.publishing.gCatFeeder.service.model.ExecutionStatus;
@ -29,6 +30,22 @@ import org.gcube.data.publishing.gCatFeeder.service.model.fault.InvalidRequest;
public class Queries { public class Queries {
public static final String getInitDB(DatabaseConnectionDescriptor.Flavor flavor) {
return "CREATE TABLE IF NOT EXISTS "+TABLE+" ("
+ID+" "+(flavor.equals(Flavor.POSTGRES)?"BIGSERIAL":"bigint auto_increment") +" NOT NULL,"
+CALLER_TOKEN+" VARCHAR NOT NULL,"
+CALLER_ID+" VARCHAR NOT NULL,"
+CALLER_CONTEXT+" VARCHAR NOT NULL,"
+STATUS+" VARCHAR(40) NOT NULL,"
+REPORT_URL+" VARCHAR,"
+COLLECTORS+" text,"
+CONTROLLERS+" text,"
+START+" timestamp with time zone,"
+END+" timestamp with time zone,"
+"primary key ("+ID+"))";
}
public static final Query GET_BY_ID=new Query("Select * from "+TABLE+" where "+ID+" = ?", public static final Query GET_BY_ID=new Query("Select * from "+TABLE+" where "+ID+" = ?",
new DBField[] {fields.get(ID)}); new DBField[] {fields.get(ID)});
@ -41,12 +58,15 @@ public class Queries {
fields.get(ID)}); fields.get(ID)});
public static final Query ACQUIRE= new Query("UPDATE "+TABLE+" SET " public static final Query ACQUIRE= new Query("UPDATE "+TABLE+" SET "
+STATUS+"="+ExecutionStatus.RUNNING+" WHERE "+ID+"=? AND "+STATUS+"="+ExecutionStatus.PENDING, +STATUS+"='"+ExecutionStatus.RUNNING+"' WHERE "+ID+"=? AND "+STATUS+"='"+ExecutionStatus.PENDING+"'",
new DBField[] {fields.get(ID)}); new DBField[] {fields.get(ID)});
public static final Query GET_ALL= new Query("SELECT * FROM "+TABLE+" ORDER BY "+END+" DESC", public static final Query GET_ALL= new Query("SELECT * FROM "+TABLE+" ORDER BY "+END+" DESC",
new DBField[] {}); new DBField[] {});
/* /*
* SAME CONTEXT, * SAME CONTEXT,
* status is RUNNING OR PENDING * status is RUNNING OR PENDING
@ -55,7 +75,7 @@ public class Queries {
*/ */
public static final Query GET_SIMILAR=new Query ("SELECT * FROM "+TABLE+" WHERE " public static final Query GET_SIMILAR=new Query ("SELECT * FROM "+TABLE+" WHERE "
+CALLER_CONTEXT+"=? AND "+COLLECTORS+"=? AND "+CONTROLLERS+"=? AND " +CALLER_CONTEXT+"=? AND "+COLLECTORS+"=? AND "+CONTROLLERS+"=? AND "
/* STATUS */ +"("+STATUS+"="+ExecutionStatus.RUNNING+" OR "+STATUS+"="+ExecutionStatus.PENDING+")", /* STATUS */ +"("+STATUS+"='"+ExecutionStatus.RUNNING+"' OR "+STATUS+"='"+ExecutionStatus.PENDING+"')",
new DBField[] {fields.get(CALLER_CONTEXT), new DBField[] {fields.get(CALLER_CONTEXT),
fields.get(COLLECTORS), fields.get(COLLECTORS),
fields.get(CONTROLLERS)}); fields.get(CONTROLLERS)});
@ -66,12 +86,10 @@ public class Queries {
+CALLER_ID+"," +CALLER_ID+","
+CALLER_CONTEXT+"," +CALLER_CONTEXT+","
+STATUS+"," +STATUS+","
+REPORT_URL+","
+COLLECTORS+"," +COLLECTORS+","
+CONTROLLERS+"," +CONTROLLERS+") VALUES (?,?,?,'"+ExecutionStatus.PENDING+"',?,?)",
+START+") VALUES (?,?,?,?,?,?,?,?)",
new DBField[] {fields.get(CALLER_TOKEN),fields.get(CALLER_ID),fields.get(CALLER_CONTEXT), new DBField[] {fields.get(CALLER_TOKEN),fields.get(CALLER_ID),fields.get(CALLER_CONTEXT),
fields.get(STATUS),fields.get(REPORT_URL),fields.get(COLLECTORS),fields.get(CONTROLLERS),fields.get(START)}); fields.get(COLLECTORS),fields.get(CONTROLLERS)});
public static final ExecutionDescriptor translateRow(ResultSet row) throws SQLException { public static final ExecutionDescriptor translateRow(ResultSet row) throws SQLException {
ExecutionDescriptor toReturn=new ExecutionDescriptor(); ExecutionDescriptor toReturn=new ExecutionDescriptor();
@ -86,7 +104,9 @@ public class Queries {
toReturn.setId(row.getLong(ID)); toReturn.setId(row.getLong(ID));
toReturn.setReportUrl(row.getString(REPORT_URL)); toReturn.setReportUrl(row.getString(REPORT_URL));
toReturn.setStartTime(row.getTimestamp(START).toInstant()); Timestamp startTime=row.getTimestamp(START);
if(startTime!=null)
toReturn.setStartTime(startTime.toInstant());
toReturn.setStatus(ExecutionStatus.valueOf(row.getString(STATUS))); toReturn.setStatus(ExecutionStatus.valueOf(row.getString(STATUS)));
return toReturn; return toReturn;
} }

View File

@ -1,14 +1,75 @@
package org.gcube.data.publishing.gCatFeeder.service.engine.impl.persistence; package org.gcube.data.publishing.gCatFeeder.service.engine.impl.persistence;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.UUID;
import javax.inject.Inject;
import org.gcube.contentmanagement.blobstorage.service.IClient;
import org.gcube.contentmanagement.blobstorage.transport.backend.RemoteBackendException;
import org.gcube.contentmanager.storageclient.wrapper.AccessType;
import org.gcube.contentmanager.storageclient.wrapper.MemoryType;
import org.gcube.contentmanager.storageclient.wrapper.StorageClient;
import org.gcube.data.publishing.gCatFeeder.service.engine.Infrastructure;
import org.gcube.data.publishing.gCatFeeder.service.engine.Storage; import org.gcube.data.publishing.gCatFeeder.service.engine.Storage;
import org.gcube.data.publishing.gCatFeeder.service.model.fault.InternalError; import org.gcube.data.publishing.gCatFeeder.service.model.fault.InternalError;
import org.gcube.data.publishing.gCatFeeder.service.model.reports.ExecutionReport; import org.gcube.data.publishing.gCatFeeder.service.model.reports.ExecutionReport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
public class StorageImpl implements Storage{ public class StorageImpl implements Storage{
private static final Logger log= LoggerFactory.getLogger(StorageImpl.class);
protected static ObjectMapper mapper=new ObjectMapper();
@Inject
private Infrastructure infra;
private final IClient getClient(){
return new StorageClient("data-publishing", "gcat-feeder", infra.getClientID(infra.getCurrentToken()), AccessType.SHARED, MemoryType.PERSISTENT).getClient();
}
//return Id
private final String putOntoStorage(File source) throws RemoteBackendException, FileNotFoundException{
IClient client=getClient();
log.debug("Uploading local file "+source.getAbsolutePath());
String id=client.put(true).LFile(new FileInputStream(source)).RFile(UUID.randomUUID().toString());
log.debug("File uploaded. ID : "+id);
String toReturn= client.getHttpUrl().RFile(id);
log.debug("Created URL : "+toReturn);
return toReturn;
}
protected File asFile(ExecutionReport report) throws InternalError {
try {
File f=File.createTempFile("report", ".json");
String serialized=mapper.writeValueAsString(report);
Files.write(Paths.get(f.getAbsolutePath()), serialized.getBytes());
return f;
} catch (IOException e) {
throw new InternalError("Unable to rite report : ",e);
}
}
@Override @Override
public String storeReport(ExecutionReport report) throws InternalError { public String storeReport(ExecutionReport report) throws InternalError {
throw new RuntimeException("Implment THIS"); try {
return putOntoStorage(asFile(report));
} catch (RemoteBackendException | FileNotFoundException e) {
throw new InternalError("Unable to store report ",e);
}
} }
} }

View File

@ -1,14 +1,15 @@
package org.gcube.data.publishing.gCatFeeder.service.model; package org.gcube.data.publishing.gCatFeeder.service.model;
import java.time.Instant; import java.time.Instant;
import java.util.HashSet;
import java.util.Set; import java.util.Set;
public class ExecutionDescriptor { public class ExecutionDescriptor {
private Long id; private Long id;
private Set<String> collectors; private Set<String> collectors=new HashSet<>();
private Set<String> catalogues; private Set<String> catalogues=new HashSet<>();
private String callerEncryptedToken; private String callerEncryptedToken;

View File

@ -0,0 +1,6 @@
db.pools.max_idle=5
db.pools.max_total=50
db.pools.min_total=3
mapping-db.ep.name=Feeder_DB
mapping-db.ep.category=Database

View File

@ -2,40 +2,61 @@ package org.gcube.data.publishing.gCatFeeder.service;
import java.io.IOException; import java.io.IOException;
import java.sql.SQLException; import java.sql.SQLException;
import java.util.Properties;
import javax.inject.Singleton; import javax.inject.Singleton;
import javax.ws.rs.core.Application; import javax.ws.rs.core.Application;
import org.gcube.common.authorization.library.provider.SecurityTokenProvider;
import org.gcube.common.scope.api.ScopeProvider;
import org.gcube.data.publishing.gCatFeeder.service.engine.CatalogueControllersManager; import org.gcube.data.publishing.gCatFeeder.service.engine.CatalogueControllersManager;
import org.gcube.data.publishing.gCatFeeder.service.engine.CollectorsManager; import org.gcube.data.publishing.gCatFeeder.service.engine.CollectorsManager;
import org.gcube.data.publishing.gCatFeeder.service.engine.ConnectionManager;
import org.gcube.data.publishing.gCatFeeder.service.engine.ExecutionManager; import org.gcube.data.publishing.gCatFeeder.service.engine.ExecutionManager;
import org.gcube.data.publishing.gCatFeeder.service.engine.FeederEngine; import org.gcube.data.publishing.gCatFeeder.service.engine.FeederEngine;
import org.gcube.data.publishing.gCatFeeder.service.engine.Infrastructure; import org.gcube.data.publishing.gCatFeeder.service.engine.Infrastructure;
import org.gcube.data.publishing.gCatFeeder.service.engine.LocalConfiguration;
import org.gcube.data.publishing.gCatFeeder.service.engine.PersistenceManager; import org.gcube.data.publishing.gCatFeeder.service.engine.PersistenceManager;
import org.gcube.data.publishing.gCatFeeder.service.engine.Storage; import org.gcube.data.publishing.gCatFeeder.service.engine.Storage;
import org.gcube.data.publishing.gCatFeeder.service.engine.impl.CatalogueControllersManagerImpl; import org.gcube.data.publishing.gCatFeeder.service.engine.impl.CatalogueControllersManagerImpl;
import org.gcube.data.publishing.gCatFeeder.service.engine.impl.CollectorsManagerImpl; import org.gcube.data.publishing.gCatFeeder.service.engine.impl.CollectorsManagerImpl;
import org.gcube.data.publishing.gCatFeeder.service.engine.impl.ExecutionManagerImpl; import org.gcube.data.publishing.gCatFeeder.service.engine.impl.ExecutionManagerImpl;
import org.gcube.data.publishing.gCatFeeder.service.engine.impl.FeederEngineImpl; import org.gcube.data.publishing.gCatFeeder.service.engine.impl.FeederEngineImpl;
import org.gcube.data.publishing.gCatFeeder.service.engine.impl.InfrastructureUtilsImpl;
import org.gcube.data.publishing.gCatFeeder.service.engine.impl.LocalConfigurationImpl;
import org.gcube.data.publishing.gCatFeeder.service.engine.impl.persistence.ConnectionManagerImpl;
import org.gcube.data.publishing.gCatFeeder.service.engine.impl.persistence.PersistenceManagerImpl;
import org.gcube.data.publishing.gCatFeeder.service.engine.impl.persistence.StorageImpl;
import org.gcube.data.publishing.gCatFeeder.service.mockups.InfrastructureMockup; import org.gcube.data.publishing.gCatFeeder.service.mockups.InfrastructureMockup;
import org.gcube.data.publishing.gCatFeeder.service.mockups.PersistenceManagerMock; import org.gcube.data.publishing.gCatFeeder.service.mockups.PersistenceManagerMock;
import org.gcube.data.publishing.gCatFeeder.service.mockups.StorageMockup; import org.gcube.data.publishing.gCatFeeder.service.mockups.StorageMockup;
import org.glassfish.hk2.utilities.binding.AbstractBinder; import org.glassfish.hk2.utilities.binding.AbstractBinder;
import org.glassfish.jersey.test.JerseyTest; import org.glassfish.jersey.test.JerseyTest;
import org.junit.Before; import org.junit.Before;
import org.junit.BeforeClass;
public class BaseTest extends JerseyTest{ public class BaseTest extends JerseyTest{
private static String testContext=null;
@BeforeClass
public static void checkEnvironment() {
testContext=System.getProperty("testContext");
System.out.println("TEST CONTEXT = "+testContext);
}
@Before @Before
public void init() throws IOException, SQLException{ public void init() throws IOException, SQLException{
setTestInfrastructure();
} }
@Override @Override
protected Application configure() { protected Application configure() {
if(testContext!=null) {
System.out.println("TEST INFRASTRUCTURE IS "+testContext);
AbstractBinder binder = new AbstractBinder() { AbstractBinder binder = new AbstractBinder() {
@Override @Override
protected void configure() { protected void configure() {
@ -43,16 +64,53 @@ public class BaseTest extends JerseyTest{
bind(CatalogueControllersManagerImpl.class).to(CatalogueControllersManager.class).in(Singleton.class); bind(CatalogueControllersManagerImpl.class).to(CatalogueControllersManager.class).in(Singleton.class);
bind(CollectorsManagerImpl.class).to(CollectorsManager.class).in(Singleton.class); bind(CollectorsManagerImpl.class).to(CollectorsManager.class).in(Singleton.class);
bind(ExecutionManagerImpl.class).to(ExecutionManager.class).in(Singleton.class); bind(ExecutionManagerImpl.class).to(ExecutionManager.class).in(Singleton.class);
bind(PersistenceManagerMock.class).to(PersistenceManager.class).in(Singleton.class); bind(StorageImpl.class).to(Storage.class);
bind(PersistenceManagerImpl.class).to(PersistenceManager.class);
bind(ConnectionManagerImpl.class).to(ConnectionManager.class).in(Singleton.class);
bind(LocalConfigurationImpl.class).to(LocalConfiguration.class).in(Singleton.class);
//Mockup
bind(InfrastructureMockup.class).to(Infrastructure.class);
}
};
return new GCatFeeder(binder);
}else {
System.out.println("NO TEST INFRASTRUCTURE AVAILABLE");
AbstractBinder binder = new AbstractBinder() {
@Override
protected void configure() {
bind(FeederEngineImpl.class).to(FeederEngine.class);
bind(CatalogueControllersManagerImpl.class).to(CatalogueControllersManager.class).in(Singleton.class);
bind(CollectorsManagerImpl.class).to(CollectorsManager.class).in(Singleton.class);
bind(ExecutionManagerImpl.class).to(ExecutionManager.class).in(Singleton.class);
bind(PersistenceManagerImpl.class).to(PersistenceManager.class);
bind(ConnectionManagerImpl.class).to(ConnectionManager.class).in(Singleton.class);
bind(LocalConfigurationImpl.class).to(LocalConfiguration.class).in(Singleton.class);
// Mockups
bind(InfrastructureMockup.class).to(Infrastructure.class); bind(InfrastructureMockup.class).to(Infrastructure.class);
bind(StorageMockup.class).to(Storage.class); bind(StorageMockup.class).to(Storage.class);
} }
}; };
return new GCatFeeder(binder); return new GCatFeeder(binder);
}
} }
public static void setTestInfrastructure() {
if(isTestInfrastructureEnabled()) {
Properties props=new Properties();
try{
props.load(BaseTest.class.getResourceAsStream("/tokens.properties"));
}catch(IOException e) {throw new RuntimeException(e);}
if(!props.containsKey(testContext)) throw new RuntimeException("No token found for scope : "+testContext);
SecurityTokenProvider.instance.set(props.getProperty(testContext));
ScopeProvider.instance.set(testContext);
}
}
public static boolean isTestInfrastructureEnabled() {
return testContext!=null;
}
} }

View File

@ -5,6 +5,7 @@ import javax.ws.rs.client.WebTarget;
import javax.ws.rs.core.Response; import javax.ws.rs.core.Response;
import org.gcube.data.publishing.gCatFeeder.service.model.ExecutionDescriptor; import org.gcube.data.publishing.gCatFeeder.service.model.ExecutionDescriptor;
import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
public class ExecutionsTest extends BaseTest { public class ExecutionsTest extends BaseTest {
@ -17,8 +18,11 @@ public class ExecutionsTest extends BaseTest {
System.out.println(target.getUri()); System.out.println(target.getUri());
Response resp=target.request().get(); Response resp=target.request().get();
if(resp.getStatus()!=200) {
System.err.println(resp.readEntity(String.class));
throw new RuntimeException("GetAll error should never happen");
}
System.out.println(resp.getStatus() + " : "+ resp.readEntity(String.class)); System.out.println(resp.getStatus() + " : "+ resp.readEntity(String.class));
if(resp.getStatus()!=200) throw new RuntimeException("GetAll error should never happen");
} }
@Test @Test
@ -33,8 +37,12 @@ public class ExecutionsTest extends BaseTest {
ExecutionDescriptor desc=resp.readEntity(ExecutionDescriptor.class); ExecutionDescriptor desc=resp.readEntity(ExecutionDescriptor.class);
Long id=desc.getId(); Long id=desc.getId();
waitForSuccess(id);
}
private void waitForSuccess(Long executionId) {
WebTarget pollTarget= WebTarget pollTarget=
target(ServiceConstants.Executions.PATH).path(id+""); target(ServiceConstants.Executions.PATH).path(executionId+"");
boolean end=false; boolean end=false;
do { do {
@ -57,6 +65,24 @@ public class ExecutionsTest extends BaseTest {
}while(!end); }while(!end);
} }
@Test
public void checkSimilar() {
WebTarget target=
target(ServiceConstants.Executions.PATH);
System.out.println(target.getUri());
Response resp=target.request().post(Entity.json(""));
// System.out.println(resp.getStatus() + " : "+ resp.readEntity(String.class));
ExecutionDescriptor desc=resp.readEntity(ExecutionDescriptor.class);
Long id=desc.getId();
resp=target.request().post(Entity.json(""));
Assert.assertEquals(id, resp.readEntity(ExecutionDescriptor.class).getId());
waitForSuccess(id);
}
@Test @Test
public void wrongSubmission() { public void wrongSubmission() {
WebTarget target= WebTarget target=

View File

@ -1,42 +1,97 @@
package org.gcube.data.publishing.gCatFeeder.service.mockups; package org.gcube.data.publishing.gCatFeeder.service.mockups;
import org.gcube.data.publishing.gCatFeeder.service.engine.Infrastructure; import java.sql.SQLException;
public class InfrastructureMockup implements Infrastructure { import org.gcube.data.publishing.gCatFeeder.service.BaseTest;
import org.gcube.data.publishing.gCatFeeder.service.engine.impl.InfrastructureUtilsImpl;
import org.gcube.data.publishing.gCatFeeder.service.engine.impl.persistence.DatabaseConnectionDescriptor;
import org.gcube.data.publishing.gCatFeeder.service.engine.impl.persistence.DatabaseConnectionDescriptor.Flavor;
public class InfrastructureMockup extends InfrastructureUtilsImpl {
static {
try {
org.h2.tools.Server.createTcpServer().start();
} catch (SQLException e) {
throw new RuntimeException("Unable to init in memory DB.",e);
}
}
@Override @Override
public String getCurrentToken() { public String getCurrentToken() {
if(BaseTest.isTestInfrastructureEnabled()) {
BaseTest.setTestInfrastructure();
return super.getCurrentToken();
}
return "FAKE_TOKEN"; return "FAKE_TOKEN";
} }
@Override @Override
public String getCurrentContext() { public String getCurrentContext() {
if(BaseTest.isTestInfrastructureEnabled()) {
BaseTest.setTestInfrastructure();
return super.getCurrentContext();
}
return "FAKE_CONTEXT"; return "FAKE_CONTEXT";
} }
@Override @Override
public String getCurrentContextName() { public String getCurrentContextName() {
if(BaseTest.isTestInfrastructureEnabled()) {
BaseTest.setTestInfrastructure();
return super.getCurrentContextName();
}
return "FAKE"; return "FAKE";
} }
@Override @Override
public String getClientID(String token){ public String getClientID(String token){
if(BaseTest.isTestInfrastructureEnabled()) {
BaseTest.setTestInfrastructure();
return super.getClientID(token);
}
return "FAKE_ID"; return "FAKE_ID";
} }
@Override @Override
public void setToken(String token) { public void setToken(String token) {
if(BaseTest.isTestInfrastructureEnabled()) {
BaseTest.setTestInfrastructure();
super.setToken(token);
}
} }
@Override @Override
public String decrypt(String toDecrypt) { public String decrypt(String toDecrypt) {
return toDecrypt; if(BaseTest.isTestInfrastructureEnabled()) {
BaseTest.setTestInfrastructure();
return super.decrypt(toDecrypt);
}
else return toDecrypt;
} }
@Override @Override
public String encrypt(String toEncrypt) { public String encrypt(String toEncrypt) {
if(BaseTest.isTestInfrastructureEnabled()) {
BaseTest.setTestInfrastructure();
return super.encrypt(toEncrypt);
}
return toEncrypt; return toEncrypt;
} }
@Override
public DatabaseConnectionDescriptor queryForDatabase(String category, String name) throws InternalError {
if(BaseTest.isTestInfrastructureEnabled()) {
BaseTest.setTestInfrastructure();
return super.queryForDatabase(category, name);
}else {
String url="jdbc:h2:mem:test;DB_CLOSE_DELAY=-1";
// String url="jdbc:h2:tcp://localhost:9092//data;DB_CLOSE_DELAY=-1";
return new DatabaseConnectionDescriptor(null, url, null,Flavor.MYSQL);
}
}
} }

View File

@ -1,33 +1,17 @@
package org.gcube.data.publishing.gCatFeeder.service.mockups; package org.gcube.data.publishing.gCatFeeder.service.mockups;
import java.io.File; import org.gcube.data.publishing.gCatFeeder.service.engine.impl.persistence.StorageImpl;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import org.gcube.data.publishing.gCatFeeder.service.engine.Storage;
import org.gcube.data.publishing.gCatFeeder.service.model.fault.InternalError; import org.gcube.data.publishing.gCatFeeder.service.model.fault.InternalError;
import org.gcube.data.publishing.gCatFeeder.service.model.reports.ExecutionReport; import org.gcube.data.publishing.gCatFeeder.service.model.reports.ExecutionReport;
import com.fasterxml.jackson.databind.ObjectMapper; public class StorageMockup extends StorageImpl {
public class StorageMockup implements Storage {
private static ObjectMapper mapper=new ObjectMapper();
@Override @Override
public String storeReport(ExecutionReport report) throws InternalError { public String storeReport(ExecutionReport report) throws InternalError {
try { return asFile(report).getAbsolutePath();
File f=File.createTempFile("report", ".json");
String serialized=mapper.writeValueAsString(report);
Files.write(Paths.get(f.getAbsolutePath()), serialized.getBytes());
return f.getAbsolutePath();
} catch (IOException e) {
throw new InternalError("Unable to rite report : ",e);
}
} }
} }