From a0919ed495c61da8d4cecbfa4c322d8f9c95176a Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Wed, 14 Sep 2022 13:27:39 +0200 Subject: [PATCH] [aggregator graph] save invalid records aside for further inspection --- .../raw/AbstractMdRecordToOafMapper.java | 2 +- .../raw/GenerateEntitiesApplication.java | 37 ++++++++++++++++++- .../graph/generate_entities_parameters.json | 6 +++ .../oa/graph/raw_all/oozie_app/workflow.xml | 2 + .../dnetlib/dhp/oa/graph/raw/MappersTest.java | 2 - .../raw/MigrateDbEntitiesApplicationTest.java | 2 +- 6 files changed, 45 insertions(+), 6 deletions(-) diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/AbstractMdRecordToOafMapper.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/AbstractMdRecordToOafMapper.java index 984254665..cdc707084 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/AbstractMdRecordToOafMapper.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/AbstractMdRecordToOafMapper.java @@ -107,7 +107,7 @@ public abstract class AbstractMdRecordToOafMapper { this.forceOriginalId = false; } - public List processMdRecord(final String xml) throws DocumentException { + public List processMdRecord(final String xml) { DocumentFactory.getInstance().setXPathNamespaceURIs(nsContext); try { diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/GenerateEntitiesApplication.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/GenerateEntitiesApplication.java index 6bb18c375..e9de43f7f 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/GenerateEntitiesApplication.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/GenerateEntitiesApplication.java @@ -16,6 +16,9 @@ import org.apache.hadoop.io.compress.GzipCodec; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.rdd.RDD; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.SparkSession; import org.dom4j.DocumentException; import org.slf4j.Logger; @@ -76,6 +79,9 @@ public class GenerateEntitiesApplication { final String targetPath = parser.get("targetPath"); log.info("targetPath: {}", targetPath); + final String invalidPath = parser.get("invalidPath"); + log.info("invalidPath: {}", invalidPath); + final String isLookupUrl = parser.get("isLookupUrl"); log.info("isLookupUrl: {}", isLookupUrl); @@ -97,7 +103,8 @@ public class GenerateEntitiesApplication { final SparkConf conf = new SparkConf(); runWithSparkSession(conf, isSparkSessionManaged, spark -> { HdfsSupport.remove(targetPath, spark.sparkContext().hadoopConfiguration()); - generateEntities(spark, vocs, sourcePaths, targetPath, shouldHashId, mode); + HdfsSupport.remove(invalidPath, spark.sparkContext().hadoopConfiguration()); + generateEntities(spark, vocs, sourcePaths, targetPath, invalidPath, shouldHashId, mode); }); } @@ -106,6 +113,7 @@ public class GenerateEntitiesApplication { final VocabularyGroup vocs, final String sourcePaths, final String targetPath, + final String invalidPath, final boolean shouldHashId, final Mode mode) { @@ -121,6 +129,19 @@ public class GenerateEntitiesApplication { JavaRDD inputRdd = sc.emptyRDD(); for (final String sp : existingSourcePaths) { + RDD invalidRecords = sc + .sequenceFile(sp, Text.class, Text.class) + .map(k -> new Tuple2<>(k._1().toString(), k._2().toString())) + .map(k -> tryApplyMapping(k._1(), k._2(), shouldHashId, vocs)) + .filter(Objects::nonNull) + .rdd(); + spark + .createDataset(invalidRecords, Encoders.STRING()) + .write() + .mode(SaveMode.Append) + .option("compression", "gzip") + .text(invalidPath); + inputRdd = inputRdd .union( sc @@ -159,7 +180,7 @@ public class GenerateEntitiesApplication { final String id, final String s, final boolean shouldHashId, - final VocabularyGroup vocs) throws DocumentException { + final VocabularyGroup vocs) { final String type = StringUtils.substringAfter(id, ":"); switch (type.toLowerCase()) { @@ -196,6 +217,18 @@ public class GenerateEntitiesApplication { } } + private static String tryApplyMapping( + final String id, + final String s, + final boolean shouldHashId, + final VocabularyGroup vocs) { + + if (convertToListOaf(id, s, shouldHashId, vocs).isEmpty()) { + return s; + } + return null; + } + private static Oaf convertFromJson(final String s, final Class clazz) { try { return OBJECT_MAPPER.readValue(s, clazz); diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/generate_entities_parameters.json b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/generate_entities_parameters.json index 52cbbf45f..da6730fbb 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/generate_entities_parameters.json +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/generate_entities_parameters.json @@ -17,6 +17,12 @@ "paramDescription": "the path of the target file", "paramRequired": true }, + { + "paramName": "i", + "paramLongName": "invalidPath", + "paramDescription": "the path of the invalid records file", + "paramRequired": false + }, { "paramName": "isu", "paramLongName": "isLookupUrl", diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_all/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_all/oozie_app/workflow.xml index c6cc46c0f..d00232e9a 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_all/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_all/oozie_app/workflow.xml @@ -468,6 +468,7 @@ --sourcePaths${contentPath}/db_claims,${contentPath}/oaf_claims,${contentPath}/odf_claims --targetPath${workingDir}/entities_claim + --invalidPath${workingDir}/invalid_records_claim --isLookupUrl${isLookupUrl} --shouldHashId${shouldHashId} --modeclaim @@ -517,6 +518,7 @@ --sourcePaths${contentPath}/db_openaire,${contentPath}/db_openorgs,${contentPath}/oaf_records,${contentPath}/odf_records,${contentPath}/oaf_records_hdfs,${contentPath}/odf_records_hdfs,${contentPath}/oaf_records_invisible --targetPath${workingDir}/entities + --invalidPath${workingDir}/invalid_records --isLookupUrl${isLookupUrl} --shouldHashId${shouldHashId} diff --git a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/raw/MappersTest.java b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/raw/MappersTest.java index 8165ad757..204649633 100644 --- a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/raw/MappersTest.java +++ b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/raw/MappersTest.java @@ -12,7 +12,6 @@ import java.util.Objects; import java.util.Optional; import java.util.stream.Collectors; -import kotlin.jvm.Throws; import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; import org.dom4j.DocumentException; @@ -22,7 +21,6 @@ import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; -import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup; diff --git a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/raw/MigrateDbEntitiesApplicationTest.java b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/raw/MigrateDbEntitiesApplicationTest.java index 408196665..11947dbe7 100644 --- a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/raw/MigrateDbEntitiesApplicationTest.java +++ b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/raw/MigrateDbEntitiesApplicationTest.java @@ -18,7 +18,6 @@ import java.util.Objects; import java.util.Optional; import java.util.stream.Collectors; -import eu.dnetlib.dhp.schema.common.ModelConstants; import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; import org.junit.jupiter.api.BeforeEach; @@ -32,6 +31,7 @@ import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup; +import eu.dnetlib.dhp.schema.common.ModelConstants; import eu.dnetlib.dhp.schema.oaf.*; import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils;