forked from antonis.lempesis/dnet-hadoop
Fixed DoiBoost generation to point to correct organization in affiliation relation
This commit is contained in:
parent
b924276e18
commit
be79d74e3d
|
@ -13,10 +13,30 @@ import org.apache.spark.sql.{Dataset, Encoder, Encoders, SaveMode, SparkSession}
|
|||
import org.slf4j.{Logger, LoggerFactory}
|
||||
|
||||
import scala.collection.JavaConverters._
|
||||
import org.json4s.DefaultFormats
|
||||
import org.json4s.JsonAST.{JField, JObject, JString,JArray}
|
||||
import org.json4s.jackson.JsonMethods.parse
|
||||
|
||||
object SparkGenerateDoiBoost {
|
||||
|
||||
|
||||
def extractIdGRID(input:String):List[(String,String)] = {
|
||||
implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats
|
||||
lazy val json: org.json4s.JValue = parse(input)
|
||||
|
||||
val id:String = (json \ "id").extract[String]
|
||||
|
||||
val grids:List[String] = for {
|
||||
|
||||
JObject(pid) <- json \ "pid"
|
||||
JField("qualifier", JObject(qualifier)) <- pid
|
||||
JField("classid", JString(classid)) <-qualifier
|
||||
JField("value", JString(vl)) <- pid
|
||||
if classid == "GRID"
|
||||
} yield vl
|
||||
grids.map(g => (id, s"unresolved::grid::${g.toLowerCase}"))(collection.breakOut)
|
||||
}
|
||||
|
||||
|
||||
|
||||
def main(args: Array[String]): Unit = {
|
||||
|
@ -36,6 +56,7 @@ object SparkGenerateDoiBoost {
|
|||
|
||||
val hostedByMapPath = parser.get("hostedByMapPath")
|
||||
val workingDirPath = parser.get("workingPath")
|
||||
val openaireOrganizationPath = parser.get("openaireOrganizationPath")
|
||||
|
||||
val crossrefAggregator = new Aggregator[(String, Publication), Publication, Publication] with Serializable {
|
||||
override def zero: Publication = new Publication
|
||||
|
@ -156,7 +177,7 @@ object SparkGenerateDoiBoost {
|
|||
magPubs.joinWith(a,magPubs("_1").equalTo(a("PaperId"))).flatMap(item => {
|
||||
val pub:Publication = item._1._2
|
||||
val affiliation = item._2
|
||||
val affId:String = if (affiliation.GridId.isDefined) DoiBoostMappingUtil.generateGridAffiliationId(affiliation.GridId.get) else DoiBoostMappingUtil.generateMAGAffiliationId(affiliation.AffiliationId.toString)
|
||||
val affId:String = if (affiliation.GridId.isDefined) s"unresolved::grid::${affiliation.GridId.get.toLowerCase}" else DoiBoostMappingUtil.generateMAGAffiliationId(affiliation.AffiliationId.toString)
|
||||
val r:Relation = new Relation
|
||||
r.setSource(pub.getId)
|
||||
r.setTarget(affId)
|
||||
|
@ -174,9 +195,35 @@ object SparkGenerateDoiBoost {
|
|||
r1.setDataInfo(pub.getDataInfo)
|
||||
r1.setCollectedfrom(List(DoiBoostMappingUtil.createMAGCollectedFrom()).asJava)
|
||||
List(r, r1)
|
||||
})(mapEncoderRel).write.mode(SaveMode.Overwrite).save(s"$workingDirPath/doiBoostPublicationAffiliation")
|
||||
})(mapEncoderRel).write.mode(SaveMode.Overwrite).save(s"$workingDirPath/doiBoostPublicationAffiliation_unresolved")
|
||||
|
||||
|
||||
|
||||
|
||||
val unresolvedRels:Dataset[(String, Relation)] = spark.read.load(s"$workingDirPath/doiBoostPublicationAffiliation_unresolved").as[Relation].map(r => {
|
||||
|
||||
if (r.getSource.startsWith("unresolved"))
|
||||
(r.getSource, r)
|
||||
else if (r.getTarget.startsWith("unresolved"))
|
||||
(r.getTarget,r)
|
||||
else
|
||||
("resolved", r)
|
||||
})
|
||||
|
||||
val openaireOrganization:Dataset[(String,String)] = spark.read.text(openaireOrganizationPath).as[String].flatMap(s => extractIdGRID(s)).groupByKey(_._2).reduceGroups((x,y) => if (x != null) x else y ).map(_._2)
|
||||
|
||||
unresolvedRels.joinWith(openaireOrganization,unresolvedRels("_1").equalTo(openaireOrganization("_2")))
|
||||
.map { x =>
|
||||
val currentRels = x._1._2
|
||||
val currentOrgs = x._2
|
||||
if (currentOrgs!= null)
|
||||
if(currentRels.getSource.startsWith("unresolved"))
|
||||
currentRels.setSource(currentOrgs._1)
|
||||
else
|
||||
currentRels.setTarget(currentOrgs._1)
|
||||
currentRels
|
||||
}.write.save(s"$workingDirPath/doiBoostPublicationAffiliation")
|
||||
|
||||
magPubs.joinWith(a,magPubs("_1").equalTo(a("PaperId"))).map( item => {
|
||||
val affiliation = item._2
|
||||
if (affiliation.GridId.isEmpty) {
|
||||
|
|
|
@ -1,7 +1,8 @@
|
|||
[
|
||||
{"paramName": "m", "paramLongName":"master", "paramDescription": "the master name", "paramRequired": true},
|
||||
{"paramName": "hb", "paramLongName":"hostedByMapPath", "paramDescription": "the hosted By Map Path", "paramRequired": true},
|
||||
{"paramName": "oo", "paramLongName":"openaireOrganizationPath", "paramDescription": "the openaire Organization Path", "paramRequired": true},
|
||||
{"paramName": "ap", "paramLongName":"affiliationPath", "paramDescription": "the Affliation Path", "paramRequired": true},
|
||||
{"paramName": "pa", "paramLongName":"paperAffiliationPath", "paramDescription": "the paperAffiliation Path", "paramRequired": true},
|
||||
{"paramName": "w", "paramLongName":"workingPath", "paramDescription": "the Working Path", "paramRequired": true}
|
||||
{"paramName": "w", "paramLongName":"workingPath", "paramDescription": "the Working Path", "paramRequired": true}
|
||||
]
|
||||
|
|
|
@ -27,6 +27,12 @@
|
|||
<name>hostedByMapPath</name>
|
||||
<description>the hostedByMap Path</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>openaireOrganizationPath</name>
|
||||
<description>the OpenAire Organizations Path</description>
|
||||
</property>
|
||||
|
||||
|
||||
<property>
|
||||
<name>outputPath</name>
|
||||
<description>the Path of the sequence file action set</description>
|
||||
|
@ -214,6 +220,7 @@
|
|||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||
</spark-opts>
|
||||
<arg>--hostedByMapPath</arg><arg>${hostedByMapPath}</arg>
|
||||
<arg>--openaireOrganizationPath</arg><arg>${openaireOrganizationPath}</arg>
|
||||
<arg>--affiliationPath</arg><arg>${inputPathMAG}/dataset/Affiliations</arg>
|
||||
<arg>--paperAffiliationPath</arg><arg>${inputPathMAG}/dataset/PaperAuthorAffiliations</arg>
|
||||
<arg>--workingPath</arg><arg>${workingPath}</arg>
|
||||
|
|
Loading…
Reference in New Issue