From 168bfb496a44398c920dbf849ed1ef5d4dbd1a13 Mon Sep 17 00:00:00 2001 From: Sandro La Bruzzo Date: Fri, 31 Jul 2020 09:06:57 +0200 Subject: [PATCH] adopted dedup to the new schema --- .../dhp/oa/dedup/jpath/JsonPathTest.java | 8 ++ .../eu/dnetlib/dedup/DedupRecordFactory.java | 23 ++++-- .../dedup/SparkCreateConnectedComponent.java | 12 ++- .../dnetlib/dedup/SparkCreateDedupRecord.java | 25 ++++--- .../eu/dnetlib/dedup/SparkCreateSimRels.java | 11 ++- .../dedup/sx/SparkPropagateRelationsJob.java | 63 ++++------------ .../sx/SparkUpdateEntityWithDedupInfo.scala | 75 +++++++++++++++++++ .../dhp/sx/dedup/oozie_app/workflow.xml | 2 +- 8 files changed, 150 insertions(+), 69 deletions(-) create mode 100644 dhp-workflows/dhp-dedup-scholexplorer/src/main/java/eu/dnetlib/dedup/sx/SparkUpdateEntityWithDedupInfo.scala diff --git a/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/jpath/JsonPathTest.java b/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/jpath/JsonPathTest.java index 9518efdb5..1759180d2 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/jpath/JsonPathTest.java +++ b/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/jpath/JsonPathTest.java @@ -289,4 +289,12 @@ public class JsonPathTest { System.out.println("d = " + d); } + + @Test + public void testNull() throws Exception { + final Object p = null; + + System.out.println((String) p); + + } } diff --git a/dhp-workflows/dhp-dedup-scholexplorer/src/main/java/eu/dnetlib/dedup/DedupRecordFactory.java b/dhp-workflows/dhp-dedup-scholexplorer/src/main/java/eu/dnetlib/dedup/DedupRecordFactory.java index d03cc2589..32b2503b2 100644 --- a/dhp-workflows/dhp-dedup-scholexplorer/src/main/java/eu/dnetlib/dedup/DedupRecordFactory.java +++ b/dhp-workflows/dhp-dedup-scholexplorer/src/main/java/eu/dnetlib/dedup/DedupRecordFactory.java @@ -6,6 +6,7 @@ import java.util.Collection; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.SparkSession; @@ -15,6 +16,8 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.Lists; import eu.dnetlib.dhp.schema.oaf.*; +import eu.dnetlib.dhp.schema.scholexplorer.DLIDataset; +import eu.dnetlib.dhp.schema.scholexplorer.DLIPublication; import eu.dnetlib.pace.config.DedupConfig; import eu.dnetlib.pace.util.MapDocumentUtil; import scala.Tuple2; @@ -30,10 +33,16 @@ public class DedupRecordFactory { final DedupConfig dedupConf) { long ts = System.currentTimeMillis(); // - final JavaPairRDD inputJsonEntities = sc - .textFile(entitiesInputPath) + final JavaPairRDD inputJsonEntities = spark + .read() + .load(entitiesInputPath) + .as(Encoders.kryo(Oaf.class)) + .map( + (MapFunction) p -> new org.codehaus.jackson.map.ObjectMapper().writeValueAsString(p), + Encoders.STRING()) + .javaRDD() .mapToPair( - (PairFunction) it -> new Tuple2( + (PairFunction) it -> new Tuple2<>( MapDocumentUtil.getJPathString(dedupConf.getWf().getIdPath(), it), it)); // : source is the dedup_id, target is the id of the mergedIn @@ -74,9 +83,9 @@ public class DedupRecordFactory { } } - private static Publication publicationMerger(Tuple2> e, final long ts) { + private static DLIPublication publicationMerger(Tuple2> e, final long ts) { - Publication p = new Publication(); // the result of the merge, to be returned at the end + DLIPublication p = new DLIPublication(); // the result of the merge, to be returned at the end p.setId(e._1()); @@ -110,9 +119,9 @@ public class DedupRecordFactory { return p; } - private static Dataset datasetMerger(Tuple2> e, final long ts) { + private static DLIDataset datasetMerger(Tuple2> e, final long ts) { - Dataset d = new Dataset(); // the result of the merge, to be returned at the end + DLIDataset d = new DLIDataset(); // the result of the merge, to be returned at the end d.setId(e._1()); diff --git a/dhp-workflows/dhp-dedup-scholexplorer/src/main/java/eu/dnetlib/dedup/SparkCreateConnectedComponent.java b/dhp-workflows/dhp-dedup-scholexplorer/src/main/java/eu/dnetlib/dedup/SparkCreateConnectedComponent.java index f86410d29..8646ac742 100644 --- a/dhp-workflows/dhp-dedup-scholexplorer/src/main/java/eu/dnetlib/dedup/SparkCreateConnectedComponent.java +++ b/dhp-workflows/dhp-dedup-scholexplorer/src/main/java/eu/dnetlib/dedup/SparkCreateConnectedComponent.java @@ -9,18 +9,21 @@ import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.FlatMapFunction; +import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.graphx.Edge; import org.apache.spark.rdd.RDD; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.SparkSession; +import org.codehaus.jackson.map.ObjectMapper; import com.google.common.hash.Hashing; import eu.dnetlib.dedup.graph.ConnectedComponent; import eu.dnetlib.dedup.graph.GraphProcessor; import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.schema.oaf.Oaf; import eu.dnetlib.dhp.schema.oaf.Relation; import eu.dnetlib.pace.config.DedupConfig; import eu.dnetlib.pace.util.MapDocumentUtil; @@ -42,7 +45,6 @@ public class SparkCreateConnectedComponent { .master(parser.get("master")) .getOrCreate(); - final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); final String inputPath = parser.get("sourcePath"); final String entity = parser.get("entity"); final String targetPath = parser.get("targetPath"); @@ -50,8 +52,12 @@ public class SparkCreateConnectedComponent { // DedupConfig.load(IOUtils.toString(SparkCreateConnectedComponent.class.getResourceAsStream("/eu/dnetlib/dhp/dedup/conf/org.curr.conf2.json"))); final DedupConfig dedupConf = DedupConfig.load(parser.get("dedupConf")); - final JavaPairRDD vertexes = sc - .textFile(inputPath + "/" + entity) + final JavaPairRDD vertexes = spark + .read() + .load(inputPath + "/" + entity) + .as(Encoders.kryo(Oaf.class)) + .map((MapFunction) p -> new ObjectMapper().writeValueAsString(p), Encoders.STRING()) + .javaRDD() .map(s -> MapDocumentUtil.getJPathString(dedupConf.getWf().getIdPath(), s)) .mapToPair( (PairFunction) s -> new Tuple2(getHashcode(s), s)); diff --git a/dhp-workflows/dhp-dedup-scholexplorer/src/main/java/eu/dnetlib/dedup/SparkCreateDedupRecord.java b/dhp-workflows/dhp-dedup-scholexplorer/src/main/java/eu/dnetlib/dedup/SparkCreateDedupRecord.java index d87269f03..fa0ee1efb 100644 --- a/dhp-workflows/dhp-dedup-scholexplorer/src/main/java/eu/dnetlib/dedup/SparkCreateDedupRecord.java +++ b/dhp-workflows/dhp-dedup-scholexplorer/src/main/java/eu/dnetlib/dedup/SparkCreateDedupRecord.java @@ -4,10 +4,10 @@ package eu.dnetlib.dedup; import org.apache.commons.io.IOUtils; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.SparkSession; -import com.fasterxml.jackson.databind.ObjectMapper; - import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.schema.oaf.OafEntity; import eu.dnetlib.pace.config.DedupConfig; @@ -41,12 +41,19 @@ public class SparkCreateDedupRecord { DedupUtility.createEntityPath(sourcePath, entity), OafEntityType.valueOf(entity), dedupConf); - dedupRecord - .map( - r -> { - ObjectMapper mapper = new ObjectMapper(); - return mapper.writeValueAsString(r); - }) - .saveAsTextFile(dedupPath + "/" + entity + "/dedup_records"); + spark + .createDataset(dedupRecord.rdd(), Encoders.kryo(OafEntity.class)) + .write() + .mode(SaveMode.Overwrite) + .save(dedupPath + "/" + entity + "/dedup_records"); +// +// +// dedupRecord +// .map( +// r -> { +// ObjectMapper mapper = new ObjectMapper(); +// return mapper.writeValueAsString(r); +// }) +// .saveAsTextFile(dedupPath + "/" + entity + "/dedup_records"); } } diff --git a/dhp-workflows/dhp-dedup-scholexplorer/src/main/java/eu/dnetlib/dedup/SparkCreateSimRels.java b/dhp-workflows/dhp-dedup-scholexplorer/src/main/java/eu/dnetlib/dedup/SparkCreateSimRels.java index 41fe911e7..6a98b112a 100644 --- a/dhp-workflows/dhp-dedup-scholexplorer/src/main/java/eu/dnetlib/dedup/SparkCreateSimRels.java +++ b/dhp-workflows/dhp-dedup-scholexplorer/src/main/java/eu/dnetlib/dedup/SparkCreateSimRels.java @@ -7,10 +7,13 @@ import org.apache.commons.io.IOUtils; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.SparkSession; +import org.codehaus.jackson.map.ObjectMapper; import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.schema.oaf.Oaf; import eu.dnetlib.dhp.schema.oaf.Relation; import eu.dnetlib.pace.config.DedupConfig; import eu.dnetlib.pace.model.MapDocument; @@ -46,8 +49,12 @@ public class SparkCreateSimRels { // DedupConfig.load(IOUtils.toString(SparkCreateSimRels.class.getResourceAsStream("/eu/dnetlib/dhp/dedup/conf/org.curr.conf.json"))); final DedupConfig dedupConf = DedupConfig.load(parser.get("dedupConf")); - JavaPairRDD mapDocument = sc - .textFile(inputPath + "/" + entity) + JavaPairRDD mapDocument = spark + .read() + .load(inputPath + "/" + entity) + .as(Encoders.kryo(Oaf.class)) + .map((MapFunction) p -> new ObjectMapper().writeValueAsString(p), Encoders.STRING()) + .javaRDD() .mapToPair( s -> { MapDocument d = MapDocumentUtil.asMapDocumentWithJPath(dedupConf, s); diff --git a/dhp-workflows/dhp-dedup-scholexplorer/src/main/java/eu/dnetlib/dedup/sx/SparkPropagateRelationsJob.java b/dhp-workflows/dhp-dedup-scholexplorer/src/main/java/eu/dnetlib/dedup/sx/SparkPropagateRelationsJob.java index e3d4fdbe3..59c069399 100644 --- a/dhp-workflows/dhp-dedup-scholexplorer/src/main/java/eu/dnetlib/dedup/sx/SparkPropagateRelationsJob.java +++ b/dhp-workflows/dhp-dedup-scholexplorer/src/main/java/eu/dnetlib/dedup/sx/SparkPropagateRelationsJob.java @@ -14,16 +14,11 @@ import com.fasterxml.jackson.databind.ObjectMapper; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.schema.oaf.DataInfo; import eu.dnetlib.dhp.schema.oaf.Relation; +import eu.dnetlib.dhp.schema.scholexplorer.DLIRelation; import eu.dnetlib.dhp.utils.DHPUtils; import scala.Tuple2; public class SparkPropagateRelationsJob { - enum FieldType { - SOURCE, TARGET - } - - static final String SOURCEJSONPATH = "$.source"; - static final String TARGETJSONPATH = "$.target"; public static void main(String[] args) throws Exception { final ArgumentApplicationParser parser = new ArgumentApplicationParser( @@ -39,7 +34,6 @@ public class SparkPropagateRelationsJob { .master(parser.get("master")) .getOrCreate(); - final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); final String relationPath = parser.get("relationPath"); final String mergeRelPath = parser.get("mergeRelPath"); final String targetRelPath = parser.get("targetRelPath"); @@ -50,63 +44,38 @@ public class SparkPropagateRelationsJob { .as(Encoders.bean(Relation.class)) .where("relClass == 'merges'"); - final Dataset rels = spark.read().load(relationPath).as(Encoders.bean(Relation.class)); + final Dataset rels = spark + .read() + .load(relationPath) + .as(Encoders.kryo(DLIRelation.class)) + .map( + (MapFunction) r -> r, + Encoders.bean(DLIRelation.class)); - final Dataset firstJoin = rels + final Dataset firstJoin = rels .joinWith(merge, merge.col("target").equalTo(rels.col("source")), "left_outer") .map( - (MapFunction, Relation>) r -> { + (MapFunction, DLIRelation>) r -> { final Relation mergeRelation = r._2(); - final Relation relation = r._1(); - + final DLIRelation relation = r._1(); if (mergeRelation != null) relation.setSource(mergeRelation.getSource()); return relation; }, - Encoders.bean(Relation.class)); + Encoders.bean(DLIRelation.class)); - final Dataset secondJoin = firstJoin + final Dataset secondJoin = firstJoin .joinWith(merge, merge.col("target").equalTo(firstJoin.col("target")), "left_outer") .map( - (MapFunction, Relation>) r -> { + (MapFunction, DLIRelation>) r -> { final Relation mergeRelation = r._2(); - final Relation relation = r._1(); + final DLIRelation relation = r._1(); if (mergeRelation != null) relation.setTarget(mergeRelation.getSource()); return relation; }, - Encoders.bean(Relation.class)); + Encoders.kryo(DLIRelation.class)); secondJoin.write().mode(SaveMode.Overwrite).save(targetRelPath); } - - private static boolean containsDedup(final String json) { - final String source = DHPUtils.getJPathString(SOURCEJSONPATH, json); - final String target = DHPUtils.getJPathString(TARGETJSONPATH, json); - - return source.toLowerCase().contains("dedup") || target.toLowerCase().contains("dedup"); - } - - private static String replaceField(final String json, final String id, final FieldType type) { - ObjectMapper mapper = new ObjectMapper(); - mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); - try { - Relation relation = mapper.readValue(json, Relation.class); - if (relation.getDataInfo() == null) - relation.setDataInfo(new DataInfo()); - relation.getDataInfo().setDeletedbyinference(false); - switch (type) { - case SOURCE: - relation.setSource(id); - return mapper.writeValueAsString(relation); - case TARGET: - relation.setTarget(id); - return mapper.writeValueAsString(relation); - default: - throw new IllegalArgumentException(""); - } - } catch (IOException e) { - throw new RuntimeException("unable to deserialize json relation: " + json, e); - } - } } diff --git a/dhp-workflows/dhp-dedup-scholexplorer/src/main/java/eu/dnetlib/dedup/sx/SparkUpdateEntityWithDedupInfo.scala b/dhp-workflows/dhp-dedup-scholexplorer/src/main/java/eu/dnetlib/dedup/sx/SparkUpdateEntityWithDedupInfo.scala new file mode 100644 index 000000000..1b29fdea4 --- /dev/null +++ b/dhp-workflows/dhp-dedup-scholexplorer/src/main/java/eu/dnetlib/dedup/sx/SparkUpdateEntityWithDedupInfo.scala @@ -0,0 +1,75 @@ +package eu.dnetlib.dedup.sx + +import eu.dnetlib.dhp.application.ArgumentApplicationParser +import eu.dnetlib.dhp.schema.oaf.{Oaf, OafEntity, Relation} +import eu.dnetlib.dhp.schema.scholexplorer.{DLIDataset, DLIPublication, DLIRelation, DLIUnknown, OafUtils} +import org.apache.commons.io.IOUtils +import org.apache.spark.sql.{Dataset, Encoder, Encoders, SaveMode, SparkSession} +import org.slf4j.LoggerFactory +import org.apache.spark.sql.functions.col + +object SparkUpdateEntityWithDedupInfo { + + def main(args: Array[String]): Unit = { + val parser = new ArgumentApplicationParser(IOUtils.toString(SparkUpdateEntityWithDedupInfo.getClass.getResourceAsStream("/eu/dnetlib/dhp/sx/graph/argumentparser/input_extract_entities_parameters.json"))) + val logger = LoggerFactory.getLogger(SparkUpdateEntityWithDedupInfo.getClass) + parser.parseArgument(args) + + val workingPath: String = parser.get("workingPath") + logger.info(s"Working dir path = $workingPath") + + implicit val oafEncoder: Encoder[OafEntity] = Encoders.kryo[OafEntity] + implicit val relEncoder: Encoder[Relation] = Encoders.bean(classOf[Relation]) + + implicit val pubEncoder: Encoder[DLIPublication] = Encoders.kryo[DLIPublication] + implicit val datEncoder: Encoder[DLIDataset] = Encoders.kryo[DLIDataset] + implicit val unkEncoder: Encoder[DLIUnknown] = Encoders.kryo[DLIUnknown] + implicit val dlirelEncoder: Encoder[DLIRelation] = Encoders.kryo[DLIRelation] + + + val spark: SparkSession = SparkSession + .builder() + .appName(SparkUpdateEntityWithDedupInfo.getClass.getSimpleName) + .master(parser.get("master")) + .getOrCreate() + + + val entityPath = parser.get("entityPath") + val mergeRelPath = parser.get("mergeRelPath") + val dedupRecordPath = parser.get("dedupRecordPath") + val entity = parser.get("entity") + val destination = parser.get("targetPath") + + val mergedIds = spark.read.load(mergeRelPath).as[Relation] + .where("relClass == 'merges'") + .select(col("target")) + + + val entities: Dataset[(String, OafEntity)] = spark + .read + .load(entityPath).as[OafEntity] + .map(o => (o.getId, o))(Encoders.tuple(Encoders.STRING, oafEncoder)) + + + val finalDataset:Dataset[OafEntity] = entities.joinWith(mergedIds, entities("_1").equalTo(mergedIds("target")), "left") + .map(k => { + val e: OafEntity = k._1._2 + val t = k._2 + if (t != null && t.getString(0).nonEmpty) { + if (e.getDataInfo == null) { + e.setDataInfo(OafUtils.generateDataInfo()) + } + e.getDataInfo.setDeletedbyinference(true) + } + e + }) + + val dedupRecords :Dataset[OafEntity] = spark.read.load(dedupRecordPath).as[OafEntity] + + finalDataset.union(dedupRecords) + .repartition(1200).write + .mode(SaveMode.Overwrite).save(destination) + + } + +} diff --git a/dhp-workflows/dhp-dedup-scholexplorer/src/main/resources/eu/dnetlib/dhp/sx/dedup/oozie_app/workflow.xml b/dhp-workflows/dhp-dedup-scholexplorer/src/main/resources/eu/dnetlib/dhp/sx/dedup/oozie_app/workflow.xml index 2f22bb764..d1196bfb1 100644 --- a/dhp-workflows/dhp-dedup-scholexplorer/src/main/resources/eu/dnetlib/dhp/sx/dedup/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-dedup-scholexplorer/src/main/resources/eu/dnetlib/dhp/sx/dedup/oozie_app/workflow.xml @@ -144,7 +144,7 @@ yarn-cluster cluster Update ${entity} and add DedupRecord - eu.dnetlib.dedup.sx.SparkUpdateEntityJob + eu.dnetlib.dedup.sx.SparkUpdateEntityWithDedupInfo dhp-dedup-scholexplorer-${projectVersion}.jar --executor-memory ${sparkExecutorMemory}