From 7b5e04f37ec57971fbd588a9a6cca0314d06b045 Mon Sep 17 00:00:00 2001 From: Sandro La Bruzzo Date: Thu, 30 Nov 2023 14:36:50 +0100 Subject: [PATCH] removed Orcid intersection on DOIBoost --- .../preprocess/oozie_app/workflow.xml | 26 ---- .../doiboost/process/oozie_app/workflow.xml | 34 +----- .../doiboost/SparkGenerateDoiBoost.scala | 17 +-- .../dhp/enrich/orcid/oozie_app/workflow.xml | 2 +- .../dhp/enrich/orcid/AuthorEnricher.scala | 7 +- .../SparkEnrichGraphWithOrcidAuthors.scala | 106 ++++++++-------- .../dhp/enrich/orcid/EnrichOrcidTest.scala | 113 ++++++++++-------- 7 files changed, 128 insertions(+), 177 deletions(-) diff --git a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/preprocess/oozie_app/workflow.xml b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/preprocess/oozie_app/workflow.xml index 40a17b486..ed6853229 100644 --- a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/preprocess/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/preprocess/oozie_app/workflow.xml @@ -133,32 +133,6 @@ --targetPath${inputPathMAG}/dataset --masteryarn-cluster - - - - - - - - yarn-cluster - cluster - Convert ORCID to Dataset - eu.dnetlib.doiboost.orcid.SparkPreprocessORCID - dhp-doiboost-${projectVersion}.jar - - --executor-memory=${sparkExecutorMemory} - --executor-cores=${sparkExecutorCores} - --driver-memory=${sparkDriverMemory} - --conf spark.sql.shuffle.partitions=3840 - --conf spark.extraListeners=${spark2ExtraListeners} - --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} - --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} - --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - - --sourcePath${inputPathOrcid} - --workingPath${workingPathOrcid} - --masteryarn-cluster - diff --git a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/process/oozie_app/workflow.xml b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/process/oozie_app/workflow.xml index 29a12f4df..8f28d706d 100644 --- a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/process/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/process/oozie_app/workflow.xml @@ -59,10 +59,10 @@ - - workingPathOrcid - the ORCID working path - + + + + @@ -170,32 +170,6 @@ --targetPath${workingPath}/uwPublication --masteryarn-cluster - - - - - - - - yarn-cluster - cluster - Convert ORCID to Dataset - eu.dnetlib.doiboost.orcid.SparkConvertORCIDToOAF - dhp-doiboost-${projectVersion}.jar - - --executor-memory=${sparkExecutorMemory} - --executor-cores=${sparkExecutorCores} - --driver-memory=${sparkDriverMemory} - --conf spark.sql.shuffle.partitions=3840 - --conf spark.extraListeners=${spark2ExtraListeners} - --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} - --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} - --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - - --workingPath${workingPathOrcid} - --targetPath${workingPath}/orcidPublication - --masteryarn-cluster - diff --git a/dhp-workflows/dhp-doiboost/src/main/scala/eu/dnetlib/doiboost/SparkGenerateDoiBoost.scala b/dhp-workflows/dhp-doiboost/src/main/scala/eu/dnetlib/doiboost/SparkGenerateDoiBoost.scala index 2cbd53097..07d6a0287 100644 --- a/dhp-workflows/dhp-doiboost/src/main/scala/eu/dnetlib/doiboost/SparkGenerateDoiBoost.scala +++ b/dhp-workflows/dhp-doiboost/src/main/scala/eu/dnetlib/doiboost/SparkGenerateDoiBoost.scala @@ -66,7 +66,7 @@ object SparkGenerateDoiBoost { Encoders.tuple(Encoders.STRING, mapEncoderPub) implicit val mapEncoderRel: Encoder[Relation] = Encoders.kryo[Relation] - logger.info("Phase 2) Join Crossref with UnpayWall") + logger.info("Phase 1) Join Crossref with UnpayWall") val crossrefPublication: Dataset[(String, Publication)] = spark.read.load(s"$workingDirPath/crossrefPublication").as[Publication].map(p => (p.getId, p)) @@ -91,20 +91,11 @@ object SparkGenerateDoiBoost { .write .mode(SaveMode.Overwrite) .save(s"$workingDirPath/firstJoin") - logger.info("Phase 3) Join Result with ORCID") - val fj: Dataset[(String, Publication)] = - spark.read.load(s"$workingDirPath/firstJoin").as[Publication].map(p => (p.getId, p)) - val orcidPublication: Dataset[(String, Publication)] = - spark.read.load(s"$workingDirPath/orcidPublication").as[Publication].map(p => (p.getId, p)) - fj.joinWith(orcidPublication, fj("_1").equalTo(orcidPublication("_1")), "left") - .map(applyMerge) - .write - .mode(SaveMode.Overwrite) - .save(s"$workingDirPath/secondJoin") - logger.info("Phase 4) Join Result with MAG") + + logger.info("Phase 2) Join Result with MAG") val sj: Dataset[(String, Publication)] = - spark.read.load(s"$workingDirPath/secondJoin").as[Publication].map(p => (p.getId, p)) + spark.read.load(s"$workingDirPath/firstJoin").as[Publication].map(p => (p.getId, p)) val magPublication: Dataset[(String, Publication)] = spark.read.load(s"$workingDirPath/magPublication").as[Publication].map(p => (p.getId, p)) diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/enrich/orcid/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/enrich/orcid/oozie_app/workflow.xml index 1284cceda..87c4dcb4f 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/enrich/orcid/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/enrich/orcid/oozie_app/workflow.xml @@ -40,7 +40,7 @@ --orcidPath${orcidPath} --targetPath${targetPath} - --graphPath${graphPath}/publication + --graphPath${graphPath} --masteryarn diff --git a/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/enrich/orcid/AuthorEnricher.scala b/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/enrich/orcid/AuthorEnricher.scala index a67de4b95..15513c8af 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/enrich/orcid/AuthorEnricher.scala +++ b/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/enrich/orcid/AuthorEnricher.scala @@ -1,5 +1,6 @@ package eu.dnetlib.dhp.enrich.orcid +import eu.dnetlib.dhp.schema.common.ModelConstants import eu.dnetlib.dhp.schema.oaf.{Author, Publication} import eu.dnetlib.dhp.schema.sx.OafUtils import org.apache.spark.sql.Row @@ -13,9 +14,11 @@ object AuthorEnricher extends Serializable { a.setName(givenName) a.setSurname(familyName) a.setFullname(s"$givenName $familyName") - a.setPid(List(OafUtils.createSP(orcid, "ORCID", "ORCID")).asJava) + val pid = OafUtils.createSP(orcid, ModelConstants.ORCID, ModelConstants.ORCID) + pid.setDataInfo(OafUtils.generateDataInfo()) + pid.getDataInfo.setProvenanceaction(OafUtils.createQualifier("ORCID_ENRICHMENT", "ORCID_ENRICHMENT")) + a.setPid(List(pid).asJava) a - } def toOAFAuthor(r: Row): java.util.List[Author] = { diff --git a/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/enrich/orcid/SparkEnrichGraphWithOrcidAuthors.scala b/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/enrich/orcid/SparkEnrichGraphWithOrcidAuthors.scala index 3c9e04a21..0d994d202 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/enrich/orcid/SparkEnrichGraphWithOrcidAuthors.scala +++ b/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/enrich/orcid/SparkEnrichGraphWithOrcidAuthors.scala @@ -1,13 +1,11 @@ package eu.dnetlib.dhp.enrich.orcid -import com.fasterxml.jackson.databind.ObjectMapper import eu.dnetlib.dhp.application.AbstractScalaApplication import eu.dnetlib.dhp.oa.merge.AuthorMerger -import eu.dnetlib.dhp.schema.oaf.{Author, DataInfo, Instance, Publication, StructuredProperty} -import org.apache.spark.sql.{Dataset, Encoder, Encoders, Row, SaveMode, SparkSession} -import org.apache.spark.sql.functions.{col, collect_set, concat, explode, expr, first, flatten, lower, size, struct} +import eu.dnetlib.dhp.schema.oaf.{OtherResearchProduct, Publication, Result, Software} +import org.apache.spark.sql.functions._ +import org.apache.spark.sql._ import org.slf4j.{Logger, LoggerFactory} -import org.apache.spark.sql.types._ class SparkEnrichGraphWithOrcidAuthors(propertyPath: String, args: Array[String], log: Logger) extends AbstractScalaApplication(propertyPath, args, log: Logger) { @@ -22,33 +20,49 @@ class SparkEnrichGraphWithOrcidAuthors(propertyPath: String, args: Array[String] log.info(s"orcidPath is '$orcidPath'") val targetPath = parser.get("targetPath") log.info(s"targetPath is '$targetPath'") - enrichResult(spark, graphPath, orcidPath, targetPath) + val orcidPublication: Dataset[Row] = generateOrcidTable(spark, orcidPath) + enrichResult( + spark, + s"$graphPath/publication", + orcidPublication, + s"$targetPath/publication", + Encoders.bean(classOf[Publication]) + ) + enrichResult( + spark, + s"$graphPath/dataset", + orcidPublication, + s"$targetPath/dataset", + Encoders.bean(classOf[eu.dnetlib.dhp.schema.oaf.Dataset]) + ) + enrichResult( + spark, + s"$graphPath/software", + orcidPublication, + s"$targetPath/software", + Encoders.bean(classOf[Software]) + ) + enrichResult( + spark, + s"$graphPath/otherresearchproduct", + orcidPublication, + s"$targetPath/otherresearchproduct", + Encoders.bean(classOf[OtherResearchProduct]) + ) } - def enrichResult(spark: SparkSession, graphPath: String, orcidPath: String, outputPath: String): Unit = { - val orcidPublication = generateOrcidTable(spark, orcidPath) + private def enrichResult[T <: Result]( + spark: SparkSession, + graphPath: String, + orcidPublication: Dataset[Row], + outputPath: String, + enc: Encoder[T] + ): Unit = { - - implicit val publicationEncoder = Encoders.bean(classOf[Publication]) - - val aschema = new StructType() - .add("id", StringType) - .add("dataInfo", Encoders.bean(classOf[DataInfo]).schema) - .add( - "author",Encoders.bean(classOf[Author]).schema - - ) - - val schema = new StructType() - .add("id", StringType) - .add("dataInfo", Encoders.bean(classOf[DataInfo]).schema) - .add( - "instance", - ArrayType(new StructType().add("pid", ArrayType(Encoders.bean(classOf[StructuredProperty]).schema))) - ) val entities = spark.read - .schema(schema) + .schema(enc.schema) .json(graphPath) + .select(col("id"), col("datainfo"), col("instance")) .where("datainfo.deletedbyinference = false") .drop("datainfo") .withColumn("instances", explode(col("instance"))) @@ -58,7 +72,8 @@ class SparkEnrichGraphWithOrcidAuthors(propertyPath: String, args: Array[String] col("pids.value").alias("pid_value"), col("id").alias("dnet_id") ) - val orcidDnet = orcidPublication + + val orcidDnet = orcidPublication .join( entities, lower(col("schema")).equalTo(lower(col("pid_schema"))) && @@ -69,36 +84,25 @@ class SparkEnrichGraphWithOrcidAuthors(propertyPath: String, args: Array[String] .agg(collect_set(orcidPublication("author")).alias("orcid_authors")) .select("dnet_id", "orcid_authors") .cache() + orcidDnet.count() + val result = spark.read.schema(enc.schema).json(graphPath).as[T](enc) - - orcidPublication - .join( - entities, - lower(col("schema")).equalTo(lower(col("pid_schema"))) && - lower(col("value")).equalTo(lower(col("pid_value"))), - "inner" - ) - .groupBy(col("dnet_id")).agg(collect_set(struct(col("pid_schema"), col("pid_value")))).write.mode("Overwrite").save("/user/sandro.labruzzo/enrich_pub") - - val publication = spark.read.schema(publicationEncoder.schema).json(graphPath).as[Publication] - - publication - .joinWith(orcidDnet, publication("id").equalTo(orcidDnet("dnet_id")), "left") + result + .joinWith(orcidDnet, result("id").equalTo(orcidDnet("dnet_id")), "left") .map { - case (p: Publication, null) => { - p - } - case (p: Publication, r: Row) => + case (r: T, null) => + r + case (p: T, r: Row) => p.setAuthor(AuthorMerger.enrichOrcid(p.getAuthor, AuthorEnricher.toOAFAuthor(r))) p - } + }(enc) .write .mode(SaveMode.Overwrite) .option("compression", "gzip") .json(outputPath) } - def generateOrcidTable(spark: SparkSession, inputPath: String): Dataset[Row] = { + private def generateOrcidTable(spark: SparkSession, inputPath: String): Dataset[Row] = { val orcidAuthors = spark.read.load(s"$inputPath/Authors").select("orcid", "familyName", "givenName", "creditName", "otherNames") val orcidWorks = spark.read @@ -107,14 +111,14 @@ class SparkEnrichGraphWithOrcidAuthors(propertyPath: String, args: Array[String] .where( "identifier.schema = 'doi' or identifier.schema ='pmid' or identifier.schema ='pmc' or identifier.schema ='arxiv' or identifier.schema ='handle'" ) - val orcidPublication =orcidAuthors + val orcidPublication = orcidAuthors .join(orcidWorks, orcidAuthors("orcid").equalTo(orcidWorks("orcid"))) .select( col("identifier.schema").alias("schema"), col("identifier.value").alias("value"), struct(orcidAuthors("orcid").alias("orcid"), col("givenName"), col("familyName")).alias("author") ) - orcidPublication + orcidPublication.cache() } } @@ -123,10 +127,8 @@ object SparkEnrichGraphWithOrcidAuthors { val log: Logger = LoggerFactory.getLogger(SparkEnrichGraphWithOrcidAuthors.getClass) def main(args: Array[String]): Unit = { - new SparkEnrichGraphWithOrcidAuthors("/eu/dnetlib/dhp/enrich/orcid/enrich_graph_orcid_parameters.json", args, log) .initialize() .run() - } } diff --git a/dhp-workflows/dhp-graph-mapper/src/test/scala/eu/dnetlib/dhp/enrich/orcid/EnrichOrcidTest.scala b/dhp-workflows/dhp-graph-mapper/src/test/scala/eu/dnetlib/dhp/enrich/orcid/EnrichOrcidTest.scala index f58b06318..84483b1a2 100644 --- a/dhp-workflows/dhp-graph-mapper/src/test/scala/eu/dnetlib/dhp/enrich/orcid/EnrichOrcidTest.scala +++ b/dhp-workflows/dhp-graph-mapper/src/test/scala/eu/dnetlib/dhp/enrich/orcid/EnrichOrcidTest.scala @@ -1,78 +1,85 @@ package eu.dnetlib.dhp.enrich.orcid -import eu.dnetlib.dhp.schema.oaf.Publication + +import eu.dnetlib.dhp.schema.oaf.{Author, Publication} import org.apache.spark.sql.{Column, Encoder, Encoders, Row, SparkSession} import org.junit.jupiter.api.Test import org.slf4j.{Logger, LoggerFactory} import org.apache.spark.sql.functions._ -case class Pid(pidScheme: String, pidValue: String) {} - -case class AuthorPid(fullName: String, pids: List[Pid]) {} - -case class PubSummary(id: String, authorWithPids: List[AuthorPid]) - class EnrichOrcidTest { val log: Logger = LoggerFactory.getLogger(getClass) - def orcid_intersection_wrong(p: PubSummary): PubSummary = { - - if (p.authorWithPids.isEmpty) - null - else { - val incorrectAuthor = p.authorWithPids.filter(a => a.pids.filter(p => p.pidScheme != null && p.pidScheme.toLowerCase.contains("orcid")).map(p => p.pidValue.toLowerCase).distinct.size > 1) - if (incorrectAuthor.nonEmpty) { - PubSummary(p.id, incorrectAuthor) - } - else { - null - } - } - } - - def test() = { val spark = SparkSession.builder().master("local[*]").getOrCreate() - spark.sparkContext.setLogLevel("ERROR") +// spark.sparkContext.setLogLevel("ERROR") + +// new SparkEnrichGraphWithOrcidAuthors(null, null, null) +// .enrichResult( +// spark, +// "/Users/sandro/orcid_test/publication", +// "", +// "/tmp/graph/", +// Encoders.bean(classOf[Publication]) +// ) val schema = Encoders.bean(classOf[Publication]).schema +// +// val simplifyAuthor = udf((r: Seq[Row]) => { +// r +// .map(k => +// AuthorPid( +// k.getAs[String]("fullname"), +// k.getAs[Seq[Row]]("pid") +// .map(p => Pid(p.getAs[Row]("qualifier").getAs[String]("classid"), p.getAs[String]("value"))) +// .toList +// ) +// ) +// .filter(l => l.pids.nonEmpty) +// .toList +// }) +// +// val wrong_orcid_intersection = udf((a: Seq[Row]) => { +// a.map(author => { +// val pids_with_orcid: Seq[Row] = author +// .getAs[Seq[Row]]("pids") +// .filter(p => +// p.getAs[String]("pidScheme") != null && p.getAs[String]("pidScheme").toLowerCase.contains("orcid") +// ) +// if (pids_with_orcid.exists(p => p.getAs[String]("pidScheme").equals("ORCID"))) { +// if (pids_with_orcid.map(p => p.getAs[String]("pidValue").toLowerCase).distinct.size > 1) { +// AuthorPid( +// author.getAs[String]("fullName"), +// pids_with_orcid.map(p => Pid(p.getAs[String]("pidScheme"), p.getAs[String]("pidValue"))).toList +// ) +// +// } else +// null +// } else +// null +// }).filter(author => author != null) +// }) + + + Encoders + import spark.implicits._ + +// val enriched = spark.read +// .schema(schema) +// .json("/Users/sandro/orcid_test/publication_enriched") +// .select(col("id"), explode(col("author")).as("authors")) +// .withColumn("ap", col("authors.pid.qualifier.classid")) +// .withColumn("dp", col("authors.pid.datainfo.provenanceAction.classid")) +// +// .show() - val simplifyAuthor = udf((r: Seq[Row]) => { - r - .map(k => - AuthorPid(k.getAs[String]("fullname"), - k.getAs[Seq[Row]]("pid") - .map( - p => Pid(p.getAs[Row]("qualifier").getAs[String]("classid"), p.getAs[String]("value")) - ).toList) - ).filter(l => l.pids.nonEmpty) - .toList - } - ) - val wrong_orcid_intersection = udf((a: Seq[Row]) => { - a.map(author => { - val pids_with_orcid: Seq[Row] = author.getAs[Seq[Row]]("pids").filter(p => p.getAs[String]("pidScheme")!= null && p.getAs[String]("pidScheme").toLowerCase.contains("orcid")) - if (pids_with_orcid.exists(p => p.getAs[String]("pidScheme").equals("ORCID"))) { - if (pids_with_orcid.map(p => p.getAs[String]("pidValue").toLowerCase).distinct.size > 1) { - AuthorPid(author.getAs[String]("fullName"),pids_with_orcid.map(p => Pid(p.getAs[String]("pidScheme"),p.getAs[String]("pidValue"))).toList ) - } - else - null - } else - null - }).filter(author => author != null) - }) - val enriched = spark.read.schema(schema).json("/Users/sandro/orcid_test/publication_enriched").select(col("id"), simplifyAuthor(col("author")).alias("authors")) - .select(col("id"), wrong_orcid_intersection(col("authors")).alias("wi")).where("wi is not null") - enriched.show(20, 1000, true) } - }