- Add documentation.

- Code polishing/cleanup.
This commit is contained in:
Lampros Smyrnaios 2024-02-02 14:18:46 +02:00
parent b5f4d37827
commit cbe7c6734a
8 changed files with 69 additions and 54 deletions

View File

@ -2,10 +2,32 @@
This module is responsible for deploying an **Oozie Workflow** (on the desired cluster), which executes a **Spark** action.<br>
This action takes the HDFS-path of a directory of parquet files containing metadata records, and applies the validation process on all of them, in parallel. Then it outputs the results, in json-format, in the given directory.<br>
The validation process is powered by the [**uoa-validator-engine2**](https://code-repo.d4science.org/MaDgIK/uoa-validator-engine2) software.<br>
The validation process is powered by the "[**uoa-validator-engine2**](https://code-repo.d4science.org/MaDgIK/uoa-validator-engine2)" software,
which is included as a dependency inside the main "pom.xml" file.<br>
### Install and run
### Configure the workflow
Add the wanted values for each of the parameters, defined in the "/src/main/resources/eu/dnetlib/dhp/continuous_validator/oozie_app/workflow.xml" file.<br>
The most important parameters are the following:
- ***parquet_path***: the input parquet
- ***openaire_guidelines***: valid values: "4.0", "3.0", "2.0", "fair_data", "fair_literature_v4"
- ***output_path***: Be careful to use a base directory which is different from the one that this module is running on, as during a new deployment, that base directory will be deleted.
### Install the project and then deploy and run the workflow
Run the **./installProject.sh** script and then the **./runOozieWorkflow.sh** script.<br>
[...]
Use the "workflow-id" displayed by the "runOozieWorkflow.sh" script to check the running status and logs, in the remote machine, as follows:
- Check the status: `oozie job -oozie http://<cluster's domain and port>/oozie -info <Workflow-ID>`
- Copy the "Job-id" from the output of the above command (numbers with ONE underscore between them).
- Check the job's logs (not the app's logs!): `yarn logs -applicationId application_<Job-ID>`
<br><br>
**Note**:<br>
If you encounter any "java.lang.NoSuchFieldError" issues in the logs, rerun using the following steps:
- Delete some remote directories related to the workflow in your user's dir: /user/<userName>/
- ***.sparkStaging***
- ***oozie-oozi***
- Run the **./installProject.sh** script and then the **./runOozieWorkflow.sh** script.

View File

@ -37,12 +37,6 @@
<artifactId>spark-sql_${scala.binary.version}</artifactId>
</dependency>
<!-- Other dependencies. -->
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
</dependency>
</dependencies>
</project>

View File

@ -7,7 +7,7 @@ CHOSEN_MAVEN_PROFILE=${DEFAULT_PROFILE}
# Build and deploy this module.
mvn clean package -U ${CHOSEN_MAVEN_PROFILE} -Poozie-package,deploy,run \
-Dworkflow.source.dir=eu/dnetlib/dhp/continuous_validator
-Dworkflow.source.dir=eu/dnetlib/dhp/continuous_validation
# Show the Oozie-job-ID.
echo -e "\n\nShowing the contents of \"extract-and-run-on-remote-host.log\":\n"

View File

@ -1,13 +1,13 @@
package eu.dnetlib.dhp.continuous_validator;
package eu.dnetlib.dhp.continuous_validation;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import java.io.File;
import java.nio.charset.StandardCharsets;
import java.util.Objects;
import java.util.Optional;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.validator2.validation.StandardValidationResult;
import eu.dnetlib.validator2.validation.XMLApplicationProfile;
import eu.dnetlib.validator2.validation.guideline.Guideline;
import eu.dnetlib.validator2.validation.guideline.StandardResult;
import eu.dnetlib.validator2.validation.guideline.openaire.*;
import eu.dnetlib.validator2.validation.utils.TestUtils;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.MapFunction;
@ -16,23 +16,24 @@ import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.nio.charset.StandardCharsets;
import java.util.Objects;
import java.util.Optional;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.validator2.validation.StandardValidationResult;
import eu.dnetlib.validator2.validation.XMLApplicationProfile;
import eu.dnetlib.validator2.validation.guideline.Guideline;
import eu.dnetlib.validator2.validation.guideline.StandardResult;
import eu.dnetlib.validator2.validation.guideline.openaire.*;
import eu.dnetlib.validator2.validation.utils.TestUtils;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
public class ContinuousValidation {
public class ContinuousValidator {
private static final org.slf4j.Logger logger = LoggerFactory.getLogger(ContinuousValidator.class);
private static final String parametersFile = "input_continuous_validator_parameters.json";
private static final org.slf4j.Logger logger = LoggerFactory.getLogger(ContinuousValidation.class);
private static final String parametersFile = "input_continuous_validation_parameters.json";
public static void main(String[] args) {
ArgumentApplicationParser parser = null;
Boolean isSparkSessionManaged = false;
String parquet_file_path = null;
String parquetPath = null;
String guidelines = null;
String outputPath = null;
@ -41,8 +42,8 @@ public class ContinuousValidator {
.toString(
Objects
.requireNonNull(
ContinuousValidator.class
.getResourceAsStream("/eu/dnetlib/dhp/continuous_validator/" + parametersFile)),
ContinuousValidation.class
.getResourceAsStream("/eu/dnetlib/dhp/continuous_validation/" + parametersFile)),
StandardCharsets.UTF_8);
parser = new ArgumentApplicationParser(jsonConfiguration);
@ -62,22 +63,21 @@ public class ContinuousValidator {
// unit test itself rather than inside the spark application"
// Set the parquet input, either a parquet-file or a directory with parquet files.
parquet_file_path = parser.get("parquet_file_path");
if (parquet_file_path == null) {
logger.error("The \"parquet_file_path\" was not retrieved from the parameters file: " + parametersFile);
parquetPath = parser.get("parquet_path");
if (parquetPath == null) {
logger.error("The \"parquet_path\" was not retrieved from the parameters file: " + parametersFile);
return;
}
guidelines = parser.get("openaire_guidelines");
if (guidelines == null) {
logger
.error("The \"openaire_guidelines\" was not retrieved from the parameters file: " + parametersFile);
logger.error("The \"openaire_guidelines\" was not retrieved from the parameters file: " + parametersFile);
return;
}
outputPath = parser.get("outputPath");
outputPath = parser.get("output_path");
if (outputPath == null) {
logger.error("The \"outputPath\" was not retrieved from the parameters file: " + parametersFile);
logger.error("The \"output_path\" was not retrieved from the parameters file: " + parametersFile);
return;
}
@ -86,8 +86,8 @@ public class ContinuousValidator {
logger
.info(
"Will validate the contents of parquetFile: \"" + parquet_file_path + "\", against guidelines: \""
+ guidelines + "\"" + " and will output the results in: " + outputPath);
"Will validate the contents of parquetFile: \"" + parquetPath + "\", against guidelines: \""
+ guidelines + "\"" + " and will output the results in the outputPath: " + outputPath);
AbstractOpenAireProfile profile;
switch (guidelines) {
@ -112,13 +112,13 @@ public class ContinuousValidator {
}
SparkConf conf = new SparkConf();
conf.setAppName(ContinuousValidator.class.getSimpleName());
conf.setAppName(ContinuousValidation.class.getSimpleName());
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
conf.registerKryoClasses(new Class[] {
XMLApplicationProfile.ValidationResult.class, Guideline.Result.class, StandardValidationResult.class,
StandardResult.class
});
String finalParquet_file_path = parquet_file_path;
String finalParquetPath = parquetPath;
String finalOutputPath = outputPath;
runWithSparkSession(conf, isSparkSessionManaged, spark -> {
@ -132,7 +132,7 @@ public class ContinuousValidator {
spark
.read()
.parquet(finalParquet_file_path)
.parquet(finalParquetPath)
.filter("encoding = 'XML' and id is not NULL and body is not NULL")
.map(validateMapFunction, Encoders.bean(XMLApplicationProfile.ValidationResult.class))
.write()

View File

@ -7,7 +7,7 @@
},
{
"paramName": "prq_file",
"paramLongName": "parquet_file_path",
"paramLongName": "parquet_path",
"paramDescription": "the full path of a parquet-file or a directory with parquet files, to be processed",
"paramRequired": true
},
@ -19,7 +19,7 @@
},
{
"paramName": "o",
"paramLongName": "outputPath",
"paramLongName": "output_path",
"paramDescription": "the path of the output-directory where the result-json-files will be stored",
"paramRequired": true
}

View File

@ -1,7 +1,7 @@
<workflow-app name="Validate metadata records against OpenAIRE Guidelines" xmlns="uri:oozie:workflow:0.5">
<parameters>
<property>
<name>parquet_file_path</name>
<name>parquet_path</name>
<value>/var/lib/dnet/mdstore_PROD/md-7763c517-538d-4aa7-83f8-6096b3ce0d96/md-7763c517-538d-4aa7-83f8-6096b3ce0d96-1702622132535/store</value>
<description>the full path of a parquet-file or a directory with parquet files, to be processed</description>
</property>
@ -11,8 +11,8 @@
<description>the version of the OpenAIRE Guidelines to validate the records against</description>
</property>
<property>
<name>outputPath</name>
<value>/user/lsmyrnaios/continuous_validator/output</value>
<name>output_path</name>
<value>/user/${dhp.hadoop.frontend.user.name}/continuous_validation/output</value>
<description>the path of the output-directory where the result-json-files will be stored</description>
</property>
<property>
@ -89,7 +89,7 @@
<master>yarn</master>
<mode>cluster</mode>
<name>Validate multiple records against OpenAIRE Guidelines</name>
<class>eu.dnetlib.dhp.continuous_validator.ContinuousValidator</class>
<class>eu.dnetlib.dhp.continuous_validation.ContinuousValidation</class>
<jar>dhp-continuous-validation-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
@ -103,9 +103,9 @@
--conf spark.sql.shuffle.partitions=3840
</spark-opts>
<!-- Arguments passed to the "main" method of the class defined above. -->
<arg>--parquet_file_path</arg><arg>${parquet_file_path}</arg>
<arg>--parquet_path</arg><arg>${parquet_path}</arg>
<arg>--openaire_guidelines</arg><arg>${openaire_guidelines}</arg>
<arg>--outputPath</arg><arg>${outputPath}</arg>
<arg>--output_path</arg><arg>${output_path}</arg>
</spark>
<ok to="End"/>
<error to="Kill"/>

View File

@ -4,10 +4,10 @@
<!-- <Appenders>-->
<appender name="RollingFile" class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>logs/ContinuousValidator.log</file>
<file>logs/ContinuousValidation.log</file>
<rollingPolicy class="ch.qos.logback.core.rolling.FixedWindowRollingPolicy">
<fileNamePattern>logs/ContinuousValidator.%i.log.zip</fileNamePattern>
<fileNamePattern>logs/ContinuousValidation.%i.log.zip</fileNamePattern>
<minIndex>1</minIndex>
<maxIndex>10</maxIndex>
</rollingPolicy>
@ -33,8 +33,7 @@
<appender-ref ref="Console" />
</root>
<!-- TODO - Change the level below to "debug" -->
<logger name="eu.dnetlib.dhp.continuous_validator" level="trace"/>
<logger name="eu.dnetlib.dhp.continuous_validation" level="debug"/>
<logger name="eu.dnetlib.validator2" level="error"/>
<logger name="org.sparkproject" level="info"/>
<logger name="org.apache.spark" level="info"/>