diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/funderresults/FunderResults.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/funderresults/FunderResults.java index 18884a327..c3d2d4c2c 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/funderresults/FunderResults.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/funderresults/FunderResults.java @@ -2,18 +2,17 @@ package eu.dnetlib.dhp.oa.graph.dump.funderresults; import java.io.Serializable; -import java.util.List; import eu.dnetlib.dhp.schema.dump.oaf.Result; -public class FunderResults implements Serializable { - private List results; +public class FunderResults extends Result implements Serializable { + private String funder_id; - public List getResults() { - return results; + public String getFunder_id() { + return funder_id; } - public void setResults(List results) { - this.results = results; + public void setFunder_id(String funder_id) { + this.funder_id = funder_id; } } diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/funderresults/SparkPrepareResultProject.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/funderresults/SparkDumpFunderResults.java similarity index 66% rename from dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/funderresults/SparkPrepareResultProject.java rename to dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/funderresults/SparkDumpFunderResults.java index 21b4f4dc7..f84a5fd11 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/funderresults/SparkPrepareResultProject.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/funderresults/SparkDumpFunderResults.java @@ -9,7 +9,6 @@ import java.util.*; import org.apache.commons.io.IOUtils; import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.MapFunction; -import org.apache.spark.api.java.function.MapGroupsFunction; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.SaveMode; @@ -21,7 +20,6 @@ import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.oa.graph.dump.ResultMapper; import eu.dnetlib.dhp.oa.graph.dump.Utils; import eu.dnetlib.dhp.oa.graph.dump.community.CommunityMap; -import eu.dnetlib.dhp.schema.dump.oaf.Result; import eu.dnetlib.dhp.schema.oaf.Relation; import scala.Tuple2; @@ -30,13 +28,13 @@ import scala.Tuple2; * Project, a serialization of an instance af ResultProject closs is done. ResultProject contains the resultId, and the * list of Projects (as in eu.dnetlib.dhp.schema.dump.oaf.community.Project) it is associated to */ -public class SparkPrepareResultProject implements Serializable { - private static final Logger log = LoggerFactory.getLogger(SparkPrepareResultProject.class); +public class SparkDumpFunderResults implements Serializable { + private static final Logger log = LoggerFactory.getLogger(SparkDumpFunderResults.class); public static void main(String[] args) throws Exception { String jsonConfiguration = IOUtils .toString( - SparkPrepareResultProject.class + SparkDumpFunderResults.class .getResourceAsStream( "/eu/dnetlib/dhp/oa/graph/dump/funder_result_parameters.json")); @@ -65,12 +63,12 @@ public class SparkPrepareResultProject implements Serializable { isSparkSessionManaged, spark -> { Utils.removeOutputDir(spark, outputPath); - prepareResultProjectList(spark, inputPath, outputPath, communityMapPath); + writeResultProjectList(spark, inputPath, outputPath, communityMapPath); }); } - private static void prepareResultProjectList(SparkSession spark, String inputPath, String outputPath, - String communityMapPath) { + private static void writeResultProjectList(SparkSession spark, String inputPath, String outputPath, + String communityMapPath) { CommunityMap communityMap = Utils.getCommunityMap(spark, communityMapPath); @@ -85,36 +83,17 @@ public class SparkPrepareResultProject implements Serializable { .union(Utils.readPath(spark, inputPath + "/software", eu.dnetlib.dhp.schema.oaf.Result.class)); result - .joinWith(relation, result.col("id").equalTo(relation.col("target"))) - .groupByKey( - (MapFunction, String>) value -> value - ._2() - .getSource() - .substring(3, 15), - Encoders.STRING()) - .mapGroups( - (MapGroupsFunction, Tuple2>) ( - s, it) -> { - Tuple2 first = it.next(); - FunderResults fr = new FunderResults(); - List resultList = new ArrayList<>(); - resultList.add(ResultMapper.map(first._1(), communityMap, true)); - it.forEachRemaining(c -> { - resultList.add(ResultMapper.map(c._1(), communityMap, true)); + .joinWith(relation, result.col("id").equalTo(relation.col("target")), "inner") + .map((MapFunction, FunderResults>) value ->{ + FunderResults res = (FunderResults) ResultMapper.map(value._1(), communityMap, false); + res.setFunder_id(value._2().getSource().substring(3,15)); + return res; + }, Encoders.bean(FunderResults.class)) + .write() + .partitionBy("funder_id") + .mode(SaveMode.Overwrite) + .json(outputPath); - }); - fr.setResults(resultList); - return new Tuple2<>(s, fr); - }, Encoders.tuple(Encoders.STRING(), Encoders.bean(FunderResults.class))) - .foreach(t -> { - String funder = t._1(); - spark - .createDataFrame(t._2.getResults(), Result.class) - .write() - .mode(SaveMode.Overwrite) - .option("compression", "gzip") - .json(outputPath + "/" + funder); - }); } } diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/funderresults/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/funderresults/oozie_app/workflow.xml index 5ddab3398..aca8f9714 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/funderresults/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/funderresults/oozie_app/workflow.xml @@ -137,7 +137,7 @@ yarn cluster Dump funder results - eu.dnetlib.dhp.oa.graph.dump.funderresults.SparkPrepareResultProject + eu.dnetlib.dhp.oa.graph.dump.funderresults.SparkDumpFunderResults dhp-graph-mapper-${projectVersion}.jar --executor-memory=${sparkExecutorMemory}