diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/SparkGenerateDoiBoost.scala b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/SparkGenerateDoiBoost.scala index e501b4823..c08e09f53 100644 --- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/SparkGenerateDoiBoost.scala +++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/SparkGenerateDoiBoost.scala @@ -206,9 +206,9 @@ object SparkGenerateDoiBoost { (r.getSource, r) else if (r.getTarget.startsWith("unresolved")) (r.getTarget,r) - else + else ("resolved", r) - }) + })(Encoders.tuple(Encoders.STRING, mapEncoderRel)) val openaireOrganization:Dataset[(String,String)] = spark.read.text(openaireOrganizationPath).as[String].flatMap(s => extractIdGRID(s)).groupByKey(_._2).reduceGroups((x,y) => if (x != null) x else y ).map(_._2) @@ -221,8 +221,8 @@ object SparkGenerateDoiBoost { currentRels.setSource(currentOrgs._1) else currentRels.setTarget(currentOrgs._1) - currentRels - }.write.save(s"$workingDirPath/doiBoostPublicationAffiliation") + currentRels + }.filter(r=> !r.getSource.startsWith("unresolved") && !r.getTarget.startsWith("unresolved")).write.mode(SaveMode.Overwrite).save(s"$workingDirPath/doiBoostPublicationAffiliation") magPubs.joinWith(a,magPubs("_1").equalTo(a("PaperId"))).map( item => { val affiliation = item._2 @@ -242,6 +242,6 @@ object SparkGenerateDoiBoost { else null }).filter(o=> o!=null).write.mode(SaveMode.Overwrite).save(s"$workingDirPath/doiBoostOrganization") - } + } -} +} \ No newline at end of file