diff --git a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/preprocess/oozie_app/workflow.xml b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/preprocess/oozie_app/workflow.xml
index 40a17b486..ed6853229 100644
--- a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/preprocess/oozie_app/workflow.xml
+++ b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/preprocess/oozie_app/workflow.xml
@@ -133,32 +133,6 @@
--targetPath${inputPathMAG}/dataset
--masteryarn-cluster
-
-
-
-
-
-
-
- yarn-cluster
- cluster
- Convert ORCID to Dataset
- eu.dnetlib.doiboost.orcid.SparkPreprocessORCID
- dhp-doiboost-${projectVersion}.jar
-
- --executor-memory=${sparkExecutorMemory}
- --executor-cores=${sparkExecutorCores}
- --driver-memory=${sparkDriverMemory}
- --conf spark.sql.shuffle.partitions=3840
- --conf spark.extraListeners=${spark2ExtraListeners}
- --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
- --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
- --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
-
- --sourcePath${inputPathOrcid}
- --workingPath${workingPathOrcid}
- --masteryarn-cluster
-
diff --git a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/process/oozie_app/workflow.xml b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/process/oozie_app/workflow.xml
index 29a12f4df..8f28d706d 100644
--- a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/process/oozie_app/workflow.xml
+++ b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/process/oozie_app/workflow.xml
@@ -59,10 +59,10 @@
-
- workingPathOrcid
- the ORCID working path
-
+
+
+
+
@@ -170,32 +170,6 @@
--targetPath${workingPath}/uwPublication
--masteryarn-cluster
-
-
-
-
-
-
-
- yarn-cluster
- cluster
- Convert ORCID to Dataset
- eu.dnetlib.doiboost.orcid.SparkConvertORCIDToOAF
- dhp-doiboost-${projectVersion}.jar
-
- --executor-memory=${sparkExecutorMemory}
- --executor-cores=${sparkExecutorCores}
- --driver-memory=${sparkDriverMemory}
- --conf spark.sql.shuffle.partitions=3840
- --conf spark.extraListeners=${spark2ExtraListeners}
- --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
- --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
- --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
-
- --workingPath${workingPathOrcid}
- --targetPath${workingPath}/orcidPublication
- --masteryarn-cluster
-
diff --git a/dhp-workflows/dhp-doiboost/src/main/scala/eu/dnetlib/doiboost/SparkGenerateDoiBoost.scala b/dhp-workflows/dhp-doiboost/src/main/scala/eu/dnetlib/doiboost/SparkGenerateDoiBoost.scala
index 2cbd53097..07d6a0287 100644
--- a/dhp-workflows/dhp-doiboost/src/main/scala/eu/dnetlib/doiboost/SparkGenerateDoiBoost.scala
+++ b/dhp-workflows/dhp-doiboost/src/main/scala/eu/dnetlib/doiboost/SparkGenerateDoiBoost.scala
@@ -66,7 +66,7 @@ object SparkGenerateDoiBoost {
Encoders.tuple(Encoders.STRING, mapEncoderPub)
implicit val mapEncoderRel: Encoder[Relation] = Encoders.kryo[Relation]
- logger.info("Phase 2) Join Crossref with UnpayWall")
+ logger.info("Phase 1) Join Crossref with UnpayWall")
val crossrefPublication: Dataset[(String, Publication)] =
spark.read.load(s"$workingDirPath/crossrefPublication").as[Publication].map(p => (p.getId, p))
@@ -91,20 +91,11 @@ object SparkGenerateDoiBoost {
.write
.mode(SaveMode.Overwrite)
.save(s"$workingDirPath/firstJoin")
- logger.info("Phase 3) Join Result with ORCID")
- val fj: Dataset[(String, Publication)] =
- spark.read.load(s"$workingDirPath/firstJoin").as[Publication].map(p => (p.getId, p))
- val orcidPublication: Dataset[(String, Publication)] =
- spark.read.load(s"$workingDirPath/orcidPublication").as[Publication].map(p => (p.getId, p))
- fj.joinWith(orcidPublication, fj("_1").equalTo(orcidPublication("_1")), "left")
- .map(applyMerge)
- .write
- .mode(SaveMode.Overwrite)
- .save(s"$workingDirPath/secondJoin")
- logger.info("Phase 4) Join Result with MAG")
+
+ logger.info("Phase 2) Join Result with MAG")
val sj: Dataset[(String, Publication)] =
- spark.read.load(s"$workingDirPath/secondJoin").as[Publication].map(p => (p.getId, p))
+ spark.read.load(s"$workingDirPath/firstJoin").as[Publication].map(p => (p.getId, p))
val magPublication: Dataset[(String, Publication)] =
spark.read.load(s"$workingDirPath/magPublication").as[Publication].map(p => (p.getId, p))
diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/enrich/orcid/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/enrich/orcid/oozie_app/workflow.xml
index 1284cceda..87c4dcb4f 100644
--- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/enrich/orcid/oozie_app/workflow.xml
+++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/enrich/orcid/oozie_app/workflow.xml
@@ -40,7 +40,7 @@
--orcidPath${orcidPath}
--targetPath${targetPath}
- --graphPath${graphPath}/publication
+ --graphPath${graphPath}
--masteryarn
diff --git a/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/enrich/orcid/AuthorEnricher.scala b/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/enrich/orcid/AuthorEnricher.scala
index a67de4b95..15513c8af 100644
--- a/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/enrich/orcid/AuthorEnricher.scala
+++ b/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/enrich/orcid/AuthorEnricher.scala
@@ -1,5 +1,6 @@
package eu.dnetlib.dhp.enrich.orcid
+import eu.dnetlib.dhp.schema.common.ModelConstants
import eu.dnetlib.dhp.schema.oaf.{Author, Publication}
import eu.dnetlib.dhp.schema.sx.OafUtils
import org.apache.spark.sql.Row
@@ -13,9 +14,11 @@ object AuthorEnricher extends Serializable {
a.setName(givenName)
a.setSurname(familyName)
a.setFullname(s"$givenName $familyName")
- a.setPid(List(OafUtils.createSP(orcid, "ORCID", "ORCID")).asJava)
+ val pid = OafUtils.createSP(orcid, ModelConstants.ORCID, ModelConstants.ORCID)
+ pid.setDataInfo(OafUtils.generateDataInfo())
+ pid.getDataInfo.setProvenanceaction(OafUtils.createQualifier("ORCID_ENRICHMENT", "ORCID_ENRICHMENT"))
+ a.setPid(List(pid).asJava)
a
-
}
def toOAFAuthor(r: Row): java.util.List[Author] = {
diff --git a/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/enrich/orcid/SparkEnrichGraphWithOrcidAuthors.scala b/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/enrich/orcid/SparkEnrichGraphWithOrcidAuthors.scala
index 3c9e04a21..0d994d202 100644
--- a/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/enrich/orcid/SparkEnrichGraphWithOrcidAuthors.scala
+++ b/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/enrich/orcid/SparkEnrichGraphWithOrcidAuthors.scala
@@ -1,13 +1,11 @@
package eu.dnetlib.dhp.enrich.orcid
-import com.fasterxml.jackson.databind.ObjectMapper
import eu.dnetlib.dhp.application.AbstractScalaApplication
import eu.dnetlib.dhp.oa.merge.AuthorMerger
-import eu.dnetlib.dhp.schema.oaf.{Author, DataInfo, Instance, Publication, StructuredProperty}
-import org.apache.spark.sql.{Dataset, Encoder, Encoders, Row, SaveMode, SparkSession}
-import org.apache.spark.sql.functions.{col, collect_set, concat, explode, expr, first, flatten, lower, size, struct}
+import eu.dnetlib.dhp.schema.oaf.{OtherResearchProduct, Publication, Result, Software}
+import org.apache.spark.sql.functions._
+import org.apache.spark.sql._
import org.slf4j.{Logger, LoggerFactory}
-import org.apache.spark.sql.types._
class SparkEnrichGraphWithOrcidAuthors(propertyPath: String, args: Array[String], log: Logger)
extends AbstractScalaApplication(propertyPath, args, log: Logger) {
@@ -22,33 +20,49 @@ class SparkEnrichGraphWithOrcidAuthors(propertyPath: String, args: Array[String]
log.info(s"orcidPath is '$orcidPath'")
val targetPath = parser.get("targetPath")
log.info(s"targetPath is '$targetPath'")
- enrichResult(spark, graphPath, orcidPath, targetPath)
+ val orcidPublication: Dataset[Row] = generateOrcidTable(spark, orcidPath)
+ enrichResult(
+ spark,
+ s"$graphPath/publication",
+ orcidPublication,
+ s"$targetPath/publication",
+ Encoders.bean(classOf[Publication])
+ )
+ enrichResult(
+ spark,
+ s"$graphPath/dataset",
+ orcidPublication,
+ s"$targetPath/dataset",
+ Encoders.bean(classOf[eu.dnetlib.dhp.schema.oaf.Dataset])
+ )
+ enrichResult(
+ spark,
+ s"$graphPath/software",
+ orcidPublication,
+ s"$targetPath/software",
+ Encoders.bean(classOf[Software])
+ )
+ enrichResult(
+ spark,
+ s"$graphPath/otherresearchproduct",
+ orcidPublication,
+ s"$targetPath/otherresearchproduct",
+ Encoders.bean(classOf[OtherResearchProduct])
+ )
}
- def enrichResult(spark: SparkSession, graphPath: String, orcidPath: String, outputPath: String): Unit = {
- val orcidPublication = generateOrcidTable(spark, orcidPath)
+ private def enrichResult[T <: Result](
+ spark: SparkSession,
+ graphPath: String,
+ orcidPublication: Dataset[Row],
+ outputPath: String,
+ enc: Encoder[T]
+ ): Unit = {
-
- implicit val publicationEncoder = Encoders.bean(classOf[Publication])
-
- val aschema = new StructType()
- .add("id", StringType)
- .add("dataInfo", Encoders.bean(classOf[DataInfo]).schema)
- .add(
- "author",Encoders.bean(classOf[Author]).schema
-
- )
-
- val schema = new StructType()
- .add("id", StringType)
- .add("dataInfo", Encoders.bean(classOf[DataInfo]).schema)
- .add(
- "instance",
- ArrayType(new StructType().add("pid", ArrayType(Encoders.bean(classOf[StructuredProperty]).schema)))
- )
val entities = spark.read
- .schema(schema)
+ .schema(enc.schema)
.json(graphPath)
+ .select(col("id"), col("datainfo"), col("instance"))
.where("datainfo.deletedbyinference = false")
.drop("datainfo")
.withColumn("instances", explode(col("instance")))
@@ -58,7 +72,8 @@ class SparkEnrichGraphWithOrcidAuthors(propertyPath: String, args: Array[String]
col("pids.value").alias("pid_value"),
col("id").alias("dnet_id")
)
- val orcidDnet = orcidPublication
+
+ val orcidDnet = orcidPublication
.join(
entities,
lower(col("schema")).equalTo(lower(col("pid_schema"))) &&
@@ -69,36 +84,25 @@ class SparkEnrichGraphWithOrcidAuthors(propertyPath: String, args: Array[String]
.agg(collect_set(orcidPublication("author")).alias("orcid_authors"))
.select("dnet_id", "orcid_authors")
.cache()
+ orcidDnet.count()
+ val result = spark.read.schema(enc.schema).json(graphPath).as[T](enc)
-
- orcidPublication
- .join(
- entities,
- lower(col("schema")).equalTo(lower(col("pid_schema"))) &&
- lower(col("value")).equalTo(lower(col("pid_value"))),
- "inner"
- )
- .groupBy(col("dnet_id")).agg(collect_set(struct(col("pid_schema"), col("pid_value")))).write.mode("Overwrite").save("/user/sandro.labruzzo/enrich_pub")
-
- val publication = spark.read.schema(publicationEncoder.schema).json(graphPath).as[Publication]
-
- publication
- .joinWith(orcidDnet, publication("id").equalTo(orcidDnet("dnet_id")), "left")
+ result
+ .joinWith(orcidDnet, result("id").equalTo(orcidDnet("dnet_id")), "left")
.map {
- case (p: Publication, null) => {
- p
- }
- case (p: Publication, r: Row) =>
+ case (r: T, null) =>
+ r
+ case (p: T, r: Row) =>
p.setAuthor(AuthorMerger.enrichOrcid(p.getAuthor, AuthorEnricher.toOAFAuthor(r)))
p
- }
+ }(enc)
.write
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.json(outputPath)
}
- def generateOrcidTable(spark: SparkSession, inputPath: String): Dataset[Row] = {
+ private def generateOrcidTable(spark: SparkSession, inputPath: String): Dataset[Row] = {
val orcidAuthors =
spark.read.load(s"$inputPath/Authors").select("orcid", "familyName", "givenName", "creditName", "otherNames")
val orcidWorks = spark.read
@@ -107,14 +111,14 @@ class SparkEnrichGraphWithOrcidAuthors(propertyPath: String, args: Array[String]
.where(
"identifier.schema = 'doi' or identifier.schema ='pmid' or identifier.schema ='pmc' or identifier.schema ='arxiv' or identifier.schema ='handle'"
)
- val orcidPublication =orcidAuthors
+ val orcidPublication = orcidAuthors
.join(orcidWorks, orcidAuthors("orcid").equalTo(orcidWorks("orcid")))
.select(
col("identifier.schema").alias("schema"),
col("identifier.value").alias("value"),
struct(orcidAuthors("orcid").alias("orcid"), col("givenName"), col("familyName")).alias("author")
)
- orcidPublication
+ orcidPublication.cache()
}
}
@@ -123,10 +127,8 @@ object SparkEnrichGraphWithOrcidAuthors {
val log: Logger = LoggerFactory.getLogger(SparkEnrichGraphWithOrcidAuthors.getClass)
def main(args: Array[String]): Unit = {
-
new SparkEnrichGraphWithOrcidAuthors("/eu/dnetlib/dhp/enrich/orcid/enrich_graph_orcid_parameters.json", args, log)
.initialize()
.run()
-
}
}
diff --git a/dhp-workflows/dhp-graph-mapper/src/test/scala/eu/dnetlib/dhp/enrich/orcid/EnrichOrcidTest.scala b/dhp-workflows/dhp-graph-mapper/src/test/scala/eu/dnetlib/dhp/enrich/orcid/EnrichOrcidTest.scala
index f58b06318..84483b1a2 100644
--- a/dhp-workflows/dhp-graph-mapper/src/test/scala/eu/dnetlib/dhp/enrich/orcid/EnrichOrcidTest.scala
+++ b/dhp-workflows/dhp-graph-mapper/src/test/scala/eu/dnetlib/dhp/enrich/orcid/EnrichOrcidTest.scala
@@ -1,78 +1,85 @@
package eu.dnetlib.dhp.enrich.orcid
-import eu.dnetlib.dhp.schema.oaf.Publication
+
+import eu.dnetlib.dhp.schema.oaf.{Author, Publication}
import org.apache.spark.sql.{Column, Encoder, Encoders, Row, SparkSession}
import org.junit.jupiter.api.Test
import org.slf4j.{Logger, LoggerFactory}
import org.apache.spark.sql.functions._
-case class Pid(pidScheme: String, pidValue: String) {}
-
-case class AuthorPid(fullName: String, pids: List[Pid]) {}
-
-case class PubSummary(id: String, authorWithPids: List[AuthorPid])
-
class EnrichOrcidTest {
val log: Logger = LoggerFactory.getLogger(getClass)
- def orcid_intersection_wrong(p: PubSummary): PubSummary = {
-
- if (p.authorWithPids.isEmpty)
- null
- else {
- val incorrectAuthor = p.authorWithPids.filter(a => a.pids.filter(p => p.pidScheme != null && p.pidScheme.toLowerCase.contains("orcid")).map(p => p.pidValue.toLowerCase).distinct.size > 1)
- if (incorrectAuthor.nonEmpty) {
- PubSummary(p.id, incorrectAuthor)
- }
- else {
- null
- }
- }
- }
-
-
def test() = {
val spark = SparkSession.builder().master("local[*]").getOrCreate()
- spark.sparkContext.setLogLevel("ERROR")
+// spark.sparkContext.setLogLevel("ERROR")
+
+// new SparkEnrichGraphWithOrcidAuthors(null, null, null)
+// .enrichResult(
+// spark,
+// "/Users/sandro/orcid_test/publication",
+// "",
+// "/tmp/graph/",
+// Encoders.bean(classOf[Publication])
+// )
val schema = Encoders.bean(classOf[Publication]).schema
+//
+// val simplifyAuthor = udf((r: Seq[Row]) => {
+// r
+// .map(k =>
+// AuthorPid(
+// k.getAs[String]("fullname"),
+// k.getAs[Seq[Row]]("pid")
+// .map(p => Pid(p.getAs[Row]("qualifier").getAs[String]("classid"), p.getAs[String]("value")))
+// .toList
+// )
+// )
+// .filter(l => l.pids.nonEmpty)
+// .toList
+// })
+//
+// val wrong_orcid_intersection = udf((a: Seq[Row]) => {
+// a.map(author => {
+// val pids_with_orcid: Seq[Row] = author
+// .getAs[Seq[Row]]("pids")
+// .filter(p =>
+// p.getAs[String]("pidScheme") != null && p.getAs[String]("pidScheme").toLowerCase.contains("orcid")
+// )
+// if (pids_with_orcid.exists(p => p.getAs[String]("pidScheme").equals("ORCID"))) {
+// if (pids_with_orcid.map(p => p.getAs[String]("pidValue").toLowerCase).distinct.size > 1) {
+// AuthorPid(
+// author.getAs[String]("fullName"),
+// pids_with_orcid.map(p => Pid(p.getAs[String]("pidScheme"), p.getAs[String]("pidValue"))).toList
+// )
+//
+// } else
+// null
+// } else
+// null
+// }).filter(author => author != null)
+// })
+
+
+ Encoders
+ import spark.implicits._
+
+// val enriched = spark.read
+// .schema(schema)
+// .json("/Users/sandro/orcid_test/publication_enriched")
+// .select(col("id"), explode(col("author")).as("authors"))
+// .withColumn("ap", col("authors.pid.qualifier.classid"))
+// .withColumn("dp", col("authors.pid.datainfo.provenanceAction.classid"))
+//
+// .show()
- val simplifyAuthor = udf((r: Seq[Row]) => {
- r
- .map(k =>
- AuthorPid(k.getAs[String]("fullname"),
- k.getAs[Seq[Row]]("pid")
- .map(
- p => Pid(p.getAs[Row]("qualifier").getAs[String]("classid"), p.getAs[String]("value"))
- ).toList)
- ).filter(l => l.pids.nonEmpty)
- .toList
- }
- )
- val wrong_orcid_intersection = udf((a: Seq[Row]) => {
- a.map(author => {
- val pids_with_orcid: Seq[Row] = author.getAs[Seq[Row]]("pids").filter(p => p.getAs[String]("pidScheme")!= null && p.getAs[String]("pidScheme").toLowerCase.contains("orcid"))
- if (pids_with_orcid.exists(p => p.getAs[String]("pidScheme").equals("ORCID"))) {
- if (pids_with_orcid.map(p => p.getAs[String]("pidValue").toLowerCase).distinct.size > 1) {
- AuthorPid(author.getAs[String]("fullName"),pids_with_orcid.map(p => Pid(p.getAs[String]("pidScheme"),p.getAs[String]("pidValue"))).toList )
- }
- else
- null
- } else
- null
- }).filter(author => author != null)
- })
- val enriched = spark.read.schema(schema).json("/Users/sandro/orcid_test/publication_enriched").select(col("id"), simplifyAuthor(col("author")).alias("authors"))
- .select(col("id"), wrong_orcid_intersection(col("authors")).alias("wi")).where("wi is not null")
- enriched.show(20, 1000, true)
}
-
}