2023-12-15 14:53:31 +01:00
package eu.dnetlib.dhp.continuous_validator ;
2023-12-18 15:46:36 +01:00
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession ;
import java.nio.charset.StandardCharsets ;
import java.util.List ;
import java.util.Objects ;
import java.util.Optional ;
2023-12-15 14:53:31 +01:00
import org.apache.commons.io.IOUtils ;
import org.apache.spark.SparkConf ;
import org.apache.spark.api.java.function.MapFunction ;
2023-12-18 09:57:40 +01:00
import org.apache.spark.sql.Dataset ;
import org.apache.spark.sql.Encoders ;
import org.apache.spark.sql.Row ;
import org.apache.spark.sql.SaveMode ;
2023-12-15 14:53:31 +01:00
import org.slf4j.LoggerFactory ;
2023-12-18 15:46:36 +01:00
import eu.dnetlib.dhp.application.ArgumentApplicationParser ;
import eu.dnetlib.validator2.validation.XMLApplicationProfile ;
import eu.dnetlib.validator2.validation.guideline.openaire.* ;
import eu.dnetlib.validator2.validation.utils.TestUtils ;
import scala.Option ;
2023-12-15 14:53:31 +01:00
public class ContinuousValidator {
public static final String TEST_FILES_V4_DIR = TestUtils . TEST_FILES_BASE_DIR + " openaireguidelinesV4/ " ;
2023-12-15 17:29:38 +01:00
public static final String RESULTS_FILE_NAME = " results.json " ;
2023-12-15 14:53:31 +01:00
private static final org . slf4j . Logger logger = LoggerFactory . getLogger ( ContinuousValidator . class ) ;
private static final String parametersFile = " input_continuous_validator_parameters.json " ;
private static final boolean SHOULD_GET_ARGUMENTS_FROM_FILE = true ; // It throws an error for now..
public static void main ( String [ ] args ) {
ArgumentApplicationParser parser = null ;
String sparkMaster = null ;
Boolean isSparkSessionManaged = false ;
String parquet_file_path = null ;
String guidelines = null ;
String outputPath = null ;
if ( SHOULD_GET_ARGUMENTS_FROM_FILE ) {
try {
String jsonConfiguration = IOUtils
. toString (
Objects
. requireNonNull (
ContinuousValidator . class
. getResourceAsStream ( " /eu/dnetlib/dhp/continuous_validator/ " + parametersFile ) ) ,
StandardCharsets . UTF_8 ) ;
parser = new ArgumentApplicationParser ( jsonConfiguration ) ;
parser . parseArgument ( args ) ;
} catch ( Exception e ) {
logger . error ( " Error when parsing the parameters! " , e ) ;
return ;
}
2023-12-15 17:29:38 +01:00
isSparkSessionManaged = Optional
. ofNullable ( parser . get ( " isSparkSessionManaged " ) ) // This param is not mandatory, so it may be null.
. map ( Boolean : : valueOf )
. orElse ( Boolean . TRUE ) ;
logger . info ( " isSparkSessionManaged: {} " , isSparkSessionManaged ) ;
// This is needed to implement a unit test in which the spark session is created in the context of the
// unit test itself rather than inside the spark application"
2023-12-15 14:53:31 +01:00
parquet_file_path = parser . get ( " parquet_file_path " ) ;
if ( parquet_file_path = = null ) {
logger . error ( " The \" parquet_file_path \" was not retrieved from the parameters file: " + parametersFile ) ;
return ;
}
2023-12-18 09:57:40 +01:00
guidelines = parser . get ( " openaire_guidelines " ) ;
2023-12-15 14:53:31 +01:00
if ( guidelines = = null ) {
2023-12-18 15:46:36 +01:00
logger
. error ( " The \" openaire_guidelines \" was not retrieved from the parameters file: " + parametersFile ) ;
2023-12-15 14:53:31 +01:00
return ;
}
outputPath = parser . get ( " outputPath " ) ;
if ( outputPath = = null ) {
logger . error ( " The \" outputPath \" was not retrieved from the parameters file: " + parametersFile ) ;
return ;
}
sparkMaster = " local[*] " ;
} else {
if ( args . length ! = 4 ) {
String errorMsg = " Wrong number of arguments given! Please run the app like so: java -jar target/dhp-continuous-validation-1.0.0-SNAPSHOT.jar <sparkMaster> <parquetFileFullPath> <guidelines> <outputPath> " ;
System . err . println ( errorMsg ) ;
logger . error ( errorMsg ) ;
System . exit ( 1 ) ;
}
sparkMaster = args [ 0 ] ;
logger . info ( " Will use this Spark master: \" " + sparkMaster + " \" . " ) ;
parquet_file_path = args [ 1 ] ;
guidelines = args [ 2 ] ;
outputPath = args [ 3 ] ;
if ( ! outputPath . endsWith ( " / " ) )
outputPath + = " / " ;
}
logger
. info (
" Will validate the contents of parquetFile: \" " + parquet_file_path + " \" , against guidelines: \" "
2023-12-15 17:29:38 +01:00
+ guidelines + " \" " + " and will output the results in: " + outputPath + RESULTS_FILE_NAME ) ;
2023-12-15 14:53:31 +01:00
2023-12-18 09:57:40 +01:00
AbstractOpenAireProfile profile ;
switch ( guidelines ) {
case " 4.0 " :
profile = new LiteratureGuidelinesV4Profile ( ) ;
break ;
case " 3.0 " :
profile = new LiteratureGuidelinesV3Profile ( ) ;
break ;
case " 2.0 " :
profile = new DataArchiveGuidelinesV2Profile ( ) ;
break ;
case " fair_data " :
profile = new FAIR_Data_GuidelinesProfile ( ) ;
break ;
case " fair_literature_v4 " :
profile = new FAIR_Literature_GuidelinesV4Profile ( ) ;
break ;
default :
logger . error ( " Invalid OpenAIRE Guidelines were given: " + guidelines ) ;
return ;
}
2023-12-15 14:53:31 +01:00
SparkConf conf = new SparkConf ( ) ;
conf . setAppName ( ContinuousValidator . class . getSimpleName ( ) ) ;
String finalParquet_file_path = parquet_file_path ;
String finalOutputPath = outputPath ;
runWithSparkSession ( conf , isSparkSessionManaged , spark - > {
// Use a new instance of Document Builder in each worker, as it is not thread-safe.
MapFunction < Row , XMLApplicationProfile . ValidationResult > validateMapFunction = row - > profile
. validate (
row . getAs ( " id " ) . toString ( ) ,
TestUtils
. getDocumentBuilder ( )
. parse ( IOUtils . toInputStream ( row . getAs ( " body " ) . toString ( ) , StandardCharsets . UTF_8 ) ) ) ;
2023-12-18 15:46:36 +01:00
spark
. read ( )
. parquet ( finalParquet_file_path )
. filter ( " encoding = 'XML' and id != NULL and body != null " )
. map ( validateMapFunction , Encoders . bean ( XMLApplicationProfile . ValidationResult . class ) )
2023-12-15 14:53:31 +01:00
. write ( )
. option ( " compression " , " gzip " )
. mode ( SaveMode . Overwrite )
2023-12-15 17:29:38 +01:00
. json ( finalOutputPath + RESULTS_FILE_NAME ) ; // The filename should be the name of the input-file or the
2023-12-15 14:53:31 +01:00
} ) ;
}
}