This commit is contained in:
Miriam Baglioni 2020-02-28 18:25:28 +01:00
parent 550cb21c23
commit b50166b9ad
1 changed files with 9 additions and 44 deletions

View File

@ -111,10 +111,10 @@ public class SparkCountryPropagationJob {
return ret.iterator();
}));
JavaPairRDD<String, OafEntity> pubs = publications.mapToPair(p -> new Tuple2<>(p.getId(),p));
JavaPairRDD<String, OafEntity> dss = datasets.mapToPair(p -> new Tuple2<>(p.getId(),p));
JavaPairRDD<String, OafEntity> sfw = software.mapToPair(p -> new Tuple2<>(p.getId(),p));
JavaPairRDD<String, OafEntity> orp = other.mapToPair(p -> new Tuple2<>(p.getId(),p));
JavaPairRDD<String, Result> pubs = publications.mapToPair(p -> new Tuple2<>(p.getId(),p));
JavaPairRDD<String, Result> dss = datasets.mapToPair(p -> new Tuple2<>(p.getId(),p));
JavaPairRDD<String, Result> sfw = software.mapToPair(p -> new Tuple2<>(p.getId(),p));
JavaPairRDD<String, Result> orp = other.mapToPair(p -> new Tuple2<>(p.getId(),p));
JavaPairRDD<String, TypedRow> datasource_country = organizations.join(organization_datasource)
.map(x -> x._2()._1().setSourceId(x._2()._2().getTargetId())) // (OrganizationId,(TypedRow for Organization, TypedRow for Relation)
@ -147,27 +147,11 @@ public class SparkCountryPropagationJob {
}
private static void updateResult(JavaPairRDD<String, OafEntity> results, JavaPairRDD<String, TypedRow> toupdateresult, String outputPath, String type) {
private static void updateResult(JavaPairRDD<String, Result> results, JavaPairRDD<String, TypedRow> toupdateresult, String outputPath, String type) {
results.leftOuterJoin(toupdateresult)
.map(c -> {
OafEntity oaf = c._2()._1();
List<Country> countryList = null;
if (oaf.getClass() == Publication.class) {
countryList = ((Publication) oaf).getCountry();
}
if (oaf.getClass() == Dataset.class){
countryList = ((Dataset) oaf).getCountry();
}
if (oaf.getClass() == Software.class){
countryList = ((Software) oaf).getCountry();
}
if (oaf.getClass() == OtherResearchProduct.class){
countryList = ((OtherResearchProduct) oaf).getCountry();
}
Result oaf = c._2()._1();
List<Country> countryList = oaf.getCountry();
if (c._2()._2().isPresent()) {
HashSet<String> countries = new HashSet<>();
for (Qualifier country : countryList) {
@ -181,29 +165,10 @@ public class SparkCountryPropagationJob {
}
}
if (oaf.getClass() == Publication.class) {
((Publication) oaf).setCountry(countryList);
return (Publication) oaf;
}
if (oaf.getClass() == Dataset.class){
((Dataset) oaf).setCountry(countryList);
return (Dataset) oaf;
oaf.setCountry(countryList);
}
if (oaf.getClass() == Software.class){
((Software) oaf).setCountry(countryList);
return (Software) oaf;
}
if (oaf.getClass() == OtherResearchProduct.class){
((OtherResearchProduct) oaf).setCountry(countryList);
return (OtherResearchProduct) oaf;
}
}
return null;
return oaf;
})
.map(p -> new ObjectMapper().writeValueAsString(p))
.saveAsTextFile(outputPath+"/"+type);