201 lines
6.8 KiB
Java
201 lines
6.8 KiB
Java
package org.gcube.application.perform.service.engine.impl;
|
|
|
|
import java.io.File;
|
|
import java.io.FileReader;
|
|
import java.io.IOException;
|
|
import java.io.Reader;
|
|
import java.sql.Connection;
|
|
import java.sql.PreparedStatement;
|
|
import java.sql.SQLException;
|
|
import java.sql.Types;
|
|
import java.util.ArrayList;
|
|
import java.util.HashMap;
|
|
import java.util.HashSet;
|
|
import java.util.Map;
|
|
import java.util.Map.Entry;
|
|
import java.util.Set;
|
|
|
|
import org.apache.commons.csv.CSVFormat;
|
|
import org.apache.commons.csv.CSVParser;
|
|
import org.apache.commons.csv.CSVRecord;
|
|
import org.gcube.application.perform.service.engine.DataBaseManager;
|
|
import org.gcube.application.perform.service.engine.PerformanceManager;
|
|
import org.gcube.application.perform.service.engine.dm.DMUtils;
|
|
import org.gcube.application.perform.service.engine.model.CSVExportRequest;
|
|
import org.gcube.application.perform.service.engine.model.DBField;
|
|
import org.gcube.application.perform.service.engine.model.DBQueryDescriptor;
|
|
import org.gcube.application.perform.service.engine.model.InternalException;
|
|
import org.gcube.application.perform.service.engine.model.InvalidRequestException;
|
|
import org.gcube.application.perform.service.engine.model.importer.AnalysisType;
|
|
import org.gcube.application.perform.service.engine.model.importer.ImportRoutineDescriptor;
|
|
import org.gcube.application.perform.service.engine.model.importer.ImportedTable;
|
|
import org.gcube.application.perform.service.engine.utils.StorageUtils;
|
|
import org.gcube.data.analysis.dataminermanagercl.shared.data.computations.ComputationId;
|
|
import org.slf4j.Logger;
|
|
import org.slf4j.LoggerFactory;
|
|
|
|
public class PerformanceManagerImpl implements PerformanceManager{
|
|
|
|
|
|
private static final Logger log= LoggerFactory.getLogger(PerformanceManagerImpl.class);
|
|
|
|
|
|
private static Map<AnalysisType,Set<ImportedTable>> analysisConfiguration=new HashMap<>();
|
|
|
|
public static Map<AnalysisType, Set<ImportedTable>> getAnalysisConfiguration() {
|
|
return analysisConfiguration;
|
|
}
|
|
|
|
@Override
|
|
public Map<String, String> generateCSV(CSVExportRequest request) throws SQLException, InvalidRequestException, InternalException, IOException {
|
|
log.trace("Serving {} ",request);
|
|
HashMap<String,String> toReturn=new HashMap<>();
|
|
Set<ImportedTable> tables=getAnalysisSet(request);
|
|
log.debug("Found {} tables in configuration",tables.size());
|
|
for(ImportedTable t:tables) {
|
|
SchemaDefinition schema=t.getSchema();
|
|
if(schema.getAnalysisEnabled()) {
|
|
log.debug("Exporting {} : {} ",schema.getRelatedDescription(),t.getTableName());
|
|
File csv=t.exportCSV(request);
|
|
String storageId=StorageUtils.putOntoStorage(csv);
|
|
|
|
toReturn.put(t.getSchema().getRelatedDescription(), storageId);
|
|
}
|
|
}
|
|
return toReturn;
|
|
}
|
|
|
|
@Override
|
|
public void loadOutputData(ImportRoutineDescriptor desc) throws SQLException, InvalidRequestException, InternalException, IOException{
|
|
log.info("Importing output for {} ",desc);
|
|
ComputationId computation=DMUtils.getComputation(desc);
|
|
Map<String,String> outputs=DMUtils.getOutputFiles(computation);
|
|
Connection conn=DataBaseManager.get().getConnection();
|
|
try {
|
|
for(Entry<String,String> entry:outputs.entrySet()) {
|
|
parse(entry.getValue(),entry.getKey(),desc,conn);
|
|
}
|
|
log.info("IMPORTED ALL FILES for {} ",desc);
|
|
conn.commit();
|
|
}finally {
|
|
conn.close();
|
|
}
|
|
}
|
|
|
|
|
|
|
|
public static void importSchema(SchemaDefinition schema,String csvBasePath) throws IOException, SQLException, InternalException {
|
|
log.info("Loading schema {} ",schema);
|
|
|
|
String actualCSVPath=csvBasePath+"/"+schema.getCsvPath();
|
|
|
|
log.debug("CSV path : {} ",actualCSVPath);
|
|
|
|
ArrayList<DBField> csvFieldsDefinition=getCSVFieldsDefinition(actualCSVPath);
|
|
|
|
AnalysisType analysisType=schema.getRelatedAnalysis();
|
|
|
|
String tablename=(analysisType.getId()+schema.getRelatedDescription()).toLowerCase().replaceAll(" ", "_");
|
|
|
|
|
|
|
|
ImportedTable table=new ImportedTable(
|
|
tablename, schema,
|
|
csvFieldsDefinition);
|
|
|
|
table.create();
|
|
|
|
|
|
if(!analysisConfiguration.containsKey(analysisType))
|
|
analysisConfiguration.put(schema.getRelatedAnalysis(), new HashSet<>());
|
|
analysisConfiguration.get(schema.getRelatedAnalysis()).add(table);
|
|
|
|
}
|
|
|
|
|
|
|
|
static Set<ImportedTable> getAnalysisSet(CSVExportRequest request) throws InvalidRequestException{
|
|
AnalysisType type=request.getType();
|
|
if(!analysisConfiguration.containsKey(type))
|
|
throw new InvalidRequestException("Analysis Configuration not found for "+type);
|
|
return analysisConfiguration.get(request.getType());
|
|
}
|
|
|
|
|
|
|
|
|
|
private static final long parse(String path, String description, ImportRoutineDescriptor routine, Connection conn) throws IOException, SQLException, InvalidRequestException {
|
|
Reader in = new FileReader(path);
|
|
CSVParser parser= CSVFormat.DEFAULT.withFirstRecordAsHeader().parse(in);
|
|
|
|
AnalysisType type=new AnalysisType(routine);
|
|
|
|
try {
|
|
log.debug("Parsing file {} : {} ",description,path);
|
|
// Extract CSV Schema
|
|
ArrayList<String> csvSchema=new ArrayList<String>();
|
|
for(Entry<String,Integer> entry : parser.getHeaderMap().entrySet()) {
|
|
csvSchema.add(entry.getValue(), entry.getKey());
|
|
}
|
|
|
|
log.debug("CSV Schema is {} ",csvSchema);
|
|
|
|
long counter=0l;
|
|
//Get the right table
|
|
for(ImportedTable table:analysisConfiguration.get(type)) {
|
|
if(table.matchesSchema(csvSchema)) {
|
|
log.debug("Matching table is {} ",table.getTableName());
|
|
Query query=table.getInsertQuery();
|
|
PreparedStatement psInsert=query.prepare(conn);
|
|
log.debug("Reading csvLines");
|
|
for(CSVRecord record:parser) {
|
|
DBQueryDescriptor desc=table.getSetRow(record.toMap(), routine.getId());
|
|
query.fill(psInsert, desc);
|
|
counter+=psInsert.executeUpdate();
|
|
}
|
|
log.debug("Inserted {} lines into {} for routine {} [FARM ID {}]",counter,table.getTableName(),routine.getId(),routine.getFarmId());
|
|
}
|
|
}
|
|
return counter;
|
|
}finally {
|
|
parser.close();
|
|
in.close();
|
|
}
|
|
|
|
}
|
|
|
|
// ************************** SCHEMA PARSING
|
|
private static final String FLOAT_REGEX="\\d*\\.\\d*";
|
|
private static final String INTEGER_REGEX="\\d*";
|
|
|
|
|
|
private static ArrayList<DBField> getCSVFieldsDefinition(String csvFile) throws IOException{
|
|
|
|
Reader in = null;
|
|
CSVParser parser= null;
|
|
try {
|
|
in=new FileReader(csvFile);
|
|
parser=CSVFormat.DEFAULT.withFirstRecordAsHeader().parse(in);
|
|
Map<String,Integer> headers=parser.getHeaderMap();
|
|
ArrayList<DBField> toReturn = new ArrayList<>(headers.size());
|
|
|
|
CSVRecord record=parser.getRecords().get(0);
|
|
|
|
for(Entry<String,Integer> header:headers.entrySet()) {
|
|
String value=record.get(header.getKey());
|
|
int type=Types.VARCHAR;
|
|
if(value.matches(FLOAT_REGEX)) type=Types.REAL;
|
|
else if(value.matches(INTEGER_REGEX)) type=Types.BIGINT;
|
|
toReturn.add(new DBField(type, header.getKey()));
|
|
}
|
|
return toReturn;
|
|
|
|
}finally{
|
|
if(in!=null) in.close();
|
|
if(parser!=null) parser.close();
|
|
}
|
|
}
|
|
|
|
|
|
}
|