forked from D-Net/dnet-hadoop
changed relclass and reltype in reelation specification for country propagation and implementation of propagation of result affiliation through institutional repositories
This commit is contained in:
parent
ed262293a6
commit
b736a9581c
|
@ -1,6 +1,7 @@
|
||||||
package eu.dnetlib.dhp.countrypropagation;
|
package eu.dnetlib.dhp.countrypropagation;
|
||||||
|
|
||||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
|
import eu.dnetlib.dhp.TypedRow;
|
||||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||||
import eu.dnetlib.dhp.schema.oaf.*;
|
import eu.dnetlib.dhp.schema.oaf.*;
|
||||||
import eu.dnetlib.dhp.schema.oaf.Dataset;
|
import eu.dnetlib.dhp.schema.oaf.Dataset;
|
||||||
|
@ -9,13 +10,14 @@ import org.apache.hadoop.io.Text;
|
||||||
import org.apache.spark.api.java.JavaPairRDD;
|
import org.apache.spark.api.java.JavaPairRDD;
|
||||||
import org.apache.spark.api.java.JavaRDD;
|
import org.apache.spark.api.java.JavaRDD;
|
||||||
import org.apache.spark.api.java.JavaSparkContext;
|
import org.apache.spark.api.java.JavaSparkContext;
|
||||||
import org.apache.spark.api.java.function.PairFunction;
|
|
||||||
import org.apache.spark.sql.*;
|
import org.apache.spark.sql.*;
|
||||||
import scala.Tuple2;
|
import scala.Tuple2;
|
||||||
|
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.util.*;
|
import java.util.*;
|
||||||
|
|
||||||
|
import static eu.dnetlib.dhp.PropagationConstant.*;
|
||||||
|
|
||||||
public class SparkCountryPropagationJob {
|
public class SparkCountryPropagationJob {
|
||||||
public static void main(String[] args) throws Exception {
|
public static void main(String[] args) throws Exception {
|
||||||
|
|
||||||
|
@ -51,9 +53,10 @@ public class SparkCountryPropagationJob {
|
||||||
JavaPairRDD<String, TypedRow> organization_datasource = sc.sequenceFile(inputPath + "/relation", Text.class, Text.class)
|
JavaPairRDD<String, TypedRow> organization_datasource = sc.sequenceFile(inputPath + "/relation", Text.class, Text.class)
|
||||||
.map(item -> new ObjectMapper().readValue(item._2().toString(), Relation.class))
|
.map(item -> new ObjectMapper().readValue(item._2().toString(), Relation.class))
|
||||||
.filter(r -> !r.getDataInfo().getDeletedbyinference())
|
.filter(r -> !r.getDataInfo().getDeletedbyinference())
|
||||||
.filter(r -> "datasourceOrganization".equals(r.getRelClass()) && "isProvidedBy".equals(r.getRelType()))
|
.filter(r -> RELATION_DATASOURCEORGANIZATION_REL_TYPE.equals(r.getRelClass()) && RELATION_ORGANIZATION_DATASOURCE_REL_CLASS.equals(r.getRelType()))
|
||||||
.map(r -> new TypedRow().setSourceId(r.getSource()).setTargetId(r.getTarget()))
|
.map(r -> new TypedRow().setSourceId(r.getSource()).setTargetId(r.getTarget()))
|
||||||
.mapToPair(toPair());
|
.mapToPair(toPair()); //id is the organization identifier
|
||||||
|
|
||||||
|
|
||||||
JavaPairRDD<String, TypedRow> datasources = sc.sequenceFile(inputPath + "/datasource", Text.class, Text.class)
|
JavaPairRDD<String, TypedRow> datasources = sc.sequenceFile(inputPath + "/datasource", Text.class, Text.class)
|
||||||
.map(item -> new ObjectMapper().readValue(item._2().toString(), Datasource.class))
|
.map(item -> new ObjectMapper().readValue(item._2().toString(), Datasource.class))
|
||||||
|
@ -72,7 +75,7 @@ public class SparkCountryPropagationJob {
|
||||||
.map(item -> new ObjectMapper().readValue(item._2().toString(), OtherResearchProduct.class));
|
.map(item -> new ObjectMapper().readValue(item._2().toString(), OtherResearchProduct.class));
|
||||||
|
|
||||||
JavaPairRDD<String, TypedRow> datasource_results = publications
|
JavaPairRDD<String, TypedRow> datasource_results = publications
|
||||||
.map(oaf -> getTypedRows(oaf))
|
.map(oaf -> getTypedRowsDatasourceResult(oaf))
|
||||||
.flatMapToPair(f -> {
|
.flatMapToPair(f -> {
|
||||||
ArrayList<Tuple2<String, TypedRow>> ret = new ArrayList<>();
|
ArrayList<Tuple2<String, TypedRow>> ret = new ArrayList<>();
|
||||||
for (TypedRow t : f) {
|
for (TypedRow t : f) {
|
||||||
|
@ -81,7 +84,7 @@ public class SparkCountryPropagationJob {
|
||||||
return ret.iterator();
|
return ret.iterator();
|
||||||
})
|
})
|
||||||
.union(datasets
|
.union(datasets
|
||||||
.map(oaf -> getTypedRows(oaf))
|
.map(oaf -> getTypedRowsDatasourceResult(oaf))
|
||||||
.flatMapToPair(f -> {
|
.flatMapToPair(f -> {
|
||||||
ArrayList<Tuple2<String, TypedRow>> ret = new ArrayList<>();
|
ArrayList<Tuple2<String, TypedRow>> ret = new ArrayList<>();
|
||||||
for (TypedRow t : f) {
|
for (TypedRow t : f) {
|
||||||
|
@ -90,7 +93,7 @@ public class SparkCountryPropagationJob {
|
||||||
return ret.iterator();
|
return ret.iterator();
|
||||||
}))
|
}))
|
||||||
.union(software
|
.union(software
|
||||||
.map(oaf -> getTypedRows(oaf))
|
.map(oaf -> getTypedRowsDatasourceResult(oaf))
|
||||||
.flatMapToPair(f -> {
|
.flatMapToPair(f -> {
|
||||||
ArrayList<Tuple2<String, TypedRow>> ret = new ArrayList<>();
|
ArrayList<Tuple2<String, TypedRow>> ret = new ArrayList<>();
|
||||||
for (TypedRow t : f) {
|
for (TypedRow t : f) {
|
||||||
|
@ -99,7 +102,7 @@ public class SparkCountryPropagationJob {
|
||||||
return ret.iterator();
|
return ret.iterator();
|
||||||
}))
|
}))
|
||||||
.union(other
|
.union(other
|
||||||
.map(oaf -> getTypedRows(oaf))
|
.map(oaf -> getTypedRowsDatasourceResult(oaf))
|
||||||
.flatMapToPair(f -> {
|
.flatMapToPair(f -> {
|
||||||
ArrayList<Tuple2<String, TypedRow>> ret = new ArrayList<>();
|
ArrayList<Tuple2<String, TypedRow>> ret = new ArrayList<>();
|
||||||
for (TypedRow t : f) {
|
for (TypedRow t : f) {
|
||||||
|
@ -155,55 +158,53 @@ public class SparkCountryPropagationJob {
|
||||||
results.leftOuterJoin(toupdateresult)
|
results.leftOuterJoin(toupdateresult)
|
||||||
.map(c -> {
|
.map(c -> {
|
||||||
OafEntity oaf = c._2()._1();
|
OafEntity oaf = c._2()._1();
|
||||||
List<Qualifier> qualifierList = null;
|
List<Country> countryList = null;
|
||||||
if (oaf.getClass() == Publication.class) {
|
if (oaf.getClass() == Publication.class) {
|
||||||
qualifierList = ((Publication) oaf).getCountry();
|
countryList = ((Publication) oaf).getCountry();
|
||||||
|
|
||||||
}
|
}
|
||||||
if (oaf.getClass() == Dataset.class){
|
if (oaf.getClass() == Dataset.class){
|
||||||
qualifierList = ((Dataset) oaf).getCountry();
|
countryList = ((Dataset) oaf).getCountry();
|
||||||
}
|
}
|
||||||
|
|
||||||
if (oaf.getClass() == Software.class){
|
if (oaf.getClass() == Software.class){
|
||||||
qualifierList = ((Software) oaf).getCountry();
|
countryList = ((Software) oaf).getCountry();
|
||||||
}
|
}
|
||||||
|
|
||||||
if (oaf.getClass() == OtherResearchProduct.class){
|
if (oaf.getClass() == OtherResearchProduct.class){
|
||||||
qualifierList = ((OtherResearchProduct) oaf).getCountry();
|
countryList = ((OtherResearchProduct) oaf).getCountry();
|
||||||
}
|
}
|
||||||
|
|
||||||
if (c._2()._2().isPresent()) {
|
if (c._2()._2().isPresent()) {
|
||||||
HashSet<String> countries = new HashSet<>();
|
HashSet<String> countries = new HashSet<>();
|
||||||
for (Qualifier country : qualifierList) {
|
for (Qualifier country : countryList) {
|
||||||
countries.add(country.getClassid());
|
countries.add(country.getClassid());
|
||||||
}
|
}
|
||||||
TypedRow t = c._2()._2().get();
|
TypedRow t = c._2()._2().get();
|
||||||
|
|
||||||
for (String country : t.getCountry().split(";")) {
|
for (String country : t.getCountry().split(";")) {
|
||||||
if (!countries.contains(country)) {
|
if (!countries.contains(country)) {
|
||||||
Qualifier q = new Qualifier();
|
countryList.add(getCountry(country));
|
||||||
q.setClassid(country);
|
|
||||||
qualifierList.add(q);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
if (oaf.getClass() == Publication.class) {
|
if (oaf.getClass() == Publication.class) {
|
||||||
((Publication) oaf).setCountry(qualifierList);
|
((Publication) oaf).setCountry(countryList);
|
||||||
return (Publication) oaf;
|
return (Publication) oaf;
|
||||||
|
|
||||||
}
|
}
|
||||||
if (oaf.getClass() == Dataset.class){
|
if (oaf.getClass() == Dataset.class){
|
||||||
((Dataset) oaf).setCountry(qualifierList);
|
((Dataset) oaf).setCountry(countryList);
|
||||||
return (Dataset) oaf;
|
return (Dataset) oaf;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (oaf.getClass() == Software.class){
|
if (oaf.getClass() == Software.class){
|
||||||
((Software) oaf).setCountry(qualifierList);
|
((Software) oaf).setCountry(countryList);
|
||||||
return (Software) oaf;
|
return (Software) oaf;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (oaf.getClass() == OtherResearchProduct.class){
|
if (oaf.getClass() == OtherResearchProduct.class){
|
||||||
((OtherResearchProduct) oaf).setCountry(qualifierList);
|
((OtherResearchProduct) oaf).setCountry(countryList);
|
||||||
return (OtherResearchProduct) oaf;
|
return (OtherResearchProduct) oaf;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -215,40 +216,7 @@ public class SparkCountryPropagationJob {
|
||||||
.saveAsTextFile(outputPath+"/"+type);
|
.saveAsTextFile(outputPath+"/"+type);
|
||||||
}
|
}
|
||||||
|
|
||||||
private static List<TypedRow> getTypedRows(OafEntity oaf) {
|
|
||||||
List<TypedRow> lst = new ArrayList<>();
|
|
||||||
Set<String> datasources_provenance = new HashSet<>();
|
|
||||||
List<Instance> instanceList = null;
|
|
||||||
String type = "";
|
|
||||||
if (oaf.getClass() == Publication.class) {
|
|
||||||
instanceList = ((Publication) oaf).getInstance();
|
|
||||||
type = "publication";
|
|
||||||
}
|
|
||||||
if (oaf.getClass() == Dataset.class){
|
|
||||||
instanceList = ((Dataset)oaf).getInstance();
|
|
||||||
type = "dataset";
|
|
||||||
}
|
|
||||||
|
|
||||||
if (oaf.getClass() == Software.class){
|
|
||||||
instanceList = ((Software)oaf).getInstance();
|
|
||||||
type = "software";
|
|
||||||
}
|
|
||||||
|
|
||||||
if (oaf.getClass() == OtherResearchProduct.class){
|
|
||||||
instanceList = ((OtherResearchProduct)oaf).getInstance();
|
|
||||||
type = "otherresearchproduct";
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
for (Instance i : instanceList) {
|
|
||||||
datasources_provenance.add(i.getCollectedfrom().getKey());
|
|
||||||
datasources_provenance.add(i.getHostedby().getKey());
|
|
||||||
}
|
|
||||||
for (String dsId : datasources_provenance) {
|
|
||||||
lst.add(new TypedRow().setSourceId(dsId).setTargetId(oaf.getId()).setType(type));
|
|
||||||
}
|
|
||||||
return lst;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
private static JavaPairRDD<String, TypedRow> getResults(JavaSparkContext sc , String inputPath){
|
private static JavaPairRDD<String, TypedRow> getResults(JavaSparkContext sc , String inputPath){
|
||||||
|
@ -280,10 +248,6 @@ public class SparkCountryPropagationJob {
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
private static PairFunction<TypedRow, String, TypedRow> toPair() {
|
|
||||||
return e -> new Tuple2<>( e.getSourceId(), e);
|
|
||||||
|
|
||||||
};
|
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,11 +1,22 @@
|
||||||
package eu.dnetlib.dhp.resulttoorganizationfrominstrepo;
|
package eu.dnetlib.dhp.resulttoorganizationfrominstrepo;
|
||||||
|
|
||||||
|
import eu.dnetlib.dhp.TypedRow;
|
||||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||||
import org.apache.commons.io.IOUtils;
|
import org.apache.commons.io.IOUtils;
|
||||||
|
import org.apache.spark.api.java.JavaPairRDD;
|
||||||
|
import org.apache.spark.api.java.JavaRDD;
|
||||||
import org.apache.spark.api.java.JavaSparkContext;
|
import org.apache.spark.api.java.JavaSparkContext;
|
||||||
import org.apache.spark.sql.SparkSession;
|
import org.apache.spark.sql.SparkSession;
|
||||||
|
import org.apache.hadoop.io.Text;
|
||||||
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.*;
|
||||||
|
import scala.Tuple2;
|
||||||
|
|
||||||
|
import static eu.dnetlib.dhp.PropagationConstant.*;
|
||||||
|
|
||||||
public class SparkResultToOrganizationFromIstRepoJob {
|
public class SparkResultToOrganizationFromIstRepoJob {
|
||||||
public static void main(String[] args) throws Exception {
|
public static void main(String[] args) throws Exception {
|
||||||
|
@ -28,78 +39,28 @@ public class SparkResultToOrganizationFromIstRepoJob {
|
||||||
if (!directory.exists()) {
|
if (!directory.exists()) {
|
||||||
directory.mkdirs();
|
directory.mkdirs();
|
||||||
}
|
}
|
||||||
}
|
|
||||||
}
|
|
||||||
/*
|
|
||||||
package eu.dnetlib.dhp.countrypropagation;
|
|
||||||
|
|
||||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
|
||||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
|
||||||
import eu.dnetlib.dhp.schema.oaf.*;
|
|
||||||
import eu.dnetlib.dhp.schema.oaf.Dataset;
|
|
||||||
import org.apache.commons.io.IOUtils;
|
|
||||||
import org.apache.hadoop.io.Text;
|
|
||||||
import org.apache.spark.api.java.JavaPairRDD;
|
|
||||||
import org.apache.spark.api.java.JavaRDD;
|
|
||||||
import org.apache.spark.api.java.JavaSparkContext;
|
|
||||||
import org.apache.spark.api.java.Optional;
|
|
||||||
import org.apache.spark.api.java.function.Function2;
|
|
||||||
import org.apache.spark.api.java.function.PairFunction;
|
|
||||||
import org.apache.spark.rdd.RDD;
|
|
||||||
import org.apache.spark.sql.*;
|
|
||||||
import scala.Tuple2;
|
|
||||||
|
|
||||||
import java.io.File;
|
|
||||||
import java.util.*;
|
|
||||||
import java.util.stream.Stream;
|
|
||||||
|
|
||||||
public class SparkCountryPropagationJob {
|
|
||||||
public static void main(String[] args) throws Exception {
|
|
||||||
|
|
||||||
final ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString(SparkCountryPropagationJob.class.getResourceAsStream("/eu/dnetlib/dhp/countrypropagation/input_countrypropagation_parameters.json")));
|
|
||||||
parser.parseArgument(args);
|
|
||||||
final SparkSession spark = SparkSession
|
|
||||||
.builder()
|
|
||||||
.appName(SparkCountryPropagationJob.class.getSimpleName())
|
|
||||||
.master(parser.get("master"))
|
|
||||||
.enableHiveSupport()
|
|
||||||
.getOrCreate();
|
|
||||||
|
|
||||||
final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
|
|
||||||
final String inputPath = parser.get("sourcePath");
|
|
||||||
final String outputPath = "/tmp/provision/propagation/countrytoresultfrominstitutionalrepositories";
|
|
||||||
|
|
||||||
File directory = new File(outputPath);
|
|
||||||
|
|
||||||
if(!directory.exists()){
|
|
||||||
directory.mkdirs();
|
|
||||||
}
|
|
||||||
|
|
||||||
//TODO: add as Job Parameters
|
|
||||||
List<String> whitelist = Arrays.asList("10|opendoar____::300891a62162b960cf02ce3827bb363c");
|
|
||||||
List<String> allowedtypes = Arrays.asList("pubsrepository::institutional");
|
|
||||||
|
|
||||||
|
|
||||||
JavaPairRDD<String, TypedRow> organizations = sc.sequenceFile(inputPath + "/organization", Text.class, Text.class)
|
|
||||||
.map(item -> new ObjectMapper().readValue(item._2().toString(), Organization.class))
|
|
||||||
.filter(org -> !org.getDataInfo().getDeletedbyinference())
|
|
||||||
.map(org -> new TypedRow().setSourceId(org.getId()).setCountry(org.getCountry().getClassid()))
|
|
||||||
.mapToPair(toPair());
|
|
||||||
|
|
||||||
JavaPairRDD<String, TypedRow> organization_datasource = sc.sequenceFile(inputPath + "/relation", Text.class, Text.class)
|
|
||||||
.map(item -> new ObjectMapper().readValue(item._2().toString(), Relation.class))
|
|
||||||
.filter(r -> !r.getDataInfo().getDeletedbyinference())
|
|
||||||
.filter(r -> "datasourceOrganization".equals(r.getRelClass()) && "isProvidedBy".equals(r.getRelType()))
|
|
||||||
.map(r -> new TypedRow().setSourceId(r.getSource()).setTargetId(r.getTarget()))
|
|
||||||
.mapToPair(toPair());
|
|
||||||
|
|
||||||
|
//get the institutional repositories
|
||||||
JavaPairRDD<String, TypedRow> datasources = sc.sequenceFile(inputPath + "/datasource", Text.class, Text.class)
|
JavaPairRDD<String, TypedRow> datasources = sc.sequenceFile(inputPath + "/datasource", Text.class, Text.class)
|
||||||
.map(item -> new ObjectMapper().readValue(item._2().toString(), Datasource.class))
|
.map(item -> new ObjectMapper().readValue(item._2().toString(), Datasource.class))
|
||||||
.filter(ds -> whitelist.contains(ds.getId()) || allowedtypes.contains(ds.getDatasourcetype().getClassid()))
|
.filter(ds -> INSTITUTIONAL_REPO_TYPE.equals(ds.getDatasourcetype().getClassid()))
|
||||||
.map(ds -> new TypedRow().setSourceId(ds.getId()))
|
.map(ds -> new TypedRow().setSourceId(ds.getId()))
|
||||||
.mapToPair(toPair());
|
.mapToPair(toPair());
|
||||||
|
|
||||||
|
|
||||||
|
JavaPairRDD<String, TypedRow> rel_datasource_organization = sc.sequenceFile(inputPath + "/relation", Text.class, Text.class)
|
||||||
|
.map(item -> new ObjectMapper().readValue(item._2().toString(), Relation.class))
|
||||||
|
.filter(r -> !r.getDataInfo().getDeletedbyinference())
|
||||||
|
.filter(r -> RELATION_DATASOURCEORGANIZATION_REL_TYPE.equals(r.getRelClass()) && RELATION_DATASOURCE_ORGANIZATION_REL_CLASS.equals(r.getRelType()))
|
||||||
|
.map(r -> new TypedRow().setSourceId(r.getSource()).setTargetId(r.getTarget()))
|
||||||
|
.mapToPair(toPair());
|
||||||
|
|
||||||
|
JavaPairRDD<String, TypedRow> instdatasource_organization = datasources.join(rel_datasource_organization)
|
||||||
|
.map(x -> x._2()._2())
|
||||||
|
.mapToPair(toPair());
|
||||||
|
|
||||||
|
JavaRDD<Relation> relations = sc.sequenceFile(inputPath + "/relation", Text.class, Text.class)
|
||||||
|
.map(item -> new ObjectMapper().readValue(item._2().toString(), Relation.class));
|
||||||
JavaRDD<Publication> publications = sc.sequenceFile(inputPath + "/publication", Text.class, Text.class)
|
JavaRDD<Publication> publications = sc.sequenceFile(inputPath + "/publication", Text.class, Text.class)
|
||||||
.map(item -> new ObjectMapper().readValue(item._2().toString(), Publication.class));
|
.map(item -> new ObjectMapper().readValue(item._2().toString(), Publication.class));
|
||||||
JavaRDD<Dataset> datasets = sc.sequenceFile(inputPath + "/dataset", Text.class, Text.class)
|
JavaRDD<Dataset> datasets = sc.sequenceFile(inputPath + "/dataset", Text.class, Text.class)
|
||||||
|
@ -110,7 +71,7 @@ public class SparkCountryPropagationJob {
|
||||||
.map(item -> new ObjectMapper().readValue(item._2().toString(), OtherResearchProduct.class));
|
.map(item -> new ObjectMapper().readValue(item._2().toString(), OtherResearchProduct.class));
|
||||||
|
|
||||||
JavaPairRDD<String, TypedRow> datasource_results = publications
|
JavaPairRDD<String, TypedRow> datasource_results = publications
|
||||||
.map(oaf -> getTypedRows(oaf))
|
.map(oaf -> getTypedRowsDatasourceResult(oaf))
|
||||||
.flatMapToPair(f -> {
|
.flatMapToPair(f -> {
|
||||||
ArrayList<Tuple2<String, TypedRow>> ret = new ArrayList<>();
|
ArrayList<Tuple2<String, TypedRow>> ret = new ArrayList<>();
|
||||||
for (TypedRow t : f) {
|
for (TypedRow t : f) {
|
||||||
|
@ -119,7 +80,7 @@ public class SparkCountryPropagationJob {
|
||||||
return ret.iterator();
|
return ret.iterator();
|
||||||
})
|
})
|
||||||
.union(datasets
|
.union(datasets
|
||||||
.map(oaf -> getTypedRows(oaf))
|
.map(oaf -> getTypedRowsDatasourceResult(oaf))
|
||||||
.flatMapToPair(f -> {
|
.flatMapToPair(f -> {
|
||||||
ArrayList<Tuple2<String, TypedRow>> ret = new ArrayList<>();
|
ArrayList<Tuple2<String, TypedRow>> ret = new ArrayList<>();
|
||||||
for (TypedRow t : f) {
|
for (TypedRow t : f) {
|
||||||
|
@ -128,7 +89,7 @@ public class SparkCountryPropagationJob {
|
||||||
return ret.iterator();
|
return ret.iterator();
|
||||||
}))
|
}))
|
||||||
.union(software
|
.union(software
|
||||||
.map(oaf -> getTypedRows(oaf))
|
.map(oaf -> getTypedRowsDatasourceResult(oaf))
|
||||||
.flatMapToPair(f -> {
|
.flatMapToPair(f -> {
|
||||||
ArrayList<Tuple2<String, TypedRow>> ret = new ArrayList<>();
|
ArrayList<Tuple2<String, TypedRow>> ret = new ArrayList<>();
|
||||||
for (TypedRow t : f) {
|
for (TypedRow t : f) {
|
||||||
|
@ -137,7 +98,7 @@ public class SparkCountryPropagationJob {
|
||||||
return ret.iterator();
|
return ret.iterator();
|
||||||
}))
|
}))
|
||||||
.union(other
|
.union(other
|
||||||
.map(oaf -> getTypedRows(oaf))
|
.map(oaf -> getTypedRowsDatasourceResult(oaf))
|
||||||
.flatMapToPair(f -> {
|
.flatMapToPair(f -> {
|
||||||
ArrayList<Tuple2<String, TypedRow>> ret = new ArrayList<>();
|
ArrayList<Tuple2<String, TypedRow>> ret = new ArrayList<>();
|
||||||
for (TypedRow t : f) {
|
for (TypedRow t : f) {
|
||||||
|
@ -146,185 +107,23 @@ public class SparkCountryPropagationJob {
|
||||||
return ret.iterator();
|
return ret.iterator();
|
||||||
}));
|
}));
|
||||||
|
|
||||||
JavaPairRDD<String, OafEntity> pubs = publications.mapToPair(p -> new Tuple2<>(p.getId(),p));
|
JavaRDD<Relation> newRels = instdatasource_organization.join(datasource_results)
|
||||||
JavaPairRDD<String, OafEntity> dss = datasets.mapToPair(p -> new Tuple2<>(p.getId(),p));
|
.flatMap(c -> {
|
||||||
JavaPairRDD<String, OafEntity> sfw = software.mapToPair(p -> new Tuple2<>(p.getId(),p));
|
List<Relation> rels = new ArrayList();
|
||||||
JavaPairRDD<String, OafEntity> orp = other.mapToPair(p -> new Tuple2<>(p.getId(),p));
|
rels.add(getRelation(c._2()._1().getTargetId(), c._2()._2().getTargetId(), RELATION_ORGANIZATION_RESULT_REL_CLASS,
|
||||||
|
RELATION_RESULTORGANIZATION_REL_TYPE, RELATION_RESULTORGANIZATION_SUBREL_TYPE, PROPAGATION_DATA_INFO_TYPE,
|
||||||
JavaPairRDD<String, TypedRow> datasource_country = organizations.join(organization_datasource)
|
PROPAGATION_RELATION_RESULT_ORGANIZATION_INST_REPO_CLASS_ID, PROPAGATION_RELATION_RESULT_ORGANIZATION_INST_REPO_CLASS_NAME));
|
||||||
.map(x -> x._2()._1().setSourceId(x._2()._2().getTargetId())) // (OrganizationId,(TypedRow for Organization, TypedRow for Relation)
|
rels.add(getRelation(c._2()._2().getTargetId(), c._2()._1().getTargetId(), RELATION_ORGANIZATION_RESULT_REL_CLASS,
|
||||||
.mapToPair(toPair()); //(DatasourceId, TypedRowforOrganziation)
|
RELATION_RESULTORGANIZATION_REL_TYPE, RELATION_RESULTORGANIZATION_SUBREL_TYPE, PROPAGATION_DATA_INFO_TYPE,
|
||||||
|
PROPAGATION_RELATION_RESULT_ORGANIZATION_INST_REPO_CLASS_ID, PROPAGATION_RELATION_RESULT_ORGANIZATION_INST_REPO_CLASS_NAME));
|
||||||
|
return rels.iterator();
|
||||||
JavaPairRDD<String, TypedRow> alloweddatasources_country = datasources.join(datasource_country)
|
|
||||||
.mapToPair(ds -> new Tuple2<>(ds._1(), ds._2()._2()));
|
|
||||||
|
|
||||||
|
|
||||||
JavaPairRDD<String,TypedRow> toupdateresult = alloweddatasources_country.join(datasource_results)
|
|
||||||
.map(u -> u._2()._2().setCountry(u._2()._1().getCountry()))
|
|
||||||
.mapToPair(toPair())
|
|
||||||
.reduceByKey((a, p) -> {
|
|
||||||
if (a == null) {
|
|
||||||
return p;
|
|
||||||
}
|
|
||||||
if (p == null) {
|
|
||||||
return a;
|
|
||||||
}
|
|
||||||
HashSet<String> countries = new HashSet();
|
|
||||||
countries.addAll(Arrays.asList(a.getCountry().split(";")));
|
|
||||||
countries.addAll(Arrays.asList(p.getCountry().split(";")));
|
|
||||||
String country = new String();
|
|
||||||
for (String c : countries) {
|
|
||||||
country += c + ";";
|
|
||||||
}
|
|
||||||
|
|
||||||
return a.setCountry(country);
|
|
||||||
});
|
});
|
||||||
|
newRels.map(p -> new ObjectMapper().writeValueAsString(p))
|
||||||
|
.saveAsTextFile(outputPath + "/relation_new");
|
||||||
|
|
||||||
updateResult(pubs, toupdateresult, outputPath, "publication");
|
newRels.union(relations).map(p -> new ObjectMapper().writeValueAsString(p))
|
||||||
updateResult(dss, toupdateresult, outputPath, "dataset");
|
.saveAsTextFile(outputPath + "/relation");
|
||||||
updateResult(sfw, toupdateresult, outputPath, "software");
|
|
||||||
updateResult(orp, toupdateresult, outputPath, "otherresearchproduct");
|
|
||||||
//we use leftOuterJoin because we want to rebuild the entire structure
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private static void updateResult(JavaPairRDD<String, OafEntity> results, JavaPairRDD<String, TypedRow> toupdateresult, String outputPath, String type) {
|
|
||||||
results.leftOuterJoin(toupdateresult)
|
|
||||||
.map(c -> {
|
|
||||||
OafEntity oaf = c._2()._1();
|
|
||||||
List<Qualifier> qualifierList = null;
|
|
||||||
if (oaf.getClass() == Publication.class) {
|
|
||||||
qualifierList = ((Publication) oaf).getCountry();
|
|
||||||
|
|
||||||
}
|
|
||||||
if (oaf.getClass() == Dataset.class){
|
|
||||||
qualifierList = ((Dataset) oaf).getCountry();
|
|
||||||
}
|
|
||||||
|
|
||||||
if (oaf.getClass() == Software.class){
|
|
||||||
qualifierList = ((Software) oaf).getCountry();
|
|
||||||
}
|
|
||||||
|
|
||||||
if (oaf.getClass() == OtherResearchProduct.class){
|
|
||||||
qualifierList = ((OtherResearchProduct) oaf).getCountry();
|
|
||||||
}
|
|
||||||
|
|
||||||
if (c._2()._2().isPresent()) {
|
|
||||||
HashSet<String> countries = new HashSet<>();
|
|
||||||
for (Qualifier country : qualifierList) {
|
|
||||||
countries.add(country.getClassid());
|
|
||||||
}
|
|
||||||
TypedRow t = c._2()._2().get();
|
|
||||||
|
|
||||||
for (String country : t.getCountry().split(";")) {
|
|
||||||
if (!countries.contains(country)) {
|
|
||||||
Qualifier q = new Qualifier();
|
|
||||||
q.setClassid(country);
|
|
||||||
qualifierList.add(q);
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
if (oaf.getClass() == Publication.class) {
|
|
||||||
((Publication) oaf).setCountry(qualifierList);
|
|
||||||
return (Publication) oaf;
|
|
||||||
|
|
||||||
}
|
|
||||||
if (oaf.getClass() == Dataset.class){
|
|
||||||
((Dataset) oaf).setCountry(qualifierList);
|
|
||||||
return (Dataset) oaf;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (oaf.getClass() == Software.class){
|
|
||||||
((Software) oaf).setCountry(qualifierList);
|
|
||||||
return (Software) oaf;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (oaf.getClass() == OtherResearchProduct.class){
|
|
||||||
((OtherResearchProduct) oaf).setCountry(qualifierList);
|
|
||||||
return (OtherResearchProduct) oaf;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
return null;
|
|
||||||
})
|
|
||||||
.map(p -> new ObjectMapper().writeValueAsString(p))
|
|
||||||
.saveAsTextFile(outputPath+"/"+type);
|
|
||||||
}
|
|
||||||
|
|
||||||
private static List<TypedRow> getTypedRows(OafEntity oaf) {
|
|
||||||
List<TypedRow> lst = new ArrayList<>();
|
|
||||||
Set<String> datasources_provenance = new HashSet<>();
|
|
||||||
List<Instance> instanceList = null;
|
|
||||||
String type = "";
|
|
||||||
if (oaf.getClass() == Publication.class) {
|
|
||||||
instanceList = ((Publication) oaf).getInstance();
|
|
||||||
type = "publication";
|
|
||||||
}
|
|
||||||
if (oaf.getClass() == Dataset.class){
|
|
||||||
instanceList = ((Dataset)oaf).getInstance();
|
|
||||||
type = "dataset";
|
|
||||||
}
|
|
||||||
|
|
||||||
if (oaf.getClass() == Software.class){
|
|
||||||
instanceList = ((Software)oaf).getInstance();
|
|
||||||
type = "software";
|
|
||||||
}
|
|
||||||
|
|
||||||
if (oaf.getClass() == OtherResearchProduct.class){
|
|
||||||
instanceList = ((OtherResearchProduct)oaf).getInstance();
|
|
||||||
type = "otherresearchproduct";
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
for (Instance i : instanceList) {
|
|
||||||
datasources_provenance.add(i.getCollectedfrom().getKey());
|
|
||||||
datasources_provenance.add(i.getHostedby().getKey());
|
|
||||||
}
|
|
||||||
for (String dsId : datasources_provenance) {
|
|
||||||
lst.add(new TypedRow().setSourceId(dsId).setTargetId(oaf.getId()).setType(type));
|
|
||||||
}
|
|
||||||
return lst;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
private static JavaPairRDD<String, TypedRow> getResults(JavaSparkContext sc , String inputPath){
|
|
||||||
|
|
||||||
return
|
|
||||||
sc.sequenceFile(inputPath + "/dataset", Text.class, Text.class)
|
|
||||||
.map(item -> new ObjectMapper().readValue(item._2().toString(), Dataset.class))
|
|
||||||
.filter(ds -> !ds.getDataInfo().getDeletedbyinference())
|
|
||||||
.map(oaf -> new TypedRow().setType("dataset").setSourceId(oaf.getId()))
|
|
||||||
.mapToPair(toPair())
|
|
||||||
.union(sc.sequenceFile(inputPath + "/otherresearchproduct", Text.class, Text.class)
|
|
||||||
.map(item -> new ObjectMapper().readValue(item._2().toString(), OtherResearchProduct.class))
|
|
||||||
.filter(o -> !o.getDataInfo().getDeletedbyinference())
|
|
||||||
.map(oaf -> new TypedRow().setType("otherresearchproduct").setSourceId(oaf.getId()))
|
|
||||||
.mapToPair(toPair()))
|
|
||||||
.union(sc.sequenceFile(inputPath + "/software", Text.class, Text.class)
|
|
||||||
.map(item -> new ObjectMapper().readValue(item._2().toString(), Software.class))
|
|
||||||
.filter(s -> !s.getDataInfo().getDeletedbyinference())
|
|
||||||
.map(oaf -> new TypedRow().setType("software").setSourceId(oaf.getId()))
|
|
||||||
.mapToPair(toPair()))
|
|
||||||
.union(sc.sequenceFile(inputPath + "/publication", Text.class, Text.class)
|
|
||||||
.map(item -> new ObjectMapper().readValue(item._2().toString(), Publication.class))
|
|
||||||
.filter(p -> !p.getDataInfo().getDeletedbyinference())
|
|
||||||
.map(oaf -> new TypedRow().setType("publication").setSourceId(oaf.getId()))
|
|
||||||
.mapToPair(toPair()));
|
|
||||||
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
private static PairFunction<TypedRow, String, TypedRow> toPair() {
|
|
||||||
return e -> new Tuple2<>( e.getSourceId(), e);
|
|
||||||
|
|
||||||
};
|
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
*/
|
|
Loading…
Reference in New Issue