diff --git a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/TypedRow.java b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/TypedRow.java index d08b0e33d1..13f3ca611d 100644 --- a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/TypedRow.java +++ b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/TypedRow.java @@ -2,31 +2,33 @@ package eu.dnetlib.dhp; import java.io.Serializable; -import java.util.ArrayList; -import java.util.Iterator; -import java.util.List; +import java.util.*; public class TypedRow implements Serializable { private String sourceId; private String targetId; private String type; - private String country; + private String value; + private Set accumulator; - public List getAccumulator() { + public Set getAccumulator() { return accumulator; } - public TypedRow setAccumulator(List accumulator) { + public TypedRow setAccumulator(Set accumulator) { this.accumulator = accumulator; return this; } - private List accumulator; + + public void addAll(Set toadd){ + this.accumulator.addAll(toadd); + } public void add(String a){ if (accumulator == null){ - accumulator = new ArrayList<>(); + accumulator = new HashSet<>(); } accumulator.add(a); } @@ -35,12 +37,12 @@ public class TypedRow implements Serializable { return accumulator.iterator(); } - public String getCountry() { - return country; + public String getValue() { + return value; } - public TypedRow setCountry(String country) { - this.country = country; + public TypedRow setValue(String value) { + this.value = value; return this; } diff --git a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/countrypropagation/SparkCountryPropagationJob.java b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/countrypropagation/SparkCountryPropagationJob.java index 3a6e7e8e20..ca126ac1aa 100644 --- a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/countrypropagation/SparkCountryPropagationJob.java +++ b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/countrypropagation/SparkCountryPropagationJob.java @@ -47,7 +47,7 @@ public class SparkCountryPropagationJob { JavaPairRDD organizations = sc.sequenceFile(inputPath + "/organization", Text.class, Text.class) .map(item -> new ObjectMapper().readValue(item._2().toString(), Organization.class)) .filter(org -> !org.getDataInfo().getDeletedbyinference()) - .map(org -> new TypedRow().setSourceId(org.getId()).setCountry(org.getCountry().getClassid())) + .map(org -> new TypedRow().setSourceId(org.getId()).setValue(org.getCountry().getClassid())) .mapToPair(toPair()); JavaPairRDD organization_datasource = sc.sequenceFile(inputPath + "/relation", Text.class, Text.class) @@ -126,7 +126,7 @@ public class SparkCountryPropagationJob { JavaPairRDD toupdateresult = alloweddatasources_country.join(datasource_results) - .map(u -> u._2()._2().setCountry(u._2()._1().getCountry())) + .map(u -> u._2()._2().setValue(u._2()._1().getValue())) .mapToPair(toPair()) .reduceByKey((a, p) -> { if (a == null) { @@ -135,15 +135,8 @@ public class SparkCountryPropagationJob { if (p == null) { return a; } - HashSet countries = new HashSet(); - countries.addAll(Arrays.asList(a.getCountry().split(";"))); - countries.addAll(Arrays.asList(p.getCountry().split(";"))); - String country = new String(); - for (String c : countries) { - country += c + ";"; - } - - return a.setCountry(country); + a.addAll(p.getAccumulator()); + return a; }); updateResult(pubs, toupdateresult, outputPath, "publication"); @@ -182,7 +175,7 @@ public class SparkCountryPropagationJob { } TypedRow t = c._2()._2().get(); - for (String country : t.getCountry().split(";")) { + for (String country : t.getAccumulator()) { if (!countries.contains(country)) { countryList.add(getCountry(country)); }