diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/SparkGenerateDoiBoost.scala b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/SparkGenerateDoiBoost.scala
index b77de13b9d..e501b48233 100644
--- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/SparkGenerateDoiBoost.scala
+++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/SparkGenerateDoiBoost.scala
@@ -13,10 +13,30 @@ import org.apache.spark.sql.{Dataset, Encoder, Encoders, SaveMode, SparkSession}
import org.slf4j.{Logger, LoggerFactory}
import scala.collection.JavaConverters._
+import org.json4s.DefaultFormats
+import org.json4s.JsonAST.{JField, JObject, JString,JArray}
+import org.json4s.jackson.JsonMethods.parse
object SparkGenerateDoiBoost {
+ def extractIdGRID(input:String):List[(String,String)] = {
+ implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats
+ lazy val json: org.json4s.JValue = parse(input)
+
+ val id:String = (json \ "id").extract[String]
+
+ val grids:List[String] = for {
+
+ JObject(pid) <- json \ "pid"
+ JField("qualifier", JObject(qualifier)) <- pid
+ JField("classid", JString(classid)) <-qualifier
+ JField("value", JString(vl)) <- pid
+ if classid == "GRID"
+ } yield vl
+ grids.map(g => (id, s"unresolved::grid::${g.toLowerCase}"))(collection.breakOut)
+ }
+
def main(args: Array[String]): Unit = {
@@ -36,6 +56,7 @@ object SparkGenerateDoiBoost {
val hostedByMapPath = parser.get("hostedByMapPath")
val workingDirPath = parser.get("workingPath")
+ val openaireOrganizationPath = parser.get("openaireOrganizationPath")
val crossrefAggregator = new Aggregator[(String, Publication), Publication, Publication] with Serializable {
override def zero: Publication = new Publication
@@ -156,7 +177,7 @@ object SparkGenerateDoiBoost {
magPubs.joinWith(a,magPubs("_1").equalTo(a("PaperId"))).flatMap(item => {
val pub:Publication = item._1._2
val affiliation = item._2
- val affId:String = if (affiliation.GridId.isDefined) DoiBoostMappingUtil.generateGridAffiliationId(affiliation.GridId.get) else DoiBoostMappingUtil.generateMAGAffiliationId(affiliation.AffiliationId.toString)
+ val affId:String = if (affiliation.GridId.isDefined) s"unresolved::grid::${affiliation.GridId.get.toLowerCase}" else DoiBoostMappingUtil.generateMAGAffiliationId(affiliation.AffiliationId.toString)
val r:Relation = new Relation
r.setSource(pub.getId)
r.setTarget(affId)
@@ -174,9 +195,35 @@ object SparkGenerateDoiBoost {
r1.setDataInfo(pub.getDataInfo)
r1.setCollectedfrom(List(DoiBoostMappingUtil.createMAGCollectedFrom()).asJava)
List(r, r1)
- })(mapEncoderRel).write.mode(SaveMode.Overwrite).save(s"$workingDirPath/doiBoostPublicationAffiliation")
+ })(mapEncoderRel).write.mode(SaveMode.Overwrite).save(s"$workingDirPath/doiBoostPublicationAffiliation_unresolved")
+
+
+ val unresolvedRels:Dataset[(String, Relation)] = spark.read.load(s"$workingDirPath/doiBoostPublicationAffiliation_unresolved").as[Relation].map(r => {
+
+ if (r.getSource.startsWith("unresolved"))
+ (r.getSource, r)
+ else if (r.getTarget.startsWith("unresolved"))
+ (r.getTarget,r)
+ else
+ ("resolved", r)
+ })
+
+ val openaireOrganization:Dataset[(String,String)] = spark.read.text(openaireOrganizationPath).as[String].flatMap(s => extractIdGRID(s)).groupByKey(_._2).reduceGroups((x,y) => if (x != null) x else y ).map(_._2)
+
+ unresolvedRels.joinWith(openaireOrganization,unresolvedRels("_1").equalTo(openaireOrganization("_2")))
+ .map { x =>
+ val currentRels = x._1._2
+ val currentOrgs = x._2
+ if (currentOrgs!= null)
+ if(currentRels.getSource.startsWith("unresolved"))
+ currentRels.setSource(currentOrgs._1)
+ else
+ currentRels.setTarget(currentOrgs._1)
+ currentRels
+ }.write.save(s"$workingDirPath/doiBoostPublicationAffiliation")
+
magPubs.joinWith(a,magPubs("_1").equalTo(a("PaperId"))).map( item => {
val affiliation = item._2
if (affiliation.GridId.isEmpty) {
diff --git a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/generate_doiboost_params.json b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/generate_doiboost_params.json
index 1ff63dd0ec..39455fb677 100644
--- a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/generate_doiboost_params.json
+++ b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/generate_doiboost_params.json
@@ -1,7 +1,8 @@
[
{"paramName": "m", "paramLongName":"master", "paramDescription": "the master name", "paramRequired": true},
{"paramName": "hb", "paramLongName":"hostedByMapPath", "paramDescription": "the hosted By Map Path", "paramRequired": true},
+ {"paramName": "oo", "paramLongName":"openaireOrganizationPath", "paramDescription": "the openaire Organization Path", "paramRequired": true},
{"paramName": "ap", "paramLongName":"affiliationPath", "paramDescription": "the Affliation Path", "paramRequired": true},
{"paramName": "pa", "paramLongName":"paperAffiliationPath", "paramDescription": "the paperAffiliation Path", "paramRequired": true},
- {"paramName": "w", "paramLongName":"workingPath", "paramDescription": "the Working Path", "paramRequired": true}
+ {"paramName": "w", "paramLongName":"workingPath", "paramDescription": "the Working Path", "paramRequired": true}
]
diff --git a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/process/oozie_app/workflow.xml b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/process/oozie_app/workflow.xml
index f5596b60ee..eb82c3a7d6 100644
--- a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/process/oozie_app/workflow.xml
+++ b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/process/oozie_app/workflow.xml
@@ -27,6 +27,12 @@
hostedByMapPath
the hostedByMap Path
+
+ openaireOrganizationPath
+ the OpenAire Organizations Path
+
+
+
outputPath
the Path of the sequence file action set
@@ -214,6 +220,7 @@
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--hostedByMapPath${hostedByMapPath}
+ --openaireOrganizationPath${openaireOrganizationPath}
--affiliationPath${inputPathMAG}/dataset/Affiliations
--paperAffiliationPath${inputPathMAG}/dataset/PaperAuthorAffiliations
--workingPath${workingPath}