This commit is contained in:
Fabio Sinibaldi 2019-01-21 17:01:54 +00:00
parent 754a15318e
commit ff1dcee328
23 changed files with 813 additions and 243 deletions

View File

@ -19,10 +19,10 @@ public class LocalConfiguration {
public static final String MAPPING_DB_ENDPOINT_NAME="mapping-db.ep.name";
public static final String MAPPING_DB_ENDPOINT_CATEGORY="mapping-db.ep.category";
public static final String PERFORMANCE_DB_ENDPOINT_NAME="performance-db.ep.name";
public static final String PERFORMANCE_DB_ENDPOINT_CATEGORY="performance-db.ep.category";
//
// public static final String PERFORMANCE_DB_ENDPOINT_NAME="performance-db.ep.name";
// public static final String PERFORMANCE_DB_ENDPOINT_CATEGORY="performance-db.ep.category";
//
public static final String IMPORTER_COMPUTATION_ID="dm.importer.computationid";

View File

@ -1,25 +1,87 @@
package org.gcube.application.perform.service;
import org.gcube.smartgears.handlers.application.ApplicationLifecycleEvent.Start;
import org.gcube.smartgears.handlers.application.ApplicationLifecycleEvent.Stop;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.net.URL;
import java.sql.SQLException;
import java.util.Properties;
import javax.servlet.ServletContext;
import javax.xml.bind.annotation.XmlRootElement;
import org.gcube.application.perform.service.engine.impl.ImportedTableManager;
import org.gcube.application.perform.service.engine.impl.SchemaDefinition;
import org.gcube.application.perform.service.engine.model.InternalException;
import org.gcube.application.perform.service.engine.model.importer.AnalysisType;
import org.gcube.smartgears.ContextProvider;
import org.gcube.smartgears.context.application.ApplicationContext;
import org.gcube.smartgears.handlers.application.ApplicationLifecycleEvent.Start;
import org.gcube.smartgears.handlers.application.ApplicationLifecycleEvent.Stop;
import org.gcube.smartgears.handlers.application.ApplicationLifecycleHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@XmlRootElement(name = "perform-lifecycle")
public class PerformServiceLifecycleManager extends ApplicationLifecycleHandler {
private static final Logger log= LoggerFactory.getLogger(PerformServiceLifecycleManager.class);
@Override
public void onStart(Start e) {
public void onStart(Start e) {
try{
ApplicationContext context=ContextProvider.get();
URL resourceUrl = context.application().getResource("/WEB-INF/config.properties");
LocalConfiguration.init(resourceUrl);
ServletContext ctx=ContextProvider.get().application();
String webinfPath=ctx.getRealPath("/WEB-INF");
initSchema(webinfPath);
}catch(Exception ex) {
throw new RuntimeException("Unable to init",ex);
}
super.onStart(e);
}
@Override
public void onStop(Stop e) {
super.onStop(e);
}
/**
* schema/analysis_id/description.csv
* @throws IOException
* @throws InternalException
* @throws SQLException
*
*
*/
static final void initSchema(String webinfPath) throws IOException, SQLException, InternalException {
String configurationPath=webinfPath+"/schema";
log.info("Reading Analysis Configuration from {} ",configurationPath);
File folder=new File(configurationPath);
for(File analysisFolder:folder.listFiles()) {
String analysisName=analysisFolder.getName();
AnalysisType type=new AnalysisType(analysisName, analysisName);
log.info("Reading from "+analysisFolder.getPath());
for(File schemaFile:analysisFolder.listFiles()) {
FileInputStream fis=null;
try {
Properties props=new Properties();
fis=new FileInputStream(schemaFile);
props.load(fis);
SchemaDefinition schema=new SchemaDefinition(type,props);
ImportedTableManager.importSchema(schema,webinfPath);
}finally {
if(fis!=null) fis.close();
}
}
}
}
}

View File

@ -22,9 +22,6 @@ public class PerformServiceManager implements ApplicationManager{
@Override
public void onInit() {
try {
ApplicationContext context=ContextProvider.get();
URL resourceUrl = context.application().getResource("/WEB-INF/config.properties");
LocalConfiguration.init(resourceUrl);
new ImporterImpl().init();

View File

@ -4,14 +4,13 @@ import java.sql.Connection;
import java.sql.SQLException;
import org.gcube.application.perform.service.engine.impl.DataBaseManagerImpl;
import org.gcube.application.perform.service.engine.model.ISQueryDescriptor;
import org.gcube.application.perform.service.engine.model.InternalException;
public interface DataBaseManager {
public static DataBaseManager get(ISQueryDescriptor desc) {
return new DataBaseManagerImpl(desc);
public static DataBaseManager get() {
return new DataBaseManagerImpl();
}

View File

@ -23,10 +23,9 @@ public class ImporterMonitor implements DMMonitorListener {
private static final Logger log= LoggerFactory.getLogger(ImporterMonitor.class);
public ImporterMonitor(ImportRoutineDescriptor routine, ISQueryDescriptor isQuery) {
public ImporterMonitor(ImportRoutineDescriptor routine) {
super();
this.routine = routine;
this.isQuery = isQuery;
this.routine = routine;
}
@ -35,36 +34,36 @@ public class ImporterMonitor implements DMMonitorListener {
@Override
public void accepted() {
updateStatus(ImportStatus.ACCEPTED,routine,isQuery);
updateStatus(ImportStatus.ACCEPTED,routine);
}
@Override
public void cancelled() {
updateStatus(ImportStatus.CANCELLED,routine,isQuery);
updateStatus(ImportStatus.CANCELLED,routine);
}
@Override
public void complete(double percentage) {
updateStatus(ImportStatus.COMPLETE,routine,isQuery);
updateStatus(ImportStatus.COMPLETE,routine);
loadOutputData(routine,isQuery);
}
@Override
public void failed(String message, Exception exception) {
updateStatus(ImportStatus.FAILED,routine,isQuery);
updateStatus(ImportStatus.FAILED,routine);
}
@Override
public void running(double percentage) {
updateStatus(ImportStatus.RUNNING,routine,isQuery);
updateStatus(ImportStatus.RUNNING,routine);
}
private static final void updateStatus(ImportStatus status,ImportRoutineDescriptor routine,ISQueryDescriptor is) {
private static final void updateStatus(ImportStatus status,ImportRoutineDescriptor routine) {
try{
log.debug("Updateing status {} for {} ",status,routine);
DataBaseManager db=DataBaseManager.get(is);
DataBaseManager db=DataBaseManager.get();
Connection conn=db.getConnection();
try {
conn.setAutoCommit(true);

View File

@ -20,6 +20,7 @@ import org.gcube.application.perform.service.engine.model.DatabaseConnectionDesc
import org.gcube.application.perform.service.engine.model.ISQueryDescriptor;
import org.gcube.application.perform.service.engine.model.InternalException;
import org.gcube.application.perform.service.engine.utils.ISUtils;
import org.gcube.application.perform.service.engine.utils.ScopeUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -45,25 +46,25 @@ public class DataBaseManagerImpl implements DataBaseManager{
// Endpoint -> datasource
private static ConcurrentHashMap<String,DataSource> datasources=new ConcurrentHashMap<>();
// Scope -> db
private static ConcurrentHashMap<String,DatabaseConnectionDescriptor> databases=new ConcurrentHashMap<>();
/**
* Manages db connection pools by scope
*
*
*/
private ISQueryDescriptor queryDescriptor;
private DatabaseConnectionDescriptor dbDescriptor=null;
public DataBaseManagerImpl(ISQueryDescriptor query) {
queryDescriptor=query;
}
private synchronized DatabaseConnectionDescriptor getDB() throws InternalException {
if(dbDescriptor==null) {
dbDescriptor=ISUtils.queryForDatabase(queryDescriptor);
}
return dbDescriptor;
if(!databases.containsKey(ScopeUtils.getCurrentScope()))
databases.put(ScopeUtils.getCurrentScope(), ISUtils.queryForDatabase(new ISQueryDescriptor(
LocalConfiguration.getProperty(LocalConfiguration.MAPPING_DB_ENDPOINT_NAME), null,
LocalConfiguration.getProperty(LocalConfiguration.MAPPING_DB_ENDPOINT_CATEGORY))));
return databases.get(ScopeUtils.getCurrentScope());
}
@Override

View File

@ -0,0 +1,68 @@
package org.gcube.application.perform.service.engine.impl;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import org.gcube.application.perform.service.engine.model.CSVExportRequest;
import org.gcube.application.perform.service.engine.model.DBField;
public class ExportCSVQuery extends Query {
private Map<String,Map<String,String>> mappings=new HashMap<>();
private String fieldList;
private String tablename;
public ExportCSVQuery(String query, DBField[] fields) {
super(query, fields);
// TODO Auto-generated constructor stub
}
public void setMapping(String field, Map<String,String> mapping) {
mappings.put(field, mapping);
}
public void setSelectionFilters(CSVExportRequest req) {
throw new RuntimeException("Impememt this");
}
private String getConditionString() {
throw new RuntimeException("Impememt this");
}
public void setTablename(String tablename) {
this.tablename = tablename;
}
public void setFieldList(Collection<DBField> fields) {
StringBuilder b=new StringBuilder();
for(DBField f:fields)
b.append(f.getFieldName()+",");
fieldList=b.toString().substring(0,b.lastIndexOf(",")-1);
}
@Override
public String getQuery() {
StringBuilder q=new StringBuilder("SELECT ");
String selectedFields=fieldList;
for(Entry<String,Map<String,String>> mapping:mappings.entrySet()) {
StringBuilder caseBuilder=new StringBuilder("CASE "+mapping.getKey());
for(Entry<String,String> condition: mapping.getValue().entrySet())
caseBuilder.append(String.format("WHEN '%1$s' THEN '%2$s'", condition.getKey(),condition.getValue()));
caseBuilder.append("END AS "+mapping.getKey()+",");
selectedFields.replaceAll(mapping.getKey(), caseBuilder.toString());
}
return String.format("SELECT %1$s FROM %2$s WHERE %3%s",
selectedFields, tablename, getConditionString());
}
}

View File

@ -1,68 +1,173 @@
package org.gcube.application.perform.service.engine.impl;
import java.io.FileReader;
import java.io.IOException;
import java.io.Reader;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Types;
import java.util.ArrayList;
import java.util.List;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Map.Entry;
import javax.servlet.ServletContext;
import java.util.Set;
import org.apache.commons.csv.CSVFormat;
import org.apache.commons.csv.CSVParser;
import org.apache.commons.csv.CSVRecord;
import org.gcube.application.perform.service.engine.DataBaseManager;
import org.gcube.application.perform.service.engine.dm.DMUtils;
import org.gcube.application.perform.service.engine.model.CSVExportRequest;
import org.gcube.application.perform.service.engine.model.DBField;
import org.gcube.application.perform.service.engine.model.DBQueryDescriptor;
import org.gcube.application.perform.service.engine.model.InternalException;
import org.gcube.application.perform.service.engine.model.InvalidRequestException;
import org.gcube.application.perform.service.engine.model.importer.AnalysisType;
import org.gcube.application.perform.service.engine.model.importer.ImportRoutineDescriptor;
import org.gcube.application.perform.service.engine.model.importer.ImportedTable;
import org.gcube.data.analysis.dataminermanagercl.shared.data.computations.ComputationId;
import org.gcube.smartgears.ContextProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ImportedTableManager {
private static final Logger log= LoggerFactory.getLogger(ImportedTableManager.class);
private static List<ImportedTable> tables;
public void loadImportedData(ImportRoutineDescriptor desc) {
// private static HashMap<String,ImportedTable> tables;
private static Map<AnalysisType,Set<ImportedTable>> analysisConfiguration=new HashMap<>();
public static void importSchema(SchemaDefinition schema,String csvBasePath) throws IOException, SQLException, InternalException {
log.info("Loading schema {} ",schema);
String actualCSVPath=csvBasePath+"/"+schema.getCsvPath();
log.debug("CSV path : {} ",actualCSVPath);
ArrayList<DBField> csvFieldsDefinition=getCSVFieldsDefinition(actualCSVPath);
AnalysisType analysisType=schema.getRelatedAnalysis();
String tablename=(analysisType.getId()+schema.getRelatedDescription()).toLowerCase().replaceAll(" ", "_");
ImportedTable table=new ImportedTable(
tablename, new DBField(Types.BIGINT, schema.getRoutineIdFieldName()),
schema.getFarmUUIDField(),
schema.getAssociationUUIDField(),
schema.getCompanyUUIDField(),
schema.getBatchUUIDField(),
csvFieldsDefinition);
table.create();
if(!analysisConfiguration.containsKey(analysisType))
analysisConfiguration.put(schema.getRelatedAnalysis(), new HashSet<>());
analysisConfiguration.get(schema.getRelatedAnalysis()).add(table);
}
private static final long parse(String path, String description, ImportRoutineDescriptor routine) {
public Set<ImportedTable> getAnalysisSet(CSVExportRequest request){
throw new RuntimeException("Implement ME");
}
public void loadImportedData(ImportRoutineDescriptor desc) throws IOException, SQLException, InternalException {
log.info("Importing output for {} ",desc);
ComputationId computation=DMUtils.getComputation(desc);
Map<String,String> outputs=DMUtils.getOutputFiles(computation);
Connection conn=DataBaseManager.get().getConnection();
try {
for(Entry<String,String> entry:outputs.entrySet()) {
parse(entry.getValue(),entry.getKey(),desc,conn);
}
log.info("IMPORTED ALL FILES for {} ",desc);
conn.commit();
}finally {
conn.close();
}
}
private static final long parse(String path, String description, ImportRoutineDescriptor routine, Connection conn) throws IOException, SQLException, InvalidRequestException {
Reader in = new FileReader(path);
CSVParser parser= CSVFormat.DEFAULT.withFirstRecordAsHeader().parse(in);
AnalysisType type=new AnalysisType(routine);
try {
log.debug("Parsing file {} : {} ",description,path);
// Extract CSV Schema
ArrayList<String> csvSchema=new ArrayList<String>();
for(Entry<String,Integer> entry : parser.getHeaderMap().entrySet()) {
csvSchema.add(entry.getValue(), entry.getKey());
}
log.debug("CSV Schema is {} ",csvSchema);
long counter=0l;
//Get the right table
for(ImportedTable table:tables) {
if(table.matchesSchema(csvSchema)) {
log.debug("Mathing table is {} ",table.getTableName());
Query query=table.getQuery();
PreparedStatement psInsert=query.prepare(conn);
log.debug("Reading csvLines");
for(CSVRecord record:parser) {
DBQueryDescriptor desc=table.getSetRow(record.toMap(), routine.getId(), routine.getFarmId());
query.fill(psInsert, desc);
counter+=psInsert.executeUpdate();
}
log.debug("Inserted {} lines into {} for routine {} [FARM ID {}]",counter,table.getTableName(),routine.getId(),routine.getFarmId());
log.debug("Parsing file {} : {} ",description,path);
// Extract CSV Schema
ArrayList<String> csvSchema=new ArrayList<String>();
for(Entry<String,Integer> entry : parser.getHeaderMap().entrySet()) {
csvSchema.add(entry.getValue(), entry.getKey());
}
}
log.debug("CSV Schema is {} ",csvSchema);
long counter=0l;
//Get the right table
for(ImportedTable table:analysisConfiguration.get(type)) {
if(table.matchesSchema(csvSchema)) {
log.debug("Mathing table is {} ",table.getTableName());
Query query=table.getInsertQuery();
PreparedStatement psInsert=query.prepare(conn);
log.debug("Reading csvLines");
for(CSVRecord record:parser) {
DBQueryDescriptor desc=table.getSetRow(record.toMap(), routine.getId());
query.fill(psInsert, desc);
counter+=psInsert.executeUpdate();
}
log.debug("Inserted {} lines into {} for routine {} [FARM ID {}]",counter,table.getTableName(),routine.getId(),routine.getFarmId());
}
}
return counter;
}finally {
parser.close();
in.close();
}
}
// ************************** SCHEMA PARSING
private static final String FLOAT_REGEX="\\d*\\.\\d*";
private static final String INTEGER_REGEX="\\d*";
private static ArrayList<DBField> getCSVFieldsDefinition(String csvFile) throws IOException{
Reader in = null;
CSVParser parser= null;
try {
in=new FileReader(csvFile);
parser=CSVFormat.DEFAULT.withFirstRecordAsHeader().parse(in);
Map<String,Integer> headers=parser.getHeaderMap();
ArrayList<DBField> toReturn = new ArrayList<>(headers.size());
CSVRecord record=parser.getRecords().get(0);
for(Entry<String,Integer> header:headers.entrySet()) {
String value=record.get(header.getKey());
int type=Types.VARCHAR;
if(value.matches(FLOAT_REGEX)) type=Types.REAL;
else if(value.matches(INTEGER_REGEX)) type=Types.BIGINT;
toReturn.add(new DBField(type, header.getKey()));
}
return toReturn;
}finally{
if(in!=null) in.close();
if(parser!=null) parser.close();
}
}
}

View File

@ -45,15 +45,15 @@ public class ImporterImpl implements Importer {
private static synchronized ISQueryDescriptor getISQueryDescriptor() {
if(isQueryDescriptor==null) {
isQueryDescriptor=
new ISQueryDescriptor(
LocalConfiguration.getProperty(LocalConfiguration.MAPPING_DB_ENDPOINT_NAME), null,
LocalConfiguration.getProperty(LocalConfiguration.MAPPING_DB_ENDPOINT_CATEGORY));
}
return isQueryDescriptor;
}
// private static synchronized ISQueryDescriptor getISQueryDescriptor() {
// if(isQueryDescriptor==null) {
// isQueryDescriptor=
// new ISQueryDescriptor(
// LocalConfiguration.getProperty(LocalConfiguration.MAPPING_DB_ENDPOINT_NAME), null,
// LocalConfiguration.getProperty(LocalConfiguration.MAPPING_DB_ENDPOINT_CATEGORY));
// }
// return isQueryDescriptor;
// }
private static final String getHostname() {
@ -73,7 +73,7 @@ public class ImporterImpl implements Importer {
public void init() throws InternalException{
try {
log.info("Initializing IMPORTER");
DataBaseManager db=DataBaseManager.get(getISQueryDescriptor());
DataBaseManager db=DataBaseManager.get();
Connection conn=db.getConnection();
try {
@ -133,7 +133,7 @@ public class ImporterImpl implements Importer {
private void monitor(ImportRoutineDescriptor desc) throws DMException {
log.debug("Monitoring {} ",desc);
DMUtils.monitor(DMUtils.getComputation(desc), new ImporterMonitor(desc,getISQueryDescriptor()));
DMUtils.monitor(DMUtils.getComputation(desc), new ImporterMonitor(desc));
}
@ -177,7 +177,7 @@ public class ImporterImpl implements Importer {
add(ImportRoutine.fields.get(ImportRoutine.STATUS),ImportStatus.ACCEPTED.toString());
DataBaseManager db=DataBaseManager.get(getISQueryDescriptor());
DataBaseManager db=DataBaseManager.get();
Connection conn=db.getConnection();
conn.setAutoCommit(true);
try {
@ -199,7 +199,7 @@ public class ImporterImpl implements Importer {
}
private ImportRoutineDescriptor getDescriptorById(Long id) throws SQLException, InternalException {
DataBaseManager db=DataBaseManager.get(getISQueryDescriptor());
DataBaseManager db=DataBaseManager.get();
Connection conn=db.getConnection();
try {
PreparedStatement ps=Queries.GET_IMPORT_ROUTINE_BY_ID.get(conn,
@ -216,7 +216,7 @@ public class ImporterImpl implements Importer {
@Override
public List<ImportRoutineDescriptor> getDescriptors(DBQueryDescriptor desc) throws SQLException, InternalException {
DataBaseManager db=DataBaseManager.get(getISQueryDescriptor());
DataBaseManager db=DataBaseManager.get();
Connection conn=db.getConnection();
try {
PreparedStatement ps=Queries.FILTER_IMPORTS.get(conn, desc);

View File

@ -24,24 +24,10 @@ public class MappingManagerImpl implements MappingManager {
private static final Logger log= LoggerFactory.getLogger(MappingManagerImpl.class);
private static ISQueryDescriptor isQueryDescriptor=null;
private static synchronized ISQueryDescriptor getISQueryDescriptor() {
if(isQueryDescriptor==null) {
isQueryDescriptor=
new ISQueryDescriptor(
LocalConfiguration.getProperty(LocalConfiguration.MAPPING_DB_ENDPOINT_NAME), null,
LocalConfiguration.getProperty(LocalConfiguration.MAPPING_DB_ENDPOINT_CATEGORY));
}
return isQueryDescriptor;
}
@Override
public Batch getBatch(DBQueryDescriptor desc) throws SQLException, InternalException{
DataBaseManager db=DataBaseManager.get(getISQueryDescriptor());
DataBaseManager db=DataBaseManager.get();
Connection conn=db.getConnection();
try{
conn.setAutoCommit(true);
@ -73,7 +59,7 @@ public class MappingManagerImpl implements MappingManager {
@Override
public Farm getFarm(DBQueryDescriptor desc) throws SQLException, InternalException{
DataBaseManager db=DataBaseManager.get(getISQueryDescriptor());
DataBaseManager db=DataBaseManager.get();
Connection conn=db.getConnection();

View File

@ -17,6 +17,14 @@ public class PerformanceManagerImpl implements PerformanceManager{
@Override
public String generateCSV(CSVExportRequest request) {
throw new RuntimeException("Not yet implemented");
// request.type -> set of tables
// for each table -> get CSV (farmid, selection)
}
@Override

View File

@ -31,6 +31,10 @@ public class Queries {
public static final Query GET_BATCH_BY_ID=new Query("Select * from batches where id = ?",
new DBField[] {DBField.Batch.fields.get(DBField.Batch.BATCH_ID)});
public static final Query GET_BATCH_BY_FARM_ID=new Query("Select * from batches where farmid = ?",
new DBField[] {DBField.Batch.fields.get(DBField.Batch.FARM_ID)});
// Imports with lock = hostname or lock == null
public static final Query ORPHAN_IMPORTS=new Query("SELECT * from "+ImportRoutine.TABLE+" where "+ImportRoutine.LOCK+" = ? OR "+ImportRoutine.LOCK+" IS NULL ",

View File

@ -17,8 +17,8 @@ public class Query {
private static final Logger log= LoggerFactory.getLogger(Query.class);
private final String query;
private final ArrayList<DBField> psFields;
protected final String query;
protected final ArrayList<DBField> psFields;
public Query(String query,DBField[] fields) {
this.query=query;

View File

@ -0,0 +1,70 @@
package org.gcube.application.perform.service.engine.impl;
import java.util.Properties;
import org.gcube.application.perform.service.engine.model.importer.AnalysisType;
public class SchemaDefinition {
private static final String DESCRIPTION="description";
private static final String FARM="farm";
private static final String ASSOCIATION="association";
private static final String BATCH="batch";
private static final String COMPANY="company";
private static final String ROUTINE_ID="routine";
private static final String CSV="csv";
public String getRelatedDescription() {
return relatedDescription;
}
public AnalysisType getRelatedAnalysis() {
return relatedAnalysis;
}
public String getCsvPath() {
return csvPath;
}
public String getFarmUUIDField() {
return farmUUIDField;
}
public String getAssociationUUIDField() {
return associationUUIDField;
}
public String getBatchUUIDField() {
return batchUUIDField;
}
public String getCompanyUUIDField() {
return companyUUIDField;
}
public void setCsvPath(String csvPath) {
this.csvPath = csvPath;
}
public SchemaDefinition(AnalysisType relatedAnalysis, Properties props) {
super();
this.relatedDescription = props.getProperty(DESCRIPTION);
this.relatedAnalysis = relatedAnalysis;
this.csvPath = props.getProperty(CSV);
this.farmUUIDField = props.getProperty(FARM);
this.associationUUIDField = props.getProperty(ASSOCIATION);
this.batchUUIDField = props.getProperty(BATCH);
this.companyUUIDField = props.getProperty(COMPANY);
this.routineIdFieldName=props.getProperty(ROUTINE_ID);
}
private String relatedDescription;
private AnalysisType relatedAnalysis;
private String csvPath;
private String farmUUIDField;
private String associationUUIDField;
private String batchUUIDField;
private String companyUUIDField;
private String routineIdFieldName;
public String getRoutineIdFieldName() {
return routineIdFieldName;
}
}

View File

@ -1,5 +1,25 @@
package org.gcube.application.perform.service.engine.model;
import java.util.Set;
import org.gcube.application.perform.service.engine.model.importer.AnalysisType;
public class CSVExportRequest {
private AnalysisType type;
private Set<Long> farmIds;
public AnalysisType getType() {
return type;
}
public void setType(AnalysisType type) {
this.type = type;
}
public Set<Long> getFarmIds() {
return farmIds;
}
public void setFarmIds(Set<Long> farmIds) {
this.farmIds = farmIds;
}
}

View File

@ -113,6 +113,11 @@ public class DBField {
public static final String COMPANY_UUID="companyuuid";
public static final String ASSOCIATION_UUID="associationuuid";
public static final String FARM_LABEL="1234";
public static final String ASSOCIATION_LABEL="1234";
public static final String COMPANY_LABEL="1234";
static {
fields.put(FARM_ID, new DBField(Types.BIGINT,FARM_ID));
fields.put(COMPANY_ID, new DBField(Types.BIGINT,COMPANY_ID));

View File

@ -23,6 +23,11 @@ public class DBQueryDescriptor {
this.condition = condition;
}
public DBQueryDescriptor(DBField field, Object value) {
this();
add(field,value);
}
public String toString() {

View File

@ -0,0 +1,57 @@
package org.gcube.application.perform.service.engine.model.importer;
public class AnalysisType {
private String name;
private String id;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public AnalysisType(String name, String id) {
super();
this.name = name;
this.id = id;
}
public AnalysisType(ImportRoutineDescriptor desc) {
this(desc.getBatch_type(),desc.getBatch_type());
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + ((id == null) ? 0 : id.hashCode());
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
AnalysisType other = (AnalysisType) obj;
if (id == null) {
if (other.id != null)
return false;
} else if (!id.equals(other.id))
return false;
return true;
}
}

View File

@ -1,34 +1,138 @@
package org.gcube.application.perform.service.engine.model.importer;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Types;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import org.apache.commons.csv.CSVFormat;
import org.apache.commons.csv.CSVPrinter;
import org.gcube.application.perform.service.engine.DataBaseManager;
import org.gcube.application.perform.service.engine.impl.ExportCSVQuery;
import org.gcube.application.perform.service.engine.impl.ImporterImpl;
import org.gcube.application.perform.service.engine.impl.Queries;
import org.gcube.application.perform.service.engine.impl.Query;
import org.gcube.application.perform.service.engine.model.CSVExportRequest;
import org.gcube.application.perform.service.engine.model.DBField;
import org.gcube.application.perform.service.engine.model.DBQueryDescriptor;
import org.gcube.application.perform.service.engine.model.InternalException;
import org.gcube.application.perform.service.engine.model.InvalidRequestException;
import org.gcube.application.perform.service.engine.model.DBField.ImportRoutine;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ImportedTable {
private static final Logger log= LoggerFactory.getLogger(ImporterImpl.class);
/**
* CSV FILE labels -> DBField
*/
private Map<String,DBField> mappings;
private Map<String,DBField> labels;
private ArrayList<String> csvFields;
private ArrayList<String> csvFields; // Fields actually expected in csv
private String tablename;
private DBField routineIdField;
private Query query;
private String farmUUIDField;
private String associationUUIDField;
private String companyUUIDField;
private String batchUUIDField;
private Query insertQuery;
public ImportedTable(String tablename, DBField routineIdField, String farmUUIDField, String associationUUIDField,
String companyUUIDField, String batchUUIDField, ArrayList<DBField> csvFieldsDefinition) {
super();
this.tablename = tablename;
this.routineIdField = routineIdField;
this.farmUUIDField = farmUUIDField;
this.associationUUIDField = associationUUIDField;
this.companyUUIDField = companyUUIDField;
this.batchUUIDField = batchUUIDField;
// init
csvFields=new ArrayList<>();
labels=new HashMap<>();
for(DBField field:csvFieldsDefinition) {
String escaped=escapeString(field.getFieldName());
csvFields.add(escaped);
labels.put(field.getFieldName(), new DBField(field.getType(),escaped));
}
insertQuery=prepareInsertionQuery();
}
private Query prepareInsertionQuery() {
StringBuilder fieldList=new StringBuilder();
StringBuilder valueString=new StringBuilder();
ArrayList<DBField> queryFields=new ArrayList<>();
for(DBField f:labels.values()) {
queryFields.add(f);
fieldList.append(f.getFieldName()+",");
valueString.append("?,");
}
queryFields.add(routineIdField);
String insertSQL= String.format("INSERT INTO %1$s (%2$s) VALUES (%3$s)", tablename,
fieldList+routineIdField.getFieldName(),valueString+"?");
return new Query(insertSQL, queryFields.toArray(new DBField[queryFields.size()]));
}
public void create() throws SQLException, InternalException {
StringBuilder fieldDefinitions=new StringBuilder();
for(DBField f:labels.values()) {
String type="text";
switch(f.getType()) {
case Types.BIGINT : type="bigint";
break;
case Types.REAL : type="real";
break;
}
fieldDefinitions.append(f.getFieldName()+" "+type+",");
}
String standardDefinitions=
String.format( "%1$s bigint,"
+ "FOREIGN KEY (%1$s) REFERENCES "+ImportRoutine.TABLE+"("+ImportRoutine.ID+")",routineIdField.getFieldName());
String stmt=String.format("CREATE TABLE IF NOT EXISTS %1$s (%2$s, %3$s)",
tablename,fieldDefinitions.substring(0,fieldDefinitions.lastIndexOf(",")),standardDefinitions);
Connection conn=DataBaseManager.get().getConnection();
try {
conn.createStatement().execute(stmt);
}finally {
conn.close();
}
}
/**
@ -46,16 +150,16 @@ public class ImportedTable {
DBQueryDescriptor desc=new DBQueryDescriptor();
for(Entry<String,String> csvField:csvRow.entrySet()) {
DBField toSetField=mappings.get(csvField.getKey());
DBField toSetField=labels.get(csvField.getKey());
Object value=csvField.getValue();
switch(toSetField.getType()) {
case Types.BIGINT : value=Long.parseLong((String) value);
break;
case Types.REAL : value=Double.parseDouble((String) value);
break;
}
if(value!=null)
switch(toSetField.getType()) {
case Types.BIGINT : value=Long.parseLong((String) value);
break;
case Types.REAL : value=Double.parseDouble((String) value);
break;
}
desc.add(toSetField, value);
}
@ -63,36 +167,97 @@ public class ImportedTable {
return desc;
}
public Query getQuery() {
return query;
public Query getInsertQuery() {
return insertQuery;
}
public String getTableName() {
return tablename;
}
public File exportCSV(CSVExportRequest request) {
// USE CASE to replace values in
/**
* SELECT
CASE status
WHEN 'VS' THEN 'validated by subsidiary'
WHEN 'NA' THEN 'not acceptable'
WHEN 'D' THEN 'delisted'
ELSE 'validated'
END AS STATUS
FROM SUPP_STATUS
*/
private ExportCSVQuery getExportQuery() {
ExportCSVQuery query=new ExportCSVQuery("", null);
query.setFieldList(labels.values());
query.setTablename(tablename);
return query;
}
// get deanonimized labels
// Create query
// Pass the result set to the CSV specifying labels
// final Appendable out = ...;
// final CSVPrinter printer = CSVFormat.DEFAULT.withHeader("H1", "H2").print(out)
// return file
throw new RuntimeException("Not implemented");
public File exportCSV(CSVExportRequest request) throws InvalidRequestException, SQLException, InternalException, IOException {
log.debug("Exporting {} from {} ",request, this);
Connection conn= DataBaseManager.get().getConnection();
FileWriter writer=null;
CSVPrinter printer=null;
try {
ExportCSVQuery exportQuery=getExportQuery();
Map<String,String> farmMapping=new HashMap<>();
Map<String,String> companyMapping=new HashMap<>();
Map<String,String> associationMapping=new HashMap<>();
Map<String,String> batchMapping=new HashMap<>();
PreparedStatement psFarm=Queries.GET_FARM_BY_ID.prepare(conn);
PreparedStatement psBatch=Queries.GET_BATCH_BY_FARM_ID.prepare(conn);
// GET Labels by Farm Id
for(Long farmid:request.getFarmIds()) {
ResultSet rsFarm=Queries.GET_FARM_BY_ID.fill(psFarm, new DBQueryDescriptor(DBField.Farm.fields.get(DBField.Farm.FARM_ID),farmid)).executeQuery();
if(! rsFarm.next())
log.warn("Unable to Find farmID "+farmid);
else {
farmMapping.put(rsFarm.getString(DBField.Farm.UUID), rsFarm.getString(DBField.Farm.FARM_LABEL));
companyMapping.put(rsFarm.getString(DBField.Farm.COMPANY_UUID),rsFarm.getString(DBField.Farm.COMPANY_LABEL));
associationMapping.put(rsFarm.getString(DBField.Farm.ASSOCIATION_UUID), rsFarm.getString(DBField.Farm.ASSOCIATION_LABEL));
ResultSet rsBatch=Queries.GET_BATCH_BY_FARM_ID.fill(psBatch, new DBQueryDescriptor(DBField.Batch.fields.get(DBField.Batch.FARM_ID),farmid)).executeQuery();
while(rsBatch.next())
batchMapping.put(rsBatch.getString(DBField.Batch.UUID), rsBatch.getString(DBField.Batch.BATCH_NAME));
}
}
exportQuery.setMapping(associationUUIDField, associationMapping);
exportQuery.setMapping(companyUUIDField, companyMapping);
exportQuery.setMapping(farmUUIDField, farmMapping);
exportQuery.setMapping(batchUUIDField, batchMapping);
exportQuery.setSelectionFilters(request);
log.trace("Performing actual query towards {} ",tablename);
String sqlExport=exportQuery.getQuery();
log.debug("Query is {} ",sqlExport);
ResultSet csvRs=conn.createStatement().executeQuery(sqlExport);
File toReturn=File.createTempFile("csv_out", ".csv");
writer=new FileWriter(toReturn);
printer = CSVFormat.DEFAULT.withHeader().print(writer);
printer.printRecords(csvRs);
return toReturn;
}finally {
conn.close();
if(writer!=null) {
writer.flush();
writer.close();
}
if(printer!=null) {
printer.flush();
printer.close();
}
}
}
private String escapeString(String fieldname) {
return fieldname;
}
}

View File

@ -0,0 +1,7 @@
description=BatchesTable
farm=farm_id
company=company_id
association=producer_association_id
batch=aggregated_batch_id
routine=internal_routine_id
csv=csv/Grow_out_Aggregated_Batch_Data_Entry_KPI.csv

View File

@ -12,7 +12,6 @@ import org.gcube.application.perform.service.engine.model.DBField.Batch;
import org.gcube.application.perform.service.engine.model.DBField.Farm;
import org.gcube.application.perform.service.engine.model.DBField.ImportRoutine;
import org.gcube.application.perform.service.engine.model.DBField.ImportedData;
import org.gcube.application.perform.service.engine.model.ISQueryDescriptor;
import org.gcube.application.perform.service.engine.model.InternalException;
public class InitializeDataBase {
@ -20,13 +19,9 @@ public class InitializeDataBase {
public static void main(String[] args) throws SQLException, InternalException, IOException {
TokenSetter.set("/gcube/preprod/preVRE");
LocalConfiguration.init(Paths.get("src/main/webapp/WEB-INF/config.properties").toUri().toURL());
ISQueryDescriptor desc= new ISQueryDescriptor(
LocalConfiguration.getProperty(LocalConfiguration.MAPPING_DB_ENDPOINT_NAME), null,
LocalConfiguration.getProperty(LocalConfiguration.MAPPING_DB_ENDPOINT_CATEGORY));
DataBaseManager db=DataBaseManager.get(desc);
DataBaseManager db=DataBaseManager.get();
Connection conn=db.getConnection();
Statement stmt=conn.createStatement();
@ -82,25 +77,25 @@ public class InitializeDataBase {
// CREATE IMPORTED TABLES
//AnagraphicGrow
stmt.executeUpdate(new TableCreator("/home/fabio/Documents/work files/Perform/Grow_out_Aggregated_Batch_Data_Entry_KPI_aggregated.csv",
ImportedData.AnagraphicGrow.TABLE,ImportedData.AnagraphicGrow.FARM_ID,ImportedData.AnagraphicGrow.ROUTINE_ID).getCreateStatement());
stmt.executeUpdate(new TableCreator("/home/fabio/Documents/work files/Perform/Grow_out_Aggregated_Batch_Data_Entry_KPI.csv",
ImportedData.BatchesTable.TABLE,ImportedData.BatchesTable.FARM_ID,ImportedData.BatchesTable.ROUTINE_ID).getCreateStatement());
stmt.executeUpdate(new TableCreator("/home/fabio/Documents/work files/Perform/KPIs_annual (2).csv",
ImportedData.AnnualTable.TABLE,ImportedData.AnnualTable.FARM_ID,ImportedData.AnnualTable.ROUTINE_ID).getCreateStatement());
stmt.executeUpdate(new TableCreator("/home/fabio/Documents/work files/Perform/KPIs_antibiotics.csv",
ImportedData.AntibioticsTable.TABLE,ImportedData.AntibioticsTable.FARM_ID,ImportedData.AntibioticsTable.ROUTINE_ID).getCreateStatement());
stmt.executeUpdate(new TableCreator("/home/fabio/Documents/work files/Perform/KPIs_antiparasitic.csv",
ImportedData.AntiparasiticTable.TABLE,ImportedData.AntiparasiticTable.FARM_ID,ImportedData.AntiparasiticTable.ROUTINE_ID).getCreateStatement());
stmt.executeUpdate(new TableCreator("/home/fabio/Documents/work files/Perform/KPIs_lethalincidents.csv",
ImportedData.LethalIncidentsTable.TABLE,ImportedData.LethalIncidentsTable.FARM_ID,ImportedData.LethalIncidentsTable.ROUTINE_ID).getCreateStatement());
// //AnagraphicGrow
// stmt.executeUpdate(new TableCreator("/home/fabio/Documents/work files/Perform/Grow_out_Aggregated_Batch_Data_Entry_KPI_aggregated.csv",
// ImportedData.AnagraphicGrow.TABLE,ImportedData.AnagraphicGrow.FARM_ID,ImportedData.AnagraphicGrow.ROUTINE_ID).getCreateStatement());
//
// stmt.executeUpdate(new TableCreator("/home/fabio/Documents/work files/Perform/Grow_out_Aggregated_Batch_Data_Entry_KPI.csv",
// ImportedData.BatchesTable.TABLE,ImportedData.BatchesTable.FARM_ID,ImportedData.BatchesTable.ROUTINE_ID).getCreateStatement());
//
// stmt.executeUpdate(new TableCreator("/home/fabio/Documents/work files/Perform/KPIs_annual (2).csv",
// ImportedData.AnnualTable.TABLE,ImportedData.AnnualTable.FARM_ID,ImportedData.AnnualTable.ROUTINE_ID).getCreateStatement());
//
// stmt.executeUpdate(new TableCreator("/home/fabio/Documents/work files/Perform/KPIs_antibiotics.csv",
// ImportedData.AntibioticsTable.TABLE,ImportedData.AntibioticsTable.FARM_ID,ImportedData.AntibioticsTable.ROUTINE_ID).getCreateStatement());
//
// stmt.executeUpdate(new TableCreator("/home/fabio/Documents/work files/Perform/KPIs_antiparasitic.csv",
// ImportedData.AntiparasiticTable.TABLE,ImportedData.AntiparasiticTable.FARM_ID,ImportedData.AntiparasiticTable.ROUTINE_ID).getCreateStatement());
//
// stmt.executeUpdate(new TableCreator("/home/fabio/Documents/work files/Perform/KPIs_lethalincidents.csv",
// ImportedData.LethalIncidentsTable.TABLE,ImportedData.LethalIncidentsTable.FARM_ID,ImportedData.LethalIncidentsTable.ROUTINE_ID).getCreateStatement());
//
// CREATE ANAGRAPHIC OUTPUT

View File

@ -0,0 +1,20 @@
package org.gcube.application.perform.service;
import java.io.IOException;
import java.net.MalformedURLException;
import java.nio.file.Paths;
import java.sql.SQLException;
import org.gcube.application.perform.service.engine.model.InternalException;
public class LoadSchemaTest {
public static void main(String[] args) throws MalformedURLException, IOException, SQLException, InternalException {
TokenSetter.set("/gcube/devsec");
LocalConfiguration.init(Paths.get("src/main/webapp/WEB-INF/config.properties").toUri().toURL());
PerformServiceLifecycleManager.initSchema("src/main/webapp/WEB-INF");
}
}

View File

@ -15,8 +15,7 @@ import org.gcube.application.perform.service.engine.model.DBField.ImportRoutine;
public class TableCreator {
private static final String FLOAT_REGEX="\\d*\\.\\d*";
private static final String INTEGER_REGEX="\\d*";
@ -25,79 +24,77 @@ public class TableCreator {
private String farmIdField;
private String routineIdField;
public static void main(String[] args) throws IOException {
TableCreator creator=new TableCreator(
"/home/fabio/Documents/work files/Perform/Grow_out_Aggregated_Batch_Data_Entry_KPI_aggregated.csv",
"dummy", "farmid", "routineid");
System.out.println(creator.getCreateStatement());
int maxFieldLength=0;
for(String path:Paths.get("/home/fabio/Documents/work files/Perform/").toFile().list()) {
int current=getMaxFieldLength("/home/fabio/Documents/work files/Perform/"+path);
maxFieldLength=current>maxFieldLength?current:maxFieldLength;
}
System.out.println("Max field Length is "+maxFieldLength);
}
public TableCreator(String path, String tableName, String farmIdField, String routineIdField) {
super();
this.path = path;
this.tableName = tableName;
this.farmIdField = farmIdField;
this.routineIdField = routineIdField;
}
public String getCreateStatement() throws IOException {
String fieldDefinitions=getFieldDefinitions(path);
String standardDefinitions=
String.format( "%1$s bigint, %2$s bigint,"+
"FOREIGN KEY (%1$s) REFERENCES farms(farmid),"
+ "FOREIGN KEY (%2$s) REFERENCES "+ImportRoutine.TABLE+"("+ImportRoutine.ID+")",farmIdField,routineIdField);
return String.format("CREATE TABLE IF NOT EXISTS %1$s (%2$s, %3$s)",
tableName,fieldDefinitions,standardDefinitions);
}
private static final String getFieldDefinitions(String path) throws IOException {
Reader in = new FileReader(path);
CSVParser parser= CSVFormat.DEFAULT.withFirstRecordAsHeader().parse(in);
Map<String,Integer> headers=parser.getHeaderMap();
CSVRecord record=parser.getRecords().get(0);
StringBuilder toReturn=new StringBuilder();
for(Entry<String,Integer> header:headers.entrySet()) {
String value=record.get(header.getKey());
String type=null;
if(value.matches(FLOAT_REGEX)) type="real";
else if(value.matches(INTEGER_REGEX)) type="bigint";
else type="text";
String fieldName=header.getKey().toLowerCase().replaceAll(" ", "_");
toReturn.append(String.format("%1$s %2$s,", fieldName, type));
}
toReturn.deleteCharAt(toReturn.lastIndexOf(","));
return toReturn.toString();
}
private static final int getMaxFieldLength(String path) throws IOException {
Reader in = new FileReader(path);
CSVParser parser= CSVFormat.DEFAULT.withFirstRecordAsHeader().parse(in);
Map<String,Integer> headers=parser.getHeaderMap();
int toReturn=0;
for(String field:headers.keySet()) {
toReturn=field.length()>toReturn?field.length():toReturn;
}
return toReturn;
}
// public static void main(String[] args) throws IOException {
// TableCreator creator=new TableCreator(
// "/home/fabio/Documents/work files/Perform/Grow_out_Aggregated_Batch_Data_Entry_KPI_aggregated.csv",
// "dummy", "farmid", "routineid");
// System.out.println(creator.getCreateStatement());
//
// int maxFieldLength=0;
// for(String path:Paths.get("/home/fabio/Documents/work files/Perform/").toFile().list()) {
// int current=getMaxFieldLength("/home/fabio/Documents/work files/Perform/"+path);
// maxFieldLength=current>maxFieldLength?current:maxFieldLength;
// }
// System.out.println("Max field Length is "+maxFieldLength);
// }
//
// public TableCreator(String path, String tableName, String routineIdField) {
// super();
// this.path = path;
// this.tableName = tableName;
// this.routineIdField = routineIdField;
// }
//
//
// public String getCreateStatement() throws IOException {
//
// String fieldDefinitions=getFieldDefinitions(path);
//
// String standardDefinitions=
// String.format( "%1$s bigint,"
// + "FOREIGN KEY (%1$s) REFERENCES "+ImportRoutine.TABLE+"("+ImportRoutine.ID+")",routineIdField);
//
// return String.format("CREATE TABLE IF NOT EXISTS %1$s (%2$s, %3$s)",
// tableName,fieldDefinitions,standardDefinitions);
// }
//
//
// private static final String getFieldDefinitions(String path) throws IOException {
//
// Reader in = new FileReader(path);
// CSVParser parser= CSVFormat.DEFAULT.withFirstRecordAsHeader().parse(in);
// Map<String,Integer> headers=parser.getHeaderMap();
//
// CSVRecord record=parser.getRecords().get(0);
//
// StringBuilder toReturn=new StringBuilder();
//
// for(Entry<String,Integer> header:headers.entrySet()) {
// String value=record.get(header.getKey());
// String type=null;
// if(value.matches(FLOAT_REGEX)) type="real";
// else if(value.matches(INTEGER_REGEX)) type="bigint";
// else type="text";
//
// String fieldName=header.getKey().toLowerCase().replaceAll(" ", "_");
//
//
// toReturn.append(String.format("%1$s %2$s,", fieldName, type));
// }
// toReturn.deleteCharAt(toReturn.lastIndexOf(","));
//
// return toReturn.toString();
// }
//
//
// private static final int getMaxFieldLength(String path) throws IOException {
// Reader in = new FileReader(path);
// CSVParser parser= CSVFormat.DEFAULT.withFirstRecordAsHeader().parse(in);
// Map<String,Integer> headers=parser.getHeaderMap();
// int toReturn=0;
// for(String field:headers.keySet()) {
// toReturn=field.length()>toReturn?field.length():toReturn;
// }
// return toReturn;
// }
}