diff --git a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/countrypropagation/SparkCountryPropagationJob.java b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/countrypropagation/SparkCountryPropagationJob.java index ca126ac1aa..0261e3887f 100644 --- a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/countrypropagation/SparkCountryPropagationJob.java +++ b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/countrypropagation/SparkCountryPropagationJob.java @@ -111,10 +111,10 @@ public class SparkCountryPropagationJob { return ret.iterator(); })); - JavaPairRDD pubs = publications.mapToPair(p -> new Tuple2<>(p.getId(),p)); - JavaPairRDD dss = datasets.mapToPair(p -> new Tuple2<>(p.getId(),p)); - JavaPairRDD sfw = software.mapToPair(p -> new Tuple2<>(p.getId(),p)); - JavaPairRDD orp = other.mapToPair(p -> new Tuple2<>(p.getId(),p)); + JavaPairRDD pubs = publications.mapToPair(p -> new Tuple2<>(p.getId(),p)); + JavaPairRDD dss = datasets.mapToPair(p -> new Tuple2<>(p.getId(),p)); + JavaPairRDD sfw = software.mapToPair(p -> new Tuple2<>(p.getId(),p)); + JavaPairRDD orp = other.mapToPair(p -> new Tuple2<>(p.getId(),p)); JavaPairRDD datasource_country = organizations.join(organization_datasource) .map(x -> x._2()._1().setSourceId(x._2()._2().getTargetId())) // (OrganizationId,(TypedRow for Organization, TypedRow for Relation) @@ -147,27 +147,11 @@ public class SparkCountryPropagationJob { } - private static void updateResult(JavaPairRDD results, JavaPairRDD toupdateresult, String outputPath, String type) { + private static void updateResult(JavaPairRDD results, JavaPairRDD toupdateresult, String outputPath, String type) { results.leftOuterJoin(toupdateresult) .map(c -> { - OafEntity oaf = c._2()._1(); - List countryList = null; - if (oaf.getClass() == Publication.class) { - countryList = ((Publication) oaf).getCountry(); - - } - if (oaf.getClass() == Dataset.class){ - countryList = ((Dataset) oaf).getCountry(); - } - - if (oaf.getClass() == Software.class){ - countryList = ((Software) oaf).getCountry(); - } - - if (oaf.getClass() == OtherResearchProduct.class){ - countryList = ((OtherResearchProduct) oaf).getCountry(); - } - + Result oaf = c._2()._1(); + List countryList = oaf.getCountry(); if (c._2()._2().isPresent()) { HashSet countries = new HashSet<>(); for (Qualifier country : countryList) { @@ -181,29 +165,10 @@ public class SparkCountryPropagationJob { } } - if (oaf.getClass() == Publication.class) { - ((Publication) oaf).setCountry(countryList); - return (Publication) oaf; - - } - if (oaf.getClass() == Dataset.class){ - ((Dataset) oaf).setCountry(countryList); - return (Dataset) oaf; - } - - if (oaf.getClass() == Software.class){ - ((Software) oaf).setCountry(countryList); - return (Software) oaf; - } - - if (oaf.getClass() == OtherResearchProduct.class){ - ((OtherResearchProduct) oaf).setCountry(countryList); - return (OtherResearchProduct) oaf; - } + oaf.setCountry(countryList); } - - return null; + return oaf; }) .map(p -> new ObjectMapper().writeValueAsString(p)) .saveAsTextFile(outputPath+"/"+type);