diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/mag/MagDataModel.scala b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/mag/MagDataModel.scala
new file mode 100644
index 000000000..189e90ed9
--- /dev/null
+++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/mag/MagDataModel.scala
@@ -0,0 +1,53 @@
+package eu.dnetlib.doiboost.mag
+
+
+import org.json4s
+import org.json4s.DefaultFormats
+import org.json4s.jackson.JsonMethods.parse
+
+
+case class Papers(PaperId:Long, Rank:Integer, Doi:String,
+ DocType:String, PaperTitle:String, OriginalTitle:String,
+ BookTitle:String, Year:Option[Integer], Date:Option[java.sql.Timestamp], Publisher:String,
+ JournalId:Option[Long], ConferenceSeriesId:Option[Long], ConferenceInstanceId:Option[Long],
+ Volume:String, Issue:String, FirstPage:String, LastPage:String,
+ ReferenceCount:Option[Long], CitationCount:Option[Long], EstimatedCitation:Option[Long],
+ OriginalVenue:String, FamilyId:Option[Long], CreatedDate:java.sql.Timestamp) {}
+
+
+case class PaperAbstract(PaperId:Long,IndexedAbstract:String) {}
+
+
+
+case object ConversionUtil {
+
+
+
+ def transformPaperAbstract(input:PaperAbstract) : PaperAbstract = {
+ PaperAbstract(input.PaperId, convertInvertedIndexString(input.IndexedAbstract))
+ }
+
+
+
+ def convertInvertedIndexString(json_input:String) :String = {
+ implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats
+ lazy val json: json4s.JValue = parse(json_input)
+
+
+
+ val idl = (json \ "IndexLength").extract[Int]
+
+ if (idl > 0) {
+ val res = Array.ofDim[String](idl)
+
+ val iid = (json \ "InvertedIndex").extract[Map[String, List[Int]]]
+
+ for {(k:String,v:List[Int]) <- iid}{
+ v.foreach(item => res(item) = k)
+ }
+ return res.mkString(" ")
+
+ }
+ ""
+ }
+}
diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/mag/SparkImportMagIntoDataset.scala b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/mag/SparkImportMagIntoDataset.scala
index 82ea48f33..f291a92f9 100644
--- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/mag/SparkImportMagIntoDataset.scala
+++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/mag/SparkImportMagIntoDataset.scala
@@ -63,7 +63,7 @@ object SparkImportMagIntoDataset {
def main(args: Array[String]): Unit = {
val logger: Logger = LoggerFactory.getLogger(getClass)
val conf: SparkConf = new SparkConf()
- val parser = new ArgumentApplicationParser(IOUtils.toString(getClass.getResourceAsStream("/eu/dnetlib/dhp/doiboost/convert_mag_to_oaf_params.json")))
+ val parser = new ArgumentApplicationParser(IOUtils.toString(getClass.getResourceAsStream("/eu/dnetlib/dhp/doiboost/mag/convert_mag_to_oaf_params.json")))
parser.parseArgument(args)
val spark: SparkSession =
SparkSession
diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/mag/SparkPreProcessMAG.scala b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/mag/SparkPreProcessMAG.scala
new file mode 100644
index 000000000..4c014a95c
--- /dev/null
+++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/mag/SparkPreProcessMAG.scala
@@ -0,0 +1,63 @@
+package eu.dnetlib.doiboost.mag
+
+import eu.dnetlib.dhp.application.ArgumentApplicationParser
+import org.apache.commons.io.IOUtils
+import org.apache.spark.SparkConf
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{Dataset, SaveMode, SparkSession}
+import org.slf4j.{Logger, LoggerFactory}
+import org.apache.spark.sql.functions._
+
+object SparkPreProcessMAG {
+
+
+ def main(args: Array[String]): Unit = {
+
+ val logger: Logger = LoggerFactory.getLogger(getClass)
+ val conf: SparkConf = new SparkConf()
+ val parser = new ArgumentApplicationParser(IOUtils.toString(getClass.getResourceAsStream("/eu/dnetlib/dhp/doiboost/mag/preprocess_mag_params.json")))
+ parser.parseArgument(args)
+ val spark: SparkSession =
+ SparkSession
+ .builder()
+ .config(conf)
+ .appName(getClass.getSimpleName)
+ .master(parser.get("master")).getOrCreate()
+ import spark.implicits._
+
+ logger.info("Phase 1) make uninque DOI in Papers:")
+
+ val d: Dataset[Papers] = spark.read.load(s"${parser.get("sourcePath")}/Papers").as[Papers]
+
+
+ // Filtering Papers with DOI, and since for the same DOI we have multiple version of item with different PapersId we get the last one
+ val result: RDD[Papers] = d.where(col("Doi").isNotNull).rdd.map { p: Papers => Tuple2(p.Doi, p) }.reduceByKey { case (p1: Papers, p2: Papers) =>
+ var r = if (p1 == null) p2 else p1
+ if (p1 != null && p2 != null) {
+ if (p1.CreatedDate != null && p2.CreatedDate != null) {
+ if (p1.CreatedDate.before(p2.CreatedDate))
+ r = p1
+ else
+ r = p2
+ } else {
+ r = if (p1.CreatedDate == null) p2 else p1
+ }
+ }
+ r
+ }.map(_._2)
+
+ val distinctPaper: Dataset[Papers] = spark.createDataset(result)
+ distinctPaper.write.mode(SaveMode.Overwrite).save(s"${parser.get("targetPath")}/Papers_distinct")
+ logger.info(s"Total number of element: ${result.count()}")
+
+ logger.info("Phase 2) convert InverdIndex Abastrac to string")
+ val pa = spark.read.load(s"${parser.get("sourcePath")}/PaperAbstractsInvertedIndex").as[PaperAbstract]
+ pa.map(ConversionUtil.transformPaperAbstract).write.mode(SaveMode.Overwrite).save(s"${parser.get("targetPath")}/PaperAbstract")
+
+
+ distinctPaper.joinWith(pa, col("PaperId").eqia)
+
+ }
+
+
+}
diff --git a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/convert_mag_to_oaf_params.json b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/mag/convert_mag_to_oaf_params.json
similarity index 100%
rename from dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/convert_mag_to_oaf_params.json
rename to dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/mag/convert_mag_to_oaf_params.json
diff --git a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/mag/oozie_app/workflow.xml b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/mag/oozie_app/workflow.xml
index 801dca612..ba6eea364 100644
--- a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/mag/oozie_app/workflow.xml
+++ b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/mag/oozie_app/workflow.xml
@@ -34,7 +34,7 @@
-
+
@@ -59,5 +59,28 @@
+
+
+
+
+ yarn-cluster
+ cluster
+ Convert Mag to Dataset
+ eu.dnetlib.doiboost.mag.SparkPreProcessMAG
+ dhp-doiboost-${projectVersion}.jar
+
+ --executor-memory=${sparkExecutorMemory}
+ --executor-cores=${sparkExecutorCores}
+ --driver-memory=${sparkDriverMemory}
+ ${sparkExtraOPT}
+
+ --sourcePath${sourcePath}
+ --targetPath${targetPath}
+ --masteryarn-cluster
+
+
+
+
+
\ No newline at end of file
diff --git a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/mag/preprocess_mag_params.json b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/mag/preprocess_mag_params.json
new file mode 100644
index 000000000..bf0b80f69
--- /dev/null
+++ b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/mag/preprocess_mag_params.json
@@ -0,0 +1,6 @@
+[
+ {"paramName":"s", "paramLongName":"sourcePath", "paramDescription": "the base path of MAG input", "paramRequired": true},
+ {"paramName":"t", "paramLongName":"targetPath", "paramDescription": "the working dir path", "paramRequired": true},
+ {"paramName":"m", "paramLongName":"master", "paramDescription": "the master name", "paramRequired": true}
+
+]
\ No newline at end of file
diff --git a/dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/doiboost/CrossrefMappingTest.scala b/dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/doiboost/CrossrefMappingTest.scala
index 75a63d70f..2d7cf4216 100644
--- a/dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/doiboost/CrossrefMappingTest.scala
+++ b/dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/doiboost/CrossrefMappingTest.scala
@@ -1,20 +1,15 @@
package eu.dnetlib.doiboost
-import com.fasterxml.jackson.databind.SerializationFeature
-import eu.dnetlib.dhp.schema.oaf.{Dataset, KeyValue, Oaf, Publication, Relation, Result}
+import eu.dnetlib.dhp.schema.oaf._
import eu.dnetlib.dhp.utils.DHPUtils
-import eu.dnetlib.doiboost.crossref.{Crossref2Oaf, SparkMapDumpIntoOAF}
-import eu.dnetlib.doiboost.mag.SparkImportMagIntoDataset
-import org.apache.spark.{SparkConf, sql}
-import org.apache.spark.sql.{Encoder, Encoders, SparkSession}
+import eu.dnetlib.doiboost.crossref.Crossref2Oaf
import org.codehaus.jackson.map.ObjectMapper
-import org.junit.jupiter.api.Test
-
-import scala.io.Source
import org.junit.jupiter.api.Assertions._
+import org.junit.jupiter.api.Test
import org.slf4j.{Logger, LoggerFactory}
import scala.collection.JavaConverters._
+import scala.io.Source
import scala.util.matching.Regex
@@ -24,12 +19,6 @@ class CrossrefMappingTest {
val mapper = new ObjectMapper()
-
- def testMAGCSV() :Unit = {
- SparkImportMagIntoDataset.main(null)
- }
-
-
@Test
def testFunderRelationshipsMapping(): Unit = {
val template = Source.fromInputStream(getClass.getResourceAsStream("article_funder_template.json")).mkString
diff --git a/dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/doiboost/mag/DatasetModel.scala b/dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/doiboost/mag/DatasetModel.scala
deleted file mode 100644
index 07235d770..000000000
--- a/dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/doiboost/mag/DatasetModel.scala
+++ /dev/null
@@ -1,14 +0,0 @@
-package eu.dnetlib.doiboost.mag
-
-
-case class Papers(PaperId:Long, Rank:Integer, Doi:String,
- DocType:String, PaperTitle:String, OriginalTitle:String,
- BookTitle:String, Year:Option[Integer], Date:Option[java.sql.Timestamp], Publisher:String,
- JournalId:Option[Long], ConferenceSeriesId:Option[Long], ConferenceInstanceId:Option[Long],
- Volume:String, Issue:String, FirstPage:String, LastPage:String,
- ReferenceCount:Option[Long], CitationCount:Option[Long], EstimatedCitation:Option[Long],
- OriginalVenue:String, FamilyId:Option[Long], CreatedDate:java.sql.Timestamp) {}
-
-
-
-
diff --git a/dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/doiboost/mag/MAGMappingTest.scala b/dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/doiboost/mag/MAGMappingTest.scala
index f60e10cf5..0aaaeb377 100644
--- a/dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/doiboost/mag/MAGMappingTest.scala
+++ b/dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/doiboost/mag/MAGMappingTest.scala
@@ -1,13 +1,10 @@
package eu.dnetlib.doiboost.mag
-import org.apache.spark.SparkConf
-import org.apache.spark.api.java.function.ReduceFunction
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.{Dataset, Encoders, SaveMode, SparkSession}
import org.codehaus.jackson.map.ObjectMapper
import org.junit.jupiter.api.Test
import org.slf4j.{Logger, LoggerFactory}
-import org.apache.spark.sql.functions._
+import org.junit.jupiter.api.Assertions._
+import scala.io.Source
class MAGMappingTest {
@@ -18,34 +15,18 @@ class MAGMappingTest {
//@Test
def testMAGCSV(): Unit = {
-
- val conf: SparkConf = new SparkConf()
- val spark: SparkSession =
- SparkSession
- .builder()
- .config(conf)
- .appName(getClass.getSimpleName)
- .master("local[*]").getOrCreate()
+ SparkPreProcessMAG.main("-m local[*] -s /data/doiboost/mag/datasets -t /data/doiboost/mag/datasets/preprocess".split(" "))
+ }
- import spark.implicits._
- val d: Dataset[Papers] = spark.read.load("/data/doiboost/mag/datasets/Papers").as[Papers]
- logger.info(s"Total number of element: ${d.where(col("Doi").isNotNull).count()}")
- //implicit val mapEncoder = org.apache.spark.sql.Encoders.bean[Papers]
- val result: RDD[Papers] = d.where(col("Doi").isNotNull).rdd.map { p: Papers => Tuple2(p.Doi, p) }.reduceByKey {case (p1:Papers, p2:Papers) =>
- var r = if (p1==null) p2 else p1
- if (p1!=null && p2!=null ) if (p1.CreatedDate.before(p2.CreatedDate))
- r = p1
- else
- r = p2
- r
- }.map(_._2)
-
-
- val distinctPaper:Dataset[Papers] = spark.createDataset(result)
- distinctPaper.write.mode(SaveMode.Overwrite).save("/data/doiboost/mag/datasets/Papers_d")
- logger.info(s"Total number of element: ${result.count()}")
+ @Test
+ def buildInvertedIndexTest() :Unit = {
+ val json_input = Source.fromInputStream(getClass.getResourceAsStream("invertedIndex.json")).mkString
+ val description = ConversionUtil.convertInvertedIndexString(json_input)
+ assertNotNull(description)
+ assertTrue(description.nonEmpty)
+ logger.debug(description)
}
diff --git a/dhp-workflows/dhp-doiboost/src/test/resources/eu/dnetlib/doiboost/mag/invertedIndex.json b/dhp-workflows/dhp-doiboost/src/test/resources/eu/dnetlib/doiboost/mag/invertedIndex.json
new file mode 100644
index 000000000..0a84e330d
--- /dev/null
+++ b/dhp-workflows/dhp-doiboost/src/test/resources/eu/dnetlib/doiboost/mag/invertedIndex.json
@@ -0,0 +1,334 @@
+{
+ "IndexLength": 139,
+ "InvertedIndex": {
+ "The": [
+ 0,
+ 23,
+ 47
+ ],
+ "invention": [
+ 1,
+ 53
+ ],
+ "discloses": [
+ 2
+ ],
+ "a": [
+ 3,
+ 10,
+ 71,
+ 81,
+ 121
+ ],
+ "treatment": [
+ 4,
+ 69,
+ 85,
+ 96
+ ],
+ "method": [
+ 5,
+ 24,
+ 49
+ ],
+ "of": [
+ 6,
+ 9,
+ 19,
+ 57,
+ 84,
+ 117,
+ 120
+ ],
+ "waste": [
+ 7,
+ 118
+ ],
+ "mash": [
+ 8,
+ 119
+ ],
+ "cane": [
+ 11,
+ 122
+ ],
+ "sugar": [
+ 12,
+ 123
+ ],
+ "factory,": [
+ 13
+ ],
+ "belonging": [
+ 14
+ ],
+ "to": [
+ 15
+ ],
+ "the": [
+ 16,
+ 26,
+ 52,
+ 55,
+ 66,
+ 93,
+ 115,
+ 135
+ ],
+ "technical": [
+ 17,
+ 48
+ ],
+ "field": [
+ 18
+ ],
+ "industrial": [
+ 20
+ ],
+ "wastewater": [
+ 21
+ ],
+ "treatment.": [
+ 22
+ ],
+ "comprises": [
+ 25
+ ],
+ "following": [
+ 27
+ ],
+ "steps": [
+ 28
+ ],
+ "of:": [
+ 29
+ ],
+ "(1)": [
+ 30
+ ],
+ "pretreatment;": [
+ 31
+ ],
+ "(2)": [
+ 32
+ ],
+ "primary": [
+ 33
+ ],
+ "concentration;": [
+ 34
+ ],
+ "(3)": [
+ 35
+ ],
+ "cooling": [
+ 36
+ ],
+ "sedimentation": [
+ 37
+ ],
+ "and": [
+ 38,
+ 45,
+ 62,
+ 80,
+ 86,
+ 114,
+ 134
+ ],
+ "dense": [
+ 39
+ ],
+ "slurry": [
+ 40
+ ],
+ "drying;": [
+ 41
+ ],
+ "(4)": [
+ 42
+ ],
+ "secondary": [
+ 43
+ ],
+ "concentration": [
+ 44
+ ],
+ "drying.": [
+ 46
+ ],
+ "disclosed": [
+ 50
+ ],
+ "by": [
+ 51
+ ],
+ "has": [
+ 54
+ ],
+ "advantages": [
+ 56
+ ],
+ "small": [
+ 58
+ ],
+ "investment,": [
+ 59
+ ],
+ "simple": [
+ 60
+ ],
+ "equipment": [
+ 61
+ ],
+ "easiness": [
+ 63
+ ],
+ "in": [
+ 64,
+ 132
+ ],
+ "popularization;": [
+ 65
+ ],
+ "product": [
+ 67
+ ],
+ "after": [
+ 68
+ ],
+ "is": [
+ 70,
+ 91,
+ 98,
+ 102,
+ 112,
+ 130,
+ 137
+ ],
+ "high-quality": [
+ 72
+ ],
+ "high": [
+ 73
+ ],
+ "value-added": [
+ 74
+ ],
+ "(fully": [
+ 75
+ ],
+ "water-soluble)": [
+ 76
+ ],
+ "potassium": [
+ 77
+ ],
+ "humate": [
+ 78
+ ],
+ "product,": [
+ 79
+ ],
+ "new": [
+ 82
+ ],
+ "mode": [
+ 83
+ ],
+ "profit": [
+ 87
+ ],
+ "enabling": [
+ 88
+ ],
+ "sustainable": [
+ 89
+ ],
+ "development": [
+ 90
+ ],
+ "realized;": [
+ 92
+ ],
+ "environmental": [
+ 94
+ ],
+ "protection": [
+ 95
+ ],
+ "effect": [
+ 97
+ ],
+ "good,": [
+ 99
+ ],
+ "water": [
+ 100,
+ 106
+ ],
+ "balance": [
+ 101
+ ],
+ "realized": [
+ 103
+ ],
+ "through": [
+ 104
+ ],
+ "final": [
+ 105
+ ],
+ "quality": [
+ 107
+ ],
+ "treatment,": [
+ 108
+ ],
+ "real": [
+ 109
+ ],
+ "zero": [
+ 110
+ ],
+ "emission": [
+ 111
+ ],
+ "realized,": [
+ 113
+ ],
+ "problem": [
+ 116
+ ],
+ "factory": [
+ 124
+ ],
+ "can": [
+ 125
+ ],
+ "be": [
+ 126
+ ],
+ "solved": [
+ 127
+ ],
+ "fundamentally;": [
+ 128
+ ],
+ "energy": [
+ 129
+ ],
+ "saved": [
+ 131
+ ],
+ "operation,": [
+ 133
+ ],
+ "feasibility": [
+ 136
+ ],
+ "high.": [
+ 138
+ ]
+ }
+}
\ No newline at end of file