This commit is contained in:
Fabio Sinibaldi 2019-01-17 13:37:57 +00:00
parent a02c0b5846
commit 754a15318e
8 changed files with 425 additions and 158 deletions

View File

@ -22,7 +22,7 @@ import org.slf4j.LoggerFactory;
public class ImporterMonitor implements DMMonitorListener {
private static final Logger log= LoggerFactory.getLogger(ImporterMonitor.class);
public ImporterMonitor(ImportRoutineDescriptor routine, ISQueryDescriptor isQuery) {
super();
this.routine = routine;
@ -32,7 +32,7 @@ public class ImporterMonitor implements DMMonitorListener {
private ImportRoutineDescriptor routine;
private ISQueryDescriptor isQuery;
@Override
public void accepted() {
updateStatus(ImportStatus.ACCEPTED,routine,isQuery);
@ -59,41 +59,45 @@ public class ImporterMonitor implements DMMonitorListener {
updateStatus(ImportStatus.RUNNING,routine,isQuery);
}
private static final void updateStatus(ImportStatus status,ImportRoutineDescriptor routine,ISQueryDescriptor is) {
try{
log.debug("Updateing status {} for {} ",status,routine);
DataBaseManager db=DataBaseManager.get(is);
Connection conn=db.getConnection();
conn.setAutoCommit(true);
Instant endTime=null;
switch(status) {
case CANCELLED:
case COMPLETE :
case FAILED : endTime=Instant.now();
}
DBQueryDescriptor queryValues=new DBQueryDescriptor().
add(DBField.ImportRoutine.fields.get(ImportRoutine.ID), routine.getId()).
add(DBField.ImportRoutine.fields.get(ImportRoutine.STATUS), status.toString()).
add(DBField.ImportRoutine.fields.get(ImportRoutine.END), endTime);
PreparedStatement psUpdate=Queries.UPDATE_IMPORT_STATUS.get(conn, queryValues);
psUpdate.executeUpdate();
DataBaseManager db=DataBaseManager.get(is);
Connection conn=db.getConnection();
try {
conn.setAutoCommit(true);
Instant endTime=null;
switch(status) {
case CANCELLED:
case COMPLETE :
case FAILED : endTime=Instant.now();
}
DBQueryDescriptor queryValues=new DBQueryDescriptor().
add(DBField.ImportRoutine.fields.get(ImportRoutine.ID), routine.getId()).
add(DBField.ImportRoutine.fields.get(ImportRoutine.STATUS), status.toString()).
add(DBField.ImportRoutine.fields.get(ImportRoutine.END), endTime);
PreparedStatement psUpdate=Queries.UPDATE_IMPORT_STATUS.get(conn, queryValues);
psUpdate.executeUpdate();
}finally {
conn.close();
}
}catch(Throwable t) {
log.warn("Unable to update status on database");
}
}
private static final void loadOutputData(ImportRoutineDescriptor routine, ISQueryDescriptor is) {
log.debug("Loading output data for {} ",routine);
ComputationId id=DMUtils.getComputation(routine);
Map<String,String> outputFiles=DMUtils.getOutputFiles(id);
// if(outputFiles.containsKey(Local))
// if(outputFiles.containsKey(Local))
}
}

View File

@ -68,9 +68,9 @@ public class DataBaseManagerImpl implements DataBaseManager{
@Override
public Connection getConnection() throws SQLException, InternalException {
DataSource ds=getDataSource();
DataSource ds=getDataSource();
Connection conn=ds.getConnection();
conn.setAutoCommit(false);
conn.setAutoCommit(false);
return conn;
}
@ -78,8 +78,7 @@ public class DataBaseManagerImpl implements DataBaseManager{
private synchronized DataSource getDataSource() throws InternalException {
DatabaseConnectionDescriptor dbDescriptor=getDB();
if(!datasources.containsKey(dbDescriptor.getUrl())) {
if(!datasources.containsKey(dbDescriptor.getUrl())) {
datasources.put(dbDescriptor.getUrl(), setupDataSource(dbDescriptor));
}
return datasources.get(dbDescriptor.getUrl());

View File

@ -0,0 +1,68 @@
package org.gcube.application.perform.service.engine.impl;
import java.io.FileReader;
import java.io.Reader;
import java.sql.PreparedStatement;
import java.util.ArrayList;
import java.util.List;
import java.util.Map.Entry;
import org.apache.commons.csv.CSVFormat;
import org.apache.commons.csv.CSVParser;
import org.apache.commons.csv.CSVRecord;
import org.gcube.application.perform.service.engine.model.DBQueryDescriptor;
import org.gcube.application.perform.service.engine.model.importer.ImportRoutineDescriptor;
import org.gcube.application.perform.service.engine.model.importer.ImportedTable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ImportedTableManager {
private static final Logger log= LoggerFactory.getLogger(ImportedTableManager.class);
private static List<ImportedTable> tables;
public void loadImportedData(ImportRoutineDescriptor desc) {
}
private static final long parse(String path, String description, ImportRoutineDescriptor routine) {
Reader in = new FileReader(path);
CSVParser parser= CSVFormat.DEFAULT.withFirstRecordAsHeader().parse(in);
try {
log.debug("Parsing file {} : {} ",description,path);
// Extract CSV Schema
ArrayList<String> csvSchema=new ArrayList<String>();
for(Entry<String,Integer> entry : parser.getHeaderMap().entrySet()) {
csvSchema.add(entry.getValue(), entry.getKey());
}
log.debug("CSV Schema is {} ",csvSchema);
long counter=0l;
//Get the right table
for(ImportedTable table:tables) {
if(table.matchesSchema(csvSchema)) {
log.debug("Mathing table is {} ",table.getTableName());
Query query=table.getQuery();
PreparedStatement psInsert=query.prepare(conn);
log.debug("Reading csvLines");
for(CSVRecord record:parser) {
DBQueryDescriptor desc=table.getSetRow(record.toMap(), routine.getId(), routine.getFarmId());
query.fill(psInsert, desc);
counter+=psInsert.executeUpdate();
}
log.debug("Inserted {} lines into {} for routine {} [FARM ID {}]",counter,table.getTableName(),routine.getId(),routine.getFarmId());
}
}
}finally {
parser.close();
in.close();
}
}
}

View File

@ -39,12 +39,12 @@ import org.slf4j.LoggerFactory;
public class ImporterImpl implements Importer {
private static final Logger log= LoggerFactory.getLogger(ImporterImpl.class);
private static ISQueryDescriptor isQueryDescriptor=null;
private static synchronized ISQueryDescriptor getISQueryDescriptor() {
if(isQueryDescriptor==null) {
isQueryDescriptor=
@ -54,67 +54,70 @@ public class ImporterImpl implements Importer {
}
return isQueryDescriptor;
}
private static final String getHostname() {
try{
ApplicationContext context=ContextProvider.get();
ContainerConfiguration configuration=context.container().configuration();
return configuration.hostname();
}catch(Throwable t) {
log.warn("UNABLE TO GET HOSTNAME. This should happen only in debug mode.");
return "localhost";
}
}
public void init() throws InternalException{
try {
log.info("Initializing IMPORTER");
DataBaseManager db=DataBaseManager.get(getISQueryDescriptor());
Connection conn=db.getConnection();
conn.setAutoCommit(true);
PreparedStatement psOrphans=Queries.ORPHAN_IMPORTS.prepare(conn);
PreparedStatement psAcquire=Queries.ACQUIRE_IMPORT_ROUTINE.prepare(conn);
// set ps
ResultSet rsOrphans=psOrphans.executeQuery();
long monitoredCount=0l;
while(rsOrphans.next()) {
Long id=rsOrphans.getLong(ImportRoutine.ID);
log.info("Initializing IMPORTER");
DataBaseManager db=DataBaseManager.get(getISQueryDescriptor());
Connection conn=db.getConnection();
try {
ImportRoutineDescriptor desc=Queries.rowToDescriptor(rsOrphans);
String hostname=getHostname();
DBQueryDescriptor acquireDesc=new DBQueryDescriptor().
add(ImportRoutine.fields.get(ImportRoutine.LOCK), hostname).
add(ImportRoutine.fields.get(ImportRoutine.ID), id);
Queries.ACQUIRE_IMPORT_ROUTINE.fill(psAcquire, acquireDesc);
if(psAcquire.executeUpdate()>0) {
log.debug("Acquired {} ",id);
// Stored caller token
log.debug("Setting stored token.. ");
SecurityTokenProvider.instance.set(CommonUtils.decryptString(desc.getCaller()));
monitor(desc);
monitoredCount++;
conn.setAutoCommit(true);
PreparedStatement psOrphans=Queries.ORPHAN_IMPORTS.prepare(conn);
PreparedStatement psAcquire=Queries.ACQUIRE_IMPORT_ROUTINE.prepare(conn);
// set ps
ResultSet rsOrphans=psOrphans.executeQuery();
long monitoredCount=0l;
while(rsOrphans.next()) {
Long id=rsOrphans.getLong(ImportRoutine.ID);
try {
ImportRoutineDescriptor desc=Queries.rowToDescriptor(rsOrphans);
String hostname=getHostname();
DBQueryDescriptor acquireDesc=new DBQueryDescriptor().
add(ImportRoutine.fields.get(ImportRoutine.LOCK), hostname).
add(ImportRoutine.fields.get(ImportRoutine.ID), id);
Queries.ACQUIRE_IMPORT_ROUTINE.fill(psAcquire, acquireDesc);
if(psAcquire.executeUpdate()>0) {
log.debug("Acquired {} ",id);
// Stored caller token
log.debug("Setting stored token.. ");
SecurityTokenProvider.instance.set(CommonUtils.decryptString(desc.getCaller()));
monitor(desc);
monitoredCount++;
}
}catch(Throwable t) {
log.warn("Unable to monitor orphan with ID {} ",id,t);
}
}
log.info("Acquired {} import executions for monitoring",monitoredCount);
}finally {
conn.close();
}
}catch(Throwable t) {
log.warn("Unable to monitor orphan with ID {} ",id,t);
}
}
log.info("Acquired {} import executions for monitoring",monitoredCount);
}catch(Throwable t) {
log.warn("Unexpected Error while trying to check orphan import routines");
throw new InternalException(t);
}
}
@Override
public ImportRoutineDescriptor importExcel(ImportRequest request) throws DMException, SQLException, InternalException {
log.debug("Submitting {} ",request);
@ -125,17 +128,17 @@ public class ImporterImpl implements Importer {
monitor(desc);
return getDescriptorById(desc.getId());
}
private void monitor(ImportRoutineDescriptor desc) throws DMException {
log.debug("Monitoring {} ",desc);
DMUtils.monitor(DMUtils.getComputation(desc), new ImporterMonitor(desc,getISQueryDescriptor()));
}
private ComputationId submit(ImportRequest request) throws DMException {
/**
* dataminer-prototypes.d4science.org/wps/WebProcessingService?
@ -147,17 +150,17 @@ public class ImporterImpl implements Importer {
* BatchType=GROW_OUT_AGGREGATED;
* FarmID=ID
*/
Map<String,String> parameters=new HashMap<>();
parameters.put("InputData", request.getSource());
parameters.put("BatchType", request.getBatchType());
parameters.put("FarmID", request.getFarmId().toString());
return DMUtils.submitJob(LocalConfiguration.getProperty(LocalConfiguration.IMPORTER_COMPUTATION_ID), parameters);
}
private ImportRoutineDescriptor register(ComputationId computationId,ImportRequest request) throws SQLException, InternalException {
DBQueryDescriptor insertionRow=new DBQueryDescriptor().
add(ImportRoutine.fields.get(ImportRoutine.BATCH_TYPE), request.getBatchType()).
add(ImportRoutine.fields.get(ImportRoutine.CALLER), CommonUtils.encryptString(ScopeUtils.getCaller())).
@ -172,37 +175,42 @@ public class ImporterImpl implements Importer {
add(ImportRoutine.fields.get(ImportRoutine.SOURCE_VERSION), request.getVersion()).
add(ImportRoutine.fields.get(ImportRoutine.START), java.sql.Timestamp.from(Instant.now())).
add(ImportRoutine.fields.get(ImportRoutine.STATUS),ImportStatus.ACCEPTED.toString());
DataBaseManager db=DataBaseManager.get(getISQueryDescriptor());
Connection conn=db.getConnection();
conn.setAutoCommit(true);
PreparedStatement ps=Queries.INSERT_ROUTINE.prepare(conn,Statement.RETURN_GENERATED_KEYS);
Queries.INSERT_ROUTINE.fill(ps, insertionRow);
ps.executeUpdate();
ResultSet rs=ps.getGeneratedKeys();
rs.next();
PreparedStatement psGet=Queries.GET_IMPORT_ROUTINE_BY_ID.get(conn,
new DBQueryDescriptor().add(ImportRoutine.fields.get(ImportRoutine.ID), rs.getLong(ImportRoutine.ID)));
ResultSet rsGet=psGet.executeQuery();
rsGet.next();
return Queries.rowToDescriptor(rsGet);
try {
PreparedStatement ps=Queries.INSERT_ROUTINE.prepare(conn,Statement.RETURN_GENERATED_KEYS);
Queries.INSERT_ROUTINE.fill(ps, insertionRow);
ps.executeUpdate();
ResultSet rs=ps.getGeneratedKeys();
rs.next();
PreparedStatement psGet=Queries.GET_IMPORT_ROUTINE_BY_ID.get(conn,
new DBQueryDescriptor().add(ImportRoutine.fields.get(ImportRoutine.ID), rs.getLong(ImportRoutine.ID)));
ResultSet rsGet=psGet.executeQuery();
rsGet.next();
return Queries.rowToDescriptor(rsGet);
}finally {
conn.close();
}
}
private ImportRoutineDescriptor getDescriptorById(Long id) throws SQLException, InternalException {
DataBaseManager db=DataBaseManager.get(getISQueryDescriptor());
Connection conn=db.getConnection();
PreparedStatement ps=Queries.GET_IMPORT_ROUTINE_BY_ID.get(conn,
new DBQueryDescriptor().add(ImportRoutine.fields.get(ImportRoutine.ID), id));
ps.setLong(1, id);
ResultSet rs=ps.executeQuery();
if(rs.next()) return Queries.rowToDescriptor(rs);
else throw new BeanNotFound("Unable to find Routine with ID "+id);
try {
PreparedStatement ps=Queries.GET_IMPORT_ROUTINE_BY_ID.get(conn,
new DBQueryDescriptor().add(ImportRoutine.fields.get(ImportRoutine.ID), id));
ps.setLong(1, id);
ResultSet rs=ps.executeQuery();
if(rs.next()) return Queries.rowToDescriptor(rs);
else throw new BeanNotFound("Unable to find Routine with ID "+id);
}finally {
conn.close();
}
}
@ -210,21 +218,24 @@ public class ImporterImpl implements Importer {
public List<ImportRoutineDescriptor> getDescriptors(DBQueryDescriptor desc) throws SQLException, InternalException {
DataBaseManager db=DataBaseManager.get(getISQueryDescriptor());
Connection conn=db.getConnection();
PreparedStatement ps=Queries.FILTER_IMPORTS.get(conn, desc);
ResultSet rs=ps.executeQuery();
ArrayList<ImportRoutineDescriptor> toReturn=new ArrayList<>();
while (rs.next())
toReturn.add(Queries.rowToDescriptor(rs));
return toReturn;
}
}
try {
PreparedStatement ps=Queries.FILTER_IMPORTS.get(conn, desc);
ResultSet rs=ps.executeQuery();
ArrayList<ImportRoutineDescriptor> toReturn=new ArrayList<>();
while (rs.next())
toReturn.add(Queries.rowToDescriptor(rs));
return toReturn;
}finally {
conn.close();
}
}
}

View File

@ -26,7 +26,7 @@ public class MappingManagerImpl implements MappingManager {
private static ISQueryDescriptor isQueryDescriptor=null;
private static synchronized ISQueryDescriptor getISQueryDescriptor() {
if(isQueryDescriptor==null) {
@ -43,27 +43,31 @@ public class MappingManagerImpl implements MappingManager {
public Batch getBatch(DBQueryDescriptor desc) throws SQLException, InternalException{
DataBaseManager db=DataBaseManager.get(getISQueryDescriptor());
Connection conn=db.getConnection();
conn.setAutoCommit(true);
Query getQuery=Queries.GET_BATCH_BY_DESCRIPTIVE_KEY;
PreparedStatement psSearch=getQuery.get(conn, desc);
try{
conn.setAutoCommit(true);
Query getQuery=Queries.GET_BATCH_BY_DESCRIPTIVE_KEY;
PreparedStatement psSearch=getQuery.get(conn, desc);
ResultSet rs=psSearch.executeQuery();
ResultSet rs=psSearch.executeQuery();
if(rs.next())
return Queries.rowToBatch(rs);
if(rs.next())
return Queries.rowToBatch(rs);
// ID NOT FOUND, TRY TO REGISTER IT
// ID NOT FOUND, TRY TO REGISTER IT
log.trace("Registering new Batch from condition {}",desc);
desc.add(DBField.Batch.fields.get(DBField.Batch.UUID), UUID.randomUUID());
PreparedStatement psInsert=Queries.INSERT_BATCH.get(conn, desc);
log.trace("Registering new Batch from condition {}",desc);
desc.add(DBField.Batch.fields.get(DBField.Batch.UUID), UUID.randomUUID());
PreparedStatement psInsert=Queries.INSERT_BATCH.get(conn, desc);
psInsert.executeUpdate();
rs=psSearch.executeQuery();
psInsert.executeUpdate();
rs=psSearch.executeQuery();
if(rs.next())
return Queries.rowToBatch(rs);
else throw new BeanNotFound(String.format("Unable to find Bean with ",desc));
if(rs.next())
return Queries.rowToBatch(rs);
else throw new BeanNotFound(String.format("Unable to find Bean with ",desc));
}finally {
conn.close();
}
}
@ -73,7 +77,8 @@ public class MappingManagerImpl implements MappingManager {
Connection conn=db.getConnection();
PreparedStatement psGet=null;
try{
PreparedStatement psGet=null;
DBField IDField=DBField.Farm.fields.get(DBField.Farm.FARM_ID);
if(desc.getCondition().containsKey(IDField)) {
psGet=Queries.GET_FARM_BY_ID.get(conn, desc);
@ -84,6 +89,9 @@ public class MappingManagerImpl implements MappingManager {
throw new BeanNotFound("Farm not found. Condition was "+desc);
return Queries.rowToFarm(rs);
}finally {
conn.close();
}
}

View File

@ -0,0 +1,38 @@
package org.gcube.application.perform.service.engine.impl;
import java.util.Map;
import org.gcube.application.perform.service.engine.PerformanceManager;
import org.gcube.application.perform.service.engine.dm.DMUtils;
import org.gcube.application.perform.service.engine.model.CSVExportRequest;
import org.gcube.application.perform.service.engine.model.importer.ImportRoutineDescriptor;
import org.gcube.data.analysis.dataminermanagercl.shared.data.computations.ComputationId;
public class PerformanceManagerImpl implements PerformanceManager{
@Override
public String generateCSV(CSVExportRequest request) {
throw new RuntimeException("Not yet implemented");
}
@Override
public void loadOutputData(ImportRoutineDescriptor desc) {
// TODO Auto-generated method stub
ComputationId id= DMUtils.getComputation(desc);
Map<String,String> outputFiles=DMUtils.getOutputFiles(id);
}
}

View File

@ -0,0 +1,98 @@
package org.gcube.application.perform.service.engine.model.importer;
import java.io.File;
import java.sql.Types;
import java.util.ArrayList;
import java.util.Map;
import java.util.Map.Entry;
import org.gcube.application.perform.service.engine.impl.Query;
import org.gcube.application.perform.service.engine.model.CSVExportRequest;
import org.gcube.application.perform.service.engine.model.DBField;
import org.gcube.application.perform.service.engine.model.DBQueryDescriptor;
public class ImportedTable {
/**
* CSV FILE labels -> DBField
*/
private Map<String,DBField> mappings;
private ArrayList<String> csvFields;
private String tablename;
private DBField routineIdField;
private Query query;
/**
* Checks if passed set of labels is
*
* @param toMatchSchema
* @return
*/
public boolean matchesSchema(ArrayList<String> toMatchSchema) {
return csvFields.equals(toMatchSchema);
}
public DBQueryDescriptor getSetRow(Map<String,String> csvRow, Long routineId) {
DBQueryDescriptor desc=new DBQueryDescriptor();
for(Entry<String,String> csvField:csvRow.entrySet()) {
DBField toSetField=mappings.get(csvField.getKey());
Object value=csvField.getValue();
switch(toSetField.getType()) {
case Types.BIGINT : value=Long.parseLong((String) value);
break;
case Types.REAL : value=Double.parseDouble((String) value);
break;
}
desc.add(toSetField, value);
}
desc.add(routineIdField, routineId);
return desc;
}
public Query getQuery() {
return query;
}
public String getTableName() {
return tablename;
}
public File exportCSV(CSVExportRequest request) {
// USE CASE to replace values in
/**
* SELECT
CASE status
WHEN 'VS' THEN 'validated by subsidiary'
WHEN 'NA' THEN 'not acceptable'
WHEN 'D' THEN 'delisted'
ELSE 'validated'
END AS STATUS
FROM SUPP_STATUS
*/
// get deanonimized labels
// Create query
// Pass the result set to the CSV specifying labels
// final Appendable out = ...;
// final CSVPrinter printer = CSVFormat.DEFAULT.withHeader("H1", "H2").print(out)
// return file
throw new RuntimeException("Not implemented");
}
}

View File

@ -1,41 +1,82 @@
package org.gcube.application.perform.service;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicLong;
import javax.ws.rs.client.WebTarget;
import javax.ws.rs.core.Response;
import org.junit.Test;
public class AnagraphicTests extends CommonTest{
private static int CALLS=100;
@Test
public void getBatch() {
WebTarget target=
// target(ServiceConstants.SERVICE_NAME).
// path(ServiceConstants.APPLICATION_PATH).
// target(ServiceConstants.SERVICE_NAME).
// path(ServiceConstants.APPLICATION_PATH).
target(ServiceConstants.Mappings.PATH).
path(ServiceConstants.Mappings.BATCHES_METHOD).
queryParam(ServiceConstants.Mappings.BATCH_NAME_PARAMETER, "gino").
queryParam(ServiceConstants.Mappings.BATCH_TYPE_PARAMETER, "pino").
queryParam(ServiceConstants.Mappings.BATCH_TYPE_PARAMETER, "pino").
queryParam(ServiceConstants.Mappings.FARM_ID_PARAMETER, 12682549);
System.out.println(target.getUri());
System.out.println(target.getUri());
Response resp=target.request().get();
System.out.println(resp.getStatus() + " : "+ resp.readEntity(String.class));
}
@Test
public void getFarm() {
WebTarget target=
// target(ServiceConstants.SERVICE_NAME).
// path(ServiceConstants.APPLICATION_PATH).
// target(ServiceConstants.SERVICE_NAME).
// path(ServiceConstants.APPLICATION_PATH).
target(ServiceConstants.Mappings.PATH).
path(ServiceConstants.Mappings.FARM_METHOD).
queryParam(ServiceConstants.Mappings.FARM_ID_PARAMETER, 126825).
queryParam(ServiceConstants.Mappings.FARM_UUID_PARAMETER, "pino");
System.out.println(target.getUri());
Response resp=target.request().get();
System.out.println(resp.getStatus() + " : "+ resp.readEntity(String.class));
}
@Test
public void parallelRequests() {
final AtomicLong currentExecution=new AtomicLong(0);
final Semaphore sem=new Semaphore((CALLS-1)*(-1));
final WebTarget farmTarget=target(ServiceConstants.Mappings.PATH).
path(ServiceConstants.Mappings.FARM_METHOD).
queryParam(ServiceConstants.Mappings.FARM_ID_PARAMETER, 126825).
queryParam(ServiceConstants.Mappings.FARM_UUID_PARAMETER, "pino");
final WebTarget batchesTarget=target(ServiceConstants.Mappings.PATH).
path(ServiceConstants.Mappings.BATCHES_METHOD).
queryParam(ServiceConstants.Mappings.BATCH_NAME_PARAMETER, "gino").
queryParam(ServiceConstants.Mappings.BATCH_TYPE_PARAMETER, "pino").
queryParam(ServiceConstants.Mappings.FARM_ID_PARAMETER, 12682549);
for(int i=0;i<CALLS;i++) {
Thread t=new Thread() {
@Override
public void run() {
farmTarget.request().get();
batchesTarget.request().get();
System.out.println("Performed "+currentExecution.incrementAndGet());
sem.release();
}
};
t.setDaemon(true);
t.start();
}
try {
sem.acquire();
} catch (InterruptedException e) {
System.out.println("COMPLETED");
}
}
}