ecological-engine/src/main/java/org/gcube/dataanalysis/ecoengine/processing/LocalSplitGenerator.java

524 lines
17 KiB
Java

package org.gcube.dataanalysis.ecoengine.processing;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.Queue;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.gcube.contentmanagement.graphtools.utils.HttpRequest;
import org.gcube.contentmanagement.graphtools.utils.MathFunctions;
import org.gcube.dataanalysis.ecoengine.configuration.ALG_PROPS;
import org.gcube.dataanalysis.ecoengine.configuration.AlgorithmConfiguration;
import org.gcube.dataanalysis.ecoengine.configuration.INFRASTRUCTURE;
import org.gcube.dataanalysis.ecoengine.connectors.livemonitor.ResourceLoad;
import org.gcube.dataanalysis.ecoengine.connectors.livemonitor.Resources;
import org.gcube.dataanalysis.ecoengine.datatypes.StatisticalType;
import org.gcube.dataanalysis.ecoengine.interfaces.Generator;
import org.gcube.dataanalysis.ecoengine.interfaces.GenericAlgorithm;
import org.gcube.dataanalysis.ecoengine.interfaces.SpatialProbabilityDistributionTable;
import org.gcube.dataanalysis.ecoengine.utils.DatabaseFactory;
import org.hibernate.SessionFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class LocalSplitGenerator implements Generator {
private static Logger logger = LoggerFactory.getLogger(LocalSplitGenerator.class);
private AlgorithmConfiguration config;
private ExecutorService executorService;
private int numberOfThreadsToUse;
private boolean threadActivity[];
private SessionFactory dbHibConnection;
private boolean stopInterrupt;
private boolean flushInterrupt;
private boolean forceflush;
private SpatialProbabilityDistributionTable distributionModel;
private int processedSpeciesCounter;
private int spaceVectorsNumber;
private List<Object> environmentVectors;
private long lastTime;
private int lastProcessedRecordsNumber;
private int processedRecordsCounter;
private float status;
private int chunksize;
private Timer writerScheduler;
ConcurrentLinkedQueue<String> probabilityBuffer;
String probabilityInsertionStatement = "insert into %1$s (speciesid,csquarecode,probability %ADDEDINFORMATION%) VALUES %2$s";
public LocalSplitGenerator(AlgorithmConfiguration config) {
setConfiguration(config);
init();
}
public LocalSplitGenerator() {
}
@Override
public float getStatus() {
return status;
}
@Override
public String getResourceLoad() {
long tk = System.currentTimeMillis();
double activity = Double.valueOf(processedRecordsCounter - lastProcessedRecordsNumber) * 1000.00 / Double.valueOf(tk - lastTime);
lastTime = tk;
lastProcessedRecordsNumber = processedRecordsCounter;
ResourceLoad rs = new ResourceLoad(tk, activity);
return rs.toString();
}
@Override
public String getResources() {
Resources res = new Resources();
try {
for (int i = 0; i < numberOfThreadsToUse; i++) {
try {
double value = (threadActivity[i]) ? 100.00 : 0.00;
res.addResource("Thread_" + (i + 1), value);
} catch (Exception e1) {
}
}
} catch (Exception e) {
e.printStackTrace();
}
if ((res != null) && (res.list != null))
return HttpRequest.toJSon(res.list).replace("resId", "resID");
else
return "";
}
@Override
public String getLoad() {
long tk = System.currentTimeMillis();
double activity = processedSpeciesCounter;
ResourceLoad rs = new ResourceLoad(tk, activity);
return rs.toString();
}
@Override
public void init() {
stopInterrupt = false;
flushInterrupt = false;
forceflush=false;
initDBSession();
try {
initModel();
} catch (Exception e) {
logger.error("error",e);
}
// probabilityBuffer = new Vector<String>();
probabilityBuffer = new ConcurrentLinkedQueue<String>();
String addedinfo = distributionModel.getAdditionalMetaInformation();
if (addedinfo == null)
addedinfo = "";
else
addedinfo = "," + addedinfo.trim();
probabilityInsertionStatement = probabilityInsertionStatement.replace("%ADDEDINFORMATION%", addedinfo);
if (!distributionModel.isSynchronousProbabilityWrite()) {
logger.trace("init()->insertion scheduler initialized");
// inizialize the scheduler for the insertions
writerScheduler = new Timer();
writerScheduler.schedule(new DatabaseWriter(), 0, AlgorithmConfiguration.refreshResourcesTime);
}
}
private void initModel() throws Exception {
Properties p = AlgorithmConfiguration.getProperties(config.getConfigPath() + AlgorithmConfiguration.algorithmsFile);
String objectclass = p.getProperty(config.getModel());
distributionModel = (SpatialProbabilityDistributionTable) Class.forName(objectclass, true, config.getAlgorithmClassLoader()).newInstance();
distributionModel.init(config, dbHibConnection);
}
@Override
public void setConfiguration(AlgorithmConfiguration config) {
this.config = config;
if (config.getNumberOfResources() == 0)
this.numberOfThreadsToUse = 1;
else
this.numberOfThreadsToUse = config.getNumberOfResources();
}
public void initializeThreads() {
// initialize threads and their activity state
executorService = Executors.newFixedThreadPool(numberOfThreadsToUse);
threadActivity = new boolean[numberOfThreadsToUse];
// initialize to false;
for (int j = 0; j < threadActivity.length; j++) {
threadActivity[j] = false;
}
}
public void initDBSession() {
try {
if ((config != null) && (config.getConfigPath() != null)) {
String defaultDatabaseFile = config.getConfigPath() + AlgorithmConfiguration.defaultConnectionFile;
config.setDatabaseDriver(config.getParam("DatabaseDriver"));
config.setDatabaseUserName(config.getParam("DatabaseUserName"));
config.setDatabasePassword(config.getParam("DatabasePassword"));
config.setDatabaseURL(config.getParam("DatabaseURL"));
dbHibConnection = DatabaseFactory.initDBConnection(defaultDatabaseFile, config);
}
} catch (Exception e) {
logger.warn("error initializing db session",e);
}
}
private void createTable() throws Exception {
if (config.getParam("CreateTable") != null && config.getParam("CreateTable").equalsIgnoreCase("true")) {
try {
logger.trace("recreating table: " + "drop table " + config.getParam("DistributionTable"));
DatabaseFactory.executeSQLUpdate("drop table " + config.getParam("DistributionTable"), dbHibConnection);
logger.trace("recreating table->OK");
} catch (Exception e) {
logger.trace("recreating table->" + e.getLocalizedMessage());
}
// DatabaseFactory.executeUpdateNoTransaction(distributionModel.getDistributionTableStatement(), config.getParam("DatabaseDriver"), config.getParam("DatabaseUserName"), config.getParam("DatabasePassword"), config.getParam("DatabaseURL"), true);
DatabaseFactory.executeUpdateNoTransaction(distributionModel.getDistributionTableStatement(), config.getDatabaseDriver(), config.getDatabaseUserName(), config.getDatabasePassword(), config.getDatabaseURL(), true);
logger.trace("createTable()->OK!");
}
}
public void shutdown() {
// shutdown threads
executorService.shutdown();
// shutdown connection
stopInterrupt = true;
if (!distributionModel.isSynchronousProbabilityWrite()) {
while (!flushInterrupt) {
try {
Thread.sleep(100);
} catch (Exception e) {
}
}
}
if (writerScheduler != null) {
try {
writerScheduler.cancel();
writerScheduler.purge();
} catch (Exception e) {
}
}
logger.trace("CLOSING CONNECTIONS");
try{
dbHibConnection.close();
}catch(Exception eee){}
}
// waits for thread to be free
private void wait4Thread(int index) {
// wait until thread is free
while (threadActivity[index]) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
@Override
public void compute() throws Exception {
// INITIALIZATION
long tstart = System.currentTimeMillis();
try {
logger.trace("generate->Using Local Computation algorithm " + distributionModel.getName());
logger.trace("generate->Check for table creation");
createTable();
logger.trace("generate->Take area reference");
// take the area reference vectors
environmentVectors = DatabaseFactory.executeSQLQuery(distributionModel.getGeographicalInfoQuery(), dbHibConnection);
logger.trace("generate->Take species reference");
List<Object> speciesVectors = DatabaseFactory.executeSQLQuery(distributionModel.getMainInfoQuery(), dbHibConnection);
logger.trace("generate->got all information");
// calculate the number of chunks needed
spaceVectorsNumber = environmentVectors.size();
int speciesVectorNumber = speciesVectors.size();
// calculate number of chunks to take into account
chunksize = spaceVectorsNumber / numberOfThreadsToUse;
if (chunksize == 0)
chunksize = 1;
int numOfChunks = spaceVectorsNumber / chunksize;
if ((spaceVectorsNumber % chunksize) != 0)
numOfChunks += 1;
logger.trace("generate->Calculation Started with " + numOfChunks + " chunks and " + speciesVectorNumber + " species");
// initialize threads
initializeThreads();
// END INITIALIZATION
// overall chunks counter
int overallcounter = 0;
processedSpeciesCounter = 0;
// SPECIES CALCULATION
// cycle throw the species
for (Object species : speciesVectors) {
// calculation on multiple threads
// thread selection index
int currentThread = 0;
// take time
long computationT0 = System.currentTimeMillis();
// pre process for single species
distributionModel.singleStepPreprocess(species, spaceVectorsNumber);
logger.trace("-------------------------------------------------> species " + distributionModel.getMainInfoID(species) + " - n. " + (processedSpeciesCounter + 1));
// CALCULATION CORE
for (int k = 0; k < numOfChunks; k++) {
// get the starting index
int start = k * chunksize;
// wait for thread to be free
wait4Thread(currentThread);
// start species information calculation on the thread
startNewTCalc(currentThread, species, start);
// increment thread selection index
currentThread++;
// reset current thread index
if (currentThread >= numberOfThreadsToUse) {
currentThread = 0;
}
// report probability
status = ((float) overallcounter / ((float) (speciesVectorNumber * numOfChunks))) * 100f;
if (status == 100)
status = 99f;
// logger.trace("STATUS->"+status+"%");
// increment global counter index
overallcounter++;
}
// END OF CALCULATION CORE
// wait for last threads to finish
for (int i = 0; i < numberOfThreadsToUse; i++) {
// free previous calculation
wait4Thread(i);
}
if (distributionModel.isSynchronousProbabilityWrite()) {
probabilityBuffer = (ConcurrentLinkedQueue<String>) distributionModel.filterProbabilitySet((Queue<String>) probabilityBuffer);
DatabaseWriter dbw = new DatabaseWriter();
dbw.flushBuffer();
}
long computationT1 = System.currentTimeMillis();
// flushBuffer();
logger.trace("generate->Species Computation Finished in " + (computationT1 - computationT0) + " ms");
// perform overall insert
// insertCriteria();
// increment the count of processed species
processedSpeciesCounter++;
// REPORT ELAPSED TIME
// post process for single species
distributionModel.singleStepPostprocess(species, spaceVectorsNumber);
// if the process was stopped then interrupt the processing
if (stopInterrupt)
break;
}
long computationT2 = System.currentTimeMillis();
// flushInterrupt = true;
logger.trace("generate->All Species Computed in " + (computationT2 - tstart) + " ms");
} catch (Exception e) {
logger.error("error",e);
throw e;
} finally {
try {
// REPORT OVERALL ELAPSED TIME
distributionModel.postProcess();
// shutdown all
shutdown();
} catch (Exception e) {
}
long tend = System.currentTimeMillis();
long ttotal = tend - tstart;
logger.warn("generate->Distribution Generator->Algorithm finished in: " + ((double) ttotal / (double) 60000) + " min\n");
status = 100f;
}
}
// end Definition of the Thread
// activation
private void startNewTCalc(int index, Object speciesVector, int start) {
threadActivity[index] = true;
ThreadCalculator tc = new ThreadCalculator(index, speciesVector, start);
executorService.submit(tc);
}
// THREAD SECTION
// definition of the Thread
private class ThreadCalculator implements Callable<Integer> {
int threadIndex;
int spaceindex;
Object speciesVector;
public ThreadCalculator(int threadIndex, Object speciesVector, int start) {
this.threadIndex = threadIndex;
this.speciesVector = speciesVector;
this.spaceindex = start;
}
public Integer call() {
// logger.trace("threadCalculation->" + (threadIndex+1));
int max = Math.min(spaceindex + chunksize, spaceVectorsNumber);
String speciesID = distributionModel.getMainInfoID(speciesVector);
for (int i = spaceindex; i < max; i++) {
float prob = distributionModel.calcProb(speciesVector, environmentVectors.get(i));
String geographicalID = distributionModel.getGeographicalID(environmentVectors.get(i));
if (prob > 0.1) {
String additionalInformation = distributionModel.getAdditionalInformation(speciesVector, environmentVectors.get(i));
if (additionalInformation == null)
additionalInformation = "";
else if (additionalInformation.length() > 0)
additionalInformation = "," + additionalInformation.trim();
// probabilityBuffer.offer("'" + speciesID + "','" + geographicalID + "','" + MathFunctions.roundDecimal(prob, 2) + "'"+additionalInformation);
probabilityBuffer.offer("'" + speciesID + "','" + geographicalID + "','" + MathFunctions.roundDecimal(prob, 2) + "'" + additionalInformation);
}
processedRecordsCounter++;
}
threadActivity[threadIndex] = false;
return 0;
}
}
// Database insertion thread
private class DatabaseWriter extends TimerTask {
public DatabaseWriter() {
}
public void run() {
try {
if (forceflush){
logger.trace("\t...flushing on db");
// flush the objects
flushBuffer();
logger.trace("\t...finished flushing on db");
forceflush=false;
}
if (stopInterrupt) {
logger.trace("\t...finally flushing on db");
// flush the objects
flushBuffer();
logger.trace("\t...finished finally flushing on db");
flushInterrupt = true;
this.cancel();
} else if ((probabilityBuffer != null) && (probabilityBuffer.size() > AlgorithmConfiguration.chunkSize)) {
// logger.trace("\t...writing on db");
writeOnDB(AlgorithmConfiguration.chunkSize);
// logger.trace("\t...finished writing on db");
}
} catch (Throwable e) {
logger.error("error",e);
flushInterrupt = true;
}
}
public void flushBuffer() {
if ((probabilityBuffer != null) && (probabilityBuffer.size() > 0)) {
while (probabilityBuffer.size() > AlgorithmConfiguration.chunkSize)
writeOnDB(AlgorithmConfiguration.chunkSize);
writeOnDB(probabilityBuffer.size());
}
}
private void writeOnDB(int endIndex) {
if (endIndex > 0) {
StringBuffer sb = new StringBuffer();
for (int i = 0; i < endIndex; i++) {
sb.append("(" + distributionModel.filterProbabiltyRow(probabilityBuffer.poll()) + ")");
if (i < endIndex - 1) {
sb.append(",");
}
}
String insertionString = String.format(probabilityInsertionStatement, config.getParam("DistributionTable"), sb.toString());
try {
// logger.debug("->"+insertionString);
DatabaseFactory.executeSQLUpdate(insertionString, dbHibConnection);
} catch (Exception e) {
e.printStackTrace();
}
logger.trace("writeOnDB()->PROBABILITIES BUFFER REMAINING:" + probabilityBuffer.size());
sb = null;
}
}
}
@Override
public ALG_PROPS[] getSupportedAlgorithms() {
ALG_PROPS[] p = { ALG_PROPS.SPECIES_VS_CSQUARE_FROM_DATABASE,ALG_PROPS.PARALLEL_SPECIES_VS_CSQUARE_FROM_DATABASE };
return p;
}
@Override
public INFRASTRUCTURE getInfrastructure() {
return INFRASTRUCTURE.LOCAL;
}
@Override
public List<StatisticalType> getInputParameters() {
return new ArrayList<StatisticalType>();
// return distributionModel.getInputParameters();
}
@Override
public StatisticalType getOutput() {
return distributionModel.getOutput();
}
@Override
public GenericAlgorithm getAlgorithm() {
return distributionModel;
}
@Override
public String getDescription() {
return "A generator based on tabular data production, which splits a distribution on different threads along the species dimension";
}
}