[stats wf] indicators across stats dbs & updates in the org ids #248
|
@ -208,7 +208,7 @@ object SparkGenerateDoiBoost {
|
||||||
(r.getTarget,r)
|
(r.getTarget,r)
|
||||||
else
|
else
|
||||||
("resolved", r)
|
("resolved", r)
|
||||||
})
|
})(Encoders.tuple(Encoders.STRING, mapEncoderRel))
|
||||||
|
|
||||||
val openaireOrganization:Dataset[(String,String)] = spark.read.text(openaireOrganizationPath).as[String].flatMap(s => extractIdGRID(s)).groupByKey(_._2).reduceGroups((x,y) => if (x != null) x else y ).map(_._2)
|
val openaireOrganization:Dataset[(String,String)] = spark.read.text(openaireOrganizationPath).as[String].flatMap(s => extractIdGRID(s)).groupByKey(_._2).reduceGroups((x,y) => if (x != null) x else y ).map(_._2)
|
||||||
|
|
||||||
|
@ -222,7 +222,7 @@ object SparkGenerateDoiBoost {
|
||||||
else
|
else
|
||||||
currentRels.setTarget(currentOrgs._1)
|
currentRels.setTarget(currentOrgs._1)
|
||||||
currentRels
|
currentRels
|
||||||
}.write.save(s"$workingDirPath/doiBoostPublicationAffiliation")
|
}.filter(r=> !r.getSource.startsWith("unresolved") && !r.getTarget.startsWith("unresolved")).write.mode(SaveMode.Overwrite).save(s"$workingDirPath/doiBoostPublicationAffiliation")
|
||||||
|
|
||||||
magPubs.joinWith(a,magPubs("_1").equalTo(a("PaperId"))).map( item => {
|
magPubs.joinWith(a,magPubs("_1").equalTo(a("PaperId"))).map( item => {
|
||||||
val affiliation = item._2
|
val affiliation = item._2
|
||||||
|
|
Loading…
Reference in New Issue