From c35c1176011be0086ce199793346a43dd0809ebd Mon Sep 17 00:00:00 2001 From: Sandro La Bruzzo Date: Wed, 14 Jul 2021 09:44:32 +0200 Subject: [PATCH] fixed process doiboost workflow: - splitted OrcidToOAF into two phase preprocess and process - updated workflow used in production --- .../crossref/SparkMapDumpIntoOAF.scala | 2 +- .../orcid/SparkConvertORCIDToOAF.scala | 56 +++------------ .../doiboost/orcid/SparkPreprocessORCID.scala | 70 +++++++++++++++++++ .../doiboost/uw/SparkMapUnpayWallToOAF.scala | 2 +- .../convert_crossref_dump_to_oaf_params.json | 6 ++ .../doiboost/convert_orcid_to_oaf_params.json | 6 ++ .../doiboost/convert_uw_to_oaf_params.json | 6 ++ .../dhp/doiboost/oozie_app/workflow.xml | 2 +- .../doiboost/orcid_oaf/oozie_app/workflow.xml | 2 +- ...rams.json => preprocess_orcid_params.json} | 3 +- .../orcid/MappingORCIDToOAFTest.scala | 2 +- 11 files changed, 103 insertions(+), 54 deletions(-) create mode 100644 dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/SparkPreprocessORCID.scala create mode 100644 dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/convert_crossref_dump_to_oaf_params.json create mode 100644 dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/convert_orcid_to_oaf_params.json create mode 100644 dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/convert_uw_to_oaf_params.json rename dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/{convert_map_to_oaf_params.json => preprocess_orcid_params.json} (59%) diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/SparkMapDumpIntoOAF.scala b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/SparkMapDumpIntoOAF.scala index 57acaf404d..c65916610c 100644 --- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/SparkMapDumpIntoOAF.scala +++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/SparkMapDumpIntoOAF.scala @@ -21,7 +21,7 @@ object SparkMapDumpIntoOAF { val logger: Logger = LoggerFactory.getLogger(SparkMapDumpIntoOAF.getClass) val conf: SparkConf = new SparkConf() - val parser = new ArgumentApplicationParser(IOUtils.toString(SparkMapDumpIntoOAF.getClass.getResourceAsStream("/eu/dnetlib/dhp/doiboost/convert_map_to_oaf_params.json"))) + val parser = new ArgumentApplicationParser(IOUtils.toString(SparkMapDumpIntoOAF.getClass.getResourceAsStream("/eu/dnetlib/dhp/doiboost/convert_crossref_dump_to_oaf_params.json"))) parser.parseArgument(args) val spark: SparkSession = SparkSession diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/SparkConvertORCIDToOAF.scala b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/SparkConvertORCIDToOAF.scala index a359eb3c6e..9117bcb347 100644 --- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/SparkConvertORCIDToOAF.scala +++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/SparkConvertORCIDToOAF.scala @@ -1,61 +1,18 @@ package eu.dnetlib.doiboost.orcid -import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper} import eu.dnetlib.dhp.application.ArgumentApplicationParser -import eu.dnetlib.dhp.oa.merge.AuthorMerger import eu.dnetlib.dhp.schema.oaf.Publication -import eu.dnetlib.dhp.schema.orcid.OrcidDOI import org.apache.commons.io.IOUtils import org.apache.spark.SparkConf -import org.apache.spark.rdd.RDD -import org.apache.spark.sql.functions._ -import org.apache.spark.sql.{Dataset, Encoder, Encoders, SaveMode, SparkSession} +import org.apache.spark.sql._ import org.slf4j.{Logger, LoggerFactory} object SparkConvertORCIDToOAF { val logger: Logger = LoggerFactory.getLogger(SparkConvertORCIDToOAF.getClass) - def fixORCIDItem(item :ORCIDItem):ORCIDItem = { - new ORCIDItem(item.doi, item.authors.groupBy(_.oid).map(_._2.head).toList) - - } - - - def run(spark:SparkSession,sourcePath:String,workingPath:String, targetPath:String):Unit = { - import spark.implicits._ - implicit val mapEncoderPubs: Encoder[Publication] = Encoders.kryo[Publication] - - val inputRDD:RDD[OrcidAuthor] = spark.sparkContext.textFile(s"$sourcePath/authors").map(s => ORCIDToOAF.convertORCIDAuthor(s)).filter(s => s!= null).filter(s => ORCIDToOAF.authorValid(s)) - - spark.createDataset(inputRDD).as[OrcidAuthor].write.mode(SaveMode.Overwrite).save(s"$workingPath/author") - - val res = spark.sparkContext.textFile(s"$sourcePath/works").flatMap(s => ORCIDToOAF.extractDOIWorks(s)).filter(s => s!= null) - - spark.createDataset(res).as[OrcidWork].write.mode(SaveMode.Overwrite).save(s"$workingPath/works") - - val authors :Dataset[OrcidAuthor] = spark.read.load(s"$workingPath/author").as[OrcidAuthor] - - val works :Dataset[OrcidWork] = spark.read.load(s"$workingPath/works").as[OrcidWork] - - works.joinWith(authors, authors("oid").equalTo(works("oid"))) - .map(i =>{ - val doi = i._1.doi - var author = i._2 - (doi, author) - }).groupBy(col("_1").alias("doi")) - .agg(collect_list(col("_2")).alias("authors")).as[ORCIDItem] - .map(s => fixORCIDItem(s)) - .write.mode(SaveMode.Overwrite).save(s"$workingPath/orcidworksWithAuthor") - - val dataset: Dataset[ORCIDItem] =spark.read.load(s"$workingPath/orcidworksWithAuthor").as[ORCIDItem] - - logger.info("Converting ORCID to OAF") - dataset.map(o => ORCIDToOAF.convertTOOAF(o)).write.mode(SaveMode.Overwrite).save(targetPath) - } - def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf() - val parser = new ArgumentApplicationParser(IOUtils.toString(SparkConvertORCIDToOAF.getClass.getResourceAsStream("/eu/dnetlib/dhp/doiboost/convert_map_to_oaf_params.json"))) + val parser = new ArgumentApplicationParser(IOUtils.toString(SparkConvertORCIDToOAF.getClass.getResourceAsStream("/eu/dnetlib/dhp/doiboost/convert_orcid_to_oaf_params.json"))) parser.parseArgument(args) val spark: SparkSession = SparkSession @@ -64,11 +21,16 @@ object SparkConvertORCIDToOAF { .appName(getClass.getSimpleName) .master(parser.get("master")).getOrCreate() + implicit val mapEncoderPubs: Encoder[Publication] = Encoders.kryo[Publication] + import spark.implicits._ - val sourcePath = parser.get("sourcePath") val workingPath = parser.get("workingPath") val targetPath = parser.get("targetPath") - run(spark, sourcePath, workingPath, targetPath) + + val dataset: Dataset[ORCIDItem] =spark.read.load(s"$workingPath/orcidworksWithAuthor").as[ORCIDItem] + + logger.info("Converting ORCID to OAF") + dataset.map(o => ORCIDToOAF.convertTOOAF(o)).write.mode(SaveMode.Overwrite).save(targetPath) } diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/SparkPreprocessORCID.scala b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/SparkPreprocessORCID.scala new file mode 100644 index 0000000000..d6911cfa78 --- /dev/null +++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/SparkPreprocessORCID.scala @@ -0,0 +1,70 @@ +package eu.dnetlib.doiboost.orcid + +import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper} +import eu.dnetlib.dhp.application.ArgumentApplicationParser +import eu.dnetlib.dhp.oa.merge.AuthorMerger +import eu.dnetlib.dhp.schema.oaf.Publication +import eu.dnetlib.dhp.schema.orcid.OrcidDOI +import org.apache.commons.io.IOUtils +import org.apache.spark.SparkConf +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.functions._ +import org.apache.spark.sql.{Dataset, Encoder, Encoders, SaveMode, SparkSession} +import org.slf4j.{Logger, LoggerFactory} + +object SparkPreprocessORCID { + val logger: Logger = LoggerFactory.getLogger(SparkConvertORCIDToOAF.getClass) + + def fixORCIDItem(item :ORCIDItem):ORCIDItem = { + ORCIDItem(item.doi, item.authors.groupBy(_.oid).map(_._2.head).toList) + + } + + + def run(spark:SparkSession,sourcePath:String,workingPath:String):Unit = { + import spark.implicits._ + implicit val mapEncoderPubs: Encoder[Publication] = Encoders.kryo[Publication] + + val inputRDD:RDD[OrcidAuthor] = spark.sparkContext.textFile(s"$sourcePath/authors").map(s => ORCIDToOAF.convertORCIDAuthor(s)).filter(s => s!= null).filter(s => ORCIDToOAF.authorValid(s)) + + spark.createDataset(inputRDD).as[OrcidAuthor].write.mode(SaveMode.Overwrite).save(s"$workingPath/author") + + val res = spark.sparkContext.textFile(s"$sourcePath/works").flatMap(s => ORCIDToOAF.extractDOIWorks(s)).filter(s => s!= null) + + spark.createDataset(res).as[OrcidWork].write.mode(SaveMode.Overwrite).save(s"$workingPath/works") + + val authors :Dataset[OrcidAuthor] = spark.read.load(s"$workingPath/author").as[OrcidAuthor] + + val works :Dataset[OrcidWork] = spark.read.load(s"$workingPath/works").as[OrcidWork] + + works.joinWith(authors, authors("oid").equalTo(works("oid"))) + .map(i =>{ + val doi = i._1.doi + val author = i._2 + (doi, author) + }).groupBy(col("_1").alias("doi")) + .agg(collect_list(col("_2")).alias("authors")).as[ORCIDItem] + .map(s => fixORCIDItem(s)) + .write.mode(SaveMode.Overwrite).save(s"$workingPath/orcidworksWithAuthor") + } + + def main(args: Array[String]): Unit = { + val conf: SparkConf = new SparkConf() + val parser = new ArgumentApplicationParser(IOUtils.toString(SparkConvertORCIDToOAF.getClass.getResourceAsStream("/eu/dnetlib/dhp/doiboost/convert_orcid_to_oaf_params.json"))) + parser.parseArgument(args) + val spark: SparkSession = + SparkSession + .builder() + .config(conf) + .appName(getClass.getSimpleName) + .master(parser.get("master")).getOrCreate() + + + val sourcePath = parser.get("sourcePath") + val workingPath = parser.get("workingPath") + + run(spark, sourcePath, workingPath) + + } + +} \ No newline at end of file diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/uw/SparkMapUnpayWallToOAF.scala b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/uw/SparkMapUnpayWallToOAF.scala index a72e4b0d6e..4530926f10 100644 --- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/uw/SparkMapUnpayWallToOAF.scala +++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/uw/SparkMapUnpayWallToOAF.scala @@ -18,7 +18,7 @@ object SparkMapUnpayWallToOAF { val logger: Logger = LoggerFactory.getLogger(SparkMapDumpIntoOAF.getClass) val conf: SparkConf = new SparkConf() - val parser = new ArgumentApplicationParser(IOUtils.toString(SparkMapDumpIntoOAF.getClass.getResourceAsStream("/eu/dnetlib/dhp/doiboost/convert_map_to_oaf_params.json"))) + val parser = new ArgumentApplicationParser(IOUtils.toString(SparkMapDumpIntoOAF.getClass.getResourceAsStream("/eu/dnetlib/dhp/doiboost/convert_uw_to_oaf_params.json"))) parser.parseArgument(args) val spark: SparkSession = SparkSession diff --git a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/convert_crossref_dump_to_oaf_params.json b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/convert_crossref_dump_to_oaf_params.json new file mode 100644 index 0000000000..da324f8c45 --- /dev/null +++ b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/convert_crossref_dump_to_oaf_params.json @@ -0,0 +1,6 @@ +[ + {"paramName":"t", "paramLongName":"targetPath", "paramDescription": "the path of the OAF Orcid transformed", "paramRequired": true}, + {"paramName":"s", "paramLongName":"sourcePath", "paramDescription": "the source path ", "paramRequired": false}, + {"paramName":"m", "paramLongName":"master", "paramDescription": "the master name", "paramRequired": true} + +] \ No newline at end of file diff --git a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/convert_orcid_to_oaf_params.json b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/convert_orcid_to_oaf_params.json new file mode 100644 index 0000000000..6c9ca5ede1 --- /dev/null +++ b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/convert_orcid_to_oaf_params.json @@ -0,0 +1,6 @@ +[ + {"paramName":"t", "paramLongName":"targetPath", "paramDescription": "the path of the OAF Orcid transformed", "paramRequired": true}, + {"paramName":"w", "paramLongName":"workingPath", "paramDescription": "the working path ", "paramRequired": false}, + {"paramName":"m", "paramLongName":"master", "paramDescription": "the master name", "paramRequired": true} + +] \ No newline at end of file diff --git a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/convert_uw_to_oaf_params.json b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/convert_uw_to_oaf_params.json new file mode 100644 index 0000000000..da324f8c45 --- /dev/null +++ b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/convert_uw_to_oaf_params.json @@ -0,0 +1,6 @@ +[ + {"paramName":"t", "paramLongName":"targetPath", "paramDescription": "the path of the OAF Orcid transformed", "paramRequired": true}, + {"paramName":"s", "paramLongName":"sourcePath", "paramDescription": "the source path ", "paramRequired": false}, + {"paramName":"m", "paramLongName":"master", "paramDescription": "the master name", "paramRequired": true} + +] \ No newline at end of file diff --git a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/oozie_app/workflow.xml b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/oozie_app/workflow.xml index fa47e142d0..34b4b5c5e3 100644 --- a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/oozie_app/workflow.xml @@ -368,7 +368,7 @@ yarn-cluster cluster Convert ORCID to Dataset - eu.dnetlib.doiboost.orcid.SparkConvertORCIDToOAF + eu.dnetlib.doiboost.orcid.SparkPreprocessORCID dhp-doiboost-${projectVersion}.jar --executor-memory=${sparkExecutorMemory} diff --git a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/orcid_oaf/oozie_app/workflow.xml b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/orcid_oaf/oozie_app/workflow.xml index bffde793ba..0670e18de4 100644 --- a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/orcid_oaf/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/orcid_oaf/oozie_app/workflow.xml @@ -34,7 +34,7 @@ yarn-cluster cluster Convert ORCID to Dataset - eu.dnetlib.doiboost.orcid.SparkConvertORCIDToOAF + eu.dnetlib.doiboost.orcid.SparkPreprocessORCID dhp-doiboost-${projectVersion}.jar --executor-memory=${sparkExecutorMemory} diff --git a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/convert_map_to_oaf_params.json b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/preprocess_orcid_params.json similarity index 59% rename from dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/convert_map_to_oaf_params.json rename to dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/preprocess_orcid_params.json index 1521035705..fdc1e2f20e 100644 --- a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/convert_map_to_oaf_params.json +++ b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/preprocess_orcid_params.json @@ -1,7 +1,6 @@ [ {"paramName":"s", "paramLongName":"sourcePath", "paramDescription": "the path of the Orcid Input file", "paramRequired": true}, {"paramName":"w", "paramLongName":"workingPath", "paramDescription": "the working path ", "paramRequired": false}, - {"paramName":"t", "paramLongName":"targetPath", "paramDescription": "the working dir path", "paramRequired": true}, - {"paramName":"m", "paramLongName":"master", "paramDescription": "the master name", "paramRequired": true} + {"paramName":"m", "paramLongName":"master", "paramDescription": "the master name", "paramRequired": true} ] \ No newline at end of file diff --git a/dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/doiboost/orcid/MappingORCIDToOAFTest.scala b/dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/doiboost/orcid/MappingORCIDToOAFTest.scala index 7628fb8535..076a01526f 100644 --- a/dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/doiboost/orcid/MappingORCIDToOAFTest.scala +++ b/dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/doiboost/orcid/MappingORCIDToOAFTest.scala @@ -46,7 +46,7 @@ class MappingORCIDToOAFTest { implicit val mapEncoderPubs: Encoder[Publication] = Encoders.kryo[Publication] import spark.implicits._ - SparkConvertORCIDToOAF.run( spark,sourcePath, workingPath, targetPath) + SparkPreprocessORCID.run( spark,sourcePath, workingPath) val mapper = new ObjectMapper()