From f7890a90dfd46e13902077fd4c01ebdc445b24d9 Mon Sep 17 00:00:00 2001 From: miconis Date: Mon, 23 Mar 2020 17:13:30 +0100 Subject: [PATCH] implementation of the mechanism that checks the existance of a mergerel file --- .../dnetlib/dhp/dedup/SparkUpdateEntity.java | 102 ++++++++++-------- .../config-default.xml | 0 .../workflow.xml | 43 +++++++- .../dhp/dedup/roots/oozie_app/workflow.xml | 38 ++----- .../dhp/dedup/updateEntity_parameters.json | 12 --- .../dhp/dedup/SparkCreateDedupTest.java | 36 +++++-- 6 files changed, 137 insertions(+), 94 deletions(-) rename dhp-workflows/dhp-dedup/src/main/resources/eu/dnetlib/dhp/dedup/{relations/oozie_app => consistency.oozie_app}/config-default.xml (100%) rename dhp-workflows/dhp-dedup/src/main/resources/eu/dnetlib/dhp/dedup/{relations/oozie_app => consistency.oozie_app}/workflow.xml (56%) diff --git a/dhp-workflows/dhp-dedup/src/main/java/eu/dnetlib/dhp/dedup/SparkUpdateEntity.java b/dhp-workflows/dhp-dedup/src/main/java/eu/dnetlib/dhp/dedup/SparkUpdateEntity.java index 0c9890b03..dd079e4cd 100644 --- a/dhp-workflows/dhp-dedup/src/main/java/eu/dnetlib/dhp/dedup/SparkUpdateEntity.java +++ b/dhp-workflows/dhp-dedup/src/main/java/eu/dnetlib/dhp/dedup/SparkUpdateEntity.java @@ -4,10 +4,10 @@ import com.fasterxml.jackson.databind.DeserializationFeature; import com.fasterxml.jackson.databind.ObjectMapper; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.schema.oaf.*; -import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; -import eu.dnetlib.pace.config.DedupConfig; import eu.dnetlib.pace.util.MapDocumentUtil; import org.apache.commons.io.IOUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.*; import org.apache.hadoop.io.compress.GzipCodec; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; @@ -18,13 +18,14 @@ import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; -import org.dom4j.DocumentException; import scala.Tuple2; import java.io.IOException; public class SparkUpdateEntity { + final String IDJSONPATH = "$.id"; + public static void main(String[] args) throws Exception { final ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString(SparkUpdateEntity.class.getResourceAsStream("/eu/dnetlib/dhp/dedup/updateEntity_parameters.json"))); parser.parseArgument(args); @@ -32,65 +33,82 @@ public class SparkUpdateEntity { new SparkUpdateEntity().run(parser); } - public void run(ArgumentApplicationParser parser) throws ISLookUpException, DocumentException { + public boolean mergeRelExists(String basePath, String entity) throws IOException { + + boolean result = false; + + FileSystem fileSystem = FileSystem.get(new Configuration()); + + FileStatus[] fileStatuses = fileSystem.listStatus(new Path(basePath)); + + for (FileStatus fs : fileStatuses) { + if (fs.isDirectory()) + if (fileSystem.exists(new Path(DedupUtility.createMergeRelPath(basePath, fs.getPath().getName(), entity)))) + result = true; + } + + return result; + } + + public void run(ArgumentApplicationParser parser) throws IOException { final String graphBasePath = parser.get("graphBasePath"); final String workingPath = parser.get("workingPath"); final String dedupGraphPath = parser.get("dedupGraphPath"); - final String isLookUpUrl = parser.get("isLookUpUrl"); - final String actionSetId = parser.get("actionSetId"); try (SparkSession spark = getSparkSession(parser)) { final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); - for (DedupConfig dedupConf : DedupUtility.getConfigurations(isLookUpUrl, actionSetId)) { + //for each entity + for (OafEntityType entity: OafEntityType.values()) { - String subEntity = dedupConf.getWf().getSubEntityValue(); + JavaRDD sourceEntity = sc.textFile(DedupUtility.createEntityPath(graphBasePath, entity.toString())); - final Dataset df = spark.read().load(DedupUtility.createMergeRelPath(workingPath, actionSetId, subEntity)).as(Encoders.bean(Relation.class)); - final JavaPairRDD mergedIds = df - .where("relClass == 'merges'") - .select(df.col("target")) - .distinct() - .toJavaRDD() - .mapToPair((PairFunction) r -> new Tuple2<>(r.getString(0), "d")); + if (mergeRelExists(workingPath, entity.toString())) { - final JavaRDD sourceEntity = sc.textFile(DedupUtility.createEntityPath(graphBasePath, subEntity)); + final Dataset rel = spark.read().load(DedupUtility.createMergeRelPath(workingPath, "*", entity.toString())).as(Encoders.bean(Relation.class)); - final JavaRDD dedupEntity = sc.textFile(DedupUtility.createDedupRecordPath(workingPath, actionSetId, subEntity)); + final JavaPairRDD mergedIds = rel + .where("relClass == 'merges'") + .select(rel.col("target")) + .distinct() + .toJavaRDD() + .mapToPair((PairFunction) r -> new Tuple2<>(r.getString(0), "d")); - JavaPairRDD entitiesWithId = sourceEntity.mapToPair((PairFunction) s -> new Tuple2<>(MapDocumentUtil.getJPathString(dedupConf.getWf().getIdPath(), s), s)); + final JavaRDD dedupEntity = sc.textFile(DedupUtility.createDedupRecordPath(workingPath, "*", entity.toString())); - Class mainClass; - switch (subEntity) { - case "publication": - mainClass = Publication.class; - break; - case "dataset": - mainClass = eu.dnetlib.dhp.schema.oaf.Dataset.class; - break; - case "datasource": - mainClass = Datasource.class; - break; - case "software": - mainClass = Software.class; - break; - case "organization": - mainClass = Organization.class; - break; - case "otherresearchproduct": - mainClass = OtherResearchProduct.class; - break; - default: - throw new IllegalArgumentException("Illegal type " + subEntity); + JavaPairRDD entitiesWithId = sourceEntity.mapToPair((PairFunction) s -> new Tuple2<>(MapDocumentUtil.getJPathString(IDJSONPATH, s), s)); + + JavaRDD map = entitiesWithId.leftOuterJoin(mergedIds).map(k -> k._2()._2().isPresent() ? updateDeletedByInference(k._2()._1(), getOafClass(entity)) : k._2()._1()); + sourceEntity = map.union(dedupEntity); } - JavaRDD map = entitiesWithId.leftOuterJoin(mergedIds).map(k -> k._2()._2().isPresent() ? updateDeletedByInference(k._2()._1(), mainClass) : k._2()._1()); - map.union(dedupEntity).saveAsTextFile(dedupGraphPath + "/" + subEntity, GzipCodec.class); + sourceEntity.saveAsTextFile(dedupGraphPath + "/" + entity, GzipCodec.class); + } } + } + public Class getOafClass(OafEntityType className) { + switch (className.toString()) { + case "publication": + return Publication.class; + case "dataset": + return eu.dnetlib.dhp.schema.oaf.Dataset.class; + case "datasource": + return Datasource.class; + case "software": + return Software.class; + case "organization": + return Organization.class; + case "otherresearchproduct": + return OtherResearchProduct.class; + case "project": + return Project.class; + default: + throw new IllegalArgumentException("Illegal type " + className); + } } private static String updateDeletedByInference(final String json, final Class clazz) { diff --git a/dhp-workflows/dhp-dedup/src/main/resources/eu/dnetlib/dhp/dedup/relations/oozie_app/config-default.xml b/dhp-workflows/dhp-dedup/src/main/resources/eu/dnetlib/dhp/dedup/consistency.oozie_app/config-default.xml similarity index 100% rename from dhp-workflows/dhp-dedup/src/main/resources/eu/dnetlib/dhp/dedup/relations/oozie_app/config-default.xml rename to dhp-workflows/dhp-dedup/src/main/resources/eu/dnetlib/dhp/dedup/consistency.oozie_app/config-default.xml diff --git a/dhp-workflows/dhp-dedup/src/main/resources/eu/dnetlib/dhp/dedup/relations/oozie_app/workflow.xml b/dhp-workflows/dhp-dedup/src/main/resources/eu/dnetlib/dhp/dedup/consistency.oozie_app/workflow.xml similarity index 56% rename from dhp-workflows/dhp-dedup/src/main/resources/eu/dnetlib/dhp/dedup/relations/oozie_app/workflow.xml rename to dhp-workflows/dhp-dedup/src/main/resources/eu/dnetlib/dhp/dedup/consistency.oozie_app/workflow.xml index 5be481057..5728e6ad8 100644 --- a/dhp-workflows/dhp-dedup/src/main/resources/eu/dnetlib/dhp/dedup/relations/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-dedup/src/main/resources/eu/dnetlib/dhp/dedup/consistency.oozie_app/workflow.xml @@ -1,12 +1,20 @@ - + graphBasePath the raw graph base path + + isLookUpUrl + the address of the lookUp service + + + actionSetId + id of the actionSet + workingPath - path for the working directory + path of the working directory dedupGraphPath @@ -26,12 +34,41 @@ - + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] + + + + + + ${jobTracker} + ${nameNode} + yarn-cluster + cluster + Create Dedup Record + eu.dnetlib.dhp.dedup.SparkUpdateEntity + dhp-dedup-${projectVersion}.jar + + --executor-memory ${sparkExecutorMemory} + --executor-cores ${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners="com.cloudera.spark.lineage.NavigatorAppListener" + --conf spark.sql.queryExecutionListeners="com.cloudera.spark.lineage.NavigatorQueryListener" + --conf spark.sql.warehouse.dir="/user/hive/warehouse" + + -mtyarn-cluster + --i${graphBasePath} + --w${workingPath} + --o${dedupGraphPath} + + + + + diff --git a/dhp-workflows/dhp-dedup/src/main/resources/eu/dnetlib/dhp/dedup/roots/oozie_app/workflow.xml b/dhp-workflows/dhp-dedup/src/main/resources/eu/dnetlib/dhp/dedup/roots/oozie_app/workflow.xml index 3ef79e04f..984e8ed48 100644 --- a/dhp-workflows/dhp-dedup/src/main/resources/eu/dnetlib/dhp/dedup/roots/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-dedup/src/main/resources/eu/dnetlib/dhp/dedup/roots/oozie_app/workflow.xml @@ -82,11 +82,13 @@ Create Dedup Record eu.dnetlib.dhp.dedup.SparkCreateDedupRecord dhp-dedup-${projectVersion}.jar - --executor-memory ${sparkExecutorMemory} --executor-cores ${sparkExecutorCores} - --driver-memory=${sparkDriverMemory} --conf - spark.extraListeners="com.cloudera.spark.lineage.NavigatorAppListener" --conf - spark.sql.queryExecutionListeners="com.cloudera.spark.lineage.NavigatorQueryListener" --conf - spark.sql.warehouse.dir="/user/hive/warehouse" + + --executor-memory ${sparkExecutorMemory} + --executor-cores ${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners="com.cloudera.spark.lineage.NavigatorAppListener" + --conf spark.sql.queryExecutionListeners="com.cloudera.spark.lineage.NavigatorQueryListener" + --conf spark.sql.warehouse.dir="/user/hive/warehouse" -mtyarn-cluster --i${graphBasePath} @@ -94,32 +96,6 @@ --la${isLookUpUrl} --asi${actionSetId} - - - - - - - ${jobTracker} - ${nameNode} - yarn-cluster - cluster - Create Dedup Record - eu.dnetlib.dhp.dedup.SparkUpdateEntity - dhp-dedup-${projectVersion}.jar - --executor-memory ${sparkExecutorMemory} --executor-cores ${sparkExecutorCores} - --driver-memory=${sparkDriverMemory} --conf - spark.extraListeners="com.cloudera.spark.lineage.NavigatorAppListener" --conf - spark.sql.queryExecutionListeners="com.cloudera.spark.lineage.NavigatorQueryListener" --conf - spark.sql.warehouse.dir="/user/hive/warehouse" - - -mtyarn-cluster - --i${graphBasePath} - --w${workingPath} - --la${isLookUpUrl} - --asi${actionSetId} - --o${dedupGraphPath} - diff --git a/dhp-workflows/dhp-dedup/src/main/resources/eu/dnetlib/dhp/dedup/updateEntity_parameters.json b/dhp-workflows/dhp-dedup/src/main/resources/eu/dnetlib/dhp/dedup/updateEntity_parameters.json index 76aea0537..06b67f732 100644 --- a/dhp-workflows/dhp-dedup/src/main/resources/eu/dnetlib/dhp/dedup/updateEntity_parameters.json +++ b/dhp-workflows/dhp-dedup/src/main/resources/eu/dnetlib/dhp/dedup/updateEntity_parameters.json @@ -18,18 +18,6 @@ "paramRequired": true }, { - "paramName": "la", - "paramLongName": "isLookUpUrl", - "paramDescriptions": "the url of the lookup service", - "paramRequired": true -}, -{ - "paramName": "asi", - "paramLongName": "actionSetId", - "paramDescriptions": "the id of the actionset (orchestrator)", - "paramRequired": true -}, - { "paramName": "o", "paramLongName": "dedupGraphPath", "paramDescription": "the path of the dedup graph", diff --git a/dhp-workflows/dhp-dedup/src/test/java/eu/dnetlib/dhp/dedup/SparkCreateDedupTest.java b/dhp-workflows/dhp-dedup/src/test/java/eu/dnetlib/dhp/dedup/SparkCreateDedupTest.java index 47e446e7a..ebc139867 100644 --- a/dhp-workflows/dhp-dedup/src/test/java/eu/dnetlib/dhp/dedup/SparkCreateDedupTest.java +++ b/dhp-workflows/dhp-dedup/src/test/java/eu/dnetlib/dhp/dedup/SparkCreateDedupTest.java @@ -4,6 +4,8 @@ import com.google.common.hash.HashFunction; import com.google.common.hash.Hashing; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import org.apache.commons.io.IOUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.*; import org.junit.Before; import org.junit.Ignore; import org.junit.Test; @@ -17,13 +19,14 @@ public class SparkCreateDedupTest { @Before public void setUp() throws IOException { - configuration = IOUtils.toString(getClass().getResourceAsStream("/eu/dnetlib/dedup/conf/org.curr.conf.json")); +// configuration = IOUtils.toString(getClass().getResourceAsStream("/eu/dnetlib/dedup/conf/org.curr.conf.json")); + configuration = ""; } @Test @Ignore public void createSimRelsTest2() throws Exception { - SparkCreateSimRels.main(new String[] { + SparkCreateSimRels.main(new String[]{ "-mt", "local[*]", "-s", "/Users/miconis/dumps", "-e", entity, @@ -40,7 +43,7 @@ public class SparkCreateDedupTest { @Ignore public void createCCTest() throws Exception { - SparkCreateConnectedComponent.main(new String[] { + SparkCreateConnectedComponent.main(new String[]{ "-mt", "local[*]", "-s", "/Users/miconis/dumps", "-e", entity, @@ -52,7 +55,7 @@ public class SparkCreateDedupTest { @Test @Ignore public void dedupRecordTest() throws Exception { - SparkCreateDedupRecord.main(new String[] { + SparkCreateDedupRecord.main(new String[]{ "-mt", "local[*]", "-s", "/Users/miconis/dumps", "-e", entity, @@ -62,21 +65,42 @@ public class SparkCreateDedupTest { } @Test + @Ignore public void printConfiguration() throws Exception { System.out.println(ArgumentApplicationParser.compressArgument(configuration)); } @Test + @Ignore public void testHashCode() { final String s1 = "20|grid________::6031f94bef015a37783268ec1e75f17f"; final String s2 = "20|nsf_________::b12be9edf414df8ee66b4c52a2d8da46"; final HashFunction hashFunction = Hashing.murmur3_128(); - System.out.println( s1.hashCode()); + System.out.println(s1.hashCode()); System.out.println(hashFunction.hashString(s1).asLong()); - System.out.println( s2.hashCode()); + System.out.println(s2.hashCode()); System.out.println(hashFunction.hashString(s2).asLong()); } + @Test + public void fileExistsTest() throws IOException { + + boolean result = false; + + FileSystem fileSystem = FileSystem.get(new Configuration()); + + FileStatus[] fileStatuses = fileSystem.listStatus(new Path("/tmp")); + + for (FileStatus fs : fileStatuses) { + if (fs.isDirectory()) { + if (fileSystem.exists(new Path(DedupUtility.createMergeRelPath("/tmp", fs.getPath().getName(), "cicciopasticcio")))) { + System.out.println("fs = " + DedupUtility.createMergeRelPath("/tmp", fs.getPath().getName(), "cicciopasticcio")); + result = true; + } + } + } + + } }