From b50166b9ad9ae7b92789ebb0dbb4231b3d1df22f Mon Sep 17 00:00:00 2001 From: "miriam.baglioni" Date: Fri, 28 Feb 2020 18:25:28 +0100 Subject: [PATCH] None --- .../SparkCountryPropagationJob.java | 53 ++++--------------- 1 file changed, 9 insertions(+), 44 deletions(-) diff --git a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/countrypropagation/SparkCountryPropagationJob.java b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/countrypropagation/SparkCountryPropagationJob.java index ca126ac1a..0261e3887 100644 --- a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/countrypropagation/SparkCountryPropagationJob.java +++ b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/countrypropagation/SparkCountryPropagationJob.java @@ -111,10 +111,10 @@ public class SparkCountryPropagationJob { return ret.iterator(); })); - JavaPairRDD pubs = publications.mapToPair(p -> new Tuple2<>(p.getId(),p)); - JavaPairRDD dss = datasets.mapToPair(p -> new Tuple2<>(p.getId(),p)); - JavaPairRDD sfw = software.mapToPair(p -> new Tuple2<>(p.getId(),p)); - JavaPairRDD orp = other.mapToPair(p -> new Tuple2<>(p.getId(),p)); + JavaPairRDD pubs = publications.mapToPair(p -> new Tuple2<>(p.getId(),p)); + JavaPairRDD dss = datasets.mapToPair(p -> new Tuple2<>(p.getId(),p)); + JavaPairRDD sfw = software.mapToPair(p -> new Tuple2<>(p.getId(),p)); + JavaPairRDD orp = other.mapToPair(p -> new Tuple2<>(p.getId(),p)); JavaPairRDD datasource_country = organizations.join(organization_datasource) .map(x -> x._2()._1().setSourceId(x._2()._2().getTargetId())) // (OrganizationId,(TypedRow for Organization, TypedRow for Relation) @@ -147,27 +147,11 @@ public class SparkCountryPropagationJob { } - private static void updateResult(JavaPairRDD results, JavaPairRDD toupdateresult, String outputPath, String type) { + private static void updateResult(JavaPairRDD results, JavaPairRDD toupdateresult, String outputPath, String type) { results.leftOuterJoin(toupdateresult) .map(c -> { - OafEntity oaf = c._2()._1(); - List countryList = null; - if (oaf.getClass() == Publication.class) { - countryList = ((Publication) oaf).getCountry(); - - } - if (oaf.getClass() == Dataset.class){ - countryList = ((Dataset) oaf).getCountry(); - } - - if (oaf.getClass() == Software.class){ - countryList = ((Software) oaf).getCountry(); - } - - if (oaf.getClass() == OtherResearchProduct.class){ - countryList = ((OtherResearchProduct) oaf).getCountry(); - } - + Result oaf = c._2()._1(); + List countryList = oaf.getCountry(); if (c._2()._2().isPresent()) { HashSet countries = new HashSet<>(); for (Qualifier country : countryList) { @@ -181,29 +165,10 @@ public class SparkCountryPropagationJob { } } - if (oaf.getClass() == Publication.class) { - ((Publication) oaf).setCountry(countryList); - return (Publication) oaf; - - } - if (oaf.getClass() == Dataset.class){ - ((Dataset) oaf).setCountry(countryList); - return (Dataset) oaf; - } - - if (oaf.getClass() == Software.class){ - ((Software) oaf).setCountry(countryList); - return (Software) oaf; - } - - if (oaf.getClass() == OtherResearchProduct.class){ - ((OtherResearchProduct) oaf).setCountry(countryList); - return (OtherResearchProduct) oaf; - } + oaf.setCountry(countryList); } - - return null; + return oaf; }) .map(p -> new ObjectMapper().writeValueAsString(p)) .saveAsTextFile(outputPath+"/"+type);