From 9d9e9edbd242c7810a6d44740543211a9d07a19d Mon Sep 17 00:00:00 2001 From: Sandro La Bruzzo Date: Thu, 6 Aug 2020 10:28:24 +0200 Subject: [PATCH] improved extractEntity Relation workflows using dataset --- .../dhp/schema/scholexplorer/DLIRelation.java | 30 ------------ .../eu/dnetlib/dedup/DedupRecordFactory.java | 2 +- .../eu/dnetlib/dedup/SparkCreateSimRels.java | 1 + .../dedup/sx/SparkPropagateRelationsJob.java | 41 ++++++++-------- .../sx/SparkUpdateEntityWithDedupInfo.scala | 6 +-- .../dhp/sx/dedup/oozie_app/workflow.xml | 5 ++ .../eu/dnetlib/dhp/sx/ebi/EBIAggregator.scala | 21 +------- .../dhp/sx/ebi/SparkAddLinkUpdates.scala | 18 +++---- .../dhp/sx/ebi/SparkCreateEBIDataFrame.scala | 8 ++-- .../sx/graph/SparkSXGeneratePidSimlarity.java | 8 ++-- .../SparkScholexplorerCreateRawGraphJob.java | 9 ++-- .../sx/graph/SparkSplitOafTODLIEntities.scala | 17 +++---- .../dhp/sx/graph/SparkXMLToOAFDataset.scala | 6 +-- .../parser/AbstractScholexplorerParser.java | 9 ++-- .../java/eu/dnetlib/dhp/export/DLIToOAF.scala | 48 +++++++++---------- .../SparkExportContentForOpenAire.scala | 7 ++- .../dhp/export/ExportDLITOOAFTest.scala | 4 +- 17 files changed, 95 insertions(+), 145 deletions(-) delete mode 100644 dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/scholexplorer/DLIRelation.java diff --git a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/scholexplorer/DLIRelation.java b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/scholexplorer/DLIRelation.java deleted file mode 100644 index ca85fa14f..000000000 --- a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/scholexplorer/DLIRelation.java +++ /dev/null @@ -1,30 +0,0 @@ - -package eu.dnetlib.dhp.schema.scholexplorer; - -import java.util.List; - -import eu.dnetlib.dhp.schema.oaf.KeyValue; -import eu.dnetlib.dhp.schema.oaf.Relation; - -public class DLIRelation extends Relation { - - private String dateOfCollection; - - private List collectedFrom; - - public List getCollectedFrom() { - return collectedFrom; - } - - public void setCollectedFrom(List collectedFrom) { - this.collectedFrom = collectedFrom; - } - - public String getDateOfCollection() { - return dateOfCollection; - } - - public void setDateOfCollection(String dateOfCollection) { - this.dateOfCollection = dateOfCollection; - } -} diff --git a/dhp-workflows/dhp-dedup-scholexplorer/src/main/java/eu/dnetlib/dedup/DedupRecordFactory.java b/dhp-workflows/dhp-dedup-scholexplorer/src/main/java/eu/dnetlib/dedup/DedupRecordFactory.java index 32b2503b2..bba277ad6 100644 --- a/dhp-workflows/dhp-dedup-scholexplorer/src/main/java/eu/dnetlib/dedup/DedupRecordFactory.java +++ b/dhp-workflows/dhp-dedup-scholexplorer/src/main/java/eu/dnetlib/dedup/DedupRecordFactory.java @@ -100,7 +100,7 @@ public class DedupRecordFactory { .forEach( pub -> { try { - Publication publication = mapper.readValue(pub, Publication.class); + DLIPublication publication = mapper.readValue(pub, DLIPublication.class); p.mergeFrom(publication); p.setAuthor(DedupUtility.mergeAuthor(p.getAuthor(), publication.getAuthor())); diff --git a/dhp-workflows/dhp-dedup-scholexplorer/src/main/java/eu/dnetlib/dedup/SparkCreateSimRels.java b/dhp-workflows/dhp-dedup-scholexplorer/src/main/java/eu/dnetlib/dedup/SparkCreateSimRels.java index 6a98b112a..572824e3d 100644 --- a/dhp-workflows/dhp-dedup-scholexplorer/src/main/java/eu/dnetlib/dedup/SparkCreateSimRels.java +++ b/dhp-workflows/dhp-dedup-scholexplorer/src/main/java/eu/dnetlib/dedup/SparkCreateSimRels.java @@ -55,6 +55,7 @@ public class SparkCreateSimRels { .as(Encoders.kryo(Oaf.class)) .map((MapFunction) p -> new ObjectMapper().writeValueAsString(p), Encoders.STRING()) .javaRDD() + .repartition(1000) .mapToPair( s -> { MapDocument d = MapDocumentUtil.asMapDocumentWithJPath(dedupConf, s); diff --git a/dhp-workflows/dhp-dedup-scholexplorer/src/main/java/eu/dnetlib/dedup/sx/SparkPropagateRelationsJob.java b/dhp-workflows/dhp-dedup-scholexplorer/src/main/java/eu/dnetlib/dedup/sx/SparkPropagateRelationsJob.java index 59c069399..13c0ff0c5 100644 --- a/dhp-workflows/dhp-dedup-scholexplorer/src/main/java/eu/dnetlib/dedup/sx/SparkPropagateRelationsJob.java +++ b/dhp-workflows/dhp-dedup-scholexplorer/src/main/java/eu/dnetlib/dedup/sx/SparkPropagateRelationsJob.java @@ -1,21 +1,16 @@ package eu.dnetlib.dedup.sx; -import java.io.IOException; - +import eu.dnetlib.dhp.schema.scholexplorer.OafUtils; import org.apache.commons.io.IOUtils; -import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.MapFunction; -import org.apache.spark.sql.*; - -import com.fasterxml.jackson.databind.DeserializationFeature; -import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.SparkSession; import eu.dnetlib.dhp.application.ArgumentApplicationParser; -import eu.dnetlib.dhp.schema.oaf.DataInfo; import eu.dnetlib.dhp.schema.oaf.Relation; -import eu.dnetlib.dhp.schema.scholexplorer.DLIRelation; -import eu.dnetlib.dhp.utils.DHPUtils; import scala.Tuple2; public class SparkPropagateRelationsJob { @@ -44,37 +39,39 @@ public class SparkPropagateRelationsJob { .as(Encoders.bean(Relation.class)) .where("relClass == 'merges'"); - final Dataset rels = spark + final Dataset rels = spark .read() .load(relationPath) - .as(Encoders.kryo(DLIRelation.class)) + .as(Encoders.kryo(Relation.class)) .map( - (MapFunction) r -> r, - Encoders.bean(DLIRelation.class)); + (MapFunction) r -> r, + Encoders.bean(Relation.class)); - final Dataset firstJoin = rels + final Dataset firstJoin = rels .joinWith(merge, merge.col("target").equalTo(rels.col("source")), "left_outer") .map( - (MapFunction, DLIRelation>) r -> { + (MapFunction, Relation>) r -> { final Relation mergeRelation = r._2(); - final DLIRelation relation = r._1(); + final Relation relation = r._1(); if (mergeRelation != null) relation.setSource(mergeRelation.getSource()); + if (relation.getDataInfo()==null) + relation.setDataInfo(OafUtils.generateDataInfo("0.9",false)); return relation; }, - Encoders.bean(DLIRelation.class)); + Encoders.bean(Relation.class)); - final Dataset secondJoin = firstJoin + final Dataset secondJoin = firstJoin .joinWith(merge, merge.col("target").equalTo(firstJoin.col("target")), "left_outer") .map( - (MapFunction, DLIRelation>) r -> { + (MapFunction, Relation>) r -> { final Relation mergeRelation = r._2(); - final DLIRelation relation = r._1(); + final Relation relation = r._1(); if (mergeRelation != null) relation.setTarget(mergeRelation.getSource()); return relation; }, - Encoders.kryo(DLIRelation.class)); + Encoders.kryo(Relation.class)); secondJoin.write().mode(SaveMode.Overwrite).save(targetRelPath); } diff --git a/dhp-workflows/dhp-dedup-scholexplorer/src/main/java/eu/dnetlib/dedup/sx/SparkUpdateEntityWithDedupInfo.scala b/dhp-workflows/dhp-dedup-scholexplorer/src/main/java/eu/dnetlib/dedup/sx/SparkUpdateEntityWithDedupInfo.scala index 1b29fdea4..ce883e207 100644 --- a/dhp-workflows/dhp-dedup-scholexplorer/src/main/java/eu/dnetlib/dedup/sx/SparkUpdateEntityWithDedupInfo.scala +++ b/dhp-workflows/dhp-dedup-scholexplorer/src/main/java/eu/dnetlib/dedup/sx/SparkUpdateEntityWithDedupInfo.scala @@ -2,7 +2,7 @@ package eu.dnetlib.dedup.sx import eu.dnetlib.dhp.application.ArgumentApplicationParser import eu.dnetlib.dhp.schema.oaf.{Oaf, OafEntity, Relation} -import eu.dnetlib.dhp.schema.scholexplorer.{DLIDataset, DLIPublication, DLIRelation, DLIUnknown, OafUtils} +import eu.dnetlib.dhp.schema.scholexplorer.{DLIDataset, DLIPublication, DLIUnknown, OafUtils} import org.apache.commons.io.IOUtils import org.apache.spark.sql.{Dataset, Encoder, Encoders, SaveMode, SparkSession} import org.slf4j.LoggerFactory @@ -11,7 +11,7 @@ import org.apache.spark.sql.functions.col object SparkUpdateEntityWithDedupInfo { def main(args: Array[String]): Unit = { - val parser = new ArgumentApplicationParser(IOUtils.toString(SparkUpdateEntityWithDedupInfo.getClass.getResourceAsStream("/eu/dnetlib/dhp/sx/graph/argumentparser/input_extract_entities_parameters.json"))) + val parser = new ArgumentApplicationParser(IOUtils.toString(SparkUpdateEntityWithDedupInfo.getClass.getResourceAsStream("/eu/dnetlib/dhp/sx/dedup/dedup_delete_by_inference_parameters.json"))) val logger = LoggerFactory.getLogger(SparkUpdateEntityWithDedupInfo.getClass) parser.parseArgument(args) @@ -24,7 +24,7 @@ object SparkUpdateEntityWithDedupInfo { implicit val pubEncoder: Encoder[DLIPublication] = Encoders.kryo[DLIPublication] implicit val datEncoder: Encoder[DLIDataset] = Encoders.kryo[DLIDataset] implicit val unkEncoder: Encoder[DLIUnknown] = Encoders.kryo[DLIUnknown] - implicit val dlirelEncoder: Encoder[DLIRelation] = Encoders.kryo[DLIRelation] + val spark: SparkSession = SparkSession diff --git a/dhp-workflows/dhp-dedup-scholexplorer/src/main/resources/eu/dnetlib/dhp/sx/dedup/oozie_app/workflow.xml b/dhp-workflows/dhp-dedup-scholexplorer/src/main/resources/eu/dnetlib/dhp/sx/dedup/oozie_app/workflow.xml index d1196bfb1..2214fd20a 100644 --- a/dhp-workflows/dhp-dedup-scholexplorer/src/main/resources/eu/dnetlib/dhp/sx/dedup/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-dedup-scholexplorer/src/main/resources/eu/dnetlib/dhp/sx/dedup/oozie_app/workflow.xml @@ -53,6 +53,7 @@ --executor-memory ${sparkExecutorMemory} --driver-memory=${sparkDriverMemory} + --executor-cores=${sparkExecutorCores} ${sparkExtraOPT} -mtyarn-cluster @@ -77,6 +78,7 @@ --executor-memory ${sparkExecutorMemory} --driver-memory=${sparkDriverMemory} + --executor-cores=${sparkExecutorCores} ${sparkExtraOPT} -mtyarn-cluster @@ -101,6 +103,7 @@ --executor-memory ${sparkExecutorMemory} --driver-memory=${sparkDriverMemory} + --executor-cores=${sparkExecutorCores} ${sparkExtraOPT} -mtyarn-cluster @@ -125,6 +128,7 @@ --executor-memory ${sparkExecutorMemory} --driver-memory=${sparkDriverMemory} + --executor-cores=${sparkExecutorCores} ${sparkExtraOPT} -mtyarn-cluster @@ -149,6 +153,7 @@ --executor-memory ${sparkExecutorMemory} --driver-memory=${sparkDriverMemory} + --executor-cores=${sparkExecutorCores} ${sparkExtraOPT} -mtyarn-cluster diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/ebi/EBIAggregator.scala b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/ebi/EBIAggregator.scala index be92b60eb..d1bf39475 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/ebi/EBIAggregator.scala +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/ebi/EBIAggregator.scala @@ -1,6 +1,6 @@ package eu.dnetlib.dhp.sx.ebi import eu.dnetlib.dhp.schema.oaf.{Publication, Relation, Dataset => OafDataset} -import eu.dnetlib.dhp.schema.scholexplorer.{DLIDataset, DLIPublication, DLIRelation, DLIUnknown} +import eu.dnetlib.dhp.schema.scholexplorer.{DLIDataset, DLIPublication, DLIUnknown} import org.apache.spark.sql.{Encoder, Encoders} import org.apache.spark.sql.expressions.Aggregator @@ -168,26 +168,7 @@ object EBIAggregator { } - def getDLIRelationAggregator(): Aggregator[(String, DLIRelation), DLIRelation, DLIRelation] = new Aggregator[(String, DLIRelation), DLIRelation, DLIRelation]{ - override def zero: DLIRelation = new DLIRelation() - - override def reduce(b: DLIRelation, a: (String, DLIRelation)): DLIRelation = { - a._2 - } - - - override def merge(a: DLIRelation, b: DLIRelation): DLIRelation = { - if(b!= null) b else a - } - override def finish(reduction: DLIRelation): DLIRelation = reduction - - override def bufferEncoder: Encoder[DLIRelation] = - Encoders.kryo(classOf[DLIRelation]) - - override def outputEncoder: Encoder[DLIRelation] = - Encoders.kryo(classOf[DLIRelation]) - } diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/ebi/SparkAddLinkUpdates.scala b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/ebi/SparkAddLinkUpdates.scala index 2d3f75b91..d5cdb8a7c 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/ebi/SparkAddLinkUpdates.scala +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/ebi/SparkAddLinkUpdates.scala @@ -1,8 +1,8 @@ package eu.dnetlib.dhp.sx.ebi import eu.dnetlib.dhp.application.ArgumentApplicationParser -import eu.dnetlib.dhp.schema.oaf.{Author, Instance, Journal, KeyValue, Oaf, Publication, Dataset => OafDataset} +import eu.dnetlib.dhp.schema.oaf.{Author, Instance, Journal, KeyValue, Oaf, Publication, Relation, Dataset => OafDataset} import eu.dnetlib.dhp.schema.scholexplorer.OafUtils.createQualifier -import eu.dnetlib.dhp.schema.scholexplorer.{DLIDataset, DLIPublication, DLIRelation, OafUtils, ProvenaceInfo} +import eu.dnetlib.dhp.schema.scholexplorer.{DLIDataset, DLIPublication, OafUtils, ProvenaceInfo} import eu.dnetlib.dhp.sx.ebi.model.{PMArticle, PMAuthor, PMJournal} import eu.dnetlib.dhp.utils.DHPUtils import eu.dnetlib.scholexplorer.relation.RelationMapper @@ -115,8 +115,8 @@ case class EBILinks(relation:String, pubdate:String, tpid:String, tpidType:Strin val dnetPublicationId = s"50|${DHPUtils.md5(s"$pmid::pmid")}" targets.flatMap(l => { - val relation = new DLIRelation - val inverseRelation = new DLIRelation + val relation = new Relation + val inverseRelation = new Relation val targetDnetId = s"50|${DHPUtils.md5(s"${l.tpid.toLowerCase.trim}::${l.tpidType.toLowerCase.trim}")}" val relInfo = relationMapper.get(l.relation.toLowerCase) val relationSemantic = relInfo.getOriginal @@ -177,7 +177,7 @@ case class EBILinks(relation:String, pubdate:String, tpid:String, tpidType:Strin val workingPath = parser.get("workingPath") implicit val oafEncoder: Encoder[Oaf] = Encoders.kryo(classOf[Oaf]) implicit val oafpubEncoder: Encoder[Publication] = Encoders.kryo[Publication] - implicit val relEncoder: Encoder[DLIRelation] = Encoders.kryo(classOf[DLIRelation]) + implicit val relEncoder: Encoder[Relation] = Encoders.kryo(classOf[Relation]) implicit val datEncoder: Encoder[DLIDataset] = Encoders.kryo(classOf[DLIDataset]) implicit val pubEncoder: Encoder[DLIPublication] = Encoders.kryo(classOf[DLIPublication]) implicit val atEncoder: Encoder[Author] = Encoders.kryo(classOf[Author]) @@ -197,7 +197,7 @@ case class EBILinks(relation:String, pubdate:String, tpid:String, tpidType:Strin val oDataset:Dataset[Oaf] = spark.read.load(s"$workingPath/baseline_links_updates_oaf").as[Oaf] - oDataset.filter(p =>p.isInstanceOf[DLIRelation]).map(p => p.asInstanceOf[DLIRelation]).write.mode(SaveMode.Overwrite).save(s"$workingPath/baseline_links_updates_relation") + oDataset.filter(p =>p.isInstanceOf[Relation]).map(p => p.asInstanceOf[Relation]).write.mode(SaveMode.Overwrite).save(s"$workingPath/baseline_links_updates_relation") oDataset.filter(p =>p.isInstanceOf[DLIDataset]).map(p => p.asInstanceOf[DLIDataset]).write.mode(SaveMode.Overwrite).save(s"$workingPath/baseline_links_updates_dataset") @@ -230,14 +230,14 @@ case class EBILinks(relation:String, pubdate:String, tpid:String, tpidType:Strin .write.mode(SaveMode.Overwrite).save(s"$workingPath/baseline_dataset_ebi") - val rel: Dataset[DLIRelation] = spark.read.load(s"$workingPath/relation").as[DLIRelation] - val relupdate : Dataset[DLIRelation] = spark.read.load(s"$workingPath/ebi_garr/baseline_links_updates_relation").as[DLIRelation] + val rel: Dataset[Relation] = spark.read.load(s"$workingPath/relation").as[Relation] + val relupdate : Dataset[Relation] = spark.read.load(s"$workingPath/ebi_garr/baseline_links_updates_relation").as[Relation] rel.union(relupdate) .map(d => (s"${d.getSource}::${d.getRelType}::${d.getTarget}", d))(Encoders.tuple(Encoders.STRING, relEncoder)) .groupByKey(_._1)(Encoders.STRING) - .agg(EBIAggregator.getDLIRelationAggregator().toColumn) + .agg(EBIAggregator.getRelationAggregator().toColumn) .map(p => p._2) .write.mode(SaveMode.Overwrite) .save(s"$workingPath/baseline_relation_ebi") diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/ebi/SparkCreateEBIDataFrame.scala b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/ebi/SparkCreateEBIDataFrame.scala index 008e3c99b..9fc970446 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/ebi/SparkCreateEBIDataFrame.scala +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/ebi/SparkCreateEBIDataFrame.scala @@ -2,7 +2,7 @@ package eu.dnetlib.dhp.sx.ebi import eu.dnetlib.dhp.application.ArgumentApplicationParser import eu.dnetlib.dhp.schema.oaf.{Oaf, Publication, Relation, Dataset => OafDataset} -import eu.dnetlib.dhp.schema.scholexplorer.{DLIDataset, DLIPublication, DLIRelation} +import eu.dnetlib.dhp.schema.scholexplorer.{DLIDataset, DLIPublication} import eu.dnetlib.dhp.sx.graph.parser.{DatasetScholexplorerParser, PublicationScholexplorerParser} import eu.dnetlib.scholexplorer.relation.RelationMapper import org.apache.commons.io.IOUtils @@ -38,7 +38,7 @@ object SparkCreateEBIDataFrame { implicit val oafEncoder: Encoder[Oaf] = Encoders.kryo(classOf[Oaf]) implicit val datasetEncoder: Encoder[DLIDataset] = Encoders.kryo(classOf[DLIDataset]) implicit val pubEncoder: Encoder[DLIPublication] = Encoders.kryo(classOf[DLIPublication]) - implicit val relEncoder: Encoder[DLIRelation] = Encoders.kryo(classOf[DLIRelation]) + implicit val relEncoder: Encoder[Relation] = Encoders.kryo(classOf[Relation]) // logger.info("Extract Publication and relation from publication_xml") // val oafPubsRDD:RDD[Oaf] = sc.textFile(s"$workingPath/publication_xml").map(s => @@ -63,7 +63,7 @@ object SparkCreateEBIDataFrame { // spark.createDataset(oafDatsRDD).write.mode(SaveMode.Append).save(s"$workingPath/oaf") val dataset: Dataset[DLIDataset] = spark.read.load(s"$workingPath/oaf").as[Oaf].filter(o => o.isInstanceOf[DLIDataset]).map(d => d.asInstanceOf[DLIDataset]) val publication: Dataset[DLIPublication] = spark.read.load(s"$workingPath/oaf").as[Oaf].filter(o => o.isInstanceOf[DLIPublication]).map(d => d.asInstanceOf[DLIPublication]) - val relations: Dataset[DLIRelation] = spark.read.load(s"$workingPath/oaf").as[Oaf].filter(o => o.isInstanceOf[DLIRelation]).map(d => d.asInstanceOf[DLIRelation]) + val relations: Dataset[Relation] = spark.read.load(s"$workingPath/oaf").as[Oaf].filter(o => o.isInstanceOf[Relation]).map(d => d.asInstanceOf[Relation]) publication.map(d => (d.getId, d))(Encoders.tuple(Encoders.STRING, pubEncoder)) .groupByKey(_._1)(Encoders.STRING) .agg(EBIAggregator.getDLIPublicationAggregator().toColumn) @@ -78,7 +78,7 @@ object SparkCreateEBIDataFrame { relations.map(d => (s"${d.getSource}::${d.getRelType}::${d.getTarget}", d))(Encoders.tuple(Encoders.STRING, relEncoder)) .groupByKey(_._1)(Encoders.STRING) - .agg(EBIAggregator.getDLIRelationAggregator().toColumn) + .agg(EBIAggregator.getRelationAggregator().toColumn) .map(p => p._2) .write.mode(SaveMode.Overwrite).save(s"$workingPath/relation") diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkSXGeneratePidSimlarity.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkSXGeneratePidSimlarity.java index f3d7fd40f..7003b179d 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkSXGeneratePidSimlarity.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkSXGeneratePidSimlarity.java @@ -10,7 +10,7 @@ import org.apache.spark.sql.Encoders; import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.SparkSession; -import eu.dnetlib.dhp.schema.scholexplorer.DLIRelation; +import eu.dnetlib.dhp.schema.oaf.Relation; import eu.dnetlib.dhp.utils.DHPUtils; import scala.Tuple2; @@ -55,18 +55,18 @@ public class SparkSXGeneratePidSimlarity { .equalsIgnoreCase(StringUtils.substringAfter(t._2(), "::"))) .distinct(); - JavaRDD simRel = datasetSimRel + JavaRDD simRel = datasetSimRel .union(publicationSimRel) .map( s -> { - final DLIRelation r = new DLIRelation(); + final Relation r = new Relation(); r.setSource(s._1()); r.setTarget(s._2()); r.setRelType("similar"); return r; }); spark - .createDataset(simRel.rdd(), Encoders.bean(DLIRelation.class)) + .createDataset(simRel.rdd(), Encoders.bean(Relation.class)) .distinct() .write() .mode(SaveMode.Overwrite) diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkScholexplorerCreateRawGraphJob.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkScholexplorerCreateRawGraphJob.java index 385ac4d1a..05fb826db 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkScholexplorerCreateRawGraphJob.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkScholexplorerCreateRawGraphJob.java @@ -31,7 +31,6 @@ import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.schema.oaf.Relation; import eu.dnetlib.dhp.schema.scholexplorer.DLIDataset; import eu.dnetlib.dhp.schema.scholexplorer.DLIPublication; -import eu.dnetlib.dhp.schema.scholexplorer.DLIRelation; import eu.dnetlib.dhp.schema.scholexplorer.DLIUnknown; import eu.dnetlib.dhp.utils.DHPUtils; import net.minidev.json.JSONArray; @@ -156,9 +155,9 @@ public class SparkScholexplorerCreateRawGraphJob { SparkSXGeneratePidSimlarity .generateDataFrame( spark, sc, inputPath.replace("/relation", ""), targetPath.replace("/relation", "")); - RDD rdd = union + RDD rdd = union .mapToPair( - (PairFunction) f -> { + (PairFunction) f -> { final String source = getJPathString(SOURCEJSONPATH, f); final String target = getJPathString(TARGETJSONPATH, f); final String reltype = getJPathString(RELJSONPATH, f); @@ -175,7 +174,7 @@ public class SparkScholexplorerCreateRawGraphJob { source.toLowerCase(), reltype.toLowerCase(), target.toLowerCase())), - mapper.readValue(f, DLIRelation.class)); + mapper.readValue(f, Relation.class)); }) .reduceByKey( (a, b) -> { @@ -186,7 +185,7 @@ public class SparkScholexplorerCreateRawGraphJob { .rdd(); spark - .createDataset(rdd, Encoders.bean(DLIRelation.class)) + .createDataset(rdd, Encoders.bean(Relation.class)) .write() .mode(SaveMode.Overwrite) .save(targetPath); diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkSplitOafTODLIEntities.scala b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkSplitOafTODLIEntities.scala index b36c6abef..d0df28b2d 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkSplitOafTODLIEntities.scala +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkSplitOafTODLIEntities.scala @@ -1,8 +1,8 @@ package eu.dnetlib.dhp.sx.graph import eu.dnetlib.dhp.application.ArgumentApplicationParser -import eu.dnetlib.dhp.schema.oaf.Oaf -import eu.dnetlib.dhp.schema.scholexplorer.{DLIDataset, DLIPublication, DLIRelation, DLIUnknown} +import eu.dnetlib.dhp.schema.oaf.{Oaf, Relation} +import eu.dnetlib.dhp.schema.scholexplorer.{DLIDataset, DLIPublication, DLIUnknown} import eu.dnetlib.dhp.sx.ebi.EBIAggregator import eu.dnetlib.dhp.sx.ebi.model.{PMArticle, PMAuthor, PMJournal} import org.apache.commons.io.IOUtils @@ -12,7 +12,7 @@ import org.slf4j.LoggerFactory object SparkSplitOafTODLIEntities { - def getKeyRelation(rel:DLIRelation):String = { + def getKeyRelation(rel:Relation):String = { s"${rel.getSource}::${rel.getRelType}::${rel.getTarget}" @@ -30,13 +30,14 @@ object SparkSplitOafTODLIEntities { implicit val pubEncoder: Encoder[DLIPublication] = Encoders.kryo[DLIPublication] implicit val datEncoder: Encoder[DLIDataset] = Encoders.kryo[DLIDataset] implicit val unkEncoder: Encoder[DLIUnknown] = Encoders.kryo[DLIUnknown] - implicit val relEncoder: Encoder[DLIRelation] = Encoders.kryo[DLIRelation] + implicit val relEncoder: Encoder[Relation] = Encoders.kryo[Relation] val spark:SparkSession = SparkSession .builder() .appName(SparkSplitOafTODLIEntities.getClass.getSimpleName) + .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .master(parser.get("master")) .getOrCreate() @@ -47,7 +48,7 @@ object SparkSplitOafTODLIEntities { val ebi_dataset:Dataset[DLIDataset] = spark.read.load(s"$workingPath/ebi/baseline_dataset_ebi").as[DLIDataset] val ebi_publication:Dataset[DLIPublication] = spark.read.load(s"$workingPath/ebi/baseline_publication_ebi").as[DLIPublication] - val ebi_relation:Dataset[DLIRelation] = spark.read.load(s"$workingPath/ebi/baseline_relation_ebi").as[DLIRelation] + val ebi_relation:Dataset[Relation] = spark.read.load(s"$workingPath/ebi/baseline_relation_ebi").as[Relation] @@ -86,12 +87,12 @@ object SparkSplitOafTODLIEntities { OAFDataset - .filter(s => s != null && s.isInstanceOf[DLIRelation]) - .map(s =>s.asInstanceOf[DLIRelation]) + .filter(s => s != null && s.isInstanceOf[Relation]) + .map(s =>s.asInstanceOf[Relation]) .union(ebi_relation) .map(d => (getKeyRelation(d), d))(Encoders.tuple(Encoders.STRING, relEncoder)) .groupByKey(_._1)(Encoders.STRING) - .agg(EBIAggregator.getDLIRelationAggregator().toColumn) + .agg(EBIAggregator.getRelationAggregator().toColumn) .map(p => p._2) .repartition(1000) .write.mode(SaveMode.Overwrite).save(s"$workingPath/graph/relation") diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkXMLToOAFDataset.scala b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkXMLToOAFDataset.scala index f74a6043b..c63ad4370 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkXMLToOAFDataset.scala +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkXMLToOAFDataset.scala @@ -1,8 +1,8 @@ package eu.dnetlib.dhp.sx.graph import eu.dnetlib.dhp.application.ArgumentApplicationParser -import eu.dnetlib.dhp.schema.oaf.Oaf -import eu.dnetlib.dhp.schema.scholexplorer.{DLIDataset, DLIPublication, DLIRelation} +import eu.dnetlib.dhp.schema.oaf.{Oaf, Relation} +import eu.dnetlib.dhp.schema.scholexplorer.{DLIDataset, DLIPublication} import eu.dnetlib.dhp.sx.graph.parser.{DatasetScholexplorerParser, PublicationScholexplorerParser} import eu.dnetlib.scholexplorer.relation.RelationMapper import org.apache.commons.io.IOUtils @@ -40,7 +40,7 @@ object SparkXMLToOAFDataset { implicit val oafEncoder:Encoder[Oaf] = Encoders.kryo[Oaf] implicit val datasetEncoder:Encoder[DLIDataset] = Encoders.kryo[DLIDataset] implicit val publicationEncoder:Encoder[DLIPublication] = Encoders.kryo[DLIPublication] - implicit val relationEncoder:Encoder[DLIRelation] = Encoders.kryo[DLIRelation] + implicit val relationEncoder:Encoder[Relation] = Encoders.kryo[Relation] val relationMapper = RelationMapper.load diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/parser/AbstractScholexplorerParser.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/parser/AbstractScholexplorerParser.java index 75f28c129..f56760c82 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/parser/AbstractScholexplorerParser.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/parser/AbstractScholexplorerParser.java @@ -14,7 +14,6 @@ import org.apache.commons.logging.LogFactory; import eu.dnetlib.dhp.parser.utility.VtdUtilityParser; import eu.dnetlib.dhp.schema.oaf.*; -import eu.dnetlib.dhp.schema.scholexplorer.DLIRelation; import eu.dnetlib.dhp.schema.scholexplorer.DLIUnknown; import eu.dnetlib.dhp.schema.scholexplorer.ProvenaceInfo; import eu.dnetlib.dhp.utils.DHPUtils; @@ -175,8 +174,8 @@ public abstract class AbstractScholexplorerParser { .stream() .flatMap( n -> { - final List rels = new ArrayList<>(); - DLIRelation r = new DLIRelation(); + final List rels = new ArrayList<>(); + Relation r = new Relation(); r.setSource(parsedObject.getId()); final String relatedPid = n.getTextValue(); final String relatedPidType = n.getAttributes().get("relatedIdentifierType"); @@ -184,7 +183,6 @@ public abstract class AbstractScholexplorerParser { String relationSemantic = n.getAttributes().get("relationType"); String inverseRelation; final String targetId = generateId(relatedPid, relatedPidType, relatedType); - r.setDateOfCollection(dateOfCollection); if (relationMapper.containsKey(relationSemantic.toLowerCase())) { RelInfo relInfo = relationMapper.get(relationSemantic.toLowerCase()); relationSemantic = relInfo.getOriginal(); @@ -199,14 +197,13 @@ public abstract class AbstractScholexplorerParser { r.setCollectedfrom(parsedObject.getCollectedfrom()); r.setDataInfo(di); rels.add(r); - r = new DLIRelation(); + r = new Relation(); r.setDataInfo(di); r.setSource(targetId); r.setTarget(parsedObject.getId()); r.setRelType(inverseRelation); r.setRelClass("datacite"); r.setCollectedfrom(parsedObject.getCollectedfrom()); - r.setDateOfCollection(dateOfCollection); rels.add(r); if ("unknown".equalsIgnoreCase(relatedType)) result diff --git a/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/export/DLIToOAF.scala b/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/export/DLIToOAF.scala index 86b68fbd2..88277b827 100644 --- a/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/export/DLIToOAF.scala +++ b/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/export/DLIToOAF.scala @@ -6,7 +6,7 @@ import java.time.format.DateTimeFormatter import eu.dnetlib.dhp.common.PacePerson import eu.dnetlib.dhp.schema.action.AtomicAction import eu.dnetlib.dhp.schema.oaf.{Author, Dataset, ExternalReference, Field, Instance, KeyValue, Oaf, Publication, Qualifier, Relation, Result, StructuredProperty} -import eu.dnetlib.dhp.schema.scholexplorer.{DLIDataset, DLIPublication, DLIRelation} +import eu.dnetlib.dhp.schema.scholexplorer.{DLIDataset, DLIPublication} import eu.dnetlib.dhp.utils.DHPUtils import org.apache.commons.lang3.StringUtils import org.codehaus.jackson.map.ObjectMapper @@ -273,29 +273,29 @@ object DLIToOAF { } - def convertDLIRelation(r: DLIRelation): Relation = { - - val result = new Relation - if (!relationTypeMapping.contains(r.getRelType)) - return null - - if (r.getCollectedFrom == null || r.getCollectedFrom.size() == 0 || (r.getCollectedFrom.size() == 1 && r.getCollectedFrom.get(0) == null)) - return null - val t = relationTypeMapping.get(r.getRelType) - - result.setRelType("resultResult") - result.setRelClass(t.get._1) - result.setSubRelType(t.get._2) - result.setCollectedfrom(r.getCollectedFrom.asScala.map(c => collectedFromMap.getOrElse(c.getKey, null)).filter(p => p != null).asJava) - result.setSource(generateId(r.getSource)) - result.setTarget(generateId(r.getTarget)) - - if (result.getSource.equals(result.getTarget)) - return null - result.setDataInfo(generateDataInfo()) - - result - } +// def convertDLIRelation(r: DLIRelation): Relation = { +// +// val result = new Relation +// if (!relationTypeMapping.contains(r.getRelType)) +// return null +// +// if (r.getProperties == null || r.getProperties.size() == 0 || (r.getProperties.size() == 1 && r.getProperties.get(0) == null)) +// return null +// val t = relationTypeMapping.get(r.getRelType) +// +// result.setRelType("resultResult") +// result.setRelClass(t.get._1) +// result.setSubRelType(t.get._2) +// result.setCollectedfrom(r.getProperties.asScala.map(c => collectedFromMap.getOrElse(c.getKey, null)).filter(p => p != null).asJava) +// result.setSource(generateId(r.getSource)) +// result.setTarget(generateId(r.getTarget)) +// +// if (result.getSource.equals(result.getTarget)) +// return null +// result.setDataInfo(generateDataInfo()) +// +// result +// } def convertDLIDatasetTOOAF(d: DLIDataset): Dataset = { diff --git a/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/export/SparkExportContentForOpenAire.scala b/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/export/SparkExportContentForOpenAire.scala index fd8f2d136..165c3340b 100644 --- a/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/export/SparkExportContentForOpenAire.scala +++ b/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/export/SparkExportContentForOpenAire.scala @@ -2,7 +2,7 @@ package eu.dnetlib.dhp.`export` import eu.dnetlib.dhp.application.ArgumentApplicationParser import eu.dnetlib.dhp.schema.oaf.{Instance, Publication, Relation, Dataset => OafDataset} -import eu.dnetlib.dhp.schema.scholexplorer.{DLIDataset, DLIPublication, DLIRelation} +import eu.dnetlib.dhp.schema.scholexplorer.{DLIDataset, DLIPublication} import org.apache.commons.io.IOUtils import org.apache.hadoop.io.Text import org.apache.hadoop.io.compress.GzipCodec @@ -39,14 +39,13 @@ object SparkExportContentForOpenAire { implicit val pubEncoder: Encoder[Publication] = Encoders.bean(classOf[Publication]) implicit val datEncoder: Encoder[OafDataset] = Encoders.bean(classOf[OafDataset]) implicit val relEncoder: Encoder[Relation] = Encoders.bean(classOf[Relation]) - implicit val dliRelEncoder: Encoder[DLIRelation] = Encoders.bean(classOf[DLIRelation]) + import spark.implicits._ val relRDD:RDD[Relation] = sc.textFile(s"$workingPath/relation_j") - .map(s => new ObjectMapper().readValue(s, classOf[DLIRelation])) + .map(s => new ObjectMapper().readValue(s, classOf[Relation])) .filter(p => p.getDataInfo.getDeletedbyinference == false) - .map(DLIToOAF.convertDLIRelation).filter(p=>p!= null) spark.createDataset(relRDD).write.mode(SaveMode.Overwrite).save(s"$workingPath/relationDS") val datRDD:RDD[OafDataset] = sc.textFile(s"$workingPath/dataset") diff --git a/dhp-workflows/dhp-graph-provision-scholexplorer/src/test/java/eu/dnetlib/dhp/export/ExportDLITOOAFTest.scala b/dhp-workflows/dhp-graph-provision-scholexplorer/src/test/java/eu/dnetlib/dhp/export/ExportDLITOOAFTest.scala index c9d33dbe4..0bd746cff 100644 --- a/dhp-workflows/dhp-graph-provision-scholexplorer/src/test/java/eu/dnetlib/dhp/export/ExportDLITOOAFTest.scala +++ b/dhp-workflows/dhp-graph-provision-scholexplorer/src/test/java/eu/dnetlib/dhp/export/ExportDLITOOAFTest.scala @@ -4,7 +4,7 @@ import java.time.LocalDateTime import java.time.format.DateTimeFormatter import eu.dnetlib.dhp.schema.oaf.Relation -import eu.dnetlib.dhp.schema.scholexplorer.{DLIDataset, DLIPublication, DLIRelation} +import eu.dnetlib.dhp.schema.scholexplorer.{DLIDataset, DLIPublication} import org.apache.spark.SparkConf import org.apache.spark.rdd.RDD import org.apache.spark.sql.SparkSession @@ -65,7 +65,7 @@ class ExportDLITOOAFTest { val json = Source.fromInputStream(getClass.getResourceAsStream("relation.json")).mkString - val oaf =DLIToOAF.convertDLIRelation(mapper.readValue(json, classOf[DLIRelation])) + val oaf =mapper.readValue(json, classOf[Relation]) println(mapper.writeValueAsString(oaf))