From d876f47d065addc4bedd93c45bd8d55038be7b64 Mon Sep 17 00:00:00 2001 From: Sandro La Bruzzo Date: Wed, 13 May 2020 10:38:04 +0200 Subject: [PATCH] next step of MAG conversion implemented --- .../dnetlib/doiboost/mag/MagDataModel.scala | 53 +++ .../mag/SparkImportMagIntoDataset.scala | 2 +- .../doiboost/mag/SparkPreProcessMAG.scala | 63 ++++ .../{ => mag}/convert_mag_to_oaf_params.json | 0 .../dhp/doiboost/mag/oozie_app/workflow.xml | 25 +- .../doiboost/mag/preprocess_mag_params.json | 6 + .../doiboost/CrossrefMappingTest.scala | 19 +- .../dnetlib/doiboost/mag/DatasetModel.scala | 14 - .../dnetlib/doiboost/mag/MAGMappingTest.scala | 41 +-- .../dnetlib/doiboost/mag/invertedIndex.json | 334 ++++++++++++++++++ 10 files changed, 496 insertions(+), 61 deletions(-) create mode 100644 dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/mag/MagDataModel.scala create mode 100644 dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/mag/SparkPreProcessMAG.scala rename dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/{ => mag}/convert_mag_to_oaf_params.json (100%) create mode 100644 dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/mag/preprocess_mag_params.json delete mode 100644 dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/doiboost/mag/DatasetModel.scala create mode 100644 dhp-workflows/dhp-doiboost/src/test/resources/eu/dnetlib/doiboost/mag/invertedIndex.json diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/mag/MagDataModel.scala b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/mag/MagDataModel.scala new file mode 100644 index 0000000000..189e90ed9a --- /dev/null +++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/mag/MagDataModel.scala @@ -0,0 +1,53 @@ +package eu.dnetlib.doiboost.mag + + +import org.json4s +import org.json4s.DefaultFormats +import org.json4s.jackson.JsonMethods.parse + + +case class Papers(PaperId:Long, Rank:Integer, Doi:String, + DocType:String, PaperTitle:String, OriginalTitle:String, + BookTitle:String, Year:Option[Integer], Date:Option[java.sql.Timestamp], Publisher:String, + JournalId:Option[Long], ConferenceSeriesId:Option[Long], ConferenceInstanceId:Option[Long], + Volume:String, Issue:String, FirstPage:String, LastPage:String, + ReferenceCount:Option[Long], CitationCount:Option[Long], EstimatedCitation:Option[Long], + OriginalVenue:String, FamilyId:Option[Long], CreatedDate:java.sql.Timestamp) {} + + +case class PaperAbstract(PaperId:Long,IndexedAbstract:String) {} + + + +case object ConversionUtil { + + + + def transformPaperAbstract(input:PaperAbstract) : PaperAbstract = { + PaperAbstract(input.PaperId, convertInvertedIndexString(input.IndexedAbstract)) + } + + + + def convertInvertedIndexString(json_input:String) :String = { + implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats + lazy val json: json4s.JValue = parse(json_input) + + + + val idl = (json \ "IndexLength").extract[Int] + + if (idl > 0) { + val res = Array.ofDim[String](idl) + + val iid = (json \ "InvertedIndex").extract[Map[String, List[Int]]] + + for {(k:String,v:List[Int]) <- iid}{ + v.foreach(item => res(item) = k) + } + return res.mkString(" ") + + } + "" + } +} diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/mag/SparkImportMagIntoDataset.scala b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/mag/SparkImportMagIntoDataset.scala index 82ea48f33f..f291a92f99 100644 --- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/mag/SparkImportMagIntoDataset.scala +++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/mag/SparkImportMagIntoDataset.scala @@ -63,7 +63,7 @@ object SparkImportMagIntoDataset { def main(args: Array[String]): Unit = { val logger: Logger = LoggerFactory.getLogger(getClass) val conf: SparkConf = new SparkConf() - val parser = new ArgumentApplicationParser(IOUtils.toString(getClass.getResourceAsStream("/eu/dnetlib/dhp/doiboost/convert_mag_to_oaf_params.json"))) + val parser = new ArgumentApplicationParser(IOUtils.toString(getClass.getResourceAsStream("/eu/dnetlib/dhp/doiboost/mag/convert_mag_to_oaf_params.json"))) parser.parseArgument(args) val spark: SparkSession = SparkSession diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/mag/SparkPreProcessMAG.scala b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/mag/SparkPreProcessMAG.scala new file mode 100644 index 0000000000..4c014a95cf --- /dev/null +++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/mag/SparkPreProcessMAG.scala @@ -0,0 +1,63 @@ +package eu.dnetlib.doiboost.mag + +import eu.dnetlib.dhp.application.ArgumentApplicationParser +import org.apache.commons.io.IOUtils +import org.apache.spark.SparkConf +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.{Dataset, SaveMode, SparkSession} +import org.slf4j.{Logger, LoggerFactory} +import org.apache.spark.sql.functions._ + +object SparkPreProcessMAG { + + + def main(args: Array[String]): Unit = { + + val logger: Logger = LoggerFactory.getLogger(getClass) + val conf: SparkConf = new SparkConf() + val parser = new ArgumentApplicationParser(IOUtils.toString(getClass.getResourceAsStream("/eu/dnetlib/dhp/doiboost/mag/preprocess_mag_params.json"))) + parser.parseArgument(args) + val spark: SparkSession = + SparkSession + .builder() + .config(conf) + .appName(getClass.getSimpleName) + .master(parser.get("master")).getOrCreate() + import spark.implicits._ + + logger.info("Phase 1) make uninque DOI in Papers:") + + val d: Dataset[Papers] = spark.read.load(s"${parser.get("sourcePath")}/Papers").as[Papers] + + + // Filtering Papers with DOI, and since for the same DOI we have multiple version of item with different PapersId we get the last one + val result: RDD[Papers] = d.where(col("Doi").isNotNull).rdd.map { p: Papers => Tuple2(p.Doi, p) }.reduceByKey { case (p1: Papers, p2: Papers) => + var r = if (p1 == null) p2 else p1 + if (p1 != null && p2 != null) { + if (p1.CreatedDate != null && p2.CreatedDate != null) { + if (p1.CreatedDate.before(p2.CreatedDate)) + r = p1 + else + r = p2 + } else { + r = if (p1.CreatedDate == null) p2 else p1 + } + } + r + }.map(_._2) + + val distinctPaper: Dataset[Papers] = spark.createDataset(result) + distinctPaper.write.mode(SaveMode.Overwrite).save(s"${parser.get("targetPath")}/Papers_distinct") + logger.info(s"Total number of element: ${result.count()}") + + logger.info("Phase 2) convert InverdIndex Abastrac to string") + val pa = spark.read.load(s"${parser.get("sourcePath")}/PaperAbstractsInvertedIndex").as[PaperAbstract] + pa.map(ConversionUtil.transformPaperAbstract).write.mode(SaveMode.Overwrite).save(s"${parser.get("targetPath")}/PaperAbstract") + + + distinctPaper.joinWith(pa, col("PaperId").eqia) + + } + + +} diff --git a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/convert_mag_to_oaf_params.json b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/mag/convert_mag_to_oaf_params.json similarity index 100% rename from dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/convert_mag_to_oaf_params.json rename to dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/mag/convert_mag_to_oaf_params.json diff --git a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/mag/oozie_app/workflow.xml b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/mag/oozie_app/workflow.xml index 801dca6122..ba6eea3646 100644 --- a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/mag/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/mag/oozie_app/workflow.xml @@ -34,7 +34,7 @@ - + @@ -59,5 +59,28 @@ + + + + + yarn-cluster + cluster + Convert Mag to Dataset + eu.dnetlib.doiboost.mag.SparkPreProcessMAG + dhp-doiboost-${projectVersion}.jar + + --executor-memory=${sparkExecutorMemory} + --executor-cores=${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + ${sparkExtraOPT} + + --sourcePath${sourcePath} + --targetPath${targetPath} + --masteryarn-cluster + + + + + \ No newline at end of file diff --git a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/mag/preprocess_mag_params.json b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/mag/preprocess_mag_params.json new file mode 100644 index 0000000000..bf0b80f69f --- /dev/null +++ b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/mag/preprocess_mag_params.json @@ -0,0 +1,6 @@ +[ + {"paramName":"s", "paramLongName":"sourcePath", "paramDescription": "the base path of MAG input", "paramRequired": true}, + {"paramName":"t", "paramLongName":"targetPath", "paramDescription": "the working dir path", "paramRequired": true}, + {"paramName":"m", "paramLongName":"master", "paramDescription": "the master name", "paramRequired": true} + +] \ No newline at end of file diff --git a/dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/doiboost/CrossrefMappingTest.scala b/dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/doiboost/CrossrefMappingTest.scala index 75a63d70f1..2d7cf42164 100644 --- a/dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/doiboost/CrossrefMappingTest.scala +++ b/dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/doiboost/CrossrefMappingTest.scala @@ -1,20 +1,15 @@ package eu.dnetlib.doiboost -import com.fasterxml.jackson.databind.SerializationFeature -import eu.dnetlib.dhp.schema.oaf.{Dataset, KeyValue, Oaf, Publication, Relation, Result} +import eu.dnetlib.dhp.schema.oaf._ import eu.dnetlib.dhp.utils.DHPUtils -import eu.dnetlib.doiboost.crossref.{Crossref2Oaf, SparkMapDumpIntoOAF} -import eu.dnetlib.doiboost.mag.SparkImportMagIntoDataset -import org.apache.spark.{SparkConf, sql} -import org.apache.spark.sql.{Encoder, Encoders, SparkSession} +import eu.dnetlib.doiboost.crossref.Crossref2Oaf import org.codehaus.jackson.map.ObjectMapper -import org.junit.jupiter.api.Test - -import scala.io.Source import org.junit.jupiter.api.Assertions._ +import org.junit.jupiter.api.Test import org.slf4j.{Logger, LoggerFactory} import scala.collection.JavaConverters._ +import scala.io.Source import scala.util.matching.Regex @@ -24,12 +19,6 @@ class CrossrefMappingTest { val mapper = new ObjectMapper() - - def testMAGCSV() :Unit = { - SparkImportMagIntoDataset.main(null) - } - - @Test def testFunderRelationshipsMapping(): Unit = { val template = Source.fromInputStream(getClass.getResourceAsStream("article_funder_template.json")).mkString diff --git a/dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/doiboost/mag/DatasetModel.scala b/dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/doiboost/mag/DatasetModel.scala deleted file mode 100644 index 07235d770b..0000000000 --- a/dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/doiboost/mag/DatasetModel.scala +++ /dev/null @@ -1,14 +0,0 @@ -package eu.dnetlib.doiboost.mag - - -case class Papers(PaperId:Long, Rank:Integer, Doi:String, - DocType:String, PaperTitle:String, OriginalTitle:String, - BookTitle:String, Year:Option[Integer], Date:Option[java.sql.Timestamp], Publisher:String, - JournalId:Option[Long], ConferenceSeriesId:Option[Long], ConferenceInstanceId:Option[Long], - Volume:String, Issue:String, FirstPage:String, LastPage:String, - ReferenceCount:Option[Long], CitationCount:Option[Long], EstimatedCitation:Option[Long], - OriginalVenue:String, FamilyId:Option[Long], CreatedDate:java.sql.Timestamp) {} - - - - diff --git a/dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/doiboost/mag/MAGMappingTest.scala b/dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/doiboost/mag/MAGMappingTest.scala index f60e10cf5b..0aaaeb3777 100644 --- a/dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/doiboost/mag/MAGMappingTest.scala +++ b/dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/doiboost/mag/MAGMappingTest.scala @@ -1,13 +1,10 @@ package eu.dnetlib.doiboost.mag -import org.apache.spark.SparkConf -import org.apache.spark.api.java.function.ReduceFunction -import org.apache.spark.rdd.RDD -import org.apache.spark.sql.{Dataset, Encoders, SaveMode, SparkSession} import org.codehaus.jackson.map.ObjectMapper import org.junit.jupiter.api.Test import org.slf4j.{Logger, LoggerFactory} -import org.apache.spark.sql.functions._ +import org.junit.jupiter.api.Assertions._ +import scala.io.Source class MAGMappingTest { @@ -18,34 +15,18 @@ class MAGMappingTest { //@Test def testMAGCSV(): Unit = { - - val conf: SparkConf = new SparkConf() - val spark: SparkSession = - SparkSession - .builder() - .config(conf) - .appName(getClass.getSimpleName) - .master("local[*]").getOrCreate() + SparkPreProcessMAG.main("-m local[*] -s /data/doiboost/mag/datasets -t /data/doiboost/mag/datasets/preprocess".split(" ")) + } - import spark.implicits._ - val d: Dataset[Papers] = spark.read.load("/data/doiboost/mag/datasets/Papers").as[Papers] - logger.info(s"Total number of element: ${d.where(col("Doi").isNotNull).count()}") - //implicit val mapEncoder = org.apache.spark.sql.Encoders.bean[Papers] - val result: RDD[Papers] = d.where(col("Doi").isNotNull).rdd.map { p: Papers => Tuple2(p.Doi, p) }.reduceByKey {case (p1:Papers, p2:Papers) => - var r = if (p1==null) p2 else p1 - if (p1!=null && p2!=null ) if (p1.CreatedDate.before(p2.CreatedDate)) - r = p1 - else - r = p2 - r - }.map(_._2) - - - val distinctPaper:Dataset[Papers] = spark.createDataset(result) - distinctPaper.write.mode(SaveMode.Overwrite).save("/data/doiboost/mag/datasets/Papers_d") - logger.info(s"Total number of element: ${result.count()}") + @Test + def buildInvertedIndexTest() :Unit = { + val json_input = Source.fromInputStream(getClass.getResourceAsStream("invertedIndex.json")).mkString + val description = ConversionUtil.convertInvertedIndexString(json_input) + assertNotNull(description) + assertTrue(description.nonEmpty) + logger.debug(description) } diff --git a/dhp-workflows/dhp-doiboost/src/test/resources/eu/dnetlib/doiboost/mag/invertedIndex.json b/dhp-workflows/dhp-doiboost/src/test/resources/eu/dnetlib/doiboost/mag/invertedIndex.json new file mode 100644 index 0000000000..0a84e330d2 --- /dev/null +++ b/dhp-workflows/dhp-doiboost/src/test/resources/eu/dnetlib/doiboost/mag/invertedIndex.json @@ -0,0 +1,334 @@ +{ + "IndexLength": 139, + "InvertedIndex": { + "The": [ + 0, + 23, + 47 + ], + "invention": [ + 1, + 53 + ], + "discloses": [ + 2 + ], + "a": [ + 3, + 10, + 71, + 81, + 121 + ], + "treatment": [ + 4, + 69, + 85, + 96 + ], + "method": [ + 5, + 24, + 49 + ], + "of": [ + 6, + 9, + 19, + 57, + 84, + 117, + 120 + ], + "waste": [ + 7, + 118 + ], + "mash": [ + 8, + 119 + ], + "cane": [ + 11, + 122 + ], + "sugar": [ + 12, + 123 + ], + "factory,": [ + 13 + ], + "belonging": [ + 14 + ], + "to": [ + 15 + ], + "the": [ + 16, + 26, + 52, + 55, + 66, + 93, + 115, + 135 + ], + "technical": [ + 17, + 48 + ], + "field": [ + 18 + ], + "industrial": [ + 20 + ], + "wastewater": [ + 21 + ], + "treatment.": [ + 22 + ], + "comprises": [ + 25 + ], + "following": [ + 27 + ], + "steps": [ + 28 + ], + "of:": [ + 29 + ], + "(1)": [ + 30 + ], + "pretreatment;": [ + 31 + ], + "(2)": [ + 32 + ], + "primary": [ + 33 + ], + "concentration;": [ + 34 + ], + "(3)": [ + 35 + ], + "cooling": [ + 36 + ], + "sedimentation": [ + 37 + ], + "and": [ + 38, + 45, + 62, + 80, + 86, + 114, + 134 + ], + "dense": [ + 39 + ], + "slurry": [ + 40 + ], + "drying;": [ + 41 + ], + "(4)": [ + 42 + ], + "secondary": [ + 43 + ], + "concentration": [ + 44 + ], + "drying.": [ + 46 + ], + "disclosed": [ + 50 + ], + "by": [ + 51 + ], + "has": [ + 54 + ], + "advantages": [ + 56 + ], + "small": [ + 58 + ], + "investment,": [ + 59 + ], + "simple": [ + 60 + ], + "equipment": [ + 61 + ], + "easiness": [ + 63 + ], + "in": [ + 64, + 132 + ], + "popularization;": [ + 65 + ], + "product": [ + 67 + ], + "after": [ + 68 + ], + "is": [ + 70, + 91, + 98, + 102, + 112, + 130, + 137 + ], + "high-quality": [ + 72 + ], + "high": [ + 73 + ], + "value-added": [ + 74 + ], + "(fully": [ + 75 + ], + "water-soluble)": [ + 76 + ], + "potassium": [ + 77 + ], + "humate": [ + 78 + ], + "product,": [ + 79 + ], + "new": [ + 82 + ], + "mode": [ + 83 + ], + "profit": [ + 87 + ], + "enabling": [ + 88 + ], + "sustainable": [ + 89 + ], + "development": [ + 90 + ], + "realized;": [ + 92 + ], + "environmental": [ + 94 + ], + "protection": [ + 95 + ], + "effect": [ + 97 + ], + "good,": [ + 99 + ], + "water": [ + 100, + 106 + ], + "balance": [ + 101 + ], + "realized": [ + 103 + ], + "through": [ + 104 + ], + "final": [ + 105 + ], + "quality": [ + 107 + ], + "treatment,": [ + 108 + ], + "real": [ + 109 + ], + "zero": [ + 110 + ], + "emission": [ + 111 + ], + "realized,": [ + 113 + ], + "problem": [ + 116 + ], + "factory": [ + 124 + ], + "can": [ + 125 + ], + "be": [ + 126 + ], + "solved": [ + 127 + ], + "fundamentally;": [ + 128 + ], + "energy": [ + 129 + ], + "saved": [ + 131 + ], + "operation,": [ + 133 + ], + "feasibility": [ + 136 + ], + "high.": [ + 138 + ] + } +} \ No newline at end of file