From bd0e504b42d695167731d9ab3fb25257e90de03d Mon Sep 17 00:00:00 2001 From: "miriam.baglioni" Date: Mon, 17 Feb 2020 18:04:15 +0100 Subject: [PATCH] changes to the wf configuration --- .../SparkCountryPropagationJob.java | 9 +- ...arkResultToOrganizationFromIstRepoJob.java | 326 ++++++++++++++++++ .../input_countrypropagation_parameters.json | 4 - .../input_countrypropagation_parameters.json | 26 ++ .../oozie_app/config-default.xml | 0 .../oozie_app/workflow.xml | 18 +- 6 files changed, 366 insertions(+), 17 deletions(-) delete mode 100644 dhp-workflows/dhp-propagation/src/main/resources/eu/dnetlib/dhp/countrypropagation/input_countrypropagation_parameters.json create mode 100644 dhp-workflows/dhp-propagation/src/main/resources/eu/dnetlib/dhp/graph/input_countrypropagation_parameters.json rename dhp-workflows/dhp-propagation/src/main/resources/eu/dnetlib/dhp/{countrypropagation => graph}/oozie_app/config-default.xml (100%) rename dhp-workflows/dhp-propagation/src/main/resources/eu/dnetlib/dhp/{countrypropagation => graph}/oozie_app/workflow.xml (76%) diff --git a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/countrypropagation/SparkCountryPropagationJob.java b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/countrypropagation/SparkCountryPropagationJob.java index d395d85be..073c80d1b 100644 --- a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/countrypropagation/SparkCountryPropagationJob.java +++ b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/countrypropagation/SparkCountryPropagationJob.java @@ -9,16 +9,12 @@ import org.apache.hadoop.io.Text; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.api.java.Optional; -import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; -import org.apache.spark.rdd.RDD; import org.apache.spark.sql.*; import scala.Tuple2; import java.io.File; import java.util.*; -import java.util.stream.Stream; public class SparkCountryPropagationJob { public static void main(String[] args) throws Exception { @@ -42,9 +38,8 @@ public class SparkCountryPropagationJob { directory.mkdirs(); } - //TODO: add as Job Parameters - List whitelist = Arrays.asList("10|opendoar____::300891a62162b960cf02ce3827bb363c"); - List allowedtypes = Arrays.asList("pubsrepository::institutional"); + List whitelist = Arrays.asList(parser.get("whitelist").split(";")); + List allowedtypes = Arrays.asList(parser.get("allowedtypes").split(";")); JavaPairRDD organizations = sc.sequenceFile(inputPath + "/organization", Text.class, Text.class) diff --git a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/resulttoorganizationfrominstrepo/SparkResultToOrganizationFromIstRepoJob.java b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/resulttoorganizationfrominstrepo/SparkResultToOrganizationFromIstRepoJob.java index 1ea62d62b..bfb2562d4 100644 --- a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/resulttoorganizationfrominstrepo/SparkResultToOrganizationFromIstRepoJob.java +++ b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/resulttoorganizationfrominstrepo/SparkResultToOrganizationFromIstRepoJob.java @@ -1,4 +1,330 @@ package eu.dnetlib.dhp.resulttoorganizationfrominstrepo; +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import org.apache.commons.io.IOUtils; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.SparkSession; + +import java.io.File; + public class SparkResultToOrganizationFromIstRepoJob { + public static void main(String[] args) throws Exception { + + final ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString(SparkResultToOrganizationFromIstRepoJob.class.getResourceAsStream("/eu/dnetlib/dhp/resulttoorganizationfrominstrepo/input_resulttoorganizationfrominstrepo_parameters.json"))); + parser.parseArgument(args); + final SparkSession spark = SparkSession + .builder() + .appName(SparkResultToOrganizationFromIstRepoJob.class.getSimpleName()) + .master(parser.get("master")) + .enableHiveSupport() + .getOrCreate(); + + final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); + final String inputPath = parser.get("sourcePath"); + final String outputPath = "/tmp/provision/propagation/resulttoorganizationfrominstitutionalrepositories"; + + File directory = new File(outputPath); + + if (!directory.exists()) { + directory.mkdirs(); + } + } + } +/* +package eu.dnetlib.dhp.graph; + +import com.fasterxml.jackson.databind.ObjectMapper; +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.schema.oaf.*; +import eu.dnetlib.dhp.schema.oaf.Dataset; +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.io.Text; +import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.Optional; +import org.apache.spark.api.java.function.Function2; +import org.apache.spark.api.java.function.PairFunction; +import org.apache.spark.rdd.RDD; +import org.apache.spark.sql.*; +import scala.Tuple2; + +import java.io.File; +import java.util.*; +import java.util.stream.Stream; + +public class SparkCountryPropagationJob { + public static void main(String[] args) throws Exception { + + final ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString(SparkCountryPropagationJob.class.getResourceAsStream("/eu/dnetlib/dhp/graph/input_countrypropagation_parameters.json"))); + parser.parseArgument(args); + final SparkSession spark = SparkSession + .builder() + .appName(SparkCountryPropagationJob.class.getSimpleName()) + .master(parser.get("master")) + .enableHiveSupport() + .getOrCreate(); + + final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); + final String inputPath = parser.get("sourcePath"); + final String outputPath = "/tmp/provision/propagation/countrytoresultfrominstitutionalrepositories"; + + File directory = new File(outputPath); + + if(!directory.exists()){ + directory.mkdirs(); + } + + //TODO: add as Job Parameters + List whitelist = Arrays.asList("10|opendoar____::300891a62162b960cf02ce3827bb363c"); + List allowedtypes = Arrays.asList("pubsrepository::institutional"); + + + JavaPairRDD organizations = sc.sequenceFile(inputPath + "/organization", Text.class, Text.class) + .map(item -> new ObjectMapper().readValue(item._2().toString(), Organization.class)) + .filter(org -> !org.getDataInfo().getDeletedbyinference()) + .map(org -> new TypedRow().setSourceId(org.getId()).setCountry(org.getCountry().getClassid())) + .mapToPair(toPair()); + + JavaPairRDD organization_datasource = sc.sequenceFile(inputPath + "/relation", Text.class, Text.class) + .map(item -> new ObjectMapper().readValue(item._2().toString(), Relation.class)) + .filter(r -> !r.getDataInfo().getDeletedbyinference()) + .filter(r -> "datasourceOrganization".equals(r.getRelClass()) && "isProvidedBy".equals(r.getRelType())) + .map(r -> new TypedRow().setSourceId(r.getSource()).setTargetId(r.getTarget())) + .mapToPair(toPair()); + + JavaPairRDD datasources = sc.sequenceFile(inputPath + "/datasource", Text.class, Text.class) + .map(item -> new ObjectMapper().readValue(item._2().toString(), Datasource.class)) + .filter(ds -> whitelist.contains(ds.getId()) || allowedtypes.contains(ds.getDatasourcetype().getClassid())) + .map(ds -> new TypedRow().setSourceId(ds.getId())) + .mapToPair(toPair()); + + + JavaRDD publications = sc.sequenceFile(inputPath + "/publication", Text.class, Text.class) + .map(item -> new ObjectMapper().readValue(item._2().toString(), Publication.class)); + JavaRDD datasets = sc.sequenceFile(inputPath + "/dataset", Text.class, Text.class) + .map(item -> new ObjectMapper().readValue(item._2().toString(), Dataset.class)); + JavaRDD software = sc.sequenceFile(inputPath + "/software", Text.class, Text.class) + .map(item -> new ObjectMapper().readValue(item._2().toString(), Software.class)); + JavaRDD other = sc.sequenceFile(inputPath + "/otherresearchproduct", Text.class, Text.class) + .map(item -> new ObjectMapper().readValue(item._2().toString(), OtherResearchProduct.class)); + + JavaPairRDD datasource_results = publications + .map(oaf -> getTypedRows(oaf)) + .flatMapToPair(f -> { + ArrayList> ret = new ArrayList<>(); + for (TypedRow t : f) { + ret.add(new Tuple2<>(t.getSourceId(), t)); + } + return ret.iterator(); + }) + .union(datasets + .map(oaf -> getTypedRows(oaf)) + .flatMapToPair(f -> { + ArrayList> ret = new ArrayList<>(); + for (TypedRow t : f) { + ret.add(new Tuple2<>(t.getSourceId(), t)); + } + return ret.iterator(); + })) + .union(software + .map(oaf -> getTypedRows(oaf)) + .flatMapToPair(f -> { + ArrayList> ret = new ArrayList<>(); + for (TypedRow t : f) { + ret.add(new Tuple2<>(t.getSourceId(), t)); + } + return ret.iterator(); + })) + .union(other + .map(oaf -> getTypedRows(oaf)) + .flatMapToPair(f -> { + ArrayList> ret = new ArrayList<>(); + for (TypedRow t : f) { + ret.add(new Tuple2<>(t.getSourceId(), t)); + } + return ret.iterator(); + })); + + JavaPairRDD pubs = publications.mapToPair(p -> new Tuple2<>(p.getId(),p)); + JavaPairRDD dss = datasets.mapToPair(p -> new Tuple2<>(p.getId(),p)); + JavaPairRDD sfw = software.mapToPair(p -> new Tuple2<>(p.getId(),p)); + JavaPairRDD orp = other.mapToPair(p -> new Tuple2<>(p.getId(),p)); + + JavaPairRDD datasource_country = organizations.join(organization_datasource) + .map(x -> x._2()._1().setSourceId(x._2()._2().getTargetId())) // (OrganizationId,(TypedRow for Organization, TypedRow for Relation) + .mapToPair(toPair()); //(DatasourceId, TypedRowforOrganziation) + + + JavaPairRDD alloweddatasources_country = datasources.join(datasource_country) + .mapToPair(ds -> new Tuple2<>(ds._1(), ds._2()._2())); + + + JavaPairRDD toupdateresult = alloweddatasources_country.join(datasource_results) + .map(u -> u._2()._2().setCountry(u._2()._1().getCountry())) + .mapToPair(toPair()) + .reduceByKey((a, p) -> { + if (a == null) { + return p; + } + if (p == null) { + return a; + } + HashSet countries = new HashSet(); + countries.addAll(Arrays.asList(a.getCountry().split(";"))); + countries.addAll(Arrays.asList(p.getCountry().split(";"))); + String country = new String(); + for (String c : countries) { + country += c + ";"; + } + + return a.setCountry(country); + }); + + updateResult(pubs, toupdateresult, outputPath, "publication"); + updateResult(dss, toupdateresult, outputPath, "dataset"); + updateResult(sfw, toupdateresult, outputPath, "software"); + updateResult(orp, toupdateresult, outputPath, "otherresearchproduct"); + //we use leftOuterJoin because we want to rebuild the entire structure + + } + + private static void updateResult(JavaPairRDD results, JavaPairRDD toupdateresult, String outputPath, String type) { + results.leftOuterJoin(toupdateresult) + .map(c -> { + OafEntity oaf = c._2()._1(); + List qualifierList = null; + if (oaf.getClass() == Publication.class) { + qualifierList = ((Publication) oaf).getCountry(); + + } + if (oaf.getClass() == Dataset.class){ + qualifierList = ((Dataset) oaf).getCountry(); + } + + if (oaf.getClass() == Software.class){ + qualifierList = ((Software) oaf).getCountry(); + } + + if (oaf.getClass() == OtherResearchProduct.class){ + qualifierList = ((OtherResearchProduct) oaf).getCountry(); + } + + if (c._2()._2().isPresent()) { + HashSet countries = new HashSet<>(); + for (Qualifier country : qualifierList) { + countries.add(country.getClassid()); + } + TypedRow t = c._2()._2().get(); + + for (String country : t.getCountry().split(";")) { + if (!countries.contains(country)) { + Qualifier q = new Qualifier(); + q.setClassid(country); + qualifierList.add(q); + } + + } + if (oaf.getClass() == Publication.class) { + ((Publication) oaf).setCountry(qualifierList); + return (Publication) oaf; + + } + if (oaf.getClass() == Dataset.class){ + ((Dataset) oaf).setCountry(qualifierList); + return (Dataset) oaf; + } + + if (oaf.getClass() == Software.class){ + ((Software) oaf).setCountry(qualifierList); + return (Software) oaf; + } + + if (oaf.getClass() == OtherResearchProduct.class){ + ((OtherResearchProduct) oaf).setCountry(qualifierList); + return (OtherResearchProduct) oaf; + } + } + + + return null; + }) + .map(p -> new ObjectMapper().writeValueAsString(p)) + .saveAsTextFile(outputPath+"/"+type); + } + + private static List getTypedRows(OafEntity oaf) { + List lst = new ArrayList<>(); + Set datasources_provenance = new HashSet<>(); + List instanceList = null; + String type = ""; + if (oaf.getClass() == Publication.class) { + instanceList = ((Publication) oaf).getInstance(); + type = "publication"; + } + if (oaf.getClass() == Dataset.class){ + instanceList = ((Dataset)oaf).getInstance(); + type = "dataset"; + } + + if (oaf.getClass() == Software.class){ + instanceList = ((Software)oaf).getInstance(); + type = "software"; + } + + if (oaf.getClass() == OtherResearchProduct.class){ + instanceList = ((OtherResearchProduct)oaf).getInstance(); + type = "otherresearchproduct"; + } + + + for (Instance i : instanceList) { + datasources_provenance.add(i.getCollectedfrom().getKey()); + datasources_provenance.add(i.getHostedby().getKey()); + } + for (String dsId : datasources_provenance) { + lst.add(new TypedRow().setSourceId(dsId).setTargetId(oaf.getId()).setType(type)); + } + return lst; + } + + + private static JavaPairRDD getResults(JavaSparkContext sc , String inputPath){ + + return + sc.sequenceFile(inputPath + "/dataset", Text.class, Text.class) + .map(item -> new ObjectMapper().readValue(item._2().toString(), Dataset.class)) + .filter(ds -> !ds.getDataInfo().getDeletedbyinference()) + .map(oaf -> new TypedRow().setType("dataset").setSourceId(oaf.getId())) + .mapToPair(toPair()) + .union(sc.sequenceFile(inputPath + "/otherresearchproduct", Text.class, Text.class) + .map(item -> new ObjectMapper().readValue(item._2().toString(), OtherResearchProduct.class)) + .filter(o -> !o.getDataInfo().getDeletedbyinference()) + .map(oaf -> new TypedRow().setType("otherresearchproduct").setSourceId(oaf.getId())) + .mapToPair(toPair())) + .union(sc.sequenceFile(inputPath + "/software", Text.class, Text.class) + .map(item -> new ObjectMapper().readValue(item._2().toString(), Software.class)) + .filter(s -> !s.getDataInfo().getDeletedbyinference()) + .map(oaf -> new TypedRow().setType("software").setSourceId(oaf.getId())) + .mapToPair(toPair())) + .union(sc.sequenceFile(inputPath + "/publication", Text.class, Text.class) + .map(item -> new ObjectMapper().readValue(item._2().toString(), Publication.class)) + .filter(p -> !p.getDataInfo().getDeletedbyinference()) + .map(oaf -> new TypedRow().setType("publication").setSourceId(oaf.getId())) + .mapToPair(toPair())); + + + } + + + + private static PairFunction toPair() { + return e -> new Tuple2<>( e.getSourceId(), e); + + }; + + } + + + */ \ No newline at end of file diff --git a/dhp-workflows/dhp-propagation/src/main/resources/eu/dnetlib/dhp/countrypropagation/input_countrypropagation_parameters.json b/dhp-workflows/dhp-propagation/src/main/resources/eu/dnetlib/dhp/countrypropagation/input_countrypropagation_parameters.json deleted file mode 100644 index 6cc21c544..000000000 --- a/dhp-workflows/dhp-propagation/src/main/resources/eu/dnetlib/dhp/countrypropagation/input_countrypropagation_parameters.json +++ /dev/null @@ -1,4 +0,0 @@ -[ - {"paramName":"mt", "paramLongName":"master", "paramDescription": "should be local or yarn", "paramRequired": true}, - {"paramName":"s", "paramLongName":"sourcePath", "paramDescription": "the path of the sequencial file to read", "paramRequired": true} -] \ No newline at end of file diff --git a/dhp-workflows/dhp-propagation/src/main/resources/eu/dnetlib/dhp/graph/input_countrypropagation_parameters.json b/dhp-workflows/dhp-propagation/src/main/resources/eu/dnetlib/dhp/graph/input_countrypropagation_parameters.json new file mode 100644 index 000000000..cbafdcd46 --- /dev/null +++ b/dhp-workflows/dhp-propagation/src/main/resources/eu/dnetlib/dhp/graph/input_countrypropagation_parameters.json @@ -0,0 +1,26 @@ +[ + { + "paramName":"mt", + "paramLongName":"master", + "paramDescription": "should be local or yarn", + "paramRequired": true, + }, + { + "paramName":"s", + "paramLongName":"sourcePath", + "paramDescription": "the path of the sequencial file to read", + "paramRequired": true, + }, + { + "paramName":"wl", + "paramLongName":"whitelist", + "paramDescription": "datasource id that will be considered even if not in the allowed typology list. Split by ;", + "paramRequired": true + }, + { + "paramName":"at", + "paramLongName":"allowedtypes", + "paramDescription": "the types of the allowed datasources. Split by ;", + "paramRequired": true + } +] \ No newline at end of file diff --git a/dhp-workflows/dhp-propagation/src/main/resources/eu/dnetlib/dhp/countrypropagation/oozie_app/config-default.xml b/dhp-workflows/dhp-propagation/src/main/resources/eu/dnetlib/dhp/graph/oozie_app/config-default.xml similarity index 100% rename from dhp-workflows/dhp-propagation/src/main/resources/eu/dnetlib/dhp/countrypropagation/oozie_app/config-default.xml rename to dhp-workflows/dhp-propagation/src/main/resources/eu/dnetlib/dhp/graph/oozie_app/config-default.xml diff --git a/dhp-workflows/dhp-propagation/src/main/resources/eu/dnetlib/dhp/countrypropagation/oozie_app/workflow.xml b/dhp-workflows/dhp-propagation/src/main/resources/eu/dnetlib/dhp/graph/oozie_app/workflow.xml similarity index 76% rename from dhp-workflows/dhp-propagation/src/main/resources/eu/dnetlib/dhp/countrypropagation/oozie_app/workflow.xml rename to dhp-workflows/dhp-propagation/src/main/resources/eu/dnetlib/dhp/graph/oozie_app/workflow.xml index dd30e3e0a..af88c6fbb 100644 --- a/dhp-workflows/dhp-propagation/src/main/resources/eu/dnetlib/dhp/countrypropagation/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-propagation/src/main/resources/eu/dnetlib/dhp/graph/oozie_app/workflow.xml @@ -1,12 +1,16 @@ - + sourcePath the source path - outputPath - the output path + whitelist + the white list + + + allowedtypes + the allowed types sparkDriverMemory @@ -22,7 +26,7 @@ - + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] @@ -36,15 +40,17 @@ cluster CountryPropagation eu.dnetlib.dhp.countrypropagation.SparkCountryPropagationJob - dhp-graph-countrypropagation-${projectVersion}.jar + dhp-propagation-${projectVersion}.jar --executor-memory ${sparkExecutorMemory} --executor-cores ${sparkExecutorCores} --driver-memory=${sparkDriverMemory} --conf spark.extraListeners="com.cloudera.spark.lineage.NavigatorAppListener" --conf spark.sql.queryExecutionListeners="com.cloudera.spark.lineage.NavigatorQueryListener" - --conf spark.sql.warehouse.dir="/user/hive/warehouse" + -mt yarn-cluster --sourcePath${sourcePath} + --withelist${whitelist} + --allowedtypes${allowedtypes}