diff --git a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/countrypropagation/SparkCountryPropagationJob.java b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/countrypropagation/SparkCountryPropagationJob.java index 073c80d1b0..3a6e7e8e20 100644 --- a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/countrypropagation/SparkCountryPropagationJob.java +++ b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/countrypropagation/SparkCountryPropagationJob.java @@ -1,6 +1,7 @@ package eu.dnetlib.dhp.countrypropagation; import com.fasterxml.jackson.databind.ObjectMapper; +import eu.dnetlib.dhp.TypedRow; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.schema.oaf.*; import eu.dnetlib.dhp.schema.oaf.Dataset; @@ -9,13 +10,14 @@ import org.apache.hadoop.io.Text; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.sql.*; import scala.Tuple2; import java.io.File; import java.util.*; +import static eu.dnetlib.dhp.PropagationConstant.*; + public class SparkCountryPropagationJob { public static void main(String[] args) throws Exception { @@ -51,9 +53,10 @@ public class SparkCountryPropagationJob { JavaPairRDD organization_datasource = sc.sequenceFile(inputPath + "/relation", Text.class, Text.class) .map(item -> new ObjectMapper().readValue(item._2().toString(), Relation.class)) .filter(r -> !r.getDataInfo().getDeletedbyinference()) - .filter(r -> "datasourceOrganization".equals(r.getRelClass()) && "isProvidedBy".equals(r.getRelType())) + .filter(r -> RELATION_DATASOURCEORGANIZATION_REL_TYPE.equals(r.getRelClass()) && RELATION_ORGANIZATION_DATASOURCE_REL_CLASS.equals(r.getRelType())) .map(r -> new TypedRow().setSourceId(r.getSource()).setTargetId(r.getTarget())) - .mapToPair(toPair()); + .mapToPair(toPair()); //id is the organization identifier + JavaPairRDD datasources = sc.sequenceFile(inputPath + "/datasource", Text.class, Text.class) .map(item -> new ObjectMapper().readValue(item._2().toString(), Datasource.class)) @@ -72,7 +75,7 @@ public class SparkCountryPropagationJob { .map(item -> new ObjectMapper().readValue(item._2().toString(), OtherResearchProduct.class)); JavaPairRDD datasource_results = publications - .map(oaf -> getTypedRows(oaf)) + .map(oaf -> getTypedRowsDatasourceResult(oaf)) .flatMapToPair(f -> { ArrayList> ret = new ArrayList<>(); for (TypedRow t : f) { @@ -81,7 +84,7 @@ public class SparkCountryPropagationJob { return ret.iterator(); }) .union(datasets - .map(oaf -> getTypedRows(oaf)) + .map(oaf -> getTypedRowsDatasourceResult(oaf)) .flatMapToPair(f -> { ArrayList> ret = new ArrayList<>(); for (TypedRow t : f) { @@ -90,7 +93,7 @@ public class SparkCountryPropagationJob { return ret.iterator(); })) .union(software - .map(oaf -> getTypedRows(oaf)) + .map(oaf -> getTypedRowsDatasourceResult(oaf)) .flatMapToPair(f -> { ArrayList> ret = new ArrayList<>(); for (TypedRow t : f) { @@ -99,7 +102,7 @@ public class SparkCountryPropagationJob { return ret.iterator(); })) .union(other - .map(oaf -> getTypedRows(oaf)) + .map(oaf -> getTypedRowsDatasourceResult(oaf)) .flatMapToPair(f -> { ArrayList> ret = new ArrayList<>(); for (TypedRow t : f) { @@ -155,55 +158,53 @@ public class SparkCountryPropagationJob { results.leftOuterJoin(toupdateresult) .map(c -> { OafEntity oaf = c._2()._1(); - List qualifierList = null; + List countryList = null; if (oaf.getClass() == Publication.class) { - qualifierList = ((Publication) oaf).getCountry(); + countryList = ((Publication) oaf).getCountry(); } if (oaf.getClass() == Dataset.class){ - qualifierList = ((Dataset) oaf).getCountry(); + countryList = ((Dataset) oaf).getCountry(); } if (oaf.getClass() == Software.class){ - qualifierList = ((Software) oaf).getCountry(); + countryList = ((Software) oaf).getCountry(); } if (oaf.getClass() == OtherResearchProduct.class){ - qualifierList = ((OtherResearchProduct) oaf).getCountry(); + countryList = ((OtherResearchProduct) oaf).getCountry(); } if (c._2()._2().isPresent()) { HashSet countries = new HashSet<>(); - for (Qualifier country : qualifierList) { + for (Qualifier country : countryList) { countries.add(country.getClassid()); } TypedRow t = c._2()._2().get(); for (String country : t.getCountry().split(";")) { if (!countries.contains(country)) { - Qualifier q = new Qualifier(); - q.setClassid(country); - qualifierList.add(q); + countryList.add(getCountry(country)); } } if (oaf.getClass() == Publication.class) { - ((Publication) oaf).setCountry(qualifierList); + ((Publication) oaf).setCountry(countryList); return (Publication) oaf; } if (oaf.getClass() == Dataset.class){ - ((Dataset) oaf).setCountry(qualifierList); + ((Dataset) oaf).setCountry(countryList); return (Dataset) oaf; } if (oaf.getClass() == Software.class){ - ((Software) oaf).setCountry(qualifierList); + ((Software) oaf).setCountry(countryList); return (Software) oaf; } if (oaf.getClass() == OtherResearchProduct.class){ - ((OtherResearchProduct) oaf).setCountry(qualifierList); + ((OtherResearchProduct) oaf).setCountry(countryList); return (OtherResearchProduct) oaf; } } @@ -215,40 +216,7 @@ public class SparkCountryPropagationJob { .saveAsTextFile(outputPath+"/"+type); } - private static List getTypedRows(OafEntity oaf) { - List lst = new ArrayList<>(); - Set datasources_provenance = new HashSet<>(); - List instanceList = null; - String type = ""; - if (oaf.getClass() == Publication.class) { - instanceList = ((Publication) oaf).getInstance(); - type = "publication"; - } - if (oaf.getClass() == Dataset.class){ - instanceList = ((Dataset)oaf).getInstance(); - type = "dataset"; - } - if (oaf.getClass() == Software.class){ - instanceList = ((Software)oaf).getInstance(); - type = "software"; - } - - if (oaf.getClass() == OtherResearchProduct.class){ - instanceList = ((OtherResearchProduct)oaf).getInstance(); - type = "otherresearchproduct"; - } - - - for (Instance i : instanceList) { - datasources_provenance.add(i.getCollectedfrom().getKey()); - datasources_provenance.add(i.getHostedby().getKey()); - } - for (String dsId : datasources_provenance) { - lst.add(new TypedRow().setSourceId(dsId).setTargetId(oaf.getId()).setType(type)); - } - return lst; - } private static JavaPairRDD getResults(JavaSparkContext sc , String inputPath){ @@ -280,10 +248,6 @@ public class SparkCountryPropagationJob { - private static PairFunction toPair() { - return e -> new Tuple2<>( e.getSourceId(), e); - - }; } diff --git a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/resulttoorganizationfrominstrepo/SparkResultToOrganizationFromIstRepoJob.java b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/resulttoorganizationfrominstrepo/SparkResultToOrganizationFromIstRepoJob.java index 5b9504fec0..c8dd27dd1c 100644 --- a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/resulttoorganizationfrominstrepo/SparkResultToOrganizationFromIstRepoJob.java +++ b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/resulttoorganizationfrominstrepo/SparkResultToOrganizationFromIstRepoJob.java @@ -1,11 +1,22 @@ package eu.dnetlib.dhp.resulttoorganizationfrominstrepo; +import eu.dnetlib.dhp.TypedRow; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import org.apache.commons.io.IOUtils; +import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.SparkSession; - +import org.apache.hadoop.io.Text; +import com.fasterxml.jackson.databind.ObjectMapper; import java.io.File; +import java.util.ArrayList; +import java.util.List; + +import eu.dnetlib.dhp.schema.oaf.*; +import scala.Tuple2; + +import static eu.dnetlib.dhp.PropagationConstant.*; public class SparkResultToOrganizationFromIstRepoJob { public static void main(String[] args) throws Exception { @@ -28,78 +39,28 @@ public class SparkResultToOrganizationFromIstRepoJob { if (!directory.exists()) { directory.mkdirs(); } - } - } -/* -package eu.dnetlib.dhp.countrypropagation; - -import com.fasterxml.jackson.databind.ObjectMapper; -import eu.dnetlib.dhp.application.ArgumentApplicationParser; -import eu.dnetlib.dhp.schema.oaf.*; -import eu.dnetlib.dhp.schema.oaf.Dataset; -import org.apache.commons.io.IOUtils; -import org.apache.hadoop.io.Text; -import org.apache.spark.api.java.JavaPairRDD; -import org.apache.spark.api.java.JavaRDD; -import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.api.java.Optional; -import org.apache.spark.api.java.function.Function2; -import org.apache.spark.api.java.function.PairFunction; -import org.apache.spark.rdd.RDD; -import org.apache.spark.sql.*; -import scala.Tuple2; - -import java.io.File; -import java.util.*; -import java.util.stream.Stream; - -public class SparkCountryPropagationJob { - public static void main(String[] args) throws Exception { - - final ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString(SparkCountryPropagationJob.class.getResourceAsStream("/eu/dnetlib/dhp/countrypropagation/input_countrypropagation_parameters.json"))); - parser.parseArgument(args); - final SparkSession spark = SparkSession - .builder() - .appName(SparkCountryPropagationJob.class.getSimpleName()) - .master(parser.get("master")) - .enableHiveSupport() - .getOrCreate(); - - final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); - final String inputPath = parser.get("sourcePath"); - final String outputPath = "/tmp/provision/propagation/countrytoresultfrominstitutionalrepositories"; - - File directory = new File(outputPath); - - if(!directory.exists()){ - directory.mkdirs(); - } - - //TODO: add as Job Parameters - List whitelist = Arrays.asList("10|opendoar____::300891a62162b960cf02ce3827bb363c"); - List allowedtypes = Arrays.asList("pubsrepository::institutional"); - - - JavaPairRDD organizations = sc.sequenceFile(inputPath + "/organization", Text.class, Text.class) - .map(item -> new ObjectMapper().readValue(item._2().toString(), Organization.class)) - .filter(org -> !org.getDataInfo().getDeletedbyinference()) - .map(org -> new TypedRow().setSourceId(org.getId()).setCountry(org.getCountry().getClassid())) - .mapToPair(toPair()); - - JavaPairRDD organization_datasource = sc.sequenceFile(inputPath + "/relation", Text.class, Text.class) - .map(item -> new ObjectMapper().readValue(item._2().toString(), Relation.class)) - .filter(r -> !r.getDataInfo().getDeletedbyinference()) - .filter(r -> "datasourceOrganization".equals(r.getRelClass()) && "isProvidedBy".equals(r.getRelType())) - .map(r -> new TypedRow().setSourceId(r.getSource()).setTargetId(r.getTarget())) - .mapToPair(toPair()); + //get the institutional repositories JavaPairRDD datasources = sc.sequenceFile(inputPath + "/datasource", Text.class, Text.class) .map(item -> new ObjectMapper().readValue(item._2().toString(), Datasource.class)) - .filter(ds -> whitelist.contains(ds.getId()) || allowedtypes.contains(ds.getDatasourcetype().getClassid())) + .filter(ds -> INSTITUTIONAL_REPO_TYPE.equals(ds.getDatasourcetype().getClassid())) .map(ds -> new TypedRow().setSourceId(ds.getId())) .mapToPair(toPair()); + JavaPairRDD rel_datasource_organization = sc.sequenceFile(inputPath + "/relation", Text.class, Text.class) + .map(item -> new ObjectMapper().readValue(item._2().toString(), Relation.class)) + .filter(r -> !r.getDataInfo().getDeletedbyinference()) + .filter(r -> RELATION_DATASOURCEORGANIZATION_REL_TYPE.equals(r.getRelClass()) && RELATION_DATASOURCE_ORGANIZATION_REL_CLASS.equals(r.getRelType())) + .map(r -> new TypedRow().setSourceId(r.getSource()).setTargetId(r.getTarget())) + .mapToPair(toPair()); + + JavaPairRDD instdatasource_organization = datasources.join(rel_datasource_organization) + .map(x -> x._2()._2()) + .mapToPair(toPair()); + + JavaRDD relations = sc.sequenceFile(inputPath + "/relation", Text.class, Text.class) + .map(item -> new ObjectMapper().readValue(item._2().toString(), Relation.class)); JavaRDD publications = sc.sequenceFile(inputPath + "/publication", Text.class, Text.class) .map(item -> new ObjectMapper().readValue(item._2().toString(), Publication.class)); JavaRDD datasets = sc.sequenceFile(inputPath + "/dataset", Text.class, Text.class) @@ -110,7 +71,7 @@ public class SparkCountryPropagationJob { .map(item -> new ObjectMapper().readValue(item._2().toString(), OtherResearchProduct.class)); JavaPairRDD datasource_results = publications - .map(oaf -> getTypedRows(oaf)) + .map(oaf -> getTypedRowsDatasourceResult(oaf)) .flatMapToPair(f -> { ArrayList> ret = new ArrayList<>(); for (TypedRow t : f) { @@ -119,7 +80,7 @@ public class SparkCountryPropagationJob { return ret.iterator(); }) .union(datasets - .map(oaf -> getTypedRows(oaf)) + .map(oaf -> getTypedRowsDatasourceResult(oaf)) .flatMapToPair(f -> { ArrayList> ret = new ArrayList<>(); for (TypedRow t : f) { @@ -128,16 +89,16 @@ public class SparkCountryPropagationJob { return ret.iterator(); })) .union(software - .map(oaf -> getTypedRows(oaf)) - .flatMapToPair(f -> { - ArrayList> ret = new ArrayList<>(); - for (TypedRow t : f) { - ret.add(new Tuple2<>(t.getSourceId(), t)); - } - return ret.iterator(); - })) + .map(oaf -> getTypedRowsDatasourceResult(oaf)) + .flatMapToPair(f -> { + ArrayList> ret = new ArrayList<>(); + for (TypedRow t : f) { + ret.add(new Tuple2<>(t.getSourceId(), t)); + } + return ret.iterator(); + })) .union(other - .map(oaf -> getTypedRows(oaf)) + .map(oaf -> getTypedRowsDatasourceResult(oaf)) .flatMapToPair(f -> { ArrayList> ret = new ArrayList<>(); for (TypedRow t : f) { @@ -146,185 +107,23 @@ public class SparkCountryPropagationJob { return ret.iterator(); })); - JavaPairRDD pubs = publications.mapToPair(p -> new Tuple2<>(p.getId(),p)); - JavaPairRDD dss = datasets.mapToPair(p -> new Tuple2<>(p.getId(),p)); - JavaPairRDD sfw = software.mapToPair(p -> new Tuple2<>(p.getId(),p)); - JavaPairRDD orp = other.mapToPair(p -> new Tuple2<>(p.getId(),p)); - - JavaPairRDD datasource_country = organizations.join(organization_datasource) - .map(x -> x._2()._1().setSourceId(x._2()._2().getTargetId())) // (OrganizationId,(TypedRow for Organization, TypedRow for Relation) - .mapToPair(toPair()); //(DatasourceId, TypedRowforOrganziation) - - - JavaPairRDD alloweddatasources_country = datasources.join(datasource_country) - .mapToPair(ds -> new Tuple2<>(ds._1(), ds._2()._2())); - - - JavaPairRDD toupdateresult = alloweddatasources_country.join(datasource_results) - .map(u -> u._2()._2().setCountry(u._2()._1().getCountry())) - .mapToPair(toPair()) - .reduceByKey((a, p) -> { - if (a == null) { - return p; - } - if (p == null) { - return a; - } - HashSet countries = new HashSet(); - countries.addAll(Arrays.asList(a.getCountry().split(";"))); - countries.addAll(Arrays.asList(p.getCountry().split(";"))); - String country = new String(); - for (String c : countries) { - country += c + ";"; - } - - return a.setCountry(country); + JavaRDD newRels = instdatasource_organization.join(datasource_results) + .flatMap(c -> { + List rels = new ArrayList(); + rels.add(getRelation(c._2()._1().getTargetId(), c._2()._2().getTargetId(), RELATION_ORGANIZATION_RESULT_REL_CLASS, + RELATION_RESULTORGANIZATION_REL_TYPE, RELATION_RESULTORGANIZATION_SUBREL_TYPE, PROPAGATION_DATA_INFO_TYPE, + PROPAGATION_RELATION_RESULT_ORGANIZATION_INST_REPO_CLASS_ID, PROPAGATION_RELATION_RESULT_ORGANIZATION_INST_REPO_CLASS_NAME)); + rels.add(getRelation(c._2()._2().getTargetId(), c._2()._1().getTargetId(), RELATION_ORGANIZATION_RESULT_REL_CLASS, + RELATION_RESULTORGANIZATION_REL_TYPE, RELATION_RESULTORGANIZATION_SUBREL_TYPE, PROPAGATION_DATA_INFO_TYPE, + PROPAGATION_RELATION_RESULT_ORGANIZATION_INST_REPO_CLASS_ID, PROPAGATION_RELATION_RESULT_ORGANIZATION_INST_REPO_CLASS_NAME)); + return rels.iterator(); }); + newRels.map(p -> new ObjectMapper().writeValueAsString(p)) + .saveAsTextFile(outputPath + "/relation_new"); - updateResult(pubs, toupdateresult, outputPath, "publication"); - updateResult(dss, toupdateresult, outputPath, "dataset"); - updateResult(sfw, toupdateresult, outputPath, "software"); - updateResult(orp, toupdateresult, outputPath, "otherresearchproduct"); - //we use leftOuterJoin because we want to rebuild the entire structure - + newRels.union(relations).map(p -> new ObjectMapper().writeValueAsString(p)) + .saveAsTextFile(outputPath + "/relation"); } - private static void updateResult(JavaPairRDD results, JavaPairRDD toupdateresult, String outputPath, String type) { - results.leftOuterJoin(toupdateresult) - .map(c -> { - OafEntity oaf = c._2()._1(); - List qualifierList = null; - if (oaf.getClass() == Publication.class) { - qualifierList = ((Publication) oaf).getCountry(); - - } - if (oaf.getClass() == Dataset.class){ - qualifierList = ((Dataset) oaf).getCountry(); - } - - if (oaf.getClass() == Software.class){ - qualifierList = ((Software) oaf).getCountry(); - } - - if (oaf.getClass() == OtherResearchProduct.class){ - qualifierList = ((OtherResearchProduct) oaf).getCountry(); - } - - if (c._2()._2().isPresent()) { - HashSet countries = new HashSet<>(); - for (Qualifier country : qualifierList) { - countries.add(country.getClassid()); - } - TypedRow t = c._2()._2().get(); - - for (String country : t.getCountry().split(";")) { - if (!countries.contains(country)) { - Qualifier q = new Qualifier(); - q.setClassid(country); - qualifierList.add(q); - } - - } - if (oaf.getClass() == Publication.class) { - ((Publication) oaf).setCountry(qualifierList); - return (Publication) oaf; - - } - if (oaf.getClass() == Dataset.class){ - ((Dataset) oaf).setCountry(qualifierList); - return (Dataset) oaf; - } - - if (oaf.getClass() == Software.class){ - ((Software) oaf).setCountry(qualifierList); - return (Software) oaf; - } - - if (oaf.getClass() == OtherResearchProduct.class){ - ((OtherResearchProduct) oaf).setCountry(qualifierList); - return (OtherResearchProduct) oaf; - } - } - - - return null; - }) - .map(p -> new ObjectMapper().writeValueAsString(p)) - .saveAsTextFile(outputPath+"/"+type); - } - - private static List getTypedRows(OafEntity oaf) { - List lst = new ArrayList<>(); - Set datasources_provenance = new HashSet<>(); - List instanceList = null; - String type = ""; - if (oaf.getClass() == Publication.class) { - instanceList = ((Publication) oaf).getInstance(); - type = "publication"; - } - if (oaf.getClass() == Dataset.class){ - instanceList = ((Dataset)oaf).getInstance(); - type = "dataset"; - } - - if (oaf.getClass() == Software.class){ - instanceList = ((Software)oaf).getInstance(); - type = "software"; - } - - if (oaf.getClass() == OtherResearchProduct.class){ - instanceList = ((OtherResearchProduct)oaf).getInstance(); - type = "otherresearchproduct"; - } - - - for (Instance i : instanceList) { - datasources_provenance.add(i.getCollectedfrom().getKey()); - datasources_provenance.add(i.getHostedby().getKey()); - } - for (String dsId : datasources_provenance) { - lst.add(new TypedRow().setSourceId(dsId).setTargetId(oaf.getId()).setType(type)); - } - return lst; - } - - - private static JavaPairRDD getResults(JavaSparkContext sc , String inputPath){ - - return - sc.sequenceFile(inputPath + "/dataset", Text.class, Text.class) - .map(item -> new ObjectMapper().readValue(item._2().toString(), Dataset.class)) - .filter(ds -> !ds.getDataInfo().getDeletedbyinference()) - .map(oaf -> new TypedRow().setType("dataset").setSourceId(oaf.getId())) - .mapToPair(toPair()) - .union(sc.sequenceFile(inputPath + "/otherresearchproduct", Text.class, Text.class) - .map(item -> new ObjectMapper().readValue(item._2().toString(), OtherResearchProduct.class)) - .filter(o -> !o.getDataInfo().getDeletedbyinference()) - .map(oaf -> new TypedRow().setType("otherresearchproduct").setSourceId(oaf.getId())) - .mapToPair(toPair())) - .union(sc.sequenceFile(inputPath + "/software", Text.class, Text.class) - .map(item -> new ObjectMapper().readValue(item._2().toString(), Software.class)) - .filter(s -> !s.getDataInfo().getDeletedbyinference()) - .map(oaf -> new TypedRow().setType("software").setSourceId(oaf.getId())) - .mapToPair(toPair())) - .union(sc.sequenceFile(inputPath + "/publication", Text.class, Text.class) - .map(item -> new ObjectMapper().readValue(item._2().toString(), Publication.class)) - .filter(p -> !p.getDataInfo().getDeletedbyinference()) - .map(oaf -> new TypedRow().setType("publication").setSourceId(oaf.getId())) - .mapToPair(toPair())); - - - } - - - - private static PairFunction toPair() { - return e -> new Tuple2<>( e.getSourceId(), e); - - }; - } - - - */ \ No newline at end of file