added set accumulator in TypedRow and used it to acucmulate country information in Country Propagation

This commit is contained in:
Miriam Baglioni 2020-02-19 15:02:50 +01:00
parent bb0fdf5e0a
commit ab84163bb3
2 changed files with 19 additions and 24 deletions

View File

@ -2,31 +2,33 @@ package eu.dnetlib.dhp;
import java.io.Serializable; import java.io.Serializable;
import java.util.ArrayList; import java.util.*;
import java.util.Iterator;
import java.util.List;
public class TypedRow implements Serializable { public class TypedRow implements Serializable {
private String sourceId; private String sourceId;
private String targetId; private String targetId;
private String type; private String type;
private String country; private String value;
private Set<String> accumulator;
public List<String> getAccumulator() { public Set<String> getAccumulator() {
return accumulator; return accumulator;
} }
public TypedRow setAccumulator(List<String> accumulator) { public TypedRow setAccumulator(Set<String> accumulator) {
this.accumulator = accumulator; this.accumulator = accumulator;
return this; return this;
} }
private List<String> accumulator;
public void addAll(Set<String> toadd){
this.accumulator.addAll(toadd);
}
public void add(String a){ public void add(String a){
if (accumulator == null){ if (accumulator == null){
accumulator = new ArrayList<>(); accumulator = new HashSet<>();
} }
accumulator.add(a); accumulator.add(a);
} }
@ -35,12 +37,12 @@ public class TypedRow implements Serializable {
return accumulator.iterator(); return accumulator.iterator();
} }
public String getCountry() { public String getValue() {
return country; return value;
} }
public TypedRow setCountry(String country) { public TypedRow setValue(String value) {
this.country = country; this.value = value;
return this; return this;
} }

View File

@ -47,7 +47,7 @@ public class SparkCountryPropagationJob {
JavaPairRDD<String, TypedRow> organizations = sc.sequenceFile(inputPath + "/organization", Text.class, Text.class) JavaPairRDD<String, TypedRow> organizations = sc.sequenceFile(inputPath + "/organization", Text.class, Text.class)
.map(item -> new ObjectMapper().readValue(item._2().toString(), Organization.class)) .map(item -> new ObjectMapper().readValue(item._2().toString(), Organization.class))
.filter(org -> !org.getDataInfo().getDeletedbyinference()) .filter(org -> !org.getDataInfo().getDeletedbyinference())
.map(org -> new TypedRow().setSourceId(org.getId()).setCountry(org.getCountry().getClassid())) .map(org -> new TypedRow().setSourceId(org.getId()).setValue(org.getCountry().getClassid()))
.mapToPair(toPair()); .mapToPair(toPair());
JavaPairRDD<String, TypedRow> organization_datasource = sc.sequenceFile(inputPath + "/relation", Text.class, Text.class) JavaPairRDD<String, TypedRow> organization_datasource = sc.sequenceFile(inputPath + "/relation", Text.class, Text.class)
@ -126,7 +126,7 @@ public class SparkCountryPropagationJob {
JavaPairRDD<String,TypedRow> toupdateresult = alloweddatasources_country.join(datasource_results) JavaPairRDD<String,TypedRow> toupdateresult = alloweddatasources_country.join(datasource_results)
.map(u -> u._2()._2().setCountry(u._2()._1().getCountry())) .map(u -> u._2()._2().setValue(u._2()._1().getValue()))
.mapToPair(toPair()) .mapToPair(toPair())
.reduceByKey((a, p) -> { .reduceByKey((a, p) -> {
if (a == null) { if (a == null) {
@ -135,15 +135,8 @@ public class SparkCountryPropagationJob {
if (p == null) { if (p == null) {
return a; return a;
} }
HashSet<String> countries = new HashSet(); a.addAll(p.getAccumulator());
countries.addAll(Arrays.asList(a.getCountry().split(";"))); return a;
countries.addAll(Arrays.asList(p.getCountry().split(";")));
String country = new String();
for (String c : countries) {
country += c + ";";
}
return a.setCountry(country);
}); });
updateResult(pubs, toupdateresult, outputPath, "publication"); updateResult(pubs, toupdateresult, outputPath, "publication");
@ -182,7 +175,7 @@ public class SparkCountryPropagationJob {
} }
TypedRow t = c._2()._2().get(); TypedRow t = c._2()._2().get();
for (String country : t.getCountry().split(";")) { for (String country : t.getAccumulator()) {
if (!countries.contains(country)) { if (!countries.contains(country)) {
countryList.add(getCountry(country)); countryList.add(getCountry(country));
} }