From f738acb85a6eb57c01ffb806fdcd4c31532e1bd6 Mon Sep 17 00:00:00 2001 From: "miriam.baglioni" Date: Thu, 31 Mar 2022 18:23:25 +0200 Subject: [PATCH] [Dump Funders] new code for the dump of products related to funders --- .../funderresults/SparkDumpFunderResults.java | 148 +++++++++++------- .../SparkDumpFunderResults2.java | 47 +++--- .../complete/oozie_app/workflow.xml | 1 + .../funder/oozie_app/workflow.xml | 4 + .../dump/funderresult/SplitPerFunderTest.java | 8 +- 5 files changed, 118 insertions(+), 90 deletions(-) diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/funderresults/SparkDumpFunderResults.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/funderresults/SparkDumpFunderResults.java index 2d283b2c0..96e6cefb5 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/funderresults/SparkDumpFunderResults.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/funderresults/SparkDumpFunderResults.java @@ -1,17 +1,7 @@ package eu.dnetlib.dhp.oa.graph.dump.funderresults; -import eu.dnetlib.dhp.application.ArgumentApplicationParser; -import eu.dnetlib.dhp.oa.graph.dump.Utils; -import eu.dnetlib.dhp.schema.dump.oaf.community.CommunityResult; -import eu.dnetlib.dhp.schema.dump.oaf.community.Project; -import org.apache.commons.io.IOUtils; -import org.apache.spark.SparkConf; -import org.apache.spark.api.java.function.FlatMapFunction; -import org.apache.spark.api.java.function.MapFunction; -import org.apache.spark.sql.*; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; import java.io.Serializable; import java.util.List; @@ -19,7 +9,21 @@ import java.util.Objects; import java.util.Optional; import java.util.stream.Collectors; -import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; +import org.apache.commons.io.IOUtils; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.function.FlatMapFunction; +import org.apache.spark.api.java.function.ForeachFunction; +import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.sql.*; +import org.jetbrains.annotations.NotNull; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.oa.graph.dump.Utils; +import eu.dnetlib.dhp.schema.dump.oaf.community.CommunityResult; +import eu.dnetlib.dhp.schema.dump.oaf.community.Funder; +import eu.dnetlib.dhp.schema.dump.oaf.community.Project; /** * Splits the dumped results by funder and stores them in a folder named as the funder nsp (for all the funders, but the EC @@ -30,18 +34,18 @@ public class SparkDumpFunderResults implements Serializable { public static void main(String[] args) throws Exception { String jsonConfiguration = IOUtils - .toString( - SparkDumpFunderResults.class - .getResourceAsStream( - "/eu/dnetlib/dhp/oa/graph/dump/funder_result_parameters.json")); + .toString( + SparkDumpFunderResults.class + .getResourceAsStream( + "/eu/dnetlib/dhp/oa/graph/dump/funder_result_parameters.json")); final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); parser.parseArgument(args); Boolean isSparkSessionManaged = Optional - .ofNullable(parser.get("isSparkSessionManaged")) - .map(Boolean::valueOf) - .orElse(Boolean.TRUE); + .ofNullable(parser.get("isSparkSessionManaged")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); log.info("isSparkSessionManaged: {}", isSparkSessionManaged); final String inputPath = parser.get("sourcePath"); @@ -50,36 +54,35 @@ public class SparkDumpFunderResults implements Serializable { final String outputPath = parser.get("outputPath"); log.info("outputPath: {}", outputPath); - SparkConf conf = new SparkConf(); runWithSparkSession( - conf, - isSparkSessionManaged, - spark -> { - Utils.removeOutputDir(spark, outputPath); - writeResultProjectList(spark, inputPath, outputPath); - }); + conf, + isSparkSessionManaged, + spark -> { + Utils.removeOutputDir(spark, outputPath); + writeResultProjectList(spark, inputPath, outputPath); + }); } private static void writeResultProjectList(SparkSession spark, String inputPath, String outputPath) { Dataset result = Utils - .readPath(spark, inputPath + "/publication", CommunityResult.class) - .union(Utils.readPath(spark, inputPath + "/dataset", CommunityResult.class)) - .union(Utils.readPath(spark, inputPath + "/otherresearchproduct", CommunityResult.class)) - .union(Utils.readPath(spark, inputPath + "/software", CommunityResult.class)); + .readPath(spark, inputPath + "/publication", CommunityResult.class) + .union(Utils.readPath(spark, inputPath + "/dataset", CommunityResult.class)) + .union(Utils.readPath(spark, inputPath + "/otherresearchproduct", CommunityResult.class)) + .union(Utils.readPath(spark, inputPath + "/software", CommunityResult.class)); + log.info("Number of result {}", result.count()); - List funderList = result.flatMap((FlatMapFunction) cr -> - cr.getProjects().stream().map(p -> { - String fName = p.getFunder().getShortName(); - if (fName.equalsIgnoreCase("ec")) { - fName += "_" + p.getFunder().getFundingStream(); - } - return fName; - }).collect(Collectors.toList()).iterator() - , Encoders.STRING()).distinct().collectAsList(); + Dataset tmp = result + .flatMap((FlatMapFunction) cr -> cr.getProjects().stream().map(p -> { + return getFunderName(p); + + }).collect(Collectors.toList()).iterator(), Encoders.STRING()) + .distinct(); + tmp.foreach((ForeachFunction) f -> log.info("Found Funder {}", f)); + List funderList = tmp.collectAsList(); funderList.forEach(funder -> { dumpResults(funder, result, outputPath); @@ -87,30 +90,55 @@ public class SparkDumpFunderResults implements Serializable { } + @NotNull + private static String getFunderName(Project p) { + Optional ofunder = Optional.ofNullable(p.getFunder()); + if (ofunder.isPresent()) { + String fName = ofunder.get().getShortName(); + if (fName.equalsIgnoreCase("ec")) { + fName += "_" + ofunder.get().getFundingStream(); + } + return fName; + } else { + String fName = p.getId().substring(3, p.getId().indexOf("_")).toUpperCase(); + if (fName.equalsIgnoreCase("ec")) { + if (p.getId().contains("h2020")) { + fName += "_H2020"; + } else { + fName += "_FP7"; + } + } else if (fName.equalsIgnoreCase("conicytf")) { + fName = "CONICYT"; + } else if (fName.equalsIgnoreCase("dfgf")) { + fName = "DFG"; + } else if (fName.equalsIgnoreCase("tubitakf")) { + fName = "TUBITAK"; + } else if (fName.equalsIgnoreCase("euenvagency")) { + fName = "EEA"; + } + return fName; + } + } + private static void dumpResults(String funder, Dataset results, String outputPath) { results.map((MapFunction) r -> { - if (!Optional.ofNullable(r.getProjects()).isPresent()) { - return null; - } - for (Project p : r.getProjects()) { - String fName = p.getFunder().getShortName(); - if (fName.equalsIgnoreCase("ec")){ - fName += "_" + p.getFunder().getFundingStream(); - } - if (fName.equalsIgnoreCase(funder)) { - return r; - } - } - return null; - }, Encoders.bean(CommunityResult.class)) - .filter(Objects::nonNull) - .write() - .mode(SaveMode.Overwrite) - .option("compression", "gzip") - .json(outputPath + "/" + funder); + if (!Optional.ofNullable(r.getProjects()).isPresent()) { + return null; + } + for (Project p : r.getProjects()) { + String fName = getFunderName(p); + if (fName.equalsIgnoreCase(funder)) { + return r; + } + } + return null; + }, Encoders.bean(CommunityResult.class)) + .filter(Objects::nonNull) + .write() + .mode(SaveMode.Overwrite) + .option("compression", "gzip") + .json(outputPath + "/" + funder); } - - } diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/funderresults/SparkDumpFunderResults2.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/funderresults/SparkDumpFunderResults2.java index 46bc60e3a..579fa760d 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/funderresults/SparkDumpFunderResults2.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/funderresults/SparkDumpFunderResults2.java @@ -1,10 +1,14 @@ package eu.dnetlib.dhp.oa.graph.dump.funderresults; -import eu.dnetlib.dhp.application.ArgumentApplicationParser; -import eu.dnetlib.dhp.oa.graph.dump.Utils; -import eu.dnetlib.dhp.schema.dump.oaf.community.CommunityResult; -import eu.dnetlib.dhp.schema.dump.oaf.community.Project; +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; + +import java.io.Serializable; +import java.util.List; +import java.util.Objects; +import java.util.Optional; +import java.util.stream.Collectors; + import org.apache.commons.io.IOUtils; import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.FlatMapFunction; @@ -13,13 +17,10 @@ import org.apache.spark.sql.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.Serializable; -import java.util.List; -import java.util.Objects; -import java.util.Optional; -import java.util.stream.Collectors; - -import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.oa.graph.dump.Utils; +import eu.dnetlib.dhp.schema.dump.oaf.community.CommunityResult; +import eu.dnetlib.dhp.schema.dump.oaf.community.Project; /** * Splits the dumped results by funder and stores them in a folder named as the funder nsp (for all the funders, but the EC @@ -72,16 +73,16 @@ public class SparkDumpFunderResults2 implements Serializable { .union(Utils.readPath(spark, inputPath + "/otherresearchproduct", CommunityResult.class)) .union(Utils.readPath(spark, inputPath + "/software", CommunityResult.class)); - - List funderList = result.flatMap((FlatMapFunction) cr -> - cr.getProjects().stream().map(p -> { - String fName = p.getFunder().getShortName(); - if (fName.equalsIgnoreCase("ec")) { - fName += "_" + p.getFunder().getFundingStream(); - } - return fName; - }).collect(Collectors.toList()).iterator() - , Encoders.STRING()).distinct().collectAsList(); + List funderList = result + .flatMap((FlatMapFunction) cr -> cr.getProjects().stream().map(p -> { + String fName = p.getFunder().getShortName(); + if (fName.equalsIgnoreCase("ec")) { + fName += "_" + p.getFunder().getFundingStream(); + } + return fName; + }).collect(Collectors.toList()).iterator(), Encoders.STRING()) + .distinct() + .collectAsList(); funderList.forEach(funder -> { @@ -98,7 +99,7 @@ public class SparkDumpFunderResults2 implements Serializable { } for (Project p : r.getProjects()) { String fName = p.getFunder().getShortName(); - if (fName.equalsIgnoreCase("ec")){ + if (fName.equalsIgnoreCase("ec")) { fName += "_" + p.getFunder().getFundingStream(); } if (fName.equalsIgnoreCase(funder)) { @@ -114,6 +115,4 @@ public class SparkDumpFunderResults2 implements Serializable { .json(outputPath + "/" + funder); } - - } diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/wf/subworkflows/complete/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/wf/subworkflows/complete/oozie_app/workflow.xml index 8eab56992..7cc133323 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/wf/subworkflows/complete/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/wf/subworkflows/complete/oozie_app/workflow.xml @@ -298,6 +298,7 @@ --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir} + --conf spark.sql.shuffle.partitions=3840 --sourcePath${sourcePath} --outputPath${workingDir}/validrelation diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/wf/subworkflows/funder/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/wf/subworkflows/funder/oozie_app/workflow.xml index dfcd8d3d8..2c05256cf 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/wf/subworkflows/funder/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/wf/subworkflows/funder/oozie_app/workflow.xml @@ -136,6 +136,7 @@ --resultTableNameeu.dnetlib.dhp.schema.oaf.Publication --outputPath${workingDir}/result/publication --graphPath${workingDir}/preparedInfo + --communityMapPath${communityMapPath} @@ -162,6 +163,7 @@ --resultTableNameeu.dnetlib.dhp.schema.oaf.Dataset --outputPath${workingDir}/result/dataset --graphPath${workingDir}/preparedInfo + --communityMapPath${communityMapPath} @@ -188,6 +190,7 @@ --resultTableNameeu.dnetlib.dhp.schema.oaf.OtherResearchProduct --outputPath${workingDir}/result/otherresearchproduct --graphPath${workingDir}/preparedInfo + --communityMapPath${communityMapPath} @@ -214,6 +217,7 @@ --resultTableNameeu.dnetlib.dhp.schema.oaf.Software --outputPath${workingDir}/result/software --graphPath${workingDir}/preparedInfo + --communityMapPath${communityMapPath} diff --git a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/dump/funderresult/SplitPerFunderTest.java b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/dump/funderresult/SplitPerFunderTest.java index b8a248d58..e9d739db2 100644 --- a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/dump/funderresult/SplitPerFunderTest.java +++ b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/dump/funderresult/SplitPerFunderTest.java @@ -5,8 +5,8 @@ import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; -//import eu.dnetlib.dhp.oa.graph.dump.funderresults.SparkDumpFunderResults2; -//import eu.dnetlib.dhp.oa.graph.dump.funderresults.SparkGetFunderList; +// import eu.dnetlib.dhp.oa.graph.dump.funderresults.SparkDumpFunderResults2; +// import eu.dnetlib.dhp.oa.graph.dump.funderresults.SparkGetFunderList; import org.apache.commons.io.FileUtils; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; @@ -75,7 +75,6 @@ public class SplitPerFunderTest { .getResource("/eu/dnetlib/dhp/oa/graph/dump/funderresource/ext") .getPath(); - SparkDumpFunderResults.main(new String[] { "-isSparkSessionManaged", Boolean.FALSE.toString(), "-outputPath", workingDir.toString() + "/split", @@ -147,9 +146,6 @@ public class SplitPerFunderTest { .map(item -> OBJECT_MAPPER.readValue(item, CommunityResult.class)); Assertions.assertEquals(1, tmp.count()); - - } - }