This commit is contained in:
Fabio Sinibaldi 2019-03-26 18:05:06 +00:00
parent f2bd178877
commit 9705974338
15 changed files with 439 additions and 36 deletions

View File

@ -76,11 +76,10 @@
<!-- Persistence -->
<dependency>
<groupId>org.eclipse.persistence</groupId>
<artifactId>eclipselink</artifactId>
<version>2.6.6</version>
<scope>compile</scope>
</dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-dbcp2</artifactId>
<version>2.0.1</version>
</dependency>
<!-- test -->

View File

@ -14,7 +14,7 @@ import org.gcube.data.publishing.gCatFeeder.service.engine.impl.CollectorsManage
import org.gcube.data.publishing.gCatFeeder.service.engine.impl.ExecutionManagerImpl;
import org.gcube.data.publishing.gCatFeeder.service.engine.impl.FeederEngineImpl;
import org.gcube.data.publishing.gCatFeeder.service.engine.impl.InfrastructureUtilsImpl;
import org.gcube.data.publishing.gCatFeeder.service.engine.impl.StorageImpl;
import org.gcube.data.publishing.gCatFeeder.service.engine.impl.persistence.StorageImpl;
import org.gcube.data.publishing.gCatFeeder.service.rest.Capabilities;
import org.gcube.data.publishing.gCatFeeder.service.rest.Executions;
import org.glassfish.hk2.utilities.binding.AbstractBinder;

View File

@ -1,10 +1,11 @@
package org.gcube.data.publishing.gCatFeeder.service.engine;
import java.sql.Connection;
import java.sql.SQLException;
public interface ConnectionManager {
public Connection getConnection();
public Connection getConnection() throws SQLException, InternalError;
}

View File

@ -1,5 +1,7 @@
package org.gcube.data.publishing.gCatFeeder.service.engine;
import org.gcube.data.publishing.gCatFeeder.service.engine.impl.persistence.DatabaseConnectionDescriptor;
public interface Infrastructure {
public String getCurrentToken();
@ -12,4 +14,9 @@ public interface Infrastructure {
public String encrypt(String toEncrypt);
public String decrypt(String toDecrypt);
//
public DatabaseConnectionDescriptor queryForDatabase(String category,String name) throws InternalError;
}

View File

@ -0,0 +1,17 @@
package org.gcube.data.publishing.gCatFeeder.service.engine;
public interface LocalConfiguration {
public static final String POOL_MAX_IDLE="db.pools.max_idle";
public static final String POOL_MAX_TOTAL="db.pools.max_total";
public static final String POOL_MIN_IDLE="db.pools.min_total";
public static final String MAPPING_DB_ENDPOINT_NAME="db.ep.name";
public static final String MAPPING_DB_ENDPOINT_CATEGORY="db.ep.category";
public String getProperty(String propertyName);
}

View File

@ -1,12 +1,27 @@
package org.gcube.data.publishing.gCatFeeder.service.engine.impl;
import static org.gcube.resources.discovery.icclient.ICFactory.clientFor;
import static org.gcube.resources.discovery.icclient.ICFactory.queryFor;
import java.util.List;
import org.gcube.common.resources.gcore.ServiceEndpoint;
import org.gcube.common.resources.gcore.ServiceEndpoint.AccessPoint;
import org.gcube.data.publishing.gCatFeeder.service.engine.Infrastructure;
import org.gcube.data.publishing.gCatFeeder.service.engine.impl.persistence.DatabaseConnectionDescriptor;
import org.gcube.data.publishing.gCatFeeder.utils.CommonUtils;
import org.gcube.data.publishing.gCatFeeder.utils.ContextUtils;
import org.gcube.data.publishing.gCatFeeder.utils.TokenUtils;
import org.gcube.resources.discovery.client.api.DiscoveryClient;
import org.gcube.resources.discovery.client.queries.api.SimpleQuery;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class InfrastructureUtilsImpl implements Infrastructure {
private static final Logger log= LoggerFactory.getLogger(InfrastructureUtilsImpl.class);
@Override
public String getCurrentToken() {
return TokenUtils.getCurrentToken();
@ -47,4 +62,33 @@ public class InfrastructureUtilsImpl implements Infrastructure {
public String encrypt(String toEncrypt) {
return CommonUtils.encryptString(toEncrypt);
}
@Override
public DatabaseConnectionDescriptor queryForDatabase(String category, String name) throws InternalError {
log.debug("Querying for DB {},{} under {}",category,name,getCurrentContext());
SimpleQuery query = queryFor(ServiceEndpoint.class);
query.addCondition("$resource/Profile/Category/text() eq '"+category+"'");
query.addCondition("$resource/Profile/Name/text() eq '"+name+"'");
DiscoveryClient<ServiceEndpoint> client = clientFor(ServiceEndpoint.class);
List<ServiceEndpoint> found=client.submit(query);
if(found.size()==0) throw new InternalError("Unable to find DB "+category+"/"+name+" in "+getCurrentContext());
if(found.size()>1) log.warn("Multiple DB "+category+"/"+name+" in "+getCurrentContext());
AccessPoint point= found.get(0).profile().accessPoints().iterator().next();
String url="jdbc:postgresql://"+point.address()+"/"+point.name();
DatabaseConnectionDescriptor toReturn= new DatabaseConnectionDescriptor(point.username(), url, CommonUtils.decryptString(point.password()));
log.debug("Going to use DB : "+toReturn);
return toReturn;
}
}

View File

@ -0,0 +1,119 @@
package org.gcube.data.publishing.gCatFeeder.service.engine.impl.persistence;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.concurrent.ConcurrentHashMap;
import javax.inject.Inject;
import javax.sql.DataSource;
import org.apache.commons.dbcp2.ConnectionFactory;
import org.apache.commons.dbcp2.DriverManagerConnectionFactory;
import org.apache.commons.dbcp2.PoolableConnection;
import org.apache.commons.dbcp2.PoolableConnectionFactory;
import org.apache.commons.dbcp2.PoolingDataSource;
import org.apache.commons.pool2.ObjectPool;
import org.apache.commons.pool2.impl.GenericObjectPool;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.gcube.data.publishing.gCatFeeder.service.engine.ConnectionManager;
import org.gcube.data.publishing.gCatFeeder.service.engine.Infrastructure;
import org.gcube.data.publishing.gCatFeeder.service.engine.LocalConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ConnectionManagerImpl implements ConnectionManager {
static {
try {
Class.forName("org.postgresql.Driver");
} catch (ClassNotFoundException e) {
System.err.println("PERFORM SERVICE - UNABLE TO REGISTER postgresql DRIVER");
e.printStackTrace(System.err);
throw new RuntimeException(e);
}
}
private static final Logger log= LoggerFactory.getLogger(ConnectionManagerImpl.class);
// ************************************** INSTANCE
// Endpoint -> datasource
private ConcurrentHashMap<String,DataSource> datasources=new ConcurrentHashMap<>();
// Scope -> db
private ConcurrentHashMap<String,DatabaseConnectionDescriptor> databases=new ConcurrentHashMap<>();
/**
* Manages db connection pools by scope
*
*
*/
@Inject
private Infrastructure infrastructure;
@Inject
private LocalConfiguration configuration;
private synchronized DatabaseConnectionDescriptor getDB() throws InternalError {
if(!databases.containsKey(infrastructure.getCurrentContext()))
databases.put(infrastructure.getCurrentContext(), infrastructure.queryForDatabase(
configuration.getProperty(LocalConfiguration.MAPPING_DB_ENDPOINT_CATEGORY),
configuration.getProperty(LocalConfiguration.MAPPING_DB_ENDPOINT_NAME)));
return databases.get(infrastructure.getCurrentContext());
}
@Override
public Connection getConnection() throws SQLException, InternalError {
DataSource ds=getDataSource();
Connection conn=ds.getConnection();
conn.setAutoCommit(false);
return conn;
}
private synchronized DataSource getDataSource() throws InternalError {
DatabaseConnectionDescriptor dbDescriptor=getDB();
if(!datasources.containsKey(dbDescriptor.getUrl())) {
datasources.put(dbDescriptor.getUrl(), setupDataSource(dbDescriptor));
}
return datasources.get(dbDescriptor.getUrl());
}
private DataSource setupDataSource(DatabaseConnectionDescriptor db) {
log.trace("Setting up data source for {} ",db);
GenericObjectPoolConfig poolConfig=new GenericObjectPoolConfig();
poolConfig.setMaxIdle(Integer.parseInt(configuration.getProperty(LocalConfiguration.POOL_MAX_IDLE)));
poolConfig.setMaxTotal(Integer.parseInt(configuration.getProperty(LocalConfiguration.POOL_MAX_TOTAL)));
poolConfig.setMinIdle(Integer.parseInt(configuration.getProperty(LocalConfiguration.POOL_MIN_IDLE)));
ConnectionFactory connectionFactory =
new DriverManagerConnectionFactory(db.getUrl(),db.getUsername(),db.getPassword());
PoolableConnectionFactory poolableConnectionFactory =
new PoolableConnectionFactory(connectionFactory, null);
ObjectPool<PoolableConnection> connectionPool =
new GenericObjectPool<>(poolableConnectionFactory);
poolableConnectionFactory.setPool(connectionPool);
PoolingDataSource<PoolableConnection> dataSource =
new PoolingDataSource<>(connectionPool);
return dataSource;
}
}

View File

@ -11,18 +11,28 @@ public class DBField {
public static final Map<String,DBField> fields=new HashMap<>();
public static final String ID="id";
public static final String ID="id";
public static final String CALLER_TOKEN="caller_token";
public static final String CALLER_ID="caller_id";
public static final String CALLER_CONTEXT="caller_context";
public static final String STATUS = "status";
public static final String REPORT_URL="report_url";
public static final String COLLECTORS="collectors";
public static final String CONTROLLERS="controllers";
public static final String START="start_time";
public static final String END="end_time";
public static final String STATUS="status";
public static final String CALLER="caller";
static {
fields.put(ID, new DBField(Types.BIGINT,ID));
fields.put(CALLER_TOKEN, new DBField(Types.VARCHAR,CALLER_TOKEN));
fields.put(CALLER_ID, new DBField(Types.VARCHAR,CALLER_ID));
fields.put(CALLER_CONTEXT, new DBField(Types.VARCHAR,CALLER_CONTEXT));
fields.put(STATUS, new DBField(Types.VARCHAR,STATUS));
fields.put(REPORT_URL, new DBField(Types.VARCHAR,REPORT_URL));
fields.put(START, new DBField(Types.TIMESTAMP,START));
fields.put(END, new DBField(Types.TIMESTAMP,END));
fields.put(STATUS, new DBField(Types.VARCHAR,STATUS));
fields.put(CALLER, new DBField(Types.VARCHAR,CALLER));
}
}

View File

@ -0,0 +1,37 @@
package org.gcube.data.publishing.gCatFeeder.service.engine.impl.persistence;
public class DatabaseConnectionDescriptor {
private String username;
private String url;
private String password;
@Override
public String toString() {
return "DatabaseConnectionDescriptor [username=" + username + ", url=" + url + "]";
}
public DatabaseConnectionDescriptor(String username, String url, String password) {
super();
this.username = username;
this.url = url;
this.password = password;
}
public String getUsername() {
return username;
}
public String getUrl() {
return url;
}
public String getPassword() {
return password;
}
}

View File

@ -1,4 +1,4 @@
package org.gcube.data.publishing.gCatFeeder.service.engine.impl;
package org.gcube.data.publishing.gCatFeeder.service.engine.impl.persistence;
import static org.gcube.data.publishing.gCatFeeder.service.engine.impl.persistence.DBField.ExecutionDescriptor.ID;
import static org.gcube.data.publishing.gCatFeeder.service.engine.impl.persistence.DBField.ExecutionDescriptor.fields;
@ -15,8 +15,6 @@ import javax.inject.Inject;
import org.gcube.data.publishing.gCatFeeder.service.engine.ConnectionManager;
import org.gcube.data.publishing.gCatFeeder.service.engine.PersistenceManager;
import org.gcube.data.publishing.gCatFeeder.service.engine.impl.persistence.DBQueryDescriptor;
import org.gcube.data.publishing.gCatFeeder.service.engine.impl.persistence.Queries;
import org.gcube.data.publishing.gCatFeeder.service.model.ExecutionDescriptor;
import org.gcube.data.publishing.gCatFeeder.service.model.ExecutionRequest;
import org.gcube.data.publishing.gCatFeeder.service.model.fault.ElementNotFound;

View File

@ -1,35 +1,148 @@
package org.gcube.data.publishing.gCatFeeder.service.engine.impl.persistence;
import static org.gcube.data.publishing.gCatFeeder.service.engine.impl.persistence.DBField.ExecutionDescriptor.CALLER_CONTEXT;
import static org.gcube.data.publishing.gCatFeeder.service.engine.impl.persistence.DBField.ExecutionDescriptor.CALLER_ID;
import static org.gcube.data.publishing.gCatFeeder.service.engine.impl.persistence.DBField.ExecutionDescriptor.CALLER_TOKEN;
import static org.gcube.data.publishing.gCatFeeder.service.engine.impl.persistence.DBField.ExecutionDescriptor.COLLECTORS;
import static org.gcube.data.publishing.gCatFeeder.service.engine.impl.persistence.DBField.ExecutionDescriptor.CONTROLLERS;
import static org.gcube.data.publishing.gCatFeeder.service.engine.impl.persistence.DBField.ExecutionDescriptor.END;
import static org.gcube.data.publishing.gCatFeeder.service.engine.impl.persistence.DBField.ExecutionDescriptor.ID;
import static org.gcube.data.publishing.gCatFeeder.service.engine.impl.persistence.DBField.ExecutionDescriptor.REPORT_URL;
import static org.gcube.data.publishing.gCatFeeder.service.engine.impl.persistence.DBField.ExecutionDescriptor.START;
import static org.gcube.data.publishing.gCatFeeder.service.engine.impl.persistence.DBField.ExecutionDescriptor.STATUS;
import static org.gcube.data.publishing.gCatFeeder.service.engine.impl.persistence.DBField.ExecutionDescriptor.TABLE;
import static org.gcube.data.publishing.gCatFeeder.service.engine.impl.persistence.DBField.ExecutionDescriptor.fields;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import org.gcube.data.publishing.gCatFeeder.service.model.ExecutionDescriptor;
import org.gcube.data.publishing.gCatFeeder.service.model.ExecutionRequest;
import org.gcube.data.publishing.gCatFeeder.service.model.ExecutionStatus;
import org.gcube.data.publishing.gCatFeeder.service.model.fault.InvalidRequest;
public class Queries {
public static final Query GET_BY_ID;
public static final Query UPDATE;
public static final Query ACQUIRE;
public static final Query GET_ALL;
public static final Query GET_SIMILAR;
public static final Query INSERT_NEW;
public static final ExecutionDescriptor translateRow(ResultSet row) {
public static final Query GET_BY_ID=new Query("Select * from "+TABLE+" where "+ID+" = ?",
new DBField[] {fields.get(ID)});
public static final Query UPDATE=new Query("UPDATE "+TABLE+" SET "
+START+"=?, "+END+"=?, "+ STATUS+"=?, "+REPORT_URL+"=? WHERE "+ID+"=?",
new DBField[] {fields.get(START),
fields.get(END),
fields.get(STATUS),
fields.get(REPORT_URL),
fields.get(ID)});
public static final Query ACQUIRE= new Query("UPDATE "+TABLE+" SET "
+STATUS+"="+ExecutionStatus.RUNNING+" WHERE "+ID+"=? AND "+STATUS+"="+ExecutionStatus.PENDING,
new DBField[] {fields.get(ID)});
public static final Query GET_ALL= new Query("SELECT * FROM "+TABLE+" ORDER BY "+END+" DESC",
new DBField[] {});
/*
* SAME CONTEXT,
* status is RUNNING OR PENDING
* SAME COLLECTORS
* SAME CATALOGUES
*/
public static final Query GET_SIMILAR=new Query ("SELECT * FROM "+TABLE+" WHERE "
+CALLER_CONTEXT+"=? AND "+COLLECTORS+"=? AND "+CONTROLLERS+"=? AND "
/* STATUS */ +"("+STATUS+"="+ExecutionStatus.RUNNING+" OR "+STATUS+"="+ExecutionStatus.PENDING+")",
new DBField[] {fields.get(CALLER_CONTEXT),
fields.get(COLLECTORS),
fields.get(CONTROLLERS)});
public static final Query INSERT_NEW=new Query ("INSERT INTO "+TABLE+" ("
+CALLER_TOKEN+","
+CALLER_ID+","
+CALLER_CONTEXT+","
+STATUS+","
+REPORT_URL+","
+COLLECTORS+","
+CONTROLLERS+","
+START+") VALUES (?,?,?,?,?,?,?,?)",
new DBField[] {fields.get(CALLER_TOKEN),fields.get(CALLER_ID),fields.get(CALLER_CONTEXT),
fields.get(STATUS),fields.get(REPORT_URL),fields.get(COLLECTORS),fields.get(CONTROLLERS),fields.get(START)});
public static final ExecutionDescriptor translateRow(ResultSet row) throws SQLException {
ExecutionDescriptor toReturn=new ExecutionDescriptor();
toReturn.setCallerContext(row.getString(CALLER_CONTEXT));
toReturn.setCallerEncryptedToken(row.getString(CALLER_TOKEN));
toReturn.setCallerIdentity(row.getString(CALLER_ID));
toReturn.getCatalogues().addAll(fromField(row.getString(CONTROLLERS)));
toReturn.getCollectors().addAll(fromField(row.getString(COLLECTORS)));
Timestamp endTime=row.getTimestamp(END);
if(endTime!=null)
toReturn.setEndTime(endTime.toInstant());
toReturn.setId(row.getLong(ID));
toReturn.setReportUrl(row.getString(REPORT_URL));
toReturn.setStartTime(row.getTimestamp(START).toInstant());
toReturn.setStatus(ExecutionStatus.valueOf(row.getString(STATUS)));
return toReturn;
}
public static final DBQueryDescriptor translateObject(ExecutionDescriptor descriptor) throws InvalidRequest{
try{
return new DBQueryDescriptor().
add(fields.get(CALLER_CONTEXT),descriptor.getCallerContext()).
add(fields.get(CALLER_TOKEN),descriptor.getCallerEncryptedToken()).
add(fields.get(CALLER_ID),descriptor.getCallerIdentity()).
add(fields.get(CONTROLLERS),toField(descriptor.getCatalogues())).
add(fields.get(COLLECTORS),toField(descriptor.getCollectors())).
add(fields.get(END),descriptor.getEndTime()!=null?java.sql.Timestamp.from(descriptor.getEndTime()):null).
add(fields.get(ID),descriptor.getId()).
add(fields.get(REPORT_URL),descriptor.getReportUrl()).
add(fields.get(START),descriptor.getStartTime()!=null?java.sql.Timestamp.from(descriptor.getStartTime()):null).
add(fields.get(STATUS),descriptor.getStatus().toString());
}catch(Throwable t) {
throw new InvalidRequest(t);
}
}
public static final DBQueryDescriptor translateObject(ExecutionRequest request) throws InvalidRequest{
try{
return new DBQueryDescriptor().
add(fields.get(CALLER_CONTEXT),request.getContext()).
add(fields.get(CALLER_TOKEN),request.getEncryptedToken()).
add(fields.get(CALLER_ID),request.getCallerID()).
add(fields.get(CONTROLLERS),toField(request.getToInvokeControllers())).
add(fields.get(COLLECTORS),toField(request.getToInvokeCollectors()));
// add(fields.get(END),descriptor.getEndTime()!=null?java.sql.Timestamp.from(descriptor.getEndTime()):null).
// add(fields.get(ID),descriptor.getId()).
// add(fields.get(REPORT_URL),descriptor.getReportUrl()).
// add(fields.get(START),descriptor.getStartTime()!=null?java.sql.Timestamp.from(descriptor.getStartTime()):null).
// add(fields.get(STATUS),descriptor.getStatus().toString());
}catch(Throwable t) {
throw new InvalidRequest(t);
}
}
private static final Set<String> fromField(String fieldValue){
HashSet<String> toReturn=new HashSet<>();
for(String v:fieldValue.split(","))
toReturn.add(v);
return toReturn;
}
private static final String toField(Set<String> values){
StringBuilder toReturn=new StringBuilder();
if(values!=null&&!values.isEmpty()) {
ArrayList<String> sorted=new ArrayList<String>(values);
Collections.sort(sorted);
for(String v:sorted)
toReturn.append(v+",");
return toReturn.substring(0, toReturn.length()-1);
}
else return "";
}
}

View File

@ -1,4 +1,4 @@
package org.gcube.data.publishing.gCatFeeder.service.engine.impl;
package org.gcube.data.publishing.gCatFeeder.service.engine.impl.persistence;
import org.gcube.data.publishing.gCatFeeder.service.engine.Storage;
import org.gcube.data.publishing.gCatFeeder.service.model.fault.InternalError;

View File

@ -1,5 +1,6 @@
package org.gcube.data.publishing.gCatFeeder.service.model;
import java.time.Instant;
import java.util.Set;
@ -17,9 +18,22 @@ public class ExecutionDescriptor {
private ExecutionStatus status;
private String reportUrl;
private Instant startTime;
private Instant endTime;
public Instant getStartTime() {
return startTime;
}
public void setStartTime(Instant startTime) {
this.startTime = startTime;
}
public Instant getEndTime() {
return endTime;
}
public void setEndTime(Instant endTime) {
this.endTime = endTime;
}
public Long getId() {
return id;
}

View File

@ -8,6 +8,10 @@ public class ExecutionRequest {
private Set<String> toInvokeCollectors=new HashSet<>();
private Set<String> toInvokeControllers=new HashSet<>();
private String callerID;
private String context;
private String encryptedToken;
public ExecutionRequest addCollectorId(String id) {
this.toInvokeCollectors.add(id);
@ -36,4 +40,36 @@ public class ExecutionRequest {
public void setToInvokeControllers(Set<String> toInvokeControllers) {
this.toInvokeControllers = toInvokeControllers;
}
public String getCallerID() {
return callerID;
}
public void setCallerID(String callerID) {
this.callerID = callerID;
}
public String getContext() {
return context;
}
public void setContext(String context) {
this.context = context;
}
public String getEncryptedToken() {
return encryptedToken;
}
public void setEncryptedToken(String encryptedToken) {
this.encryptedToken = encryptedToken;
}
}

View File

@ -18,6 +18,7 @@ import javax.ws.rs.core.UriInfo;
import org.gcube.data.publishing.gCatFeeder.service.GCatFeederManager;
import org.gcube.data.publishing.gCatFeeder.service.ServiceConstants;
import org.gcube.data.publishing.gCatFeeder.service.engine.FeederEngine;
import org.gcube.data.publishing.gCatFeeder.service.engine.Infrastructure;
import org.gcube.data.publishing.gCatFeeder.service.engine.impl.persistence.DBQueryDescriptor;
import org.gcube.data.publishing.gCatFeeder.service.model.ExecutionDescriptor;
import org.gcube.data.publishing.gCatFeeder.service.model.ExecutionRequest;
@ -38,6 +39,9 @@ public class Executions {
@Inject
private FeederEngine engine;
@Inject
private Infrastructure infrastructure;
@POST
@Produces(MediaType.APPLICATION_JSON)
@ -56,6 +60,10 @@ public class Executions {
request.addControllerId(catalogue);
else request.addControllerId(ServiceConstants.Executions.DEFAULT_VALUE);
String token=infrastructure.getCurrentToken();
request.setCallerID(infrastructure.getClientID(token));
request.setContext(infrastructure.getCurrentContext());
request.setEncryptedToken(infrastructure.encrypt(token));
log.trace("Submitting request {} ",request);