From a4c52661a01f8534da611804ec652ac7cf9dc4e5 Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Fri, 20 Mar 2020 19:17:24 +0100 Subject: [PATCH] WIP: fixing dedup workflows --- dhp-workflows/dhp-dedup/pom.xml | 9 +++++++++ .../java/eu/dnetlib/dhp/dedup/DedupRecordFactory.java | 2 -- .../eu/dnetlib/dhp/dedup/SparkCreateDedupRecord.java | 5 ++++- .../java/eu/dnetlib/dhp/dedup/SparkCreateSimRels.java | 8 +++++++- .../eu/dnetlib/dhp/dedup/createSimRels_parameters.json | 2 +- .../eu/dnetlib/dhp/dedup/scan/oozie_app/workflow.xml | 4 ++++ 6 files changed, 25 insertions(+), 5 deletions(-) diff --git a/dhp-workflows/dhp-dedup/pom.xml b/dhp-workflows/dhp-dedup/pom.xml index f39bf62f0..691fbe6d5 100644 --- a/dhp-workflows/dhp-dedup/pom.xml +++ b/dhp-workflows/dhp-dedup/pom.xml @@ -65,6 +65,15 @@ com.arakelian java-jq + + dom4j + dom4j + + + jaxen + jaxen + + eu.dnetlib diff --git a/dhp-workflows/dhp-dedup/src/main/java/eu/dnetlib/dhp/dedup/DedupRecordFactory.java b/dhp-workflows/dhp-dedup/src/main/java/eu/dnetlib/dhp/dedup/DedupRecordFactory.java index 2fcac45fa..583e90ab9 100644 --- a/dhp-workflows/dhp-dedup/src/main/java/eu/dnetlib/dhp/dedup/DedupRecordFactory.java +++ b/dhp-workflows/dhp-dedup/src/main/java/eu/dnetlib/dhp/dedup/DedupRecordFactory.java @@ -15,8 +15,6 @@ import scala.Tuple2; import java.util.Collection; -import static java.util.stream.Collectors.toMap; - public class DedupRecordFactory { public static JavaRDD createDedupRecord(final JavaSparkContext sc, final SparkSession spark, final String mergeRelsInputPath, final String entitiesInputPath, final OafEntityType entityType, final DedupConfig dedupConf) { diff --git a/dhp-workflows/dhp-dedup/src/main/java/eu/dnetlib/dhp/dedup/SparkCreateDedupRecord.java b/dhp-workflows/dhp-dedup/src/main/java/eu/dnetlib/dhp/dedup/SparkCreateDedupRecord.java index 0ce12d10a..51d0760e0 100644 --- a/dhp-workflows/dhp-dedup/src/main/java/eu/dnetlib/dhp/dedup/SparkCreateDedupRecord.java +++ b/dhp-workflows/dhp-dedup/src/main/java/eu/dnetlib/dhp/dedup/SparkCreateDedupRecord.java @@ -34,8 +34,11 @@ public class SparkCreateDedupRecord { for (DedupConfig dedupConf: DedupUtility.getConfigurations(isLookUpUrl, actionSetId)) { String subEntity = dedupConf.getWf().getSubEntityValue(); + final String mergeRelPath = DedupUtility.createMergeRelPath(workingPath, actionSetId, subEntity); + final String entityPath = DedupUtility.createEntityPath(graphBasePath, subEntity); + final OafEntityType entityType = OafEntityType.valueOf(subEntity); final JavaRDD dedupRecord = - DedupRecordFactory.createDedupRecord(sc, spark, DedupUtility.createMergeRelPath(workingPath, actionSetId, subEntity), DedupUtility.createEntityPath(graphBasePath, subEntity), OafEntityType.valueOf(subEntity), dedupConf); + DedupRecordFactory.createDedupRecord(sc, spark, mergeRelPath, entityPath, entityType, dedupConf); dedupRecord.map(r -> { ObjectMapper mapper = new ObjectMapper(); return mapper.writeValueAsString(r); diff --git a/dhp-workflows/dhp-dedup/src/main/java/eu/dnetlib/dhp/dedup/SparkCreateSimRels.java b/dhp-workflows/dhp-dedup/src/main/java/eu/dnetlib/dhp/dedup/SparkCreateSimRels.java index 8298f9867..18d0d4ee6 100644 --- a/dhp-workflows/dhp-dedup/src/main/java/eu/dnetlib/dhp/dedup/SparkCreateSimRels.java +++ b/dhp-workflows/dhp-dedup/src/main/java/eu/dnetlib/dhp/dedup/SparkCreateSimRels.java @@ -47,6 +47,12 @@ public class SparkCreateSimRels implements Serializable { final String actionSetId = parser.get("actionSetId"); final String workingPath = parser.get("workingPath"); + System.out.println(String.format("graphBasePath: '%s'", graphBasePath)); + System.out.println(String.format("rawSet: '%s'", rawSet)); + System.out.println(String.format("isLookUpUrl: '%s'", isLookUpUrl)); + System.out.println(String.format("actionSetId: '%s'", actionSetId)); + System.out.println(String.format("workingPath: '%s'", workingPath)); + try (SparkSession spark = getSparkSession(parser)) { final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); @@ -58,7 +64,7 @@ public class SparkCreateSimRels implements Serializable { final String entity = dedupConf.getWf().getEntityType(); final String subEntity = dedupConf.getWf().getSubEntityValue(); - JavaPairRDD mapDocument = sc.textFile(graphBasePath + "/" + subEntity) + JavaPairRDD mapDocument = sc.textFile(DedupUtility.createEntityPath(graphBasePath, subEntity)) .mapToPair(s -> { MapDocument d = MapDocumentUtil.asMapDocumentWithJPath(dedupConf, s); return new Tuple2<>(d.getIdentifier(), d); diff --git a/dhp-workflows/dhp-dedup/src/main/resources/eu/dnetlib/dhp/dedup/createSimRels_parameters.json b/dhp-workflows/dhp-dedup/src/main/resources/eu/dnetlib/dhp/dedup/createSimRels_parameters.json index 83a030159..b8c8af699 100644 --- a/dhp-workflows/dhp-dedup/src/main/resources/eu/dnetlib/dhp/dedup/createSimRels_parameters.json +++ b/dhp-workflows/dhp-dedup/src/main/resources/eu/dnetlib/dhp/dedup/createSimRels_parameters.json @@ -19,7 +19,7 @@ }, { "paramName": "i", - "paramLongName": "rawGraphBasePath", + "paramLongName": "graphBasePath", "paramDescription": "the base path of the raw graph", "paramRequired": true }, diff --git a/dhp-workflows/dhp-dedup/src/main/resources/eu/dnetlib/dhp/dedup/scan/oozie_app/workflow.xml b/dhp-workflows/dhp-dedup/src/main/resources/eu/dnetlib/dhp/dedup/scan/oozie_app/workflow.xml index 01498ce04..35ed28103 100644 --- a/dhp-workflows/dhp-dedup/src/main/resources/eu/dnetlib/dhp/dedup/scan/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-dedup/src/main/resources/eu/dnetlib/dhp/dedup/scan/oozie_app/workflow.xml @@ -65,6 +65,10 @@ + + + + yarn cluster Create Similarity Relations