From e16e644faff7d30222dbfd358421941a6e56e411 Mon Sep 17 00:00:00 2001 From: miconis Date: Fri, 20 Mar 2020 13:01:56 +0100 Subject: [PATCH] implementation of the workflow for entity update and for relations update --- .../java/eu/dnetlib/dedup/DedupUtility.java | 4 + .../dedup/SparkCreateConnectedComponent.java | 100 +++++++---- .../dedup/SparkCreateConnectedComponent2.java | 100 ----------- .../dnetlib/dedup/SparkCreateDedupRecord.java | 56 ++++-- .../eu/dnetlib/dedup/SparkCreateSimRels.java | 152 +++++++++++----- .../eu/dnetlib/dedup/SparkCreateSimRels2.java | 144 --------------- .../dnetlib/dedup/SparkPropagateRelation.java | 169 ++++++++++++++++++ .../eu/dnetlib/dedup/SparkUpdateEntity.java | 121 +++++++++++++ .../dedup/createDedupRecord_parameters.json | 32 ++++ .../dedup/oozie_app/BuildRootRecordsWf.xml | 129 +++++++++++++ .../dhp/dedup/oozie_app/DuplicateScanWf.xml | 5 +- .../dhp/dedup/oozie_app/UpdateRelationsWf.xml | 68 +++++++ .../dedup/propagateRelation_parameters.json | 26 +++ .../dhp/dedup/updateEntity_parameters.json | 38 ++++ .../dnetlib/dedup/SparkCreateDedupTest.java | 31 ++-- 15 files changed, 809 insertions(+), 366 deletions(-) delete mode 100644 dhp-workflows/dhp-dedup/src/main/java/eu/dnetlib/dedup/SparkCreateConnectedComponent2.java delete mode 100644 dhp-workflows/dhp-dedup/src/main/java/eu/dnetlib/dedup/SparkCreateSimRels2.java create mode 100644 dhp-workflows/dhp-dedup/src/main/java/eu/dnetlib/dedup/SparkPropagateRelation.java create mode 100644 dhp-workflows/dhp-dedup/src/main/java/eu/dnetlib/dedup/SparkUpdateEntity.java create mode 100644 dhp-workflows/dhp-dedup/src/main/resources/eu/dnetlib/dhp/dedup/createDedupRecord_parameters.json create mode 100644 dhp-workflows/dhp-dedup/src/main/resources/eu/dnetlib/dhp/dedup/oozie_app/BuildRootRecordsWf.xml create mode 100644 dhp-workflows/dhp-dedup/src/main/resources/eu/dnetlib/dhp/dedup/oozie_app/UpdateRelationsWf.xml create mode 100644 dhp-workflows/dhp-dedup/src/main/resources/eu/dnetlib/dhp/dedup/propagateRelation_parameters.json create mode 100644 dhp-workflows/dhp-dedup/src/main/resources/eu/dnetlib/dhp/dedup/updateEntity_parameters.json diff --git a/dhp-workflows/dhp-dedup/src/main/java/eu/dnetlib/dedup/DedupUtility.java b/dhp-workflows/dhp-dedup/src/main/java/eu/dnetlib/dedup/DedupUtility.java index 94a328533..ca390743e 100644 --- a/dhp-workflows/dhp-dedup/src/main/java/eu/dnetlib/dedup/DedupUtility.java +++ b/dhp-workflows/dhp-dedup/src/main/java/eu/dnetlib/dedup/DedupUtility.java @@ -122,6 +122,10 @@ public class DedupUtility { }); } + public static String createDedupRecordPath(final String basePath, final String actionSetId, final String entityType) { + return String.format("%s/%s/%s_deduprecord", basePath, actionSetId, entityType); + } + public static String createEntityPath(final String basePath, final String entityType) { return String.format("%s/%s", basePath, entityType); } diff --git a/dhp-workflows/dhp-dedup/src/main/java/eu/dnetlib/dedup/SparkCreateConnectedComponent.java b/dhp-workflows/dhp-dedup/src/main/java/eu/dnetlib/dedup/SparkCreateConnectedComponent.java index bdfd2c572..411913cdf 100644 --- a/dhp-workflows/dhp-dedup/src/main/java/eu/dnetlib/dedup/SparkCreateConnectedComponent.java +++ b/dhp-workflows/dhp-dedup/src/main/java/eu/dnetlib/dedup/SparkCreateConnectedComponent.java @@ -5,9 +5,11 @@ import eu.dnetlib.dedup.graph.ConnectedComponent; import eu.dnetlib.dedup.graph.GraphProcessor; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.schema.oaf.Relation; +import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; import eu.dnetlib.pace.config.DedupConfig; import eu.dnetlib.pace.util.MapDocumentUtil; import org.apache.commons.io.IOUtils; +import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; @@ -18,6 +20,7 @@ import org.apache.spark.rdd.RDD; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.SparkSession; +import org.dom4j.DocumentException; import scala.Tuple2; import java.util.ArrayList; @@ -26,51 +29,72 @@ import java.util.List; public class SparkCreateConnectedComponent { public static void main(String[] args) throws Exception { - final ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString(SparkCreateConnectedComponent.class.getResourceAsStream("/eu/dnetlib/dhp/dedup/createSimRels_parameters.json"))); + final ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString(SparkCreateConnectedComponent.class.getResourceAsStream("/eu/dnetlib/dhp/dedup/createCC_parameters.json"))); parser.parseArgument(args); - final SparkSession spark = SparkSession - .builder() - .appName(SparkCreateConnectedComponent.class.getSimpleName()) - .master(parser.get("master")) - .getOrCreate(); - final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); - final String inputPath = parser.get("sourcePath"); - final String entity = parser.get("entity"); - final String targetPath = parser.get("targetPath"); -// final DedupConfig dedupConf = DedupConfig.load(IOUtils.toString(SparkCreateConnectedComponent.class.getResourceAsStream("/eu/dnetlib/dhp/dedup/conf/org.curr.conf2.json"))); - final DedupConfig dedupConf = DedupConfig.load(parser.get("dedupConf")); + new SparkCreateConnectedComponent().run(parser); + } - final JavaPairRDD vertexes = sc.textFile(inputPath + "/" + entity) - .map(s -> MapDocumentUtil.getJPathString(dedupConf.getWf().getIdPath(), s)) - .mapToPair((PairFunction) - s -> new Tuple2(getHashcode(s), s) - ); + private void run(ArgumentApplicationParser parser) throws ISLookUpException, DocumentException { - final Dataset similarityRelations = spark.read().load(DedupUtility.createSimRelPath(targetPath, "",entity)).as(Encoders.bean(Relation.class)); - final RDD> edgeRdd = similarityRelations.javaRDD().map(it -> new Edge<>(getHashcode(it.getSource()), getHashcode(it.getTarget()), it.getRelClass())).rdd(); - final JavaRDD cc = GraphProcessor.findCCs(vertexes.rdd(), edgeRdd, dedupConf.getWf().getMaxIterations()).toJavaRDD(); - final Dataset mergeRelation = spark.createDataset(cc.filter(k->k.getDocIds().size()>1).flatMap((FlatMapFunction) c -> - c.getDocIds() - .stream() - .flatMap(id -> { - List tmp = new ArrayList<>(); - Relation r = new Relation(); - r.setSource(c.getCcId()); - r.setTarget(id); - r.setRelClass("merges"); - tmp.add(r); - r = new Relation(); - r.setTarget(c.getCcId()); - r.setSource(id); - r.setRelClass("isMergedIn"); - tmp.add(r); - return tmp.stream(); - }).iterator()).rdd(), Encoders.bean(Relation.class)); - mergeRelation.write().mode("overwrite").save(DedupUtility.createMergeRelPath(targetPath,"",entity)); + final String graphBasePath = parser.get("graphBasePath"); + final String workingPath = parser.get("workingPath"); + final String isLookUpUrl = parser.get("isLookUpUrl"); + final String actionSetId = parser.get("actionSetId"); + + try (SparkSession spark = getSparkSession(parser)) { + + final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); + + for (DedupConfig dedupConf: DedupUtility.getConfigurations(isLookUpUrl, actionSetId)) { + + final String entity = dedupConf.getWf().getEntityType(); + final String subEntity = dedupConf.getWf().getSubEntityValue(); + + final JavaPairRDD vertexes = sc.textFile(graphBasePath + "/" + subEntity) + .map(s -> MapDocumentUtil.getJPathString(dedupConf.getWf().getIdPath(), s)) + .mapToPair((PairFunction) + s -> new Tuple2(getHashcode(s), s) + ); + + final Dataset similarityRelations = spark.read().load(DedupUtility.createSimRelPath(workingPath, actionSetId, subEntity)).as(Encoders.bean(Relation.class)); + final RDD> edgeRdd = similarityRelations.javaRDD().map(it -> new Edge<>(getHashcode(it.getSource()), getHashcode(it.getTarget()), it.getRelClass())).rdd(); + final JavaRDD cc = GraphProcessor.findCCs(vertexes.rdd(), edgeRdd, dedupConf.getWf().getMaxIterations()).toJavaRDD(); + final Dataset mergeRelation = spark.createDataset(cc.filter(k -> k.getDocIds().size() > 1).flatMap((FlatMapFunction) c -> + c.getDocIds() + .stream() + .flatMap(id -> { + List tmp = new ArrayList<>(); + Relation r = new Relation(); + r.setSource(c.getCcId()); + r.setTarget(id); + r.setRelClass("merges"); + tmp.add(r); + r = new Relation(); + r.setTarget(c.getCcId()); + r.setSource(id); + r.setRelClass("isMergedIn"); + tmp.add(r); + return tmp.stream(); + }).iterator()).rdd(), Encoders.bean(Relation.class)); + mergeRelation.write().mode("overwrite").save(DedupUtility.createMergeRelPath(workingPath, actionSetId, entity)); + } + } } public static long getHashcode(final String id) { return Hashing.murmur3_128().hashUnencodedChars(id).asLong(); } + + private static SparkSession getSparkSession(ArgumentApplicationParser parser) { + SparkConf conf = new SparkConf(); + + return SparkSession + .builder() + .appName(SparkCreateSimRels.class.getSimpleName()) + .master(parser.get("master")) + .config(conf) + .enableHiveSupport() + .getOrCreate(); + } } diff --git a/dhp-workflows/dhp-dedup/src/main/java/eu/dnetlib/dedup/SparkCreateConnectedComponent2.java b/dhp-workflows/dhp-dedup/src/main/java/eu/dnetlib/dedup/SparkCreateConnectedComponent2.java deleted file mode 100644 index ad3f6efc0..000000000 --- a/dhp-workflows/dhp-dedup/src/main/java/eu/dnetlib/dedup/SparkCreateConnectedComponent2.java +++ /dev/null @@ -1,100 +0,0 @@ -package eu.dnetlib.dedup; - -import com.google.common.hash.Hashing; -import eu.dnetlib.dedup.graph.ConnectedComponent; -import eu.dnetlib.dedup.graph.GraphProcessor; -import eu.dnetlib.dhp.application.ArgumentApplicationParser; -import eu.dnetlib.dhp.schema.oaf.Relation; -import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; -import eu.dnetlib.pace.config.DedupConfig; -import eu.dnetlib.pace.util.MapDocumentUtil; -import org.apache.commons.io.IOUtils; -import org.apache.spark.SparkConf; -import org.apache.spark.api.java.JavaPairRDD; -import org.apache.spark.api.java.JavaRDD; -import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.api.java.function.FlatMapFunction; -import org.apache.spark.api.java.function.PairFunction; -import org.apache.spark.graphx.Edge; -import org.apache.spark.rdd.RDD; -import org.apache.spark.sql.Dataset; -import org.apache.spark.sql.Encoders; -import org.apache.spark.sql.SparkSession; -import org.dom4j.DocumentException; -import scala.Tuple2; - -import java.util.ArrayList; -import java.util.List; - -public class SparkCreateConnectedComponent2 { - - public static void main(String[] args) throws Exception { - final ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString(SparkCreateSimRels.class.getResourceAsStream("/eu/dnetlib/dhp/dedup/createCC_parameters.json"))); - parser.parseArgument(args); - - new SparkCreateConnectedComponent2().run(parser); - } - - private void run(ArgumentApplicationParser parser) throws ISLookUpException, DocumentException { - - final String graphBasePath = parser.get("graphBasePath"); - final String workingPath = parser.get("workingPath"); - final String isLookUpUrl = parser.get("isLookUpUrl"); - final String actionSetId = parser.get("actionSetId"); - - try (SparkSession spark = getSparkSession(parser)) { - - final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); - - - for (DedupConfig dedupConf: DedupUtility.getConfigurations(isLookUpUrl, actionSetId)) { - final String entity = dedupConf.getWf().getEntityType(); - final String subEntity = dedupConf.getWf().getSubEntityValue(); - - final JavaPairRDD vertexes = sc.textFile(graphBasePath + "/" + subEntity) - .map(s -> MapDocumentUtil.getJPathString(dedupConf.getWf().getIdPath(), s)) - .mapToPair((PairFunction) - s -> new Tuple2(getHashcode(s), s) - ); - - final Dataset similarityRelations = spark.read().load(DedupUtility.createSimRelPath(workingPath, actionSetId, subEntity)).as(Encoders.bean(Relation.class)); - final RDD> edgeRdd = similarityRelations.javaRDD().map(it -> new Edge<>(getHashcode(it.getSource()), getHashcode(it.getTarget()), it.getRelClass())).rdd(); - final JavaRDD cc = GraphProcessor.findCCs(vertexes.rdd(), edgeRdd, dedupConf.getWf().getMaxIterations()).toJavaRDD(); - final Dataset mergeRelation = spark.createDataset(cc.filter(k -> k.getDocIds().size() > 1).flatMap((FlatMapFunction) c -> - c.getDocIds() - .stream() - .flatMap(id -> { - List tmp = new ArrayList<>(); - Relation r = new Relation(); - r.setSource(c.getCcId()); - r.setTarget(id); - r.setRelClass("merges"); - tmp.add(r); - r = new Relation(); - r.setTarget(c.getCcId()); - r.setSource(id); - r.setRelClass("isMergedIn"); - tmp.add(r); - return tmp.stream(); - }).iterator()).rdd(), Encoders.bean(Relation.class)); - mergeRelation.write().mode("overwrite").save(DedupUtility.createMergeRelPath(workingPath, actionSetId, entity)); - } - } - } - - public static long getHashcode(final String id) { - return Hashing.murmur3_128().hashUnencodedChars(id).asLong(); - } - - private static SparkSession getSparkSession(ArgumentApplicationParser parser) { - SparkConf conf = new SparkConf(); - - return SparkSession - .builder() - .appName(SparkCreateSimRels2.class.getSimpleName()) - .master(parser.get("master")) - .config(conf) - .enableHiveSupport() - .getOrCreate(); - } -} diff --git a/dhp-workflows/dhp-dedup/src/main/java/eu/dnetlib/dedup/SparkCreateDedupRecord.java b/dhp-workflows/dhp-dedup/src/main/java/eu/dnetlib/dedup/SparkCreateDedupRecord.java index db2306526..77c8e04e9 100644 --- a/dhp-workflows/dhp-dedup/src/main/java/eu/dnetlib/dedup/SparkCreateDedupRecord.java +++ b/dhp-workflows/dhp-dedup/src/main/java/eu/dnetlib/dedup/SparkCreateDedupRecord.java @@ -3,37 +3,57 @@ package eu.dnetlib.dedup; import com.fasterxml.jackson.databind.ObjectMapper; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.schema.oaf.OafEntity; +import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; import eu.dnetlib.pace.config.DedupConfig; import org.apache.commons.io.IOUtils; +import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.SparkSession; +import org.dom4j.DocumentException; public class SparkCreateDedupRecord { public static void main(String[] args) throws Exception { - final ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString(SparkCreateDedupRecord.class.getResourceAsStream("/eu/dnetlib/dhp/dedup/dedupRecord_parameters.json"))); + final ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString(SparkCreateDedupRecord.class.getResourceAsStream("/eu/dnetlib/dhp/dedup/createDedupRecord_parameters.json"))); parser.parseArgument(args); - final SparkSession spark = SparkSession + + new SparkCreateDedupRecord().run(parser); + } + + private void run(ArgumentApplicationParser parser) throws ISLookUpException, DocumentException { + + final String graphBasePath = parser.get("graphBasePath"); + final String isLookUpUrl = parser.get("isLookUpUrl"); + final String actionSetId = parser.get("actionSetId"); + final String workingPath = parser.get("workingPath"); + + try (SparkSession spark = getSparkSession(parser)) { + final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); + + for (DedupConfig dedupConf: DedupUtility.getConfigurations(isLookUpUrl, actionSetId)) { + String subEntity = dedupConf.getWf().getSubEntityValue(); + + final JavaRDD dedupRecord = + DedupRecordFactory.createDedupRecord(sc, spark, DedupUtility.createMergeRelPath(workingPath, actionSetId, subEntity), DedupUtility.createEntityPath(graphBasePath, subEntity), OafEntityType.valueOf(subEntity), dedupConf); + dedupRecord.map(r -> { + ObjectMapper mapper = new ObjectMapper(); + return mapper.writeValueAsString(r); + }).saveAsTextFile(DedupUtility.createDedupRecordPath(workingPath, actionSetId, subEntity)); + } + } + } + + private static SparkSession getSparkSession(ArgumentApplicationParser parser) { + SparkConf conf = new SparkConf(); + + return SparkSession .builder() .appName(SparkCreateDedupRecord.class.getSimpleName()) .master(parser.get("master")) + .config(conf) + .enableHiveSupport() .getOrCreate(); - - final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); - final String sourcePath = parser.get("sourcePath"); - final String entity = parser.get("entity"); - final String dedupPath = parser.get("dedupPath"); -// final DedupConfig dedupConf = DedupConfig.load(IOUtils.toString(SparkCreateDedupRecord.class.getResourceAsStream("/eu/dnetlib/dhp/dedup/conf/org.curr.conf2.json"))); - final DedupConfig dedupConf = DedupConfig.load(parser.get("dedupConf")); - - final JavaRDD dedupRecord = DedupRecordFactory.createDedupRecord(sc, spark, DedupUtility.createMergeRelPath(dedupPath,entity), DedupUtility.createEntityPath(sourcePath,entity), OafEntityType.valueOf(entity), dedupConf); - dedupRecord.map(r-> { - ObjectMapper mapper = new ObjectMapper(); - return mapper.writeValueAsString(r); - }).saveAsTextFile(dedupPath+"/"+entity+"_dedup_record_json"); - - } - } + diff --git a/dhp-workflows/dhp-dedup/src/main/java/eu/dnetlib/dedup/SparkCreateSimRels.java b/dhp-workflows/dhp-dedup/src/main/java/eu/dnetlib/dedup/SparkCreateSimRels.java index 543dae8e9..4f25d620b 100644 --- a/dhp-workflows/dhp-dedup/src/main/java/eu/dnetlib/dedup/SparkCreateSimRels.java +++ b/dhp-workflows/dhp-dedup/src/main/java/eu/dnetlib/dedup/SparkCreateSimRels.java @@ -1,71 +1,135 @@ package eu.dnetlib.dedup; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.schema.action.AtomicAction; import eu.dnetlib.dhp.schema.oaf.Relation; +import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; import eu.dnetlib.pace.config.DedupConfig; import eu.dnetlib.pace.model.MapDocument; import eu.dnetlib.pace.util.MapDocumentUtil; import org.apache.commons.io.IOUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.compress.GzipCodec; +import org.apache.hadoop.mapred.SequenceFileOutputFormat; +import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.SparkSession; +import org.dom4j.DocumentException; import scala.Tuple2; +import java.io.Serializable; import java.util.List; +public class SparkCreateSimRels implements Serializable { -/** - * This Spark class creates similarity relations between entities, saving result - * - * param request: - * sourcePath - * entityType - * target Path - */ -public class SparkCreateSimRels { + private static final Log log = LogFactory.getLog(SparkCreateSimRels.class); public static void main(String[] args) throws Exception { final ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString(SparkCreateSimRels.class.getResourceAsStream("/eu/dnetlib/dhp/dedup/createSimRels_parameters.json"))); parser.parseArgument(args); - final SparkSession spark = SparkSession + + new SparkCreateSimRels().run(parser); + } + + private void run(ArgumentApplicationParser parser) throws ISLookUpException, DocumentException { + + //read oozie parameters + final String graphBasePath = parser.get("graphBasePath"); + final String rawSet = parser.get("rawSet"); + final String isLookUpUrl = parser.get("isLookUpUrl"); + final String actionSetId = parser.get("actionSetId"); + final String workingPath = parser.get("workingPath"); + + try (SparkSession spark = getSparkSession(parser)) { + final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); + + //create empty sequenceFile for the accumulation + JavaRDD> simRel = sc.emptyRDD(); + + //for each dedup configuration + for (DedupConfig dedupConf: DedupUtility.getConfigurations(isLookUpUrl, actionSetId)) { + final String entity = dedupConf.getWf().getEntityType(); + final String subEntity = dedupConf.getWf().getSubEntityValue(); + + JavaPairRDD mapDocument = sc.textFile(graphBasePath + "/" + subEntity) + .mapToPair(s -> { + MapDocument d = MapDocumentUtil.asMapDocumentWithJPath(dedupConf, s); + return new Tuple2<>(d.getIdentifier(), d); + }); + + //create blocks for deduplication + JavaPairRDD> blocks = Deduper.createsortedBlocks(sc, mapDocument, dedupConf); + + //create relations by comparing only elements in the same group + final JavaPairRDD dedupRels = Deduper.computeRelations2(sc, blocks, dedupConf); + + JavaRDD relationsRDD = dedupRels.map(r -> createSimRel(r._1(), r._2(), entity)); + + //save the simrel in the workingdir + spark.createDataset(relationsRDD.rdd(), Encoders.bean(Relation.class)).write().mode("overwrite").save( DedupUtility.createSimRelPath(workingPath, actionSetId, subEntity)); + + //create atomic actions + JavaRDD> newSimRels = relationsRDD + .map(this::createSequenceFileRow); + + simRel = simRel.union(newSimRels); + } + + simRel.mapToPair(r -> r) + .saveAsHadoopFile(rawSet, Text.class, Text.class, SequenceFileOutputFormat.class, GzipCodec.class); + } + + } + + public Tuple2 createSequenceFileRow(Relation relation) throws JsonProcessingException { + + ObjectMapper mapper = new ObjectMapper(); + + String id = relation.getSource() + "@" + relation.getRelClass() + "@" + relation.getTarget(); + AtomicAction aa = new AtomicAction<>(Relation.class, relation); + + return new Tuple2<>( + new Text(id), + new Text(mapper.writeValueAsString(aa)) + ); + } + + public Relation createSimRel(String source, String target, String entity){ + final Relation r = new Relation(); + r.setSource(source); + r.setTarget(target); + + switch(entity){ + case "result": + r.setRelClass("resultResult_dedupSimilarity_isSimilarTo"); + break; + case "organization": + r.setRelClass("organizationOrganization_dedupSimilarity_isSimilarTo"); + break; + default: + r.setRelClass("isSimilarTo"); + break; + } + return r; + } + + private static SparkSession getSparkSession(ArgumentApplicationParser parser) { + SparkConf conf = new SparkConf(); + + return SparkSession .builder() .appName(SparkCreateSimRels.class.getSimpleName()) .master(parser.get("master")) + .config(conf) + .enableHiveSupport() .getOrCreate(); - - final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); - final String inputPath = parser.get("sourcePath"); - final String entity = parser.get("entity"); - final String targetPath = parser.get("targetPath"); -// final DedupConfig dedupConf = DedupConfig.load(IOUtils.toString(SparkCreateSimRels.class.getResourceAsStream("/eu/dnetlib/dhp/dedup/conf/org.curr.conf.json"))); - final DedupConfig dedupConf = DedupConfig.load(parser.get("dedupConf")); - - final long total = sc.textFile(inputPath + "/" + entity).count(); - - JavaPairRDD mapDocument = sc.textFile(inputPath + "/" + entity) - .mapToPair(s->{ - MapDocument d = MapDocumentUtil.asMapDocumentWithJPath(dedupConf,s); - return new Tuple2<>(d.getIdentifier(), d);}); - - //create blocks for deduplication - JavaPairRDD> blocks = Deduper.createsortedBlocks(sc, mapDocument, dedupConf); -// JavaPairRDD> blocks = Deduper.createBlocks(sc, mapDocument, dedupConf); - - //create relations by comparing only elements in the same group - final JavaPairRDD dedupRels = Deduper.computeRelations2(sc, blocks, dedupConf); -// final JavaPairRDD dedupRels = Deduper.computeRelations(sc, blocks, dedupConf); - - final JavaRDD isSimilarToRDD = dedupRels.map(simRel -> { - final Relation r = new Relation(); - r.setSource(simRel._1()); - r.setTarget(simRel._2()); - r.setRelClass("isSimilarTo"); - return r; - }); - - spark.createDataset(isSimilarToRDD.rdd(), Encoders.bean(Relation.class)).write().mode("overwrite").save( DedupUtility.createSimRelPath(targetPath,entity)); - } -} \ No newline at end of file + +} diff --git a/dhp-workflows/dhp-dedup/src/main/java/eu/dnetlib/dedup/SparkCreateSimRels2.java b/dhp-workflows/dhp-dedup/src/main/java/eu/dnetlib/dedup/SparkCreateSimRels2.java deleted file mode 100644 index 4f5458a24..000000000 --- a/dhp-workflows/dhp-dedup/src/main/java/eu/dnetlib/dedup/SparkCreateSimRels2.java +++ /dev/null @@ -1,144 +0,0 @@ -package eu.dnetlib.dedup; - -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; -import eu.dnetlib.actionmanager.actions.AtomicAction; -import eu.dnetlib.actionmanager.common.Agent; -import eu.dnetlib.dhp.application.ArgumentApplicationParser; -import eu.dnetlib.dhp.schema.oaf.Relation; -import eu.dnetlib.dhp.utils.ISLookupClientFactory; -import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; -import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; -import eu.dnetlib.pace.config.DedupConfig; -import eu.dnetlib.pace.model.MapDocument; -import eu.dnetlib.pace.util.MapDocumentUtil; -import org.apache.commons.io.IOUtils; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.io.compress.GzipCodec; -import org.apache.hadoop.mapred.SequenceFileOutputFormat; -import org.apache.spark.SparkConf; -import org.apache.spark.api.java.JavaPairRDD; -import org.apache.spark.api.java.JavaRDD; -import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.sql.Encoders; -import org.apache.spark.sql.SparkSession; -import org.dom4j.Document; -import org.dom4j.DocumentException; -import org.dom4j.Element; -import org.dom4j.io.SAXReader; -import scala.Tuple2; - -import java.io.Serializable; -import java.io.StringReader; -import java.util.ArrayList; -import java.util.List; - -public class SparkCreateSimRels2 implements Serializable { - - private static final Log log = LogFactory.getLog(SparkCreateSimRels2.class); - - public static void main(String[] args) throws Exception { - final ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString(SparkCreateSimRels.class.getResourceAsStream("/eu/dnetlib/dhp/dedup/createSimRels_parameters.json"))); - parser.parseArgument(args); - - new SparkCreateSimRels2().run(parser); - } - - private void run(ArgumentApplicationParser parser) throws ISLookUpException, DocumentException { - - //read oozie parameters - final String graphBasePath = parser.get("graphBasePath"); - final String rawSet = parser.get("rawSet"); - final String isLookUpUrl = parser.get("isLookUpUrl"); - final String actionSetId = parser.get("actionSetId"); - final String workingPath = parser.get("workingPath"); - - try (SparkSession spark = getSparkSession(parser)) { - final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); - - //create empty sequenceFile for the accumulation - JavaRDD> simRel = sc.emptyRDD(); - - //for each dedup configuration - for (DedupConfig dedupConf: DedupUtility.getConfigurations(isLookUpUrl, actionSetId)) { - final String entity = dedupConf.getWf().getEntityType(); - final String subEntity = dedupConf.getWf().getSubEntityValue(); - - JavaPairRDD mapDocument = sc.textFile(graphBasePath + "/" + subEntity) - .mapToPair(s -> { - MapDocument d = MapDocumentUtil.asMapDocumentWithJPath(dedupConf, s); - return new Tuple2<>(d.getIdentifier(), d); - }); - - //create blocks for deduplication - JavaPairRDD> blocks = Deduper.createsortedBlocks(sc, mapDocument, dedupConf); - - //create relations by comparing only elements in the same group - final JavaPairRDD dedupRels = Deduper.computeRelations2(sc, blocks, dedupConf); - - JavaRDD relationsRDD = dedupRels.map(r -> createSimRel(r._1(), r._2(), entity)); - - //save the simrel in the workingdir - spark.createDataset(relationsRDD.rdd(), Encoders.bean(Relation.class)).write().mode("overwrite").save( DedupUtility.createSimRelPath(workingPath, actionSetId, subEntity)); - - //create atomic actions - JavaRDD> newSimRels = relationsRDD - .map(this::createSequenceFileRow); - - simRel = simRel.union(newSimRels); - } - - simRel.mapToPair(r -> r) - .saveAsHadoopFile(rawSet, Text.class, Text.class, SequenceFileOutputFormat.class, GzipCodec.class); - } - - } - - public Tuple2 createSequenceFileRow(Relation relation) throws JsonProcessingException { - - ObjectMapper mapper = new ObjectMapper(); - - String id = relation.getSource() + "@" + relation.getRelClass() + "@" + relation.getTarget(); - //TODO do be replaced by the new implementation of AtomicAction - AtomicAction aa = new AtomicAction("rawSet", new Agent("agentId", "agentName", Agent.AGENT_TYPE.service), relation.getSource(), relation.getRelClass(), relation.getTarget(), new ObjectMapper().writeValueAsString(relation).getBytes()); - - return new Tuple2<>( - new Text(id), - new Text(mapper.writeValueAsString(aa)) - ); - } - - public Relation createSimRel(String source, String target, String entity){ - final Relation r = new Relation(); - r.setSource(source); - r.setTarget(target); - - switch(entity){ - case "result": - r.setRelClass("resultResult_dedupSimilarity_isSimilarTo"); - break; - case "organization": - r.setRelClass("organizationOrganization_dedupSimilarity_isSimilarTo"); - break; - default: - r.setRelClass("isSimilarTo"); - break; - } - return r; - } - - private static SparkSession getSparkSession(ArgumentApplicationParser parser) { - SparkConf conf = new SparkConf(); - - return SparkSession - .builder() - .appName(SparkCreateSimRels2.class.getSimpleName()) - .master(parser.get("master")) - .config(conf) - .enableHiveSupport() - .getOrCreate(); - } - -} diff --git a/dhp-workflows/dhp-dedup/src/main/java/eu/dnetlib/dedup/SparkPropagateRelation.java b/dhp-workflows/dhp-dedup/src/main/java/eu/dnetlib/dedup/SparkPropagateRelation.java new file mode 100644 index 000000000..12d9f31b3 --- /dev/null +++ b/dhp-workflows/dhp-dedup/src/main/java/eu/dnetlib/dedup/SparkPropagateRelation.java @@ -0,0 +1,169 @@ +package eu.dnetlib.dedup; + +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.schema.oaf.DataInfo; +import eu.dnetlib.dhp.schema.oaf.Oaf; +import eu.dnetlib.dhp.schema.oaf.Relation; +import eu.dnetlib.pace.util.MapDocumentUtil; +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.io.compress.GzipCodec; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.Optional; +import org.apache.spark.api.java.function.Function; +import org.apache.spark.api.java.function.PairFunction; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparkSession; +import scala.Tuple2; + +import java.io.IOException; + +public class SparkPropagateRelation { + + enum FieldType { + SOURCE, + TARGET + } + + final static String SOURCEJSONPATH = "$.source"; + final static String TARGETJSONPATH = "$.target"; + + public static void main(String[] args) throws Exception { + final ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString(SparkPropagateRelation.class.getResourceAsStream("/eu/dnetlib/dhp/dedup/propagateRelation_parameters.json"))); + parser.parseArgument(args); + + new SparkPropagateRelation().run(parser); + } + + public void run(ArgumentApplicationParser parser) { + + final String graphBasePath = parser.get("graphBasePath"); + final String workingPath = parser.get("workingPath"); + final String dedupGraphPath = parser.get("dedupGraphPath"); + + try (SparkSession spark = getSparkSession(parser)) { + final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); + + final Dataset mergeRels = spark.read().load(DedupUtility.createMergeRelPath(workingPath, "*", "*")).as(Encoders.bean(Relation.class)); + + final JavaPairRDD mergedIds = mergeRels + .where("relClass == 'merges'") + .select(mergeRels.col("source"), mergeRels.col("target")) + .distinct() + .toJavaRDD() + .mapToPair((PairFunction) r -> new Tuple2<>(r.getString(1), r.getString(0))); + + JavaRDD relations = sc.textFile(DedupUtility.createEntityPath(graphBasePath, "relation")); + + JavaRDD newRels = relations.mapToPair( + (PairFunction) s -> + new Tuple2<>(MapDocumentUtil.getJPathString(SOURCEJSONPATH, s), s)) + .leftOuterJoin(mergedIds) + .map((Function>>, String>) v1 -> { + if (v1._2()._2().isPresent()) { + return replaceField(v1._2()._1(), v1._2()._2().get(), FieldType.SOURCE); + } + return v1._2()._1(); + }) + .mapToPair( + (PairFunction) s -> + new Tuple2<>(MapDocumentUtil.getJPathString(TARGETJSONPATH, s), s)) + .leftOuterJoin(mergedIds) + .map((Function>>, String>) v1 -> { + if (v1._2()._2().isPresent()) { + return replaceField(v1._2()._1(), v1._2()._2().get(), FieldType.TARGET); + } + return v1._2()._1(); + }).filter(SparkPropagateRelation::containsDedup) + .repartition(500); + + //update deleted by inference + relations = relations.mapToPair( + (PairFunction) s -> + new Tuple2<>(MapDocumentUtil.getJPathString(SOURCEJSONPATH, s), s)) + .leftOuterJoin(mergedIds) + .map((Function>>, String>) v1 -> { + if (v1._2()._2().isPresent()) { + return updateDeletedByInference(v1._2()._1(), Relation.class); + } + return v1._2()._1(); + }) + .mapToPair( + (PairFunction) s -> + new Tuple2<>(MapDocumentUtil.getJPathString(TARGETJSONPATH, s), s)) + .leftOuterJoin(mergedIds) + .map((Function>>, String>) v1 -> { + if (v1._2()._2().isPresent()) { + return updateDeletedByInference(v1._2()._1(), Relation.class); + } + return v1._2()._1(); + }) + .repartition(500); + + newRels.union(relations).repartition(1000) + .saveAsTextFile(DedupUtility.createEntityPath(dedupGraphPath, "relation"), GzipCodec.class); + } + } + + private static boolean containsDedup(final String json) { + final String source = MapDocumentUtil.getJPathString(SOURCEJSONPATH, json); + final String target = MapDocumentUtil.getJPathString(TARGETJSONPATH, json); + + return source.toLowerCase().contains("dedup") || target.toLowerCase().contains("dedup"); + } + + private static String replaceField(final String json, final String id, final FieldType type) { + ObjectMapper mapper = new ObjectMapper(); + mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + try { + Relation relation = mapper.readValue(json, Relation.class); + if (relation.getDataInfo() == null) + relation.setDataInfo(new DataInfo()); + relation.getDataInfo().setDeletedbyinference(false); + switch (type) { + case SOURCE: + relation.setSource(id); + return mapper.writeValueAsString(relation); + case TARGET: + relation.setTarget(id); + return mapper.writeValueAsString(relation); + default: + throw new IllegalArgumentException(""); + } + } catch (IOException e) { + throw new RuntimeException("unable to deserialize json relation: " + json, e); + } + } + + private static SparkSession getSparkSession(ArgumentApplicationParser parser) { + SparkConf conf = new SparkConf(); + + return SparkSession + .builder() + .appName(SparkPropagateRelation.class.getSimpleName()) + .master(parser.get("master")) + .config(conf) + .enableHiveSupport() + .getOrCreate(); + } + + private static String updateDeletedByInference(final String json, final Class clazz) { + final ObjectMapper mapper = new ObjectMapper(); + mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + try { + Oaf entity = mapper.readValue(json, clazz); + if (entity.getDataInfo()== null) + entity.setDataInfo(new DataInfo()); + entity.getDataInfo().setDeletedbyinference(true); + return mapper.writeValueAsString(entity); + } catch (IOException e) { + throw new RuntimeException("Unable to convert json", e); + } + } +} diff --git a/dhp-workflows/dhp-dedup/src/main/java/eu/dnetlib/dedup/SparkUpdateEntity.java b/dhp-workflows/dhp-dedup/src/main/java/eu/dnetlib/dedup/SparkUpdateEntity.java new file mode 100644 index 000000000..3fde1bdae --- /dev/null +++ b/dhp-workflows/dhp-dedup/src/main/java/eu/dnetlib/dedup/SparkUpdateEntity.java @@ -0,0 +1,121 @@ +package eu.dnetlib.dedup; + +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.schema.oaf.*; +import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; +import eu.dnetlib.pace.config.DedupConfig; +import eu.dnetlib.pace.util.MapDocumentUtil; +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.io.compress.GzipCodec; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.function.PairFunction; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparkSession; +import org.dom4j.DocumentException; +import scala.Tuple2; + +import java.io.IOException; + +public class SparkUpdateEntity { + + public static void main(String[] args) throws Exception { + final ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString(SparkUpdateEntity.class.getResourceAsStream("/eu/dnetlib/dhp/dedup/updateEntity_parameters.json"))); + parser.parseArgument(args); + + new SparkUpdateEntity().run(parser); + } + + public void run(ArgumentApplicationParser parser) throws ISLookUpException, DocumentException { + + final String graphBasePath = parser.get("graphBasePath"); + final String workingPath = parser.get("workingPath"); + final String dedupGraphPath = parser.get("dedupGraphPath"); + final String isLookUpUrl = parser.get("isLookUpUrl"); + final String actionSetId = parser.get("actionSetId"); + + try (SparkSession spark = getSparkSession(parser)) { + + final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); + + for (DedupConfig dedupConf : DedupUtility.getConfigurations(isLookUpUrl, actionSetId)) { + + String subEntity = dedupConf.getWf().getSubEntityValue(); + + final Dataset df = spark.read().load(DedupUtility.createMergeRelPath(workingPath, actionSetId, subEntity)).as(Encoders.bean(Relation.class)); + final JavaPairRDD mergedIds = df + .where("relClass == 'merges'") + .select(df.col("target")) + .distinct() + .toJavaRDD() + .mapToPair((PairFunction) r -> new Tuple2<>(r.getString(0), "d")); + + final JavaRDD sourceEntity = sc.textFile(DedupUtility.createEntityPath(graphBasePath, subEntity)); + + final JavaRDD dedupEntity = sc.textFile(DedupUtility.createDedupRecordPath(workingPath, actionSetId, subEntity)); + + JavaPairRDD entitiesWithId = sourceEntity.mapToPair((PairFunction) s -> new Tuple2<>(MapDocumentUtil.getJPathString(dedupConf.getWf().getIdPath(), s), s)); + + Class mainClass; + switch (subEntity) { + case "publication": + mainClass = Publication.class; + break; + case "dataset": + mainClass = eu.dnetlib.dhp.schema.oaf.Dataset.class; + break; + case "datasource": + mainClass = Datasource.class; + break; + case "software": + mainClass = Software.class; + break; + case "organization": + mainClass = Organization.class; + break; + case "otherresearchproduct": + mainClass = OtherResearchProduct.class; + break; + default: + throw new IllegalArgumentException("Illegal type " + subEntity); + } + + JavaRDD map = entitiesWithId.leftOuterJoin(mergedIds).map(k -> k._2()._2().isPresent() ? updateDeletedByInference(k._2()._1(), mainClass) : k._2()._1()); + map.union(dedupEntity).saveAsTextFile(dedupGraphPath + "/" + subEntity, GzipCodec.class); + } + } + + } + + private static String updateDeletedByInference(final String json, final Class clazz) { + final ObjectMapper mapper = new ObjectMapper(); + mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + try { + Oaf entity = mapper.readValue(json, clazz); + if (entity.getDataInfo()== null) + entity.setDataInfo(new DataInfo()); + entity.getDataInfo().setDeletedbyinference(true); + return mapper.writeValueAsString(entity); + } catch (IOException e) { + throw new RuntimeException("Unable to convert json", e); + } + } + + private static SparkSession getSparkSession(ArgumentApplicationParser parser) { + SparkConf conf = new SparkConf(); + + return SparkSession + .builder() + .appName(SparkUpdateEntity.class.getSimpleName()) + .master(parser.get("master")) + .config(conf) + .enableHiveSupport() + .getOrCreate(); + } +} diff --git a/dhp-workflows/dhp-dedup/src/main/resources/eu/dnetlib/dhp/dedup/createDedupRecord_parameters.json b/dhp-workflows/dhp-dedup/src/main/resources/eu/dnetlib/dhp/dedup/createDedupRecord_parameters.json new file mode 100644 index 000000000..f7bf5e518 --- /dev/null +++ b/dhp-workflows/dhp-dedup/src/main/resources/eu/dnetlib/dhp/dedup/createDedupRecord_parameters.json @@ -0,0 +1,32 @@ +[ + { + "paramName": "mt", + "paramLongName": "master", + "paramDescription": "should be local or yarn", + "paramRequired": true + }, + { + "paramName": "i", + "paramLongName": "graphBasePath", + "paramDescription": "the base path of raw graph", + "paramRequired": true + }, + { + "paramName": "w", + "paramLongName": "workingPath", + "paramDescription": "the working directory path", + "paramRequired": true + }, + { + "paramName": "la", + "paramLongName": "isLookUpUrl", + "paramDescription": "the url of the lookup service", + "paramRequired": true + }, + { + "paramName": "asi", + "paramLongName": "actionSetId", + "paramDescription": "the id of the actionset (orchestrator)", + "paramRequired": true + } +] \ No newline at end of file diff --git a/dhp-workflows/dhp-dedup/src/main/resources/eu/dnetlib/dhp/dedup/oozie_app/BuildRootRecordsWf.xml b/dhp-workflows/dhp-dedup/src/main/resources/eu/dnetlib/dhp/dedup/oozie_app/BuildRootRecordsWf.xml new file mode 100644 index 000000000..477e98791 --- /dev/null +++ b/dhp-workflows/dhp-dedup/src/main/resources/eu/dnetlib/dhp/dedup/oozie_app/BuildRootRecordsWf.xml @@ -0,0 +1,129 @@ + + + + graphBasePath + the raw graph base path + + + isLookUpUrl + the address of the lookUp service + + + actionSetId + id of the actionSet + + + workingPath + path of the working directory + + + dedupGraphPath + path of the dedup graph + + + sparkDriverMemory + memory for driver process + + + sparkExecutorMemory + memory for individual executor + + + sparkExecutorCores + number of cores used by single executor + + + + + + + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] + + + + + + + + + + + + + + ${jobTracker} + ${nameNode} + yarn-cluster + cluster + Create Merge Relations + eu.dnetlib.dedup.SparkCreateConnectedComponent + dhp-dedup-${projectVersion}.jar + --executor-memory ${sparkExecutorMemory} --executor-cores ${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} --conf + spark.extraListeners="com.cloudera.spark.lineage.NavigatorAppListener" --conf + spark.sql.queryExecutionListeners="com.cloudera.spark.lineage.NavigatorQueryListener" --conf + spark.sql.warehouse.dir="/user/hive/warehouse" + + -mtyarn-cluster + --i${graphBasePath} + --w${workingPath} + --la${isLookUpUrl} + --asi${actionSetId} + + + + + + + + ${jobTracker} + ${nameNode} + yarn-cluster + cluster + Create Dedup Record + eu.dnetlib.dedup.SparkCreateDedupRecord + dhp-dedup-${projectVersion}.jar + --executor-memory ${sparkExecutorMemory} --executor-cores ${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} --conf + spark.extraListeners="com.cloudera.spark.lineage.NavigatorAppListener" --conf + spark.sql.queryExecutionListeners="com.cloudera.spark.lineage.NavigatorQueryListener" --conf + spark.sql.warehouse.dir="/user/hive/warehouse" + + -mtyarn-cluster + --i${graphBasePath} + --w${workingPath} + --la${isLookUpUrl} + --asi${actionSetId} + + + + + + + + ${jobTracker} + ${nameNode} + yarn-cluster + cluster + Create Dedup Record + eu.dnetlib.dedup.SparkUpdateEntity + dhp-dedup-${projectVersion}.jar + --executor-memory ${sparkExecutorMemory} --executor-cores ${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} --conf + spark.extraListeners="com.cloudera.spark.lineage.NavigatorAppListener" --conf + spark.sql.queryExecutionListeners="com.cloudera.spark.lineage.NavigatorQueryListener" --conf + spark.sql.warehouse.dir="/user/hive/warehouse" + + -mtyarn-cluster + --i${graphBasePath} + --w${workingPath} + --la${isLookUpUrl} + --asi${actionSetId} + --o${dedupGraphPath} + + + + + + + \ No newline at end of file diff --git a/dhp-workflows/dhp-dedup/src/main/resources/eu/dnetlib/dhp/dedup/oozie_app/DuplicateScanWf.xml b/dhp-workflows/dhp-dedup/src/main/resources/eu/dnetlib/dhp/dedup/oozie_app/DuplicateScanWf.xml index 5ab6c9e47..a685db1e8 100644 --- a/dhp-workflows/dhp-dedup/src/main/resources/eu/dnetlib/dhp/dedup/oozie_app/DuplicateScanWf.xml +++ b/dhp-workflows/dhp-dedup/src/main/resources/eu/dnetlib/dhp/dedup/oozie_app/DuplicateScanWf.xml @@ -55,7 +55,7 @@ yarn-cluster cluster Create Similarity Relations - eu.dnetlib.dedup.SparkCreateSimRels2 + eu.dnetlib.dedup.SparkCreateSimRels dhp-dedup-${projectVersion}.jar --executor-memory ${sparkExecutorMemory} --executor-cores ${sparkExecutorCores} --driver-memory=${sparkDriverMemory} --conf @@ -64,10 +64,11 @@ spark.sql.warehouse.dir="/user/hive/warehouse" -mtyarn-cluster - --i${rawGraphBasePath} + --i${graphBasePath} --o${rawSet} --la${isLookUpUrl} --asi${actionSetId} + --w${workingPath} diff --git a/dhp-workflows/dhp-dedup/src/main/resources/eu/dnetlib/dhp/dedup/oozie_app/UpdateRelationsWf.xml b/dhp-workflows/dhp-dedup/src/main/resources/eu/dnetlib/dhp/dedup/oozie_app/UpdateRelationsWf.xml new file mode 100644 index 000000000..c4b17860e --- /dev/null +++ b/dhp-workflows/dhp-dedup/src/main/resources/eu/dnetlib/dhp/dedup/oozie_app/UpdateRelationsWf.xml @@ -0,0 +1,68 @@ + + + + graphBasePath + the raw graph base path + + + workingPath + path for the working directory + + + dedupGraphPath + path of the dedup graph + + + sparkDriverMemory + memory for driver process + + + sparkExecutorMemory + memory for individual executor + + + sparkExecutorCores + number of cores used by single executor + + + + + + + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] + + + + + + + + + + + + + ${jobTracker} + ${nameNode} + yarn-cluster + cluster + Update Relations + eu.dnetlib.dedup.SparkPropagateRelation + dhp-dedup-${projectVersion}.jar + --executor-memory ${sparkExecutorMemory} --executor-cores ${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} --conf + spark.extraListeners="com.cloudera.spark.lineage.NavigatorAppListener" --conf + spark.sql.queryExecutionListeners="com.cloudera.spark.lineage.NavigatorQueryListener" --conf + spark.sql.warehouse.dir="/user/hive/warehouse" + + -mtyarn-cluster + --i${graphBasePath} + --o${dedupGraphPath} + --w${workingPath} + + + + + + + \ No newline at end of file diff --git a/dhp-workflows/dhp-dedup/src/main/resources/eu/dnetlib/dhp/dedup/propagateRelation_parameters.json b/dhp-workflows/dhp-dedup/src/main/resources/eu/dnetlib/dhp/dedup/propagateRelation_parameters.json new file mode 100644 index 000000000..721a783e1 --- /dev/null +++ b/dhp-workflows/dhp-dedup/src/main/resources/eu/dnetlib/dhp/dedup/propagateRelation_parameters.json @@ -0,0 +1,26 @@ +[ + { + "paramName": "mt", + "paramLongName": "master", + "paramDescription": "should be local or yarn", + "paramRequired": true + }, + { + "paramName": "i", + "paramLongName": "graphBasePath", + "paramDescription": "the base path of raw graph", + "paramRequired": true + }, + { + "paramName": "w", + "paramLongName": "workingPath", + "paramDescription": "the working directory path", + "paramRequired": true + }, + { + "paramName": "o", + "paramLongName": "dedupGraphPath", + "paramDescription": "the path of the dedup graph", + "paramRequired": true + } +] \ No newline at end of file diff --git a/dhp-workflows/dhp-dedup/src/main/resources/eu/dnetlib/dhp/dedup/updateEntity_parameters.json b/dhp-workflows/dhp-dedup/src/main/resources/eu/dnetlib/dhp/dedup/updateEntity_parameters.json new file mode 100644 index 000000000..76aea0537 --- /dev/null +++ b/dhp-workflows/dhp-dedup/src/main/resources/eu/dnetlib/dhp/dedup/updateEntity_parameters.json @@ -0,0 +1,38 @@ +[ +{ + "paramName": "mt", + "paramLongName": "master", + "paramDescription": "should be local or yarn", + "paramRequired": true +}, +{ + "paramName": "i", + "paramLongName": "graphBasePath", + "paramDescription": "the base path of raw graph", + "paramRequired": true +}, +{ + "paramName": "w", + "paramLongName": "workingPath", + "paramDescription": "the working directory path", + "paramRequired": true +}, +{ + "paramName": "la", + "paramLongName": "isLookUpUrl", + "paramDescriptions": "the url of the lookup service", + "paramRequired": true +}, +{ + "paramName": "asi", + "paramLongName": "actionSetId", + "paramDescriptions": "the id of the actionset (orchestrator)", + "paramRequired": true +}, + { + "paramName": "o", + "paramLongName": "dedupGraphPath", + "paramDescription": "the path of the dedup graph", + "paramRequired": true + } +] \ No newline at end of file diff --git a/dhp-workflows/dhp-dedup/src/test/java/eu/dnetlib/dedup/SparkCreateDedupTest.java b/dhp-workflows/dhp-dedup/src/test/java/eu/dnetlib/dedup/SparkCreateDedupTest.java index abb00d27c..09f8a0fd6 100644 --- a/dhp-workflows/dhp-dedup/src/test/java/eu/dnetlib/dedup/SparkCreateDedupTest.java +++ b/dhp-workflows/dhp-dedup/src/test/java/eu/dnetlib/dedup/SparkCreateDedupTest.java @@ -1,21 +1,14 @@ package eu.dnetlib.dedup; -import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.hash.HashFunction; import com.google.common.hash.Hashing; import eu.dnetlib.dhp.application.ArgumentApplicationParser; -import eu.dnetlib.dhp.schema.oaf.Publication; -import org.apache.commons.io.FileUtils; import org.apache.commons.io.IOUtils; import org.junit.Before; import org.junit.Ignore; import org.junit.Test; -import java.io.File; import java.io.IOException; -import java.util.HashSet; -import java.util.List; -import java.util.Set; public class SparkCreateDedupTest { @@ -27,22 +20,10 @@ public class SparkCreateDedupTest { configuration = IOUtils.toString(getClass().getResourceAsStream("/eu/dnetlib/dedup/conf/org.curr.conf.json")); } - @Test - @Ignore - public void createSimRelsTest() throws Exception { - SparkCreateSimRels.main(new String[] { - "-mt", "local[*]", - "-s", "/Users/miconis/dumps", - "-e", entity, - "-c", ArgumentApplicationParser.compressArgument(configuration), - "-t", "/tmp/dedup", - }); - } - @Test @Ignore public void createSimRelsTest2() throws Exception { - SparkCreateSimRels2.main(new String[] { + SparkCreateSimRels.main(new String[] { "-mt", "local[*]", "-s", "/Users/miconis/dumps", "-e", entity, @@ -98,4 +79,14 @@ public class SparkCreateDedupTest { System.out.println(hashFunction.hashUnencodedChars(s2).asLong()); } + @Test + public void testJoinEntities() throws Exception{ + SparkJoinEntities.main(new String[] { + "-mt", "local[*]", + "-i", "/tmp/dedup", + "-w", "/tmp/dedup", + "-o", "/tmp/dedup", + }); + } + }