forked from D-Net/dnet-hadoop
fixed process doiboost workflow:
- splitted OrcidToOAF into two phase preprocess and process - updated workflow used in production
This commit is contained in:
parent
bc4b86c27c
commit
c35c117601
|
@ -21,7 +21,7 @@ object SparkMapDumpIntoOAF {
|
||||||
|
|
||||||
val logger: Logger = LoggerFactory.getLogger(SparkMapDumpIntoOAF.getClass)
|
val logger: Logger = LoggerFactory.getLogger(SparkMapDumpIntoOAF.getClass)
|
||||||
val conf: SparkConf = new SparkConf()
|
val conf: SparkConf = new SparkConf()
|
||||||
val parser = new ArgumentApplicationParser(IOUtils.toString(SparkMapDumpIntoOAF.getClass.getResourceAsStream("/eu/dnetlib/dhp/doiboost/convert_map_to_oaf_params.json")))
|
val parser = new ArgumentApplicationParser(IOUtils.toString(SparkMapDumpIntoOAF.getClass.getResourceAsStream("/eu/dnetlib/dhp/doiboost/convert_crossref_dump_to_oaf_params.json")))
|
||||||
parser.parseArgument(args)
|
parser.parseArgument(args)
|
||||||
val spark: SparkSession =
|
val spark: SparkSession =
|
||||||
SparkSession
|
SparkSession
|
||||||
|
|
|
@ -1,61 +1,18 @@
|
||||||
package eu.dnetlib.doiboost.orcid
|
package eu.dnetlib.doiboost.orcid
|
||||||
|
|
||||||
import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper}
|
|
||||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser
|
import eu.dnetlib.dhp.application.ArgumentApplicationParser
|
||||||
import eu.dnetlib.dhp.oa.merge.AuthorMerger
|
|
||||||
import eu.dnetlib.dhp.schema.oaf.Publication
|
import eu.dnetlib.dhp.schema.oaf.Publication
|
||||||
import eu.dnetlib.dhp.schema.orcid.OrcidDOI
|
|
||||||
import org.apache.commons.io.IOUtils
|
import org.apache.commons.io.IOUtils
|
||||||
import org.apache.spark.SparkConf
|
import org.apache.spark.SparkConf
|
||||||
import org.apache.spark.rdd.RDD
|
import org.apache.spark.sql._
|
||||||
import org.apache.spark.sql.functions._
|
|
||||||
import org.apache.spark.sql.{Dataset, Encoder, Encoders, SaveMode, SparkSession}
|
|
||||||
import org.slf4j.{Logger, LoggerFactory}
|
import org.slf4j.{Logger, LoggerFactory}
|
||||||
|
|
||||||
object SparkConvertORCIDToOAF {
|
object SparkConvertORCIDToOAF {
|
||||||
val logger: Logger = LoggerFactory.getLogger(SparkConvertORCIDToOAF.getClass)
|
val logger: Logger = LoggerFactory.getLogger(SparkConvertORCIDToOAF.getClass)
|
||||||
|
|
||||||
def fixORCIDItem(item :ORCIDItem):ORCIDItem = {
|
|
||||||
new ORCIDItem(item.doi, item.authors.groupBy(_.oid).map(_._2.head).toList)
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
def run(spark:SparkSession,sourcePath:String,workingPath:String, targetPath:String):Unit = {
|
|
||||||
import spark.implicits._
|
|
||||||
implicit val mapEncoderPubs: Encoder[Publication] = Encoders.kryo[Publication]
|
|
||||||
|
|
||||||
val inputRDD:RDD[OrcidAuthor] = spark.sparkContext.textFile(s"$sourcePath/authors").map(s => ORCIDToOAF.convertORCIDAuthor(s)).filter(s => s!= null).filter(s => ORCIDToOAF.authorValid(s))
|
|
||||||
|
|
||||||
spark.createDataset(inputRDD).as[OrcidAuthor].write.mode(SaveMode.Overwrite).save(s"$workingPath/author")
|
|
||||||
|
|
||||||
val res = spark.sparkContext.textFile(s"$sourcePath/works").flatMap(s => ORCIDToOAF.extractDOIWorks(s)).filter(s => s!= null)
|
|
||||||
|
|
||||||
spark.createDataset(res).as[OrcidWork].write.mode(SaveMode.Overwrite).save(s"$workingPath/works")
|
|
||||||
|
|
||||||
val authors :Dataset[OrcidAuthor] = spark.read.load(s"$workingPath/author").as[OrcidAuthor]
|
|
||||||
|
|
||||||
val works :Dataset[OrcidWork] = spark.read.load(s"$workingPath/works").as[OrcidWork]
|
|
||||||
|
|
||||||
works.joinWith(authors, authors("oid").equalTo(works("oid")))
|
|
||||||
.map(i =>{
|
|
||||||
val doi = i._1.doi
|
|
||||||
var author = i._2
|
|
||||||
(doi, author)
|
|
||||||
}).groupBy(col("_1").alias("doi"))
|
|
||||||
.agg(collect_list(col("_2")).alias("authors")).as[ORCIDItem]
|
|
||||||
.map(s => fixORCIDItem(s))
|
|
||||||
.write.mode(SaveMode.Overwrite).save(s"$workingPath/orcidworksWithAuthor")
|
|
||||||
|
|
||||||
val dataset: Dataset[ORCIDItem] =spark.read.load(s"$workingPath/orcidworksWithAuthor").as[ORCIDItem]
|
|
||||||
|
|
||||||
logger.info("Converting ORCID to OAF")
|
|
||||||
dataset.map(o => ORCIDToOAF.convertTOOAF(o)).write.mode(SaveMode.Overwrite).save(targetPath)
|
|
||||||
}
|
|
||||||
|
|
||||||
def main(args: Array[String]): Unit = {
|
def main(args: Array[String]): Unit = {
|
||||||
val conf: SparkConf = new SparkConf()
|
val conf: SparkConf = new SparkConf()
|
||||||
val parser = new ArgumentApplicationParser(IOUtils.toString(SparkConvertORCIDToOAF.getClass.getResourceAsStream("/eu/dnetlib/dhp/doiboost/convert_map_to_oaf_params.json")))
|
val parser = new ArgumentApplicationParser(IOUtils.toString(SparkConvertORCIDToOAF.getClass.getResourceAsStream("/eu/dnetlib/dhp/doiboost/convert_orcid_to_oaf_params.json")))
|
||||||
parser.parseArgument(args)
|
parser.parseArgument(args)
|
||||||
val spark: SparkSession =
|
val spark: SparkSession =
|
||||||
SparkSession
|
SparkSession
|
||||||
|
@ -64,11 +21,16 @@ object SparkConvertORCIDToOAF {
|
||||||
.appName(getClass.getSimpleName)
|
.appName(getClass.getSimpleName)
|
||||||
.master(parser.get("master")).getOrCreate()
|
.master(parser.get("master")).getOrCreate()
|
||||||
|
|
||||||
|
implicit val mapEncoderPubs: Encoder[Publication] = Encoders.kryo[Publication]
|
||||||
|
import spark.implicits._
|
||||||
|
|
||||||
val sourcePath = parser.get("sourcePath")
|
|
||||||
val workingPath = parser.get("workingPath")
|
val workingPath = parser.get("workingPath")
|
||||||
val targetPath = parser.get("targetPath")
|
val targetPath = parser.get("targetPath")
|
||||||
run(spark, sourcePath, workingPath, targetPath)
|
|
||||||
|
val dataset: Dataset[ORCIDItem] =spark.read.load(s"$workingPath/orcidworksWithAuthor").as[ORCIDItem]
|
||||||
|
|
||||||
|
logger.info("Converting ORCID to OAF")
|
||||||
|
dataset.map(o => ORCIDToOAF.convertTOOAF(o)).write.mode(SaveMode.Overwrite).save(targetPath)
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,70 @@
|
||||||
|
package eu.dnetlib.doiboost.orcid
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper}
|
||||||
|
import eu.dnetlib.dhp.application.ArgumentApplicationParser
|
||||||
|
import eu.dnetlib.dhp.oa.merge.AuthorMerger
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.Publication
|
||||||
|
import eu.dnetlib.dhp.schema.orcid.OrcidDOI
|
||||||
|
import org.apache.commons.io.IOUtils
|
||||||
|
import org.apache.spark.SparkConf
|
||||||
|
import org.apache.spark.rdd.RDD
|
||||||
|
import org.apache.spark.sql.functions._
|
||||||
|
import org.apache.spark.sql.{Dataset, Encoder, Encoders, SaveMode, SparkSession}
|
||||||
|
import org.slf4j.{Logger, LoggerFactory}
|
||||||
|
|
||||||
|
object SparkPreprocessORCID {
|
||||||
|
val logger: Logger = LoggerFactory.getLogger(SparkConvertORCIDToOAF.getClass)
|
||||||
|
|
||||||
|
def fixORCIDItem(item :ORCIDItem):ORCIDItem = {
|
||||||
|
ORCIDItem(item.doi, item.authors.groupBy(_.oid).map(_._2.head).toList)
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def run(spark:SparkSession,sourcePath:String,workingPath:String):Unit = {
|
||||||
|
import spark.implicits._
|
||||||
|
implicit val mapEncoderPubs: Encoder[Publication] = Encoders.kryo[Publication]
|
||||||
|
|
||||||
|
val inputRDD:RDD[OrcidAuthor] = spark.sparkContext.textFile(s"$sourcePath/authors").map(s => ORCIDToOAF.convertORCIDAuthor(s)).filter(s => s!= null).filter(s => ORCIDToOAF.authorValid(s))
|
||||||
|
|
||||||
|
spark.createDataset(inputRDD).as[OrcidAuthor].write.mode(SaveMode.Overwrite).save(s"$workingPath/author")
|
||||||
|
|
||||||
|
val res = spark.sparkContext.textFile(s"$sourcePath/works").flatMap(s => ORCIDToOAF.extractDOIWorks(s)).filter(s => s!= null)
|
||||||
|
|
||||||
|
spark.createDataset(res).as[OrcidWork].write.mode(SaveMode.Overwrite).save(s"$workingPath/works")
|
||||||
|
|
||||||
|
val authors :Dataset[OrcidAuthor] = spark.read.load(s"$workingPath/author").as[OrcidAuthor]
|
||||||
|
|
||||||
|
val works :Dataset[OrcidWork] = spark.read.load(s"$workingPath/works").as[OrcidWork]
|
||||||
|
|
||||||
|
works.joinWith(authors, authors("oid").equalTo(works("oid")))
|
||||||
|
.map(i =>{
|
||||||
|
val doi = i._1.doi
|
||||||
|
val author = i._2
|
||||||
|
(doi, author)
|
||||||
|
}).groupBy(col("_1").alias("doi"))
|
||||||
|
.agg(collect_list(col("_2")).alias("authors")).as[ORCIDItem]
|
||||||
|
.map(s => fixORCIDItem(s))
|
||||||
|
.write.mode(SaveMode.Overwrite).save(s"$workingPath/orcidworksWithAuthor")
|
||||||
|
}
|
||||||
|
|
||||||
|
def main(args: Array[String]): Unit = {
|
||||||
|
val conf: SparkConf = new SparkConf()
|
||||||
|
val parser = new ArgumentApplicationParser(IOUtils.toString(SparkConvertORCIDToOAF.getClass.getResourceAsStream("/eu/dnetlib/dhp/doiboost/convert_orcid_to_oaf_params.json")))
|
||||||
|
parser.parseArgument(args)
|
||||||
|
val spark: SparkSession =
|
||||||
|
SparkSession
|
||||||
|
.builder()
|
||||||
|
.config(conf)
|
||||||
|
.appName(getClass.getSimpleName)
|
||||||
|
.master(parser.get("master")).getOrCreate()
|
||||||
|
|
||||||
|
|
||||||
|
val sourcePath = parser.get("sourcePath")
|
||||||
|
val workingPath = parser.get("workingPath")
|
||||||
|
|
||||||
|
run(spark, sourcePath, workingPath)
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -18,7 +18,7 @@ object SparkMapUnpayWallToOAF {
|
||||||
|
|
||||||
val logger: Logger = LoggerFactory.getLogger(SparkMapDumpIntoOAF.getClass)
|
val logger: Logger = LoggerFactory.getLogger(SparkMapDumpIntoOAF.getClass)
|
||||||
val conf: SparkConf = new SparkConf()
|
val conf: SparkConf = new SparkConf()
|
||||||
val parser = new ArgumentApplicationParser(IOUtils.toString(SparkMapDumpIntoOAF.getClass.getResourceAsStream("/eu/dnetlib/dhp/doiboost/convert_map_to_oaf_params.json")))
|
val parser = new ArgumentApplicationParser(IOUtils.toString(SparkMapDumpIntoOAF.getClass.getResourceAsStream("/eu/dnetlib/dhp/doiboost/convert_uw_to_oaf_params.json")))
|
||||||
parser.parseArgument(args)
|
parser.parseArgument(args)
|
||||||
val spark: SparkSession =
|
val spark: SparkSession =
|
||||||
SparkSession
|
SparkSession
|
||||||
|
|
|
@ -0,0 +1,6 @@
|
||||||
|
[
|
||||||
|
{"paramName":"t", "paramLongName":"targetPath", "paramDescription": "the path of the OAF Orcid transformed", "paramRequired": true},
|
||||||
|
{"paramName":"s", "paramLongName":"sourcePath", "paramDescription": "the source path ", "paramRequired": false},
|
||||||
|
{"paramName":"m", "paramLongName":"master", "paramDescription": "the master name", "paramRequired": true}
|
||||||
|
|
||||||
|
]
|
|
@ -0,0 +1,6 @@
|
||||||
|
[
|
||||||
|
{"paramName":"t", "paramLongName":"targetPath", "paramDescription": "the path of the OAF Orcid transformed", "paramRequired": true},
|
||||||
|
{"paramName":"w", "paramLongName":"workingPath", "paramDescription": "the working path ", "paramRequired": false},
|
||||||
|
{"paramName":"m", "paramLongName":"master", "paramDescription": "the master name", "paramRequired": true}
|
||||||
|
|
||||||
|
]
|
|
@ -0,0 +1,6 @@
|
||||||
|
[
|
||||||
|
{"paramName":"t", "paramLongName":"targetPath", "paramDescription": "the path of the OAF Orcid transformed", "paramRequired": true},
|
||||||
|
{"paramName":"s", "paramLongName":"sourcePath", "paramDescription": "the source path ", "paramRequired": false},
|
||||||
|
{"paramName":"m", "paramLongName":"master", "paramDescription": "the master name", "paramRequired": true}
|
||||||
|
|
||||||
|
]
|
|
@ -368,7 +368,7 @@
|
||||||
<master>yarn-cluster</master>
|
<master>yarn-cluster</master>
|
||||||
<mode>cluster</mode>
|
<mode>cluster</mode>
|
||||||
<name>Convert ORCID to Dataset</name>
|
<name>Convert ORCID to Dataset</name>
|
||||||
<class>eu.dnetlib.doiboost.orcid.SparkConvertORCIDToOAF</class>
|
<class>eu.dnetlib.doiboost.orcid.SparkPreprocessORCID</class>
|
||||||
<jar>dhp-doiboost-${projectVersion}.jar</jar>
|
<jar>dhp-doiboost-${projectVersion}.jar</jar>
|
||||||
<spark-opts>
|
<spark-opts>
|
||||||
--executor-memory=${sparkExecutorMemory}
|
--executor-memory=${sparkExecutorMemory}
|
||||||
|
|
|
@ -34,7 +34,7 @@
|
||||||
<master>yarn-cluster</master>
|
<master>yarn-cluster</master>
|
||||||
<mode>cluster</mode>
|
<mode>cluster</mode>
|
||||||
<name>Convert ORCID to Dataset</name>
|
<name>Convert ORCID to Dataset</name>
|
||||||
<class>eu.dnetlib.doiboost.orcid.SparkConvertORCIDToOAF</class>
|
<class>eu.dnetlib.doiboost.orcid.SparkPreprocessORCID</class>
|
||||||
<jar>dhp-doiboost-${projectVersion}.jar</jar>
|
<jar>dhp-doiboost-${projectVersion}.jar</jar>
|
||||||
<spark-opts>
|
<spark-opts>
|
||||||
--executor-memory=${sparkExecutorMemory}
|
--executor-memory=${sparkExecutorMemory}
|
||||||
|
|
|
@ -1,7 +1,6 @@
|
||||||
[
|
[
|
||||||
{"paramName":"s", "paramLongName":"sourcePath", "paramDescription": "the path of the Orcid Input file", "paramRequired": true},
|
{"paramName":"s", "paramLongName":"sourcePath", "paramDescription": "the path of the Orcid Input file", "paramRequired": true},
|
||||||
{"paramName":"w", "paramLongName":"workingPath", "paramDescription": "the working path ", "paramRequired": false},
|
{"paramName":"w", "paramLongName":"workingPath", "paramDescription": "the working path ", "paramRequired": false},
|
||||||
{"paramName":"t", "paramLongName":"targetPath", "paramDescription": "the working dir path", "paramRequired": true},
|
{"paramName":"m", "paramLongName":"master", "paramDescription": "the master name", "paramRequired": true}
|
||||||
{"paramName":"m", "paramLongName":"master", "paramDescription": "the master name", "paramRequired": true}
|
|
||||||
|
|
||||||
]
|
]
|
|
@ -46,7 +46,7 @@ class MappingORCIDToOAFTest {
|
||||||
implicit val mapEncoderPubs: Encoder[Publication] = Encoders.kryo[Publication]
|
implicit val mapEncoderPubs: Encoder[Publication] = Encoders.kryo[Publication]
|
||||||
import spark.implicits._
|
import spark.implicits._
|
||||||
|
|
||||||
SparkConvertORCIDToOAF.run( spark,sourcePath, workingPath, targetPath)
|
SparkPreprocessORCID.run( spark,sourcePath, workingPath)
|
||||||
|
|
||||||
val mapper = new ObjectMapper()
|
val mapper = new ObjectMapper()
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue