2017-09-05 15:10:06 +02:00
package org.gcube.dataanalysis.seadatanet ;
import java.io.BufferedWriter ;
import java.io.File ;
import java.io.FileWriter ;
import java.util.ArrayList ;
import java.util.HashMap ;
import java.util.LinkedHashMap ;
import java.util.List ;
import java.util.UUID ;
2017-09-07 12:02:32 +02:00
import org.gcube.contentmanagement.graphtools.utils.HttpRequest ;
2017-09-05 15:10:06 +02:00
import org.gcube.contentmanagement.graphtools.utils.MathFunctions ;
import org.gcube.contentmanagement.lexicalmatcher.utils.AnalysisLogger ;
import org.gcube.dataanalysis.ecoengine.datatypes.ColumnType ;
import org.gcube.dataanalysis.ecoengine.datatypes.DatabaseType ;
import org.gcube.dataanalysis.ecoengine.datatypes.InputTable ;
import org.gcube.dataanalysis.ecoengine.datatypes.PrimitiveType ;
import org.gcube.dataanalysis.ecoengine.datatypes.StatisticalType ;
import org.gcube.dataanalysis.ecoengine.datatypes.enumtypes.PrimitiveTypes ;
import org.gcube.dataanalysis.ecoengine.datatypes.enumtypes.TableTemplates ;
import org.gcube.dataanalysis.ecoengine.interfaces.StandardLocalExternalAlgorithm ;
import org.gcube.dataanalysis.ecoengine.utils.DatabaseFactory ;
import org.gcube.dataanalysis.ecoengine.utils.DatabaseUtils ;
import org.geotoolkit.geometry.jts.JTS ;
import org.geotoolkit.referencing.CRS ;
import org.geotoolkit.referencing.crs.DefaultGeographicCRS ;
import org.hibernate.SessionFactory ;
import org.opengis.referencing.crs.CoordinateReferenceSystem ;
import org.opengis.referencing.operation.MathTransform ;
import com.vividsolutions.jts.geom.Coordinate ;
import com.vividsolutions.jts.geom.Geometry ;
import com.vividsolutions.jts.geom.GeometryFactory ;
import com.vividsolutions.jts.geom.LinearRing ;
import com.vividsolutions.jts.geom.Polygon ;
import com.vividsolutions.jts.geom.PrecisionModel ;
import com.vividsolutions.jts.geom.impl.CoordinateArraySequence ;
import com.vividsolutions.jts.io.WKTReader ;
public class SeaDataNetConnector_AutoCorrelation extends StandardLocalExternalAlgorithm {
// Statistical result by DIVA
LinkedHashMap < String , String > statResultMap = new LinkedHashMap < String , String > ( ) ;
// HashMap for
LinkedHashMap < String , StatisticalType > outputDivaMap = new LinkedHashMap < String , StatisticalType > ( ) ;
SessionFactory dbconnection ;
@Override
public void init ( ) throws Exception {
// TODO Auto-generated method stub
}
@Override
public String getDescription ( ) {
//->adjusted description with proper citation
return " An algorithm to geographically interpolate punctual observation data. Based on a connector to the SeaDataNet infrastructure, this version automatically estimates the correlation between the data, and signal to noise ratio. " +
" This algorithms invokes the Data-Interpolating Variational Analysis (DIVA) SeaDataNet service to interpolate spatial data. " +
" The model uses GEBCO bathymetry data. " +
" It can interpolate up to 10,000 points randomly taken from the input table. " +
" As output, it produces a NetCDF file with a uniform grid of values. " +
" The underlying model is described in Troupin et al. 2012, 'Generation of analysis and consistent error fields using the Data Interpolating Variational Analysis (Diva)', Ocean Modelling, 52-53, 90-101. " ;
}
File outputfile ;
public static String InputTableP = " InputTable " ;
public static String LongitudeP = " Longitude " ;
public static String LatitudeP = " Latitude " ;
public static String QuantityP = " Quantity " ;
public static String ExtensionP = " ProjectionExtension " ;
public static String ResolutionP = " ProjectionResolution " ;
public static String DepthHeightP = " DepthOrHeightLevel " ;
public static String UseCurrentsP = " UseCurrentsInformation " ;
2017-09-07 12:02:32 +02:00
public static String CurrentsWeightP = " CurrentsVelocityWeight " ;
2017-09-05 15:10:06 +02:00
Double longMinVal = null ;
Double longMaxVal = null ;
Double latMinVal = null ;
Double latMaxVal = null ;
public void parseExtension ( String WKTSquare ) throws Exception {
WKTReader wktr = new WKTReader ( ) ;
Polygon g = ( Polygon ) wktr . read ( WKTSquare ) ;
Coordinate [ ] coordinates = g . getCoordinates ( ) ;
double lat1 = coordinates [ 0 ] . y ;
double lat2 = coordinates [ 2 ] . y ;
latMaxVal = Math . max ( lat1 , lat2 ) ;
latMinVal = Math . min ( lat1 , lat2 ) ;
double lon1 = coordinates [ 0 ] . x ;
double lon2 = coordinates [ 2 ] . x ;
longMaxVal = Math . max ( lon1 , lon2 ) ;
longMinVal = Math . min ( lat1 , lat2 ) ;
if ( longMaxVal > 180 )
longMaxVal = 180d ;
if ( longMaxVal < - 180 )
longMaxVal = - 180d ;
if ( longMinVal > 180 )
longMaxVal = 180d ;
if ( longMinVal < - 180 )
longMinVal = - 180d ;
if ( latMaxVal > 85 )
latMaxVal = 85d ;
if ( latMaxVal < - 85 )
longMaxVal = - 85d ;
if ( latMinVal > 85 )
latMinVal = 85d ;
if ( latMinVal < - 85 )
latMinVal = - 85d ;
}
@Override
protected void process ( ) throws Exception {
File neofile = null ;
File fileForDiva = null ;
try {
String outpath = config . getPersistencePath ( ) ;
neofile = new File ( outpath , " seadn_diva_ " + UUID . randomUUID ( ) + " .nc " ) ;
AnalysisLogger . getLogger ( ) . debug ( " Input Parameters " ) ;
AnalysisLogger . getLogger ( ) . debug ( " Input Table: " + config . getParam ( InputTableP ) ) ;
AnalysisLogger . getLogger ( ) . debug ( " Input Long: " + config . getParam ( LongitudeP ) ) ;
AnalysisLogger . getLogger ( ) . debug ( " Input Lat: " + config . getParam ( LatitudeP ) ) ;
AnalysisLogger . getLogger ( ) . debug ( " Input Qt: " + config . getParam ( QuantityP ) ) ;
AnalysisLogger . getLogger ( ) . debug ( " Extension: " + config . getParam ( ExtensionP ) ) ;
AnalysisLogger . getLogger ( ) . debug ( " Resolution: " + config . getParam ( ResolutionP ) ) ;
AnalysisLogger . getLogger ( ) . debug ( " Depth-Heigh Level: " + config . getParam ( DepthHeightP ) ) ;
AnalysisLogger . getLogger ( ) . debug ( " Use currents: " + config . getParam ( UseCurrentsP ) ) ;
2017-09-07 12:02:32 +02:00
AnalysisLogger . getLogger ( ) . debug ( " Currents weight: " + config . getParam ( CurrentsWeightP ) ) ;
2017-09-07 13:00:34 +02:00
AnalysisLogger . getLogger ( ) . debug ( " Converting weights " ) ;
int currentsWeight = ( int ) Math . round ( Double . parseDouble ( " " + config . getParam ( CurrentsWeightP ) ) ) ;
2017-09-07 12:02:32 +02:00
if ( currentsWeight < 0 )
currentsWeight = 0 ;
else
currentsWeight = currentsWeight * 2000 / 100 ;
AnalysisLogger . getLogger ( ) . debug ( " Recalculated currents weight: " + currentsWeight ) ;
2017-09-05 15:10:06 +02:00
AnalysisLogger . getLogger ( ) . debug ( " Checking parameters consistency " ) ;
//->try - catch to manage case of NULL values
//->the check on the values has been put before the initialization of the connection
parseExtension ( config . getParam ( ExtensionP ) ) ;
Double resolutionVal = null ;
Double depthLevelVal = null ;
AnalysisLogger . getLogger ( ) . debug ( " Parse extension: " ) ;
AnalysisLogger . getLogger ( ) . debug ( " X: [ " + longMinVal + " , " + longMaxVal + " ] " ) ;
AnalysisLogger . getLogger ( ) . debug ( " Y: [ " + latMinVal + " , " + latMaxVal + " ] " ) ;
AnalysisLogger . getLogger ( ) . debug ( " Parsing use currents choice " ) ;
Boolean useCurrents = Boolean . valueOf ( config . getParam ( UseCurrentsP ) ) ;
try {
resolutionVal = Double . parseDouble ( config . getParam ( ResolutionP ) ) ;
depthLevelVal = Double . parseDouble ( config . getParam ( DepthHeightP ) ) ;
if ( depthLevelVal < 0 )
throw new Exception ( " Depth-Heigh Level cannot be negative " ) ;
} catch ( NumberFormatException e ) {
throw new Exception ( " Parameter values are incomplete " ) ;
}
AnalysisLogger . getLogger ( ) . debug ( " Parameters are OK " ) ;
AnalysisLogger . getLogger ( ) . debug ( " Initializing DB connection " ) ;
dbconnection = DatabaseUtils . initDBSession ( config ) ;
//->set limit to 100 000 - maximum allowed by DIVA
String query = " select "
+ config . getParam ( LongitudeP ) + " , " + config . getParam ( LatitudeP ) + " , " + config . getParam ( QuantityP ) + " From " + getInputParameter ( InputTableP ) +
" ORDER BY RANDOM() limit 10000 " ;
//->indicate the status of the computation
status = 10 ;
AnalysisLogger . getLogger ( ) . debug ( " Query for extracting data from the DB: " + query ) ;
List < Object > dataList = DatabaseFactory . executeSQLQuery ( query , dbconnection ) ;
int ndata = dataList . size ( ) ;
fileForDiva = new File ( outpath , " file_for_diva_ " + UUID . randomUUID ( ) + " .txt " ) ;
BufferedWriter fileWriterDiva = new BufferedWriter ( new FileWriter ( fileForDiva ) ) ;
AnalysisLogger . getLogger ( ) . debug ( " Writing input file in: " + fileForDiva . getAbsolutePath ( ) ) ;
for ( Object o : dataList ) {
Object [ ] oarray = ( Object [ ] ) o ;
fileWriterDiva . write ( " " + oarray [ 0 ] + " " + oarray [ 1 ] + " " + oarray [ 2 ] + " \ n " ) ;
}
fileWriterDiva . close ( ) ;
AnalysisLogger . getLogger ( ) . debug ( " Sending data to DIVA: Uploading " + ndata + " records " ) ;
// integration DivaHttpClient
// UPLOADFILE for DIVA
//->*use the HTTPClient Class methods
DivaFilePostResponse response = DivaHTTPClient . uploadFile ( fileForDiva ) ;
AnalysisLogger . getLogger ( ) . debug ( " DIVA Server Response for the Upload: \ n " + response . getSessionid ( ) ) ;
status = 30 ;
int velocityuploadingOK = 200 ;
2017-09-07 12:02:32 +02:00
if ( useCurrents & & currentsWeight > 0 ) {
String currentsFile = " http://data.d4science.org/QW43dXd0RDRIYUJUOENpNXF1ekoxNnFLeUdSTjZQWGZHbWJQNStIS0N6Yz0 " ; //under VRE Folders->BiodiversityLab
String tempFile = " currents " + UUID . randomUUID ( ) + " .nc " ;
AnalysisLogger . getLogger ( ) . debug ( " Downloading oceans currents file from " + currentsFile ) ;
HttpRequest . downloadFile ( currentsFile , tempFile ) ;
File tempcurrentsfile = new File ( tempFile ) ;
AnalysisLogger . getLogger ( ) . debug ( " Sending ocean currents to DIVA - local File exists " + tempcurrentsfile . exists ( ) ) ;
velocityuploadingOK = DivaHTTPClient . postVelocityFiles ( response . getSessionid ( ) , " u " , " v " , tempcurrentsfile ) ;
AnalysisLogger . getLogger ( ) . debug ( " Done - Sending ocean currents to DIVA " ) ;
tempcurrentsfile . delete ( ) ;
}
//velocityuploadingOK = DivaHTTPClient.postVelocityFiles(response.getSessionid(), "u", "v", new File ("./example.nc"));
2017-09-05 15:10:06 +02:00
if ( velocityuploadingOK ! = 200 )
throw new Exception ( " Could not upload currents files. Response " + velocityuploadingOK ) ;
AnalysisLogger . getLogger ( ) . debug ( " Requesting analysis for data correlation to DIVA... " ) ;
DivaFitResponse fitResponse = DivaHTTPClient . getFit ( response . getSessionid ( ) ) ;
AnalysisLogger . getLogger ( ) . debug ( " Response from DIVA FIT: " + fitResponse ) ;
status = 50 ;
AnalysisLogger . getLogger ( ) . debug ( " Requesting analysis to DIVA... " ) ;
long t0 = System . currentTimeMillis ( ) ;
2017-09-07 12:02:32 +02:00
DivaAnalysisGetResponse respAnalysis = DivaHTTPClient . getAnalysis ( response . getSessionid ( ) , fitResponse . getLength_scale ( ) , fitResponse . getSignal_to_noise ( ) , longMinVal , longMaxVal , resolutionVal , latMinVal , latMaxVal , resolutionVal , depthLevelVal , useCurrents , currentsWeight ) ;
2017-09-05 15:10:06 +02:00
AnalysisLogger . getLogger ( ) . debug ( " Response from DIVA analysis: " + respAnalysis ) ;
long t1 = System . currentTimeMillis ( ) ;
//->Record the time of the analysis
AnalysisLogger . getLogger ( ) . debug ( " Analysis finished in " + ( t1 - t0 ) + " ms " ) ;
status = 80 ;
//->the number of observations is now an integer
statResultMap . put ( " Minimum value estimated by the model " , " " + respAnalysis . getVmin ( ) ) ;
statResultMap . put ( " Maximum value estimated by the model " , " " + respAnalysis . getVmax ( ) ) ;
statResultMap . put ( " Number of observations used " , " " + respAnalysis . getStat_obs_count_used ( ) ) ;
statResultMap . put ( " Signal-to-noise ratio " , " " + fitResponse . getSignal_to_noise ( ) ) ;
statResultMap . put ( " Geographical length of the correlation (deg) " , " " + fitResponse . getLength_scale ( ) ) ;
2017-09-07 12:02:32 +02:00
statResultMap . put ( " Quality of the automatic signal-to-noise ratio estimate " , " " + fitResponse . getFit_quality ( ) ) ;
2017-09-05 15:10:06 +02:00
//statResultMap.put("A posteriori estimate of signal-to-noise ratio", "" + respAnalysis.getStat_posteriori_stn());
AnalysisLogger . getLogger ( ) . debug ( " Map of results to be returned: " + statResultMap ) ;
AnalysisLogger . getLogger ( ) . debug ( " Downloading result file in " + neofile . getAbsolutePath ( ) ) ;
// DOWNLOAD FILE
DivaHTTPClient . downloadFileDiva ( respAnalysis . getIdentifier ( ) , neofile . getAbsolutePath ( ) ) ;
//->*put the output file in the output object
outputfile = neofile ;
AnalysisLogger . getLogger ( ) . debug ( " Downloading finished " ) ;
AnalysisLogger . getLogger ( ) . debug ( " FINAL RESULT MAP: " + statResultMap . toString ( ) ) ;
} catch ( Exception e ) {
//->*ONLY in case of errors, delete the output file
if ( neofile . exists ( ) ) {
neofile . delete ( ) ;
AnalysisLogger . getLogger ( ) . debug ( " Output file " + neofile . getAbsolutePath ( ) + " deleted! " ) ;
}
throw e ;
} finally {
//->*in any case, delete the input file because we don't need it anymore
if ( fileForDiva . exists ( ) ) {
fileForDiva . delete ( ) ;
AnalysisLogger . getLogger ( ) . debug ( " Input file " + fileForDiva . getAbsolutePath ( ) + " deleted! " ) ;
}
AnalysisLogger . getLogger ( ) . debug ( " DIVA process finished " ) ;
}
}
@Override
protected void setInputParameters ( ) {
List < TableTemplates > templates = new ArrayList < TableTemplates > ( ) ;
templates . add ( TableTemplates . GENERIC ) ;
InputTable tinput = new InputTable ( templates , InputTableP , " Input tabular resource. Up to 10,000 points will be randomly taken from this table. " ) ;
inputs . add ( tinput ) ;
ColumnType p1 = new ColumnType ( InputTableP , LongitudeP , " The column containing longitude decimal values " , " longitude " , false ) ;
ColumnType p2 = new ColumnType ( InputTableP , LatitudeP , " The column containing latitude decimal values " , " latitude " , false ) ;
ColumnType p3 = new ColumnType ( InputTableP , QuantityP , " The column containing the values to geographycally interpolate " , " quantity " , false ) ;
PrimitiveType p4 = new PrimitiveType ( String . class . getName ( ) , null , PrimitiveTypes . STRING , ExtensionP , " Bounding Box of the projection. Values outside the (-180,+80;-90,+90) range will be automatically adjusted to this range. [WKT_BOX] " , " POLYGON((-206.71875 133.59375,-206.71875 -147.65625,212.34375 -147.65625,212.34375 133.59375,-206.71875 133.59375)) " ) ;
PrimitiveType p5 = new PrimitiveType ( Double . class . getName ( ) , null , PrimitiveTypes . NUMBER , ResolutionP , " Resolution of the projection (minimum 0.1deg. - maximum 10deg.) " , " 1 " ) ;
PrimitiveType p6 = new PrimitiveType ( Double . class . getName ( ) , null , PrimitiveTypes . NUMBER , DepthHeightP , " Depth (a negative value) or height (a positive value) of the observations (in meters). " , " 0 " ) ;
PrimitiveType p7 = new PrimitiveType ( Boolean . class . getName ( ) , null , PrimitiveTypes . BOOLEAN , UseCurrentsP , " Involve information about average annual global currents velocity. " , " true " ) ;
2017-09-07 12:02:32 +02:00
PrimitiveType p8 = new PrimitiveType ( Double . class . getName ( ) , null , PrimitiveTypes . NUMBER , CurrentsWeightP , " The weight of the currents velocity in the analysis in percentace (0-100) " , " 50 " ) ;
2017-09-05 15:10:06 +02:00
inputs . add ( p1 ) ;
inputs . add ( p2 ) ;
inputs . add ( p3 ) ;
inputs . add ( p4 ) ;
inputs . add ( p5 ) ;
inputs . add ( p6 ) ;
inputs . add ( p7 ) ;
2017-09-07 12:02:32 +02:00
inputs . add ( p8 ) ;
2017-09-05 15:10:06 +02:00
DatabaseType . addDefaultDBPars ( inputs ) ;
}
@Override
public void shutdown ( ) {
if ( dbconnection ! = null )
dbconnection . close ( ) ;
}
public StatisticalType getOutput ( ) {
PrimitiveType file = new PrimitiveType ( File . class . getName ( ) , outputfile , PrimitiveTypes . FILE , " NetCDFOutputFile " , " Output file in NetCDF format " ) ;
for ( String key : statResultMap . keySet ( ) ) {
String value = statResultMap . get ( key ) ;
PrimitiveType val = new PrimitiveType ( String . class . getName ( ) , value , PrimitiveTypes . STRING , key , key ) ;
outputDivaMap . put ( key , val ) ;
}
outputDivaMap . put ( " Netcdf output file " , file ) ;
PrimitiveType hashma = new PrimitiveType ( HashMap . class . getName ( ) , outputDivaMap , PrimitiveTypes . MAP , " Diva results " , " Output of DIVA fit " ) ;
return hashma ;
}
}