From b99a01134545beb75cf3a4114575be61c9c75486 Mon Sep 17 00:00:00 2001 From: Alessia Bardi Date: Tue, 13 Sep 2022 11:51:55 +0200 Subject: [PATCH 1/6] return empty Oaf list if record cannot be parsed --- .../dhp/oa/graph/raw/AbstractMdRecordToOafMapper.java | 2 +- .../test/java/eu/dnetlib/dhp/oa/graph/raw/MappersTest.java | 5 +---- 2 files changed, 2 insertions(+), 5 deletions(-) diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/AbstractMdRecordToOafMapper.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/AbstractMdRecordToOafMapper.java index e263cffa8..984254665 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/AbstractMdRecordToOafMapper.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/AbstractMdRecordToOafMapper.java @@ -143,7 +143,7 @@ public abstract class AbstractMdRecordToOafMapper { return createOafs(doc, type, instances, collectedFrom, info, lastUpdateTimestamp); } catch (DocumentException e) { log.error("Error with record:\n" + xml); - throw e; + return Lists.newArrayList(); } } diff --git a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/raw/MappersTest.java b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/raw/MappersTest.java index c4e34a9a8..8165ad757 100644 --- a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/raw/MappersTest.java +++ b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/raw/MappersTest.java @@ -931,11 +931,8 @@ class MappersTest { void testNotWellFormed() throws IOException, DocumentException { final String xml = IOUtils .toString(Objects.requireNonNull(getClass().getResourceAsStream("oaf_notwellformed.xml"))); - final DocumentException generalEx = new DocumentException(); + assertEquals(0, new OafToOafMapper(vocs, false, true).processMdRecord(xml).size()); - DocumentException exception = assertThrows(DocumentException.class, () -> { - new OafToOafMapper(vocs, false, true).processMdRecord(xml); - }); } private void assertValidId(final String id) { From a0919ed495c61da8d4cecbfa4c322d8f9c95176a Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Wed, 14 Sep 2022 13:27:39 +0200 Subject: [PATCH 2/6] [aggregator graph] save invalid records aside for further inspection --- .../raw/AbstractMdRecordToOafMapper.java | 2 +- .../raw/GenerateEntitiesApplication.java | 37 ++++++++++++++++++- .../graph/generate_entities_parameters.json | 6 +++ .../oa/graph/raw_all/oozie_app/workflow.xml | 2 + .../dnetlib/dhp/oa/graph/raw/MappersTest.java | 2 - .../raw/MigrateDbEntitiesApplicationTest.java | 2 +- 6 files changed, 45 insertions(+), 6 deletions(-) diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/AbstractMdRecordToOafMapper.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/AbstractMdRecordToOafMapper.java index 984254665..cdc707084 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/AbstractMdRecordToOafMapper.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/AbstractMdRecordToOafMapper.java @@ -107,7 +107,7 @@ public abstract class AbstractMdRecordToOafMapper { this.forceOriginalId = false; } - public List processMdRecord(final String xml) throws DocumentException { + public List processMdRecord(final String xml) { DocumentFactory.getInstance().setXPathNamespaceURIs(nsContext); try { diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/GenerateEntitiesApplication.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/GenerateEntitiesApplication.java index 6bb18c375..e9de43f7f 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/GenerateEntitiesApplication.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/GenerateEntitiesApplication.java @@ -16,6 +16,9 @@ import org.apache.hadoop.io.compress.GzipCodec; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.rdd.RDD; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.SparkSession; import org.dom4j.DocumentException; import org.slf4j.Logger; @@ -76,6 +79,9 @@ public class GenerateEntitiesApplication { final String targetPath = parser.get("targetPath"); log.info("targetPath: {}", targetPath); + final String invalidPath = parser.get("invalidPath"); + log.info("invalidPath: {}", invalidPath); + final String isLookupUrl = parser.get("isLookupUrl"); log.info("isLookupUrl: {}", isLookupUrl); @@ -97,7 +103,8 @@ public class GenerateEntitiesApplication { final SparkConf conf = new SparkConf(); runWithSparkSession(conf, isSparkSessionManaged, spark -> { HdfsSupport.remove(targetPath, spark.sparkContext().hadoopConfiguration()); - generateEntities(spark, vocs, sourcePaths, targetPath, shouldHashId, mode); + HdfsSupport.remove(invalidPath, spark.sparkContext().hadoopConfiguration()); + generateEntities(spark, vocs, sourcePaths, targetPath, invalidPath, shouldHashId, mode); }); } @@ -106,6 +113,7 @@ public class GenerateEntitiesApplication { final VocabularyGroup vocs, final String sourcePaths, final String targetPath, + final String invalidPath, final boolean shouldHashId, final Mode mode) { @@ -121,6 +129,19 @@ public class GenerateEntitiesApplication { JavaRDD inputRdd = sc.emptyRDD(); for (final String sp : existingSourcePaths) { + RDD invalidRecords = sc + .sequenceFile(sp, Text.class, Text.class) + .map(k -> new Tuple2<>(k._1().toString(), k._2().toString())) + .map(k -> tryApplyMapping(k._1(), k._2(), shouldHashId, vocs)) + .filter(Objects::nonNull) + .rdd(); + spark + .createDataset(invalidRecords, Encoders.STRING()) + .write() + .mode(SaveMode.Append) + .option("compression", "gzip") + .text(invalidPath); + inputRdd = inputRdd .union( sc @@ -159,7 +180,7 @@ public class GenerateEntitiesApplication { final String id, final String s, final boolean shouldHashId, - final VocabularyGroup vocs) throws DocumentException { + final VocabularyGroup vocs) { final String type = StringUtils.substringAfter(id, ":"); switch (type.toLowerCase()) { @@ -196,6 +217,18 @@ public class GenerateEntitiesApplication { } } + private static String tryApplyMapping( + final String id, + final String s, + final boolean shouldHashId, + final VocabularyGroup vocs) { + + if (convertToListOaf(id, s, shouldHashId, vocs).isEmpty()) { + return s; + } + return null; + } + private static Oaf convertFromJson(final String s, final Class clazz) { try { return OBJECT_MAPPER.readValue(s, clazz); diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/generate_entities_parameters.json b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/generate_entities_parameters.json index 52cbbf45f..da6730fbb 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/generate_entities_parameters.json +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/generate_entities_parameters.json @@ -17,6 +17,12 @@ "paramDescription": "the path of the target file", "paramRequired": true }, + { + "paramName": "i", + "paramLongName": "invalidPath", + "paramDescription": "the path of the invalid records file", + "paramRequired": false + }, { "paramName": "isu", "paramLongName": "isLookupUrl", diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_all/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_all/oozie_app/workflow.xml index c6cc46c0f..d00232e9a 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_all/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_all/oozie_app/workflow.xml @@ -468,6 +468,7 @@ --sourcePaths${contentPath}/db_claims,${contentPath}/oaf_claims,${contentPath}/odf_claims --targetPath${workingDir}/entities_claim + --invalidPath${workingDir}/invalid_records_claim --isLookupUrl${isLookupUrl} --shouldHashId${shouldHashId} --modeclaim @@ -517,6 +518,7 @@ --sourcePaths${contentPath}/db_openaire,${contentPath}/db_openorgs,${contentPath}/oaf_records,${contentPath}/odf_records,${contentPath}/oaf_records_hdfs,${contentPath}/odf_records_hdfs,${contentPath}/oaf_records_invisible --targetPath${workingDir}/entities + --invalidPath${workingDir}/invalid_records --isLookupUrl${isLookupUrl} --shouldHashId${shouldHashId} diff --git a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/raw/MappersTest.java b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/raw/MappersTest.java index 8165ad757..204649633 100644 --- a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/raw/MappersTest.java +++ b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/raw/MappersTest.java @@ -12,7 +12,6 @@ import java.util.Objects; import java.util.Optional; import java.util.stream.Collectors; -import kotlin.jvm.Throws; import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; import org.dom4j.DocumentException; @@ -22,7 +21,6 @@ import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; -import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup; diff --git a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/raw/MigrateDbEntitiesApplicationTest.java b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/raw/MigrateDbEntitiesApplicationTest.java index 408196665..11947dbe7 100644 --- a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/raw/MigrateDbEntitiesApplicationTest.java +++ b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/raw/MigrateDbEntitiesApplicationTest.java @@ -18,7 +18,6 @@ import java.util.Objects; import java.util.Optional; import java.util.stream.Collectors; -import eu.dnetlib.dhp.schema.common.ModelConstants; import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; import org.junit.jupiter.api.BeforeEach; @@ -32,6 +31,7 @@ import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup; +import eu.dnetlib.dhp.schema.common.ModelConstants; import eu.dnetlib.dhp.schema.oaf.*; import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils; From c48f6e9c57c6b1fe549d59e032bc0cad6fa01f66 Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Wed, 14 Sep 2022 17:11:26 +0200 Subject: [PATCH 3/6] [aggregator graph] save invalid records aside for further inspection --- .../dhp/oa/graph/raw/AbstractMdRecordToOafMapper.java | 2 +- .../dhp/oa/graph/raw/GenerateEntitiesApplication.java | 8 +++++--- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/AbstractMdRecordToOafMapper.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/AbstractMdRecordToOafMapper.java index cdc707084..0a32766c9 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/AbstractMdRecordToOafMapper.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/AbstractMdRecordToOafMapper.java @@ -143,7 +143,7 @@ public abstract class AbstractMdRecordToOafMapper { return createOafs(doc, type, instances, collectedFrom, info, lastUpdateTimestamp); } catch (DocumentException e) { log.error("Error with record:\n" + xml); - return Lists.newArrayList(); + return null; } } diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/GenerateEntitiesApplication.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/GenerateEntitiesApplication.java index e9de43f7f..290a22656 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/GenerateEntitiesApplication.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/GenerateEntitiesApplication.java @@ -126,8 +126,6 @@ public class GenerateEntitiesApplication { log.info("Generate entities from files:"); existingSourcePaths.forEach(log::info); - JavaRDD inputRdd = sc.emptyRDD(); - for (final String sp : existingSourcePaths) { RDD invalidRecords = sc .sequenceFile(sp, Text.class, Text.class) @@ -141,7 +139,11 @@ public class GenerateEntitiesApplication { .mode(SaveMode.Append) .option("compression", "gzip") .text(invalidPath); + } + JavaRDD inputRdd = sc.emptyRDD(); + + for (final String sp : existingSourcePaths) { inputRdd = inputRdd .union( sc @@ -223,7 +225,7 @@ public class GenerateEntitiesApplication { final boolean shouldHashId, final VocabularyGroup vocs) { - if (convertToListOaf(id, s, shouldHashId, vocs).isEmpty()) { + if (Objects.isNull(convertToListOaf(id, s, shouldHashId, vocs))) { return s; } return null; From 9e7ec4198fb99514b2f5e5f1c7817ffa7d0c1f8e Mon Sep 17 00:00:00 2001 From: Alessia Bardi Date: Wed, 14 Sep 2022 18:08:56 +0200 Subject: [PATCH 4/6] fixed test --- .../src/test/java/eu/dnetlib/dhp/oa/graph/raw/MappersTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/raw/MappersTest.java b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/raw/MappersTest.java index 204649633..390920027 100644 --- a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/raw/MappersTest.java +++ b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/raw/MappersTest.java @@ -929,7 +929,7 @@ class MappersTest { void testNotWellFormed() throws IOException, DocumentException { final String xml = IOUtils .toString(Objects.requireNonNull(getClass().getResourceAsStream("oaf_notwellformed.xml"))); - assertEquals(0, new OafToOafMapper(vocs, false, true).processMdRecord(xml).size()); + assertEquals(null, new OafToOafMapper(vocs, false, true).processMdRecord(xml)); } From 1e42d984e1ead5e1e7519d69f8150f44060f9565 Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Thu, 15 Sep 2022 10:49:42 +0200 Subject: [PATCH 5/6] [aggregator graph] save invalid records aside for further inspection --- .../dhp/oa/graph/raw/AbstractMdRecordToOafMapper.java | 2 +- .../dhp/oa/graph/raw/GenerateEntitiesApplication.java | 10 +++++----- .../java/eu/dnetlib/dhp/oa/graph/raw/MappersTest.java | 7 ++++--- 3 files changed, 10 insertions(+), 9 deletions(-) diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/AbstractMdRecordToOafMapper.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/AbstractMdRecordToOafMapper.java index 0a32766c9..cdc707084 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/AbstractMdRecordToOafMapper.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/AbstractMdRecordToOafMapper.java @@ -143,7 +143,7 @@ public abstract class AbstractMdRecordToOafMapper { return createOafs(doc, type, instances, collectedFrom, info, lastUpdateTimestamp); } catch (DocumentException e) { log.error("Error with record:\n" + xml); - return null; + return Lists.newArrayList(); } } diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/GenerateEntitiesApplication.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/GenerateEntitiesApplication.java index 290a22656..06d5e9acb 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/GenerateEntitiesApplication.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/GenerateEntitiesApplication.java @@ -150,8 +150,8 @@ public class GenerateEntitiesApplication { .sequenceFile(sp, Text.class, Text.class) .map(k -> new Tuple2<>(k._1().toString(), k._2().toString())) .map(k -> convertToListOaf(k._1(), k._2(), shouldHashId, vocs)) - .filter(Objects::nonNull) - .flatMap(List::iterator)); + .flatMap(List::iterator) + .filter(Objects::nonNull)); } switch (mode) { @@ -225,7 +225,8 @@ public class GenerateEntitiesApplication { final boolean shouldHashId, final VocabularyGroup vocs) { - if (Objects.isNull(convertToListOaf(id, s, shouldHashId, vocs))) { + final List oaf = convertToListOaf(id, s, shouldHashId, vocs); + if (Optional.ofNullable(oaf).map(List::isEmpty).orElse(false)) { return s; } return null; @@ -235,8 +236,7 @@ public class GenerateEntitiesApplication { try { return OBJECT_MAPPER.readValue(s, clazz); } catch (final Exception e) { - log.error("Error parsing object of class: {}", clazz); - log.error(s); + log.error("Error parsing object of class: {}:\n{}", clazz, s); throw new IllegalArgumentException(e); } } diff --git a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/raw/MappersTest.java b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/raw/MappersTest.java index 390920027..506a69012 100644 --- a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/raw/MappersTest.java +++ b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/raw/MappersTest.java @@ -926,11 +926,12 @@ class MappersTest { } @Test - void testNotWellFormed() throws IOException, DocumentException { + void testNotWellFormed() throws IOException { final String xml = IOUtils .toString(Objects.requireNonNull(getClass().getResourceAsStream("oaf_notwellformed.xml"))); - assertEquals(null, new OafToOafMapper(vocs, false, true).processMdRecord(xml)); - + final List actual = new OafToOafMapper(vocs, false, true).processMdRecord(xml); + assertNotNull(actual); + assertTrue(actual.isEmpty()); } private void assertValidId(final String id) { From e370e940d871b5b9772eff48300ad8d9074641c3 Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Fri, 16 Sep 2022 14:06:28 +0200 Subject: [PATCH 6/6] [aggregator graph] save invalid records aside for further inspection --- .../raw/GenerateEntitiesApplication.java | 37 +----- .../graph/raw/VerifyRecordsApplication.java | 108 ++++++++++++++++++ .../common/AbstractMigrationApplication.java | 6 +- .../graph/generate_entities_parameters.json | 6 - .../oa/graph/raw_all/oozie_app/workflow.xml | 54 ++++++++- .../oa/graph/verify_records_parameters.json | 26 +++++ 6 files changed, 191 insertions(+), 46 deletions(-) create mode 100644 dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/VerifyRecordsApplication.java create mode 100644 dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/verify_records_parameters.json diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/GenerateEntitiesApplication.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/GenerateEntitiesApplication.java index 06d5e9acb..5f9d98073 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/GenerateEntitiesApplication.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/GenerateEntitiesApplication.java @@ -79,9 +79,6 @@ public class GenerateEntitiesApplication { final String targetPath = parser.get("targetPath"); log.info("targetPath: {}", targetPath); - final String invalidPath = parser.get("invalidPath"); - log.info("invalidPath: {}", invalidPath); - final String isLookupUrl = parser.get("isLookupUrl"); log.info("isLookupUrl: {}", isLookupUrl); @@ -103,8 +100,7 @@ public class GenerateEntitiesApplication { final SparkConf conf = new SparkConf(); runWithSparkSession(conf, isSparkSessionManaged, spark -> { HdfsSupport.remove(targetPath, spark.sparkContext().hadoopConfiguration()); - HdfsSupport.remove(invalidPath, spark.sparkContext().hadoopConfiguration()); - generateEntities(spark, vocs, sourcePaths, targetPath, invalidPath, shouldHashId, mode); + generateEntities(spark, vocs, sourcePaths, targetPath, shouldHashId, mode); }); } @@ -113,7 +109,6 @@ public class GenerateEntitiesApplication { final VocabularyGroup vocs, final String sourcePaths, final String targetPath, - final String invalidPath, final boolean shouldHashId, final Mode mode) { @@ -126,21 +121,6 @@ public class GenerateEntitiesApplication { log.info("Generate entities from files:"); existingSourcePaths.forEach(log::info); - for (final String sp : existingSourcePaths) { - RDD invalidRecords = sc - .sequenceFile(sp, Text.class, Text.class) - .map(k -> new Tuple2<>(k._1().toString(), k._2().toString())) - .map(k -> tryApplyMapping(k._1(), k._2(), shouldHashId, vocs)) - .filter(Objects::nonNull) - .rdd(); - spark - .createDataset(invalidRecords, Encoders.STRING()) - .write() - .mode(SaveMode.Append) - .option("compression", "gzip") - .text(invalidPath); - } - JavaRDD inputRdd = sc.emptyRDD(); for (final String sp : existingSourcePaths) { @@ -178,7 +158,7 @@ public class GenerateEntitiesApplication { .saveAsTextFile(targetPath, GzipCodec.class); } - private static List convertToListOaf( + public static List convertToListOaf( final String id, final String s, final boolean shouldHashId, @@ -219,19 +199,6 @@ public class GenerateEntitiesApplication { } } - private static String tryApplyMapping( - final String id, - final String s, - final boolean shouldHashId, - final VocabularyGroup vocs) { - - final List oaf = convertToListOaf(id, s, shouldHashId, vocs); - if (Optional.ofNullable(oaf).map(List::isEmpty).orElse(false)) { - return s; - } - return null; - } - private static Oaf convertFromJson(final String s, final Class clazz) { try { return OBJECT_MAPPER.readValue(s, clazz); diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/VerifyRecordsApplication.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/VerifyRecordsApplication.java new file mode 100644 index 000000000..a8eb871c8 --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/VerifyRecordsApplication.java @@ -0,0 +1,108 @@ + +package eu.dnetlib.dhp.oa.graph.raw; + +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; + +import java.util.Arrays; +import java.util.List; +import java.util.Objects; +import java.util.Optional; +import java.util.stream.Collectors; + +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.io.Text; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.rdd.RDD; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.SparkSession; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.common.HdfsSupport; +import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup; +import eu.dnetlib.dhp.schema.oaf.Oaf; +import eu.dnetlib.dhp.utils.ISLookupClientFactory; +import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; +import scala.Tuple2; + +public class VerifyRecordsApplication { + + private static final Logger log = LoggerFactory.getLogger(VerifyRecordsApplication.class); + + public static void main(final String[] args) throws Exception { + final ArgumentApplicationParser parser = new ArgumentApplicationParser( + IOUtils + .toString( + VerifyRecordsApplication.class + .getResourceAsStream("/eu/dnetlib/dhp/oa/graph/verify_records_parameters.json"))); + + parser.parseArgument(args); + + final Boolean isSparkSessionManaged = Optional + .ofNullable(parser.get("isSparkSessionManaged")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); + log.info("isSparkSessionManaged: {}", isSparkSessionManaged); + + final String sourcePaths = parser.get("sourcePaths"); + log.info("sourcePaths: {}", sourcePaths); + + final String invalidPath = parser.get("invalidPath"); + log.info("invalidPath: {}", invalidPath); + + final String isLookupUrl = parser.get("isLookupUrl"); + log.info("isLookupUrl: {}", isLookupUrl); + + final ISLookUpService isLookupService = ISLookupClientFactory.getLookUpService(isLookupUrl); + final VocabularyGroup vocs = VocabularyGroup.loadVocsFromIS(isLookupService); + + final SparkConf conf = new SparkConf(); + runWithSparkSession(conf, isSparkSessionManaged, spark -> { + HdfsSupport.remove(invalidPath, spark.sparkContext().hadoopConfiguration()); + validateRecords(spark, sourcePaths, invalidPath, vocs); + }); + } + + private static void validateRecords(SparkSession spark, String sourcePaths, String invalidPath, + VocabularyGroup vocs) { + + final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); + final List existingSourcePaths = Arrays + .stream(sourcePaths.split(",")) + .filter(p -> HdfsSupport.exists(p, sc.hadoopConfiguration())) + .collect(Collectors.toList()); + + log.info("Verify records in files:"); + existingSourcePaths.forEach(log::info); + + for (final String sp : existingSourcePaths) { + RDD invalidRecords = sc + .sequenceFile(sp, Text.class, Text.class) + .map(k -> tryApplyMapping(k._1().toString(), k._2().toString(), true, vocs)) + .filter(Objects::nonNull) + .rdd(); + spark + .createDataset(invalidRecords, Encoders.STRING()) + .write() + .mode(SaveMode.Append) + .option("compression", "gzip") + .text(invalidPath); + } + } + + private static String tryApplyMapping( + final String id, + final String xmlRecord, + final boolean shouldHashId, + final VocabularyGroup vocs) { + + final List oaf = GenerateEntitiesApplication.convertToListOaf(id, xmlRecord, shouldHashId, vocs); + if (Optional.ofNullable(oaf).map(List::isEmpty).orElse(false)) { + return xmlRecord; + } + return null; + } +} diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/common/AbstractMigrationApplication.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/common/AbstractMigrationApplication.java index cba64899b..6f63e9327 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/common/AbstractMigrationApplication.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/common/AbstractMigrationApplication.java @@ -4,6 +4,7 @@ package eu.dnetlib.dhp.oa.graph.raw.common; import java.io.Closeable; import java.io.IOException; import java.util.Arrays; +import java.util.List; import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; @@ -24,8 +25,11 @@ import org.apache.http.impl.client.HttpClients; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; +import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup; +import eu.dnetlib.dhp.oa.graph.raw.OafToOafMapper; +import eu.dnetlib.dhp.oa.graph.raw.OdfToOafMapper; import eu.dnetlib.dhp.schema.mdstore.MDStoreWithInfo; -import eu.dnetlib.dhp.schema.oaf.Oaf; +import eu.dnetlib.dhp.schema.oaf.*; import eu.dnetlib.dhp.utils.DHPUtils; public class AbstractMigrationApplication implements Closeable { diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/generate_entities_parameters.json b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/generate_entities_parameters.json index da6730fbb..52cbbf45f 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/generate_entities_parameters.json +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/generate_entities_parameters.json @@ -17,12 +17,6 @@ "paramDescription": "the path of the target file", "paramRequired": true }, - { - "paramName": "i", - "paramLongName": "invalidPath", - "paramDescription": "the path of the invalid records file", - "paramRequired": false - }, { "paramName": "isu", "paramLongName": "isLookupUrl", diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_all/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_all/oozie_app/workflow.xml index d00232e9a..8262c6923 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_all/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_all/oozie_app/workflow.xml @@ -446,10 +446,34 @@ - - + + + + + yarn + cluster + VerifyRecords_claim + eu.dnetlib.dhp.oa.graph.raw.VerifyRecordsApplication + dhp-graph-mapper-${projectVersion}.jar + + --executor-memory ${sparkExecutorMemory} + --executor-cores ${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + + --sourcePaths${contentPath}/db_claims,${contentPath}/oaf_claims,${contentPath}/odf_claims + --invalidPath${workingDir}/invalid_records_claim + --isLookupUrl${isLookupUrl} + + + + + yarn @@ -468,7 +492,6 @@ --sourcePaths${contentPath}/db_claims,${contentPath}/oaf_claims,${contentPath}/odf_claims --targetPath${workingDir}/entities_claim - --invalidPath${workingDir}/invalid_records_claim --isLookupUrl${isLookupUrl} --shouldHashId${shouldHashId} --modeclaim @@ -500,6 +523,30 @@ + + + yarn + cluster + VerifyRecords + eu.dnetlib.dhp.oa.graph.raw.VerifyRecordsApplication + dhp-graph-mapper-${projectVersion}.jar + + --executor-memory ${sparkExecutorMemory} + --executor-cores ${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + + --sourcePaths${contentPath}/db_openaire,${contentPath}/db_openorgs,${contentPath}/oaf_records,${contentPath}/odf_records,${contentPath}/oaf_records_hdfs,${contentPath}/odf_records_hdfs,${contentPath}/oaf_records_invisible + --invalidPath${workingDir}/invalid_records + --isLookupUrl${isLookupUrl} + + + + + yarn @@ -518,7 +565,6 @@ --sourcePaths${contentPath}/db_openaire,${contentPath}/db_openorgs,${contentPath}/oaf_records,${contentPath}/odf_records,${contentPath}/oaf_records_hdfs,${contentPath}/odf_records_hdfs,${contentPath}/oaf_records_invisible --targetPath${workingDir}/entities - --invalidPath${workingDir}/invalid_records --isLookupUrl${isLookupUrl} --shouldHashId${shouldHashId} diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/verify_records_parameters.json b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/verify_records_parameters.json new file mode 100644 index 000000000..eb00e7609 --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/verify_records_parameters.json @@ -0,0 +1,26 @@ +[ + { + "paramName": "issm", + "paramLongName": "isSparkSessionManaged", + "paramDescription": "when true will stop SparkSession after job execution", + "paramRequired": false + }, + { + "paramName": "s", + "paramLongName": "sourcePaths", + "paramDescription": "the HDFS source paths which contains the sequential file (comma separated)", + "paramRequired": true + }, + { + "paramName": "i", + "paramLongName": "invalidPath", + "paramDescription": "the path of the invalid records file", + "paramRequired": false + }, + { + "paramName": "isu", + "paramLongName": "isLookupUrl", + "paramDescription": "the url of the ISLookupService", + "paramRequired": true + } +] \ No newline at end of file