From be79d74e3d7993e8dc160d89ccd0b182dec1b139 Mon Sep 17 00:00:00 2001 From: Sandro La Bruzzo Date: Mon, 27 Sep 2021 16:57:04 +0200 Subject: [PATCH] Fixed DoiBoost generation to point to correct organization in affiliation relation --- .../doiboost/SparkGenerateDoiBoost.scala | 51 ++++++++++++++++++- .../doiboost/generate_doiboost_params.json | 3 +- .../doiboost/process/oozie_app/workflow.xml | 7 +++ 3 files changed, 58 insertions(+), 3 deletions(-) diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/SparkGenerateDoiBoost.scala b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/SparkGenerateDoiBoost.scala index b77de13b9d..e501b48233 100644 --- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/SparkGenerateDoiBoost.scala +++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/SparkGenerateDoiBoost.scala @@ -13,10 +13,30 @@ import org.apache.spark.sql.{Dataset, Encoder, Encoders, SaveMode, SparkSession} import org.slf4j.{Logger, LoggerFactory} import scala.collection.JavaConverters._ +import org.json4s.DefaultFormats +import org.json4s.JsonAST.{JField, JObject, JString,JArray} +import org.json4s.jackson.JsonMethods.parse object SparkGenerateDoiBoost { + def extractIdGRID(input:String):List[(String,String)] = { + implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats + lazy val json: org.json4s.JValue = parse(input) + + val id:String = (json \ "id").extract[String] + + val grids:List[String] = for { + + JObject(pid) <- json \ "pid" + JField("qualifier", JObject(qualifier)) <- pid + JField("classid", JString(classid)) <-qualifier + JField("value", JString(vl)) <- pid + if classid == "GRID" + } yield vl + grids.map(g => (id, s"unresolved::grid::${g.toLowerCase}"))(collection.breakOut) + } + def main(args: Array[String]): Unit = { @@ -36,6 +56,7 @@ object SparkGenerateDoiBoost { val hostedByMapPath = parser.get("hostedByMapPath") val workingDirPath = parser.get("workingPath") + val openaireOrganizationPath = parser.get("openaireOrganizationPath") val crossrefAggregator = new Aggregator[(String, Publication), Publication, Publication] with Serializable { override def zero: Publication = new Publication @@ -156,7 +177,7 @@ object SparkGenerateDoiBoost { magPubs.joinWith(a,magPubs("_1").equalTo(a("PaperId"))).flatMap(item => { val pub:Publication = item._1._2 val affiliation = item._2 - val affId:String = if (affiliation.GridId.isDefined) DoiBoostMappingUtil.generateGridAffiliationId(affiliation.GridId.get) else DoiBoostMappingUtil.generateMAGAffiliationId(affiliation.AffiliationId.toString) + val affId:String = if (affiliation.GridId.isDefined) s"unresolved::grid::${affiliation.GridId.get.toLowerCase}" else DoiBoostMappingUtil.generateMAGAffiliationId(affiliation.AffiliationId.toString) val r:Relation = new Relation r.setSource(pub.getId) r.setTarget(affId) @@ -174,9 +195,35 @@ object SparkGenerateDoiBoost { r1.setDataInfo(pub.getDataInfo) r1.setCollectedfrom(List(DoiBoostMappingUtil.createMAGCollectedFrom()).asJava) List(r, r1) - })(mapEncoderRel).write.mode(SaveMode.Overwrite).save(s"$workingDirPath/doiBoostPublicationAffiliation") + })(mapEncoderRel).write.mode(SaveMode.Overwrite).save(s"$workingDirPath/doiBoostPublicationAffiliation_unresolved") + + + val unresolvedRels:Dataset[(String, Relation)] = spark.read.load(s"$workingDirPath/doiBoostPublicationAffiliation_unresolved").as[Relation].map(r => { + + if (r.getSource.startsWith("unresolved")) + (r.getSource, r) + else if (r.getTarget.startsWith("unresolved")) + (r.getTarget,r) + else + ("resolved", r) + }) + + val openaireOrganization:Dataset[(String,String)] = spark.read.text(openaireOrganizationPath).as[String].flatMap(s => extractIdGRID(s)).groupByKey(_._2).reduceGroups((x,y) => if (x != null) x else y ).map(_._2) + + unresolvedRels.joinWith(openaireOrganization,unresolvedRels("_1").equalTo(openaireOrganization("_2"))) + .map { x => + val currentRels = x._1._2 + val currentOrgs = x._2 + if (currentOrgs!= null) + if(currentRels.getSource.startsWith("unresolved")) + currentRels.setSource(currentOrgs._1) + else + currentRels.setTarget(currentOrgs._1) + currentRels + }.write.save(s"$workingDirPath/doiBoostPublicationAffiliation") + magPubs.joinWith(a,magPubs("_1").equalTo(a("PaperId"))).map( item => { val affiliation = item._2 if (affiliation.GridId.isEmpty) { diff --git a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/generate_doiboost_params.json b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/generate_doiboost_params.json index 1ff63dd0ec..39455fb677 100644 --- a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/generate_doiboost_params.json +++ b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/generate_doiboost_params.json @@ -1,7 +1,8 @@ [ {"paramName": "m", "paramLongName":"master", "paramDescription": "the master name", "paramRequired": true}, {"paramName": "hb", "paramLongName":"hostedByMapPath", "paramDescription": "the hosted By Map Path", "paramRequired": true}, + {"paramName": "oo", "paramLongName":"openaireOrganizationPath", "paramDescription": "the openaire Organization Path", "paramRequired": true}, {"paramName": "ap", "paramLongName":"affiliationPath", "paramDescription": "the Affliation Path", "paramRequired": true}, {"paramName": "pa", "paramLongName":"paperAffiliationPath", "paramDescription": "the paperAffiliation Path", "paramRequired": true}, - {"paramName": "w", "paramLongName":"workingPath", "paramDescription": "the Working Path", "paramRequired": true} + {"paramName": "w", "paramLongName":"workingPath", "paramDescription": "the Working Path", "paramRequired": true} ] diff --git a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/process/oozie_app/workflow.xml b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/process/oozie_app/workflow.xml index f5596b60ee..eb82c3a7d6 100644 --- a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/process/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/process/oozie_app/workflow.xml @@ -27,6 +27,12 @@ hostedByMapPath the hostedByMap Path + + openaireOrganizationPath + the OpenAire Organizations Path + + + outputPath the Path of the sequence file action set @@ -214,6 +220,7 @@ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --hostedByMapPath${hostedByMapPath} + --openaireOrganizationPath${openaireOrganizationPath} --affiliationPath${inputPathMAG}/dataset/Affiliations --paperAffiliationPath${inputPathMAG}/dataset/PaperAuthorAffiliations --workingPath${workingPath}