enrichment steps #38

Merged
claudio.atzori merged 334 commits from miriam.baglioni/dnet-hadoop:master into enrichment_wfs 2020-08-11 16:40:26 +02:00
10 changed files with 496 additions and 61 deletions
Showing only changes of commit d876f47d06 - Show all commits

View File

@ -0,0 +1,53 @@
package eu.dnetlib.doiboost.mag
import org.json4s
import org.json4s.DefaultFormats
import org.json4s.jackson.JsonMethods.parse
case class Papers(PaperId:Long, Rank:Integer, Doi:String,
DocType:String, PaperTitle:String, OriginalTitle:String,
BookTitle:String, Year:Option[Integer], Date:Option[java.sql.Timestamp], Publisher:String,
JournalId:Option[Long], ConferenceSeriesId:Option[Long], ConferenceInstanceId:Option[Long],
Volume:String, Issue:String, FirstPage:String, LastPage:String,
ReferenceCount:Option[Long], CitationCount:Option[Long], EstimatedCitation:Option[Long],
OriginalVenue:String, FamilyId:Option[Long], CreatedDate:java.sql.Timestamp) {}
case class PaperAbstract(PaperId:Long,IndexedAbstract:String) {}
case object ConversionUtil {
def transformPaperAbstract(input:PaperAbstract) : PaperAbstract = {
PaperAbstract(input.PaperId, convertInvertedIndexString(input.IndexedAbstract))
}
def convertInvertedIndexString(json_input:String) :String = {
implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats
lazy val json: json4s.JValue = parse(json_input)
val idl = (json \ "IndexLength").extract[Int]
if (idl > 0) {
val res = Array.ofDim[String](idl)
val iid = (json \ "InvertedIndex").extract[Map[String, List[Int]]]
for {(k:String,v:List[Int]) <- iid}{
v.foreach(item => res(item) = k)
}
return res.mkString(" ")
}
""
}
}

View File

@ -63,7 +63,7 @@ object SparkImportMagIntoDataset {
def main(args: Array[String]): Unit = { def main(args: Array[String]): Unit = {
val logger: Logger = LoggerFactory.getLogger(getClass) val logger: Logger = LoggerFactory.getLogger(getClass)
val conf: SparkConf = new SparkConf() val conf: SparkConf = new SparkConf()
val parser = new ArgumentApplicationParser(IOUtils.toString(getClass.getResourceAsStream("/eu/dnetlib/dhp/doiboost/convert_mag_to_oaf_params.json"))) val parser = new ArgumentApplicationParser(IOUtils.toString(getClass.getResourceAsStream("/eu/dnetlib/dhp/doiboost/mag/convert_mag_to_oaf_params.json")))
parser.parseArgument(args) parser.parseArgument(args)
val spark: SparkSession = val spark: SparkSession =
SparkSession SparkSession

View File

@ -0,0 +1,63 @@
package eu.dnetlib.doiboost.mag
import eu.dnetlib.dhp.application.ArgumentApplicationParser
import org.apache.commons.io.IOUtils
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Dataset, SaveMode, SparkSession}
import org.slf4j.{Logger, LoggerFactory}
import org.apache.spark.sql.functions._
object SparkPreProcessMAG {
def main(args: Array[String]): Unit = {
val logger: Logger = LoggerFactory.getLogger(getClass)
val conf: SparkConf = new SparkConf()
val parser = new ArgumentApplicationParser(IOUtils.toString(getClass.getResourceAsStream("/eu/dnetlib/dhp/doiboost/mag/preprocess_mag_params.json")))
parser.parseArgument(args)
val spark: SparkSession =
SparkSession
.builder()
.config(conf)
.appName(getClass.getSimpleName)
.master(parser.get("master")).getOrCreate()
import spark.implicits._
logger.info("Phase 1) make uninque DOI in Papers:")
val d: Dataset[Papers] = spark.read.load(s"${parser.get("sourcePath")}/Papers").as[Papers]
// Filtering Papers with DOI, and since for the same DOI we have multiple version of item with different PapersId we get the last one
val result: RDD[Papers] = d.where(col("Doi").isNotNull).rdd.map { p: Papers => Tuple2(p.Doi, p) }.reduceByKey { case (p1: Papers, p2: Papers) =>
var r = if (p1 == null) p2 else p1
if (p1 != null && p2 != null) {
if (p1.CreatedDate != null && p2.CreatedDate != null) {
if (p1.CreatedDate.before(p2.CreatedDate))
r = p1
else
r = p2
} else {
r = if (p1.CreatedDate == null) p2 else p1
}
}
r
}.map(_._2)
val distinctPaper: Dataset[Papers] = spark.createDataset(result)
distinctPaper.write.mode(SaveMode.Overwrite).save(s"${parser.get("targetPath")}/Papers_distinct")
logger.info(s"Total number of element: ${result.count()}")
logger.info("Phase 2) convert InverdIndex Abastrac to string")
val pa = spark.read.load(s"${parser.get("sourcePath")}/PaperAbstractsInvertedIndex").as[PaperAbstract]
pa.map(ConversionUtil.transformPaperAbstract).write.mode(SaveMode.Overwrite).save(s"${parser.get("targetPath")}/PaperAbstract")
distinctPaper.joinWith(pa, col("PaperId").eqia)
}
}

View File

@ -34,7 +34,7 @@
<delete path='${targetPath}'/> <delete path='${targetPath}'/>
<mkdir path='${targetPath}'/> <mkdir path='${targetPath}'/>
</fs> </fs>
<ok to="ConvertMagToDataset"/> <ok to="PreprocessMag"/>
<error to="Kill"/> <error to="Kill"/>
</action> </action>
@ -59,5 +59,28 @@
<error to="Kill"/> <error to="Kill"/>
</action> </action>
<action name="PreprocessMag">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn-cluster</master>
<mode>cluster</mode>
<name>Convert Mag to Dataset</name>
<class>eu.dnetlib.doiboost.mag.SparkPreProcessMAG</class>
<jar>dhp-doiboost-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
${sparkExtraOPT}
</spark-opts>
<arg>--sourcePath</arg><arg>${sourcePath}</arg>
<arg>--targetPath</arg><arg>${targetPath}</arg>
<arg>--master</arg><arg>yarn-cluster</arg>
</spark>
<ok to="End"/>
<error to="Kill"/>
</action>
<end name="End"/> <end name="End"/>
</workflow-app> </workflow-app>

View File

@ -0,0 +1,6 @@
[
{"paramName":"s", "paramLongName":"sourcePath", "paramDescription": "the base path of MAG input", "paramRequired": true},
{"paramName":"t", "paramLongName":"targetPath", "paramDescription": "the working dir path", "paramRequired": true},
{"paramName":"m", "paramLongName":"master", "paramDescription": "the master name", "paramRequired": true}
]

View File

@ -1,20 +1,15 @@
package eu.dnetlib.doiboost package eu.dnetlib.doiboost
import com.fasterxml.jackson.databind.SerializationFeature import eu.dnetlib.dhp.schema.oaf._
import eu.dnetlib.dhp.schema.oaf.{Dataset, KeyValue, Oaf, Publication, Relation, Result}
import eu.dnetlib.dhp.utils.DHPUtils import eu.dnetlib.dhp.utils.DHPUtils
import eu.dnetlib.doiboost.crossref.{Crossref2Oaf, SparkMapDumpIntoOAF} import eu.dnetlib.doiboost.crossref.Crossref2Oaf
import eu.dnetlib.doiboost.mag.SparkImportMagIntoDataset
import org.apache.spark.{SparkConf, sql}
import org.apache.spark.sql.{Encoder, Encoders, SparkSession}
import org.codehaus.jackson.map.ObjectMapper import org.codehaus.jackson.map.ObjectMapper
import org.junit.jupiter.api.Test
import scala.io.Source
import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.Test
import org.slf4j.{Logger, LoggerFactory} import org.slf4j.{Logger, LoggerFactory}
import scala.collection.JavaConverters._ import scala.collection.JavaConverters._
import scala.io.Source
import scala.util.matching.Regex import scala.util.matching.Regex
@ -24,12 +19,6 @@ class CrossrefMappingTest {
val mapper = new ObjectMapper() val mapper = new ObjectMapper()
def testMAGCSV() :Unit = {
SparkImportMagIntoDataset.main(null)
}
@Test @Test
def testFunderRelationshipsMapping(): Unit = { def testFunderRelationshipsMapping(): Unit = {
val template = Source.fromInputStream(getClass.getResourceAsStream("article_funder_template.json")).mkString val template = Source.fromInputStream(getClass.getResourceAsStream("article_funder_template.json")).mkString

View File

@ -1,14 +0,0 @@
package eu.dnetlib.doiboost.mag
case class Papers(PaperId:Long, Rank:Integer, Doi:String,
DocType:String, PaperTitle:String, OriginalTitle:String,
BookTitle:String, Year:Option[Integer], Date:Option[java.sql.Timestamp], Publisher:String,
JournalId:Option[Long], ConferenceSeriesId:Option[Long], ConferenceInstanceId:Option[Long],
Volume:String, Issue:String, FirstPage:String, LastPage:String,
ReferenceCount:Option[Long], CitationCount:Option[Long], EstimatedCitation:Option[Long],
OriginalVenue:String, FamilyId:Option[Long], CreatedDate:java.sql.Timestamp) {}

View File

@ -1,13 +1,10 @@
package eu.dnetlib.doiboost.mag package eu.dnetlib.doiboost.mag
import org.apache.spark.SparkConf
import org.apache.spark.api.java.function.ReduceFunction
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Dataset, Encoders, SaveMode, SparkSession}
import org.codehaus.jackson.map.ObjectMapper import org.codehaus.jackson.map.ObjectMapper
import org.junit.jupiter.api.Test import org.junit.jupiter.api.Test
import org.slf4j.{Logger, LoggerFactory} import org.slf4j.{Logger, LoggerFactory}
import org.apache.spark.sql.functions._ import org.junit.jupiter.api.Assertions._
import scala.io.Source
class MAGMappingTest { class MAGMappingTest {
@ -18,34 +15,18 @@ class MAGMappingTest {
//@Test //@Test
def testMAGCSV(): Unit = { def testMAGCSV(): Unit = {
SparkPreProcessMAG.main("-m local[*] -s /data/doiboost/mag/datasets -t /data/doiboost/mag/datasets/preprocess".split(" "))
val conf: SparkConf = new SparkConf() }
val spark: SparkSession =
SparkSession
.builder()
.config(conf)
.appName(getClass.getSimpleName)
.master("local[*]").getOrCreate()
import spark.implicits._ @Test
val d: Dataset[Papers] = spark.read.load("/data/doiboost/mag/datasets/Papers").as[Papers] def buildInvertedIndexTest() :Unit = {
logger.info(s"Total number of element: ${d.where(col("Doi").isNotNull).count()}") val json_input = Source.fromInputStream(getClass.getResourceAsStream("invertedIndex.json")).mkString
//implicit val mapEncoder = org.apache.spark.sql.Encoders.bean[Papers] val description = ConversionUtil.convertInvertedIndexString(json_input)
val result: RDD[Papers] = d.where(col("Doi").isNotNull).rdd.map { p: Papers => Tuple2(p.Doi, p) }.reduceByKey {case (p1:Papers, p2:Papers) => assertNotNull(description)
var r = if (p1==null) p2 else p1 assertTrue(description.nonEmpty)
if (p1!=null && p2!=null ) if (p1.CreatedDate.before(p2.CreatedDate))
r = p1
else
r = p2
r
}.map(_._2)
val distinctPaper:Dataset[Papers] = spark.createDataset(result)
distinctPaper.write.mode(SaveMode.Overwrite).save("/data/doiboost/mag/datasets/Papers_d")
logger.info(s"Total number of element: ${result.count()}")
logger.debug(description)
} }

View File

@ -0,0 +1,334 @@
{
"IndexLength": 139,
"InvertedIndex": {
"The": [
0,
23,
47
],
"invention": [
1,
53
],
"discloses": [
2
],
"a": [
3,
10,
71,
81,
121
],
"treatment": [
4,
69,
85,
96
],
"method": [
5,
24,
49
],
"of": [
6,
9,
19,
57,
84,
117,
120
],
"waste": [
7,
118
],
"mash": [
8,
119
],
"cane": [
11,
122
],
"sugar": [
12,
123
],
"factory,": [
13
],
"belonging": [
14
],
"to": [
15
],
"the": [
16,
26,
52,
55,
66,
93,
115,
135
],
"technical": [
17,
48
],
"field": [
18
],
"industrial": [
20
],
"wastewater": [
21
],
"treatment.": [
22
],
"comprises": [
25
],
"following": [
27
],
"steps": [
28
],
"of:": [
29
],
"(1)": [
30
],
"pretreatment;": [
31
],
"(2)": [
32
],
"primary": [
33
],
"concentration;": [
34
],
"(3)": [
35
],
"cooling": [
36
],
"sedimentation": [
37
],
"and": [
38,
45,
62,
80,
86,
114,
134
],
"dense": [
39
],
"slurry": [
40
],
"drying;": [
41
],
"(4)": [
42
],
"secondary": [
43
],
"concentration": [
44
],
"drying.": [
46
],
"disclosed": [
50
],
"by": [
51
],
"has": [
54
],
"advantages": [
56
],
"small": [
58
],
"investment,": [
59
],
"simple": [
60
],
"equipment": [
61
],
"easiness": [
63
],
"in": [
64,
132
],
"popularization;": [
65
],
"product": [
67
],
"after": [
68
],
"is": [
70,
91,
98,
102,
112,
130,
137
],
"high-quality": [
72
],
"high": [
73
],
"value-added": [
74
],
"(fully": [
75
],
"water-soluble)": [
76
],
"potassium": [
77
],
"humate": [
78
],
"product,": [
79
],
"new": [
82
],
"mode": [
83
],
"profit": [
87
],
"enabling": [
88
],
"sustainable": [
89
],
"development": [
90
],
"realized;": [
92
],
"environmental": [
94
],
"protection": [
95
],
"effect": [
97
],
"good,": [
99
],
"water": [
100,
106
],
"balance": [
101
],
"realized": [
103
],
"through": [
104
],
"final": [
105
],
"quality": [
107
],
"treatment,": [
108
],
"real": [
109
],
"zero": [
110
],
"emission": [
111
],
"realized,": [
113
],
"problem": [
116
],
"factory": [
124
],
"can": [
125
],
"be": [
126
],
"solved": [
127
],
"fundamentally;": [
128
],
"energy": [
129
],
"saved": [
131
],
"operation,": [
133
],
"feasibility": [
136
],
"high.": [
138
]
}
}