From c7bc73aedf804f15c5a5faa3436b04dbb94d7645 Mon Sep 17 00:00:00 2001 From: "miriam.baglioni" Date: Mon, 17 Feb 2020 11:44:48 +0100 Subject: [PATCH] country propagation for results collected from institutional repositories --- dhp-workflows/dhp-bulktag/pom.xml | 15 + dhp-workflows/dhp-propagation/pom.xml | 39 +++ .../SparkCountryPropagationJob.java | 308 ++++++++++++++++++ .../dhp/countrypropagation/TypedRow.java | 70 ++++ .../countrypropagation-ovverride.properties | 3 + .../input_countrypropagation_parameters.json | 5 + .../oozie_app/config-default.xml | 18 + .../countrypropagation/oozie_app/workflow.xml | 55 ++++ dhp-workflows/pom.xml | 2 + 9 files changed, 515 insertions(+) create mode 100644 dhp-workflows/dhp-bulktag/pom.xml create mode 100644 dhp-workflows/dhp-propagation/pom.xml create mode 100644 dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/countrypropagation/SparkCountryPropagationJob.java create mode 100644 dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/countrypropagation/TypedRow.java create mode 100644 dhp-workflows/dhp-propagation/src/main/resources/eu/dnetlib/dhp/countrypropagation/countrypropagation-ovverride.properties create mode 100644 dhp-workflows/dhp-propagation/src/main/resources/eu/dnetlib/dhp/countrypropagation/input_countrypropagation_parameters.json create mode 100644 dhp-workflows/dhp-propagation/src/main/resources/eu/dnetlib/dhp/countrypropagation/oozie_app/config-default.xml create mode 100644 dhp-workflows/dhp-propagation/src/main/resources/eu/dnetlib/dhp/countrypropagation/oozie_app/workflow.xml diff --git a/dhp-workflows/dhp-bulktag/pom.xml b/dhp-workflows/dhp-bulktag/pom.xml new file mode 100644 index 0000000000..5b6a210271 --- /dev/null +++ b/dhp-workflows/dhp-bulktag/pom.xml @@ -0,0 +1,15 @@ + + + + dhp-workflows + eu.dnetlib.dhp + 1.0.5-SNAPSHOT + + 4.0.0 + + dhp-bulktag + + + \ No newline at end of file diff --git a/dhp-workflows/dhp-propagation/pom.xml b/dhp-workflows/dhp-propagation/pom.xml new file mode 100644 index 0000000000..10fcd38bb4 --- /dev/null +++ b/dhp-workflows/dhp-propagation/pom.xml @@ -0,0 +1,39 @@ + + + + dhp-workflows + eu.dnetlib.dhp + 1.0.5-SNAPSHOT + + 4.0.0 + + dhp-propagation + + + + org.apache.spark + spark-core_2.11 + + + org.apache.spark + spark-sql_2.11 + + + + eu.dnetlib.dhp + dhp-common + ${project.version} + + + eu.dnetlib.dhp + dhp-schemas + ${project.version} + + + + + + + \ No newline at end of file diff --git a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/countrypropagation/SparkCountryPropagationJob.java b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/countrypropagation/SparkCountryPropagationJob.java new file mode 100644 index 0000000000..abcec9dbbe --- /dev/null +++ b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/countrypropagation/SparkCountryPropagationJob.java @@ -0,0 +1,308 @@ +package eu.dnetlib.dhp.countrypropagation; + +import com.fasterxml.jackson.databind.ObjectMapper; +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.schema.oaf.*; +import eu.dnetlib.dhp.schema.oaf.Dataset; +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.io.Text; +import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.Optional; +import org.apache.spark.api.java.function.Function2; +import org.apache.spark.api.java.function.PairFunction; +import org.apache.spark.rdd.RDD; +import org.apache.spark.sql.*; +import scala.Tuple2; + +import java.util.*; +import java.util.stream.Stream; + +public class SparkCountryPropagationJob { + public static void main(String[] args) throws Exception { + + final ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString(SparkCountryPropagationJob.class.getResourceAsStream("/eu/dnetlib/dhp/countrypropagation/input_countrypropagation_parameters.json"))); + parser.parseArgument(args); + final SparkSession spark = SparkSession + .builder() + .appName(SparkCountryPropagationJob.class.getSimpleName()) + .master(parser.get("master")) + .enableHiveSupport() + .getOrCreate(); + + final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); + final String inputPath = parser.get("sourcePath"); + final String outputPath = parser.get("outputPath"); + + + List whitelist = new ArrayList<>(); + List allowedtypes = new ArrayList<>(); + + + + // JavaPairRDD results = getResults(sc, inputPath); +// sc.sequenceFile(inputPath + "/dataset", Text.class, Text.class) +// .map(item -> new ObjectMapper().readValue(item._2().toString(), Dataset.class)) +// .map(oaf -> new TypedRow().setType("dataset").setDeleted(oaf.getDataInfo().getDeletedbyinference()).setOaf(oaf.toString()).setSourceId(oaf.getId())) +// .mapToPair(toPair()) +// .union(sc.sequenceFile(inputPath + "/otherresearchproduct", Text.class, Text.class) +// .map(item -> new ObjectMapper().readValue(item._2().toString(), OtherResearchProduct.class)) +// .map(oaf -> new TypedRow().setType("otherresearchproduct").setDeleted(oaf.getDataInfo().getDeletedbyinference()).setOaf(oaf.toString()).setSourceId(oaf.getId())) +// .mapToPair(toPair())) +// .union(sc.sequenceFile(inputPath + "/software", Text.class, Text.class) +// .map(item -> new ObjectMapper().readValue(item._2().toString(), Software.class)) +// .map(oaf -> new TypedRow().setType("software").setDeleted(oaf.getDataInfo().getDeletedbyinference()).setOaf(oaf.toString()).setSourceId(oaf.getId())) +// .mapToPair(toPair())) +// .union(sc.sequenceFile(inputPath + "/publication", Text.class, Text.class) +// .map(item -> new ObjectMapper().readValue(item._2().toString(), Publication.class)) +// .map(oaf -> new TypedRow().setType("publication").setDeleted(oaf.getDataInfo().getDeletedbyinference()).setOaf(oaf.toString()).setSourceId(oaf.getId())) +// .mapToPair(toPair())); +// +// + JavaPairRDD organizations = sc.sequenceFile(inputPath + "/organization", Text.class, Text.class) + .map(item -> new ObjectMapper().readValue(item._2().toString(), Organization.class)) + .filter(org -> !org.getDataInfo().getDeletedbyinference()) + .map(org -> new TypedRow().setSourceId(org.getId()).setCountry(org.getCountry().getClassid())) + .mapToPair(toPair()); + + JavaPairRDD organization_datasource = sc.sequenceFile(inputPath + "/relation", Text.class, Text.class) + .map(item -> new ObjectMapper().readValue(item._2().toString(), Relation.class)) + .filter(r -> !r.getDataInfo().getDeletedbyinference()) + .filter(r -> "datasourceOrganization".equals(r.getRelClass()) && "isProvidedBy".equals(r.getRelType())) + .map(r -> new TypedRow().setSourceId(r.getSource()).setTargetId(r.getTarget())) + .mapToPair(toPair()); + + JavaPairRDD datasources = sc.sequenceFile(inputPath + "/datasource", Text.class, Text.class) + .map(item -> new ObjectMapper().readValue(item._2().toString(), Datasource.class)) + .filter(ds -> whitelist.contains(ds.getId()) || allowedtypes.contains(ds.getDatasourcetype().getClassid())) + .map(ds -> new TypedRow().setSourceId(ds.getId())) + .mapToPair(toPair()); + + + JavaRDD publications = sc.sequenceFile(inputPath + "/publication", Text.class, Text.class) + .map(item -> new ObjectMapper().readValue(item._2().toString(), Publication.class)); + JavaRDD datasets = sc.sequenceFile(inputPath + "/dataset", Text.class, Text.class) + .map(item -> new ObjectMapper().readValue(item._2().toString(), Dataset.class)); + JavaRDD software = sc.sequenceFile(inputPath + "/software", Text.class, Text.class) + .map(item -> new ObjectMapper().readValue(item._2().toString(), Software.class)); + JavaRDD other = sc.sequenceFile(inputPath + "/otherresearchproduct", Text.class, Text.class) + .map(item -> new ObjectMapper().readValue(item._2().toString(), OtherResearchProduct.class)); + + JavaPairRDD datasource_results = publications + .map(oaf -> getTypedRows(oaf)) + .flatMapToPair(f -> { + ArrayList> ret = new ArrayList<>(); + for (TypedRow t : f) { + ret.add(new Tuple2<>(t.getSourceId(), t)); + } + return ret.iterator(); + }) + .union(datasets + .map(oaf -> getTypedRows(oaf)) + .flatMapToPair(f -> { + ArrayList> ret = new ArrayList<>(); + for (TypedRow t : f) { + ret.add(new Tuple2<>(t.getSourceId(), t)); + } + return ret.iterator(); + })) + .union(software + .map(oaf -> getTypedRows(oaf)) + .flatMapToPair(f -> { + ArrayList> ret = new ArrayList<>(); + for (TypedRow t : f) { + ret.add(new Tuple2<>(t.getSourceId(), t)); + } + return ret.iterator(); + })) + .union(other + .map(oaf -> getTypedRows(oaf)) + .flatMapToPair(f -> { + ArrayList> ret = new ArrayList<>(); + for (TypedRow t : f) { + ret.add(new Tuple2<>(t.getSourceId(), t)); + } + return ret.iterator(); + })); + + JavaPairRDD pubs = publications.mapToPair(p -> new Tuple2<>(p.getId(),p)); + JavaPairRDD dss = datasets.mapToPair(p -> new Tuple2<>(p.getId(),p)); + JavaPairRDD sfw = software.mapToPair(p -> new Tuple2<>(p.getId(),p)); + JavaPairRDD orp = other.mapToPair(p -> new Tuple2<>(p.getId(),p)); + + JavaPairRDD datasource_country = organizations.join(organization_datasource) + .map(x -> x._2()._1().setSourceId(x._2()._2().getTargetId())) // (OrganizationId,(TypedRow for Organization, TypedRow for Relation) + .mapToPair(toPair()); //(DatasourceId, TypedRowforOrganziation) + + + JavaPairRDD alloweddatasources_country = datasources.join(datasource_country) + .mapToPair(ds -> new Tuple2<>(ds._1(), ds._2()._2())); + + + JavaPairRDD toupdateresult = alloweddatasources_country.join(datasource_results) + .map(u -> u._2()._2().setCountry(u._2()._1().getCountry())) + .mapToPair(toPair()) + .reduceByKey((a, p) -> { + if (a == null) { + return p; + } + if (p == null) { + return a; + } + HashSet countries = new HashSet(); + countries.addAll(Arrays.asList(a.getCountry().split(";"))); + countries.addAll(Arrays.asList(p.getCountry().split(";"))); + String country = new String(); + for (String c : countries) { + country += c + ";"; + } + + return a.setCountry(country); + }); + + updateResult(pubs, toupdateresult, outputPath, "publication"); + updateResult(dss, toupdateresult, outputPath, "dataset"); + updateResult(sfw, toupdateresult, outputPath, "software"); + updateResult(orp, toupdateresult, outputPath, "otherresearchproduct"); + //we use leftOuterJoin because we want to rebuild the entire structure + + } + + private static void updateResult(JavaPairRDD results, JavaPairRDD toupdateresult, String outputPath, String type) { + results.leftOuterJoin(toupdateresult) + .map(c -> { + OafEntity oaf = c._2()._1(); + List qualifierList = null; + if (oaf.getClass() == Publication.class) { + qualifierList = ((Publication) oaf).getCountry(); + + } + if (oaf.getClass() == Dataset.class){ + qualifierList = ((Dataset) oaf).getCountry(); + } + + if (oaf.getClass() == Software.class){ + qualifierList = ((Software) oaf).getCountry(); + } + + if (oaf.getClass() == OtherResearchProduct.class){ + qualifierList = ((OtherResearchProduct) oaf).getCountry(); + } + + if (c._2()._2().isPresent()) { + HashSet countries = new HashSet<>(); + for (Qualifier country : qualifierList) { + countries.add(country.getClassid()); + } + TypedRow t = c._2()._2().get(); + + for (String country : t.getCountry().split(";")) { + if (!countries.contains(country)) { + Qualifier q = new Qualifier(); + q.setClassid(country); + qualifierList.add(q); + } + + } + if (oaf.getClass() == Publication.class) { + ((Publication) oaf).setCountry(qualifierList); + return (Publication) oaf; + + } + if (oaf.getClass() == Dataset.class){ + ((Dataset) oaf).setCountry(qualifierList); + return (Dataset) oaf; + } + + if (oaf.getClass() == Software.class){ + ((Software) oaf).setCountry(qualifierList); + return (Software) oaf; + } + + if (oaf.getClass() == OtherResearchProduct.class){ + ((OtherResearchProduct) oaf).setCountry(qualifierList); + return (OtherResearchProduct) oaf; + } + } + + + return null; + }) + .map(p -> new ObjectMapper().writeValueAsString(p)) + .saveAsTextFile(outputPath+"/"+type); + } + + private static List getTypedRows(OafEntity oaf) { + List lst = new ArrayList<>(); + Set datasources_provenance = new HashSet<>(); + List instanceList = null; + String type = ""; + if (oaf.getClass() == Publication.class) { + instanceList = ((Publication) oaf).getInstance(); + type = "publication"; + } + if (oaf.getClass() == Dataset.class){ + instanceList = ((Dataset)oaf).getInstance(); + type = "dataset"; + } + + if (oaf.getClass() == Software.class){ + instanceList = ((Software)oaf).getInstance(); + type = "software"; + } + + if (oaf.getClass() == OtherResearchProduct.class){ + instanceList = ((OtherResearchProduct)oaf).getInstance(); + type = "otherresearchproduct"; + } + + + for (Instance i : instanceList) { + datasources_provenance.add(i.getCollectedfrom().getKey()); + datasources_provenance.add(i.getHostedby().getKey()); + } + for (String dsId : datasources_provenance) { + lst.add(new TypedRow().setSourceId(dsId).setTargetId(oaf.getId()).setType(type)); + } + return lst; + } + + + private static JavaPairRDD getResults(JavaSparkContext sc , String inputPath){ + + return + sc.sequenceFile(inputPath + "/dataset", Text.class, Text.class) + .map(item -> new ObjectMapper().readValue(item._2().toString(), Dataset.class)) + .filter(ds -> !ds.getDataInfo().getDeletedbyinference()) + .map(oaf -> new TypedRow().setType("dataset").setSourceId(oaf.getId())) + .mapToPair(toPair()) + .union(sc.sequenceFile(inputPath + "/otherresearchproduct", Text.class, Text.class) + .map(item -> new ObjectMapper().readValue(item._2().toString(), OtherResearchProduct.class)) + .filter(o -> !o.getDataInfo().getDeletedbyinference()) + .map(oaf -> new TypedRow().setType("otherresearchproduct").setSourceId(oaf.getId())) + .mapToPair(toPair())) + .union(sc.sequenceFile(inputPath + "/software", Text.class, Text.class) + .map(item -> new ObjectMapper().readValue(item._2().toString(), Software.class)) + .filter(s -> !s.getDataInfo().getDeletedbyinference()) + .map(oaf -> new TypedRow().setType("software").setSourceId(oaf.getId())) + .mapToPair(toPair())) + .union(sc.sequenceFile(inputPath + "/publication", Text.class, Text.class) + .map(item -> new ObjectMapper().readValue(item._2().toString(), Publication.class)) + .filter(p -> !p.getDataInfo().getDeletedbyinference()) + .map(oaf -> new TypedRow().setType("publication").setSourceId(oaf.getId())) + .mapToPair(toPair())); + + + } + + + + private static PairFunction toPair() { + return e -> new Tuple2<>( e.getSourceId(), e); + + }; + + +} + diff --git a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/countrypropagation/TypedRow.java b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/countrypropagation/TypedRow.java new file mode 100644 index 0000000000..f006a320b9 --- /dev/null +++ b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/countrypropagation/TypedRow.java @@ -0,0 +1,70 @@ +package eu.dnetlib.dhp.countrypropagation; + + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + +public class TypedRow implements Serializable { + private String sourceId; + private String targetId; + private String type; + private String country; + + public List getAccumulator() { + return accumulator; + } + + public TypedRow setAccumulator(List accumulator) { + this.accumulator = accumulator; + return this; + } + + private List accumulator; + + + public void add(String a){ + if (accumulator == null){ + accumulator = new ArrayList<>(); + } + accumulator.add(a); + } + + public Iterator getAccumulatorIterator(){ + return accumulator.iterator(); + } + + public String getCountry() { + return country; + } + + public TypedRow setCountry(String country) { + this.country = country; + return this; + } + + public String getSourceId() { + return sourceId; + } + public TypedRow setSourceId(String sourceId) { + this.sourceId = sourceId; + return this; + } + public String getTargetId() { + return targetId; + } + public TypedRow setTargetId(String targetId) { + this.targetId = targetId; + return this; + } + + public String getType() { + return type; + } + public TypedRow setType(String type) { + this.type = type; + return this; + } + +} diff --git a/dhp-workflows/dhp-propagation/src/main/resources/eu/dnetlib/dhp/countrypropagation/countrypropagation-ovverride.properties b/dhp-workflows/dhp-propagation/src/main/resources/eu/dnetlib/dhp/countrypropagation/countrypropagation-ovverride.properties new file mode 100644 index 0000000000..7754151e87 --- /dev/null +++ b/dhp-workflows/dhp-propagation/src/main/resources/eu/dnetlib/dhp/countrypropagation/countrypropagation-ovverride.properties @@ -0,0 +1,3 @@ +sourcePath=/tmp/db_openaireplus_services.export_dhp.2020.02.03 +sparkDriverMemory=15G +sparkExecutorMemory=15G \ No newline at end of file diff --git a/dhp-workflows/dhp-propagation/src/main/resources/eu/dnetlib/dhp/countrypropagation/input_countrypropagation_parameters.json b/dhp-workflows/dhp-propagation/src/main/resources/eu/dnetlib/dhp/countrypropagation/input_countrypropagation_parameters.json new file mode 100644 index 0000000000..03f3dd8ff0 --- /dev/null +++ b/dhp-workflows/dhp-propagation/src/main/resources/eu/dnetlib/dhp/countrypropagation/input_countrypropagation_parameters.json @@ -0,0 +1,5 @@ +[ + {"paramName":"mt", "paramLongName":"master", "paramDescription": "should be local or yarn", "paramRequired": true}, + {"paramName":"s", "paramLongName":"sourcePath", "paramDescription": "the path of the sequencial file to read", "paramRequired": true}, + {"paramName":"o", "paramLongName":"outputPath", "paramDescription": "the path of the sequential file to write", "paramRequired": true} +] \ No newline at end of file diff --git a/dhp-workflows/dhp-propagation/src/main/resources/eu/dnetlib/dhp/countrypropagation/oozie_app/config-default.xml b/dhp-workflows/dhp-propagation/src/main/resources/eu/dnetlib/dhp/countrypropagation/oozie_app/config-default.xml new file mode 100644 index 0000000000..2e0ed9aeea --- /dev/null +++ b/dhp-workflows/dhp-propagation/src/main/resources/eu/dnetlib/dhp/countrypropagation/oozie_app/config-default.xml @@ -0,0 +1,18 @@ + + + jobTracker + yarnRM + + + nameNode + hdfs://nameservice1 + + + oozie.use.system.libpath + true + + + oozie.action.sharelib.for.spark + spark2 + + \ No newline at end of file diff --git a/dhp-workflows/dhp-propagation/src/main/resources/eu/dnetlib/dhp/countrypropagation/oozie_app/workflow.xml b/dhp-workflows/dhp-propagation/src/main/resources/eu/dnetlib/dhp/countrypropagation/oozie_app/workflow.xml new file mode 100644 index 0000000000..1fef8b17c6 --- /dev/null +++ b/dhp-workflows/dhp-propagation/src/main/resources/eu/dnetlib/dhp/countrypropagation/oozie_app/workflow.xml @@ -0,0 +1,55 @@ + + + + sourcePath + the source path + + + outputPath + the output path + + + sparkDriverMemory + memory for driver process + + + sparkExecutorMemory + memory for individual executor + + + sparkExecutorCores + number of cores used by single executor + + + + + + + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] + + + + + ${jobTracker} + ${nameNode} + yarn-cluster + cluster + CountryPropagation + eu.dnetlib.dhp.countrypropagation.SparkCountryPropagationJob + dhp-graph-countrypropagation-${projectVersion}.jar + --executor-memory ${sparkExecutorMemory} + --executor-cores ${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners="com.cloudera.spark.lineage.NavigatorAppListener" + --conf spark.sql.queryExecutionListeners="com.cloudera.spark.lineage.NavigatorQueryListener" + --conf spark.sql.warehouse.dir="/user/hive/warehouse" + -mt yarn-cluster + --sourcePath${sourcePath} + --outputPath${outputPath} + + + + + + + \ No newline at end of file diff --git a/dhp-workflows/pom.xml b/dhp-workflows/pom.xml index cf71190a43..2a9a72f30c 100644 --- a/dhp-workflows/pom.xml +++ b/dhp-workflows/pom.xml @@ -18,6 +18,8 @@ dhp-distcp dhp-graph-mapper dhp-dedup + dhp-bulktag + dhp-propagation