This commit is contained in:
Miriam Baglioni 2020-11-18 18:56:48 +01:00
parent 906db690d2
commit fafb688887
3 changed files with 23 additions and 45 deletions

View File

@ -2,18 +2,17 @@
package eu.dnetlib.dhp.oa.graph.dump.funderresults; package eu.dnetlib.dhp.oa.graph.dump.funderresults;
import java.io.Serializable; import java.io.Serializable;
import java.util.List;
import eu.dnetlib.dhp.schema.dump.oaf.Result; import eu.dnetlib.dhp.schema.dump.oaf.Result;
public class FunderResults implements Serializable { public class FunderResults extends Result implements Serializable {
private List<Result> results; private String funder_id;
public List<Result> getResults() { public String getFunder_id() {
return results; return funder_id;
} }
public void setResults(List<Result> results) { public void setFunder_id(String funder_id) {
this.results = results; this.funder_id = funder_id;
} }
} }

View File

@ -9,7 +9,6 @@ import java.util.*;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf; import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.api.java.function.MapGroupsFunction;
import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.SaveMode;
@ -21,7 +20,6 @@ import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.oa.graph.dump.ResultMapper; import eu.dnetlib.dhp.oa.graph.dump.ResultMapper;
import eu.dnetlib.dhp.oa.graph.dump.Utils; import eu.dnetlib.dhp.oa.graph.dump.Utils;
import eu.dnetlib.dhp.oa.graph.dump.community.CommunityMap; import eu.dnetlib.dhp.oa.graph.dump.community.CommunityMap;
import eu.dnetlib.dhp.schema.dump.oaf.Result;
import eu.dnetlib.dhp.schema.oaf.Relation; import eu.dnetlib.dhp.schema.oaf.Relation;
import scala.Tuple2; import scala.Tuple2;
@ -30,13 +28,13 @@ import scala.Tuple2;
* Project, a serialization of an instance af ResultProject closs is done. ResultProject contains the resultId, and the * Project, a serialization of an instance af ResultProject closs is done. ResultProject contains the resultId, and the
* list of Projects (as in eu.dnetlib.dhp.schema.dump.oaf.community.Project) it is associated to * list of Projects (as in eu.dnetlib.dhp.schema.dump.oaf.community.Project) it is associated to
*/ */
public class SparkPrepareResultProject implements Serializable { public class SparkDumpFunderResults implements Serializable {
private static final Logger log = LoggerFactory.getLogger(SparkPrepareResultProject.class); private static final Logger log = LoggerFactory.getLogger(SparkDumpFunderResults.class);
public static void main(String[] args) throws Exception { public static void main(String[] args) throws Exception {
String jsonConfiguration = IOUtils String jsonConfiguration = IOUtils
.toString( .toString(
SparkPrepareResultProject.class SparkDumpFunderResults.class
.getResourceAsStream( .getResourceAsStream(
"/eu/dnetlib/dhp/oa/graph/dump/funder_result_parameters.json")); "/eu/dnetlib/dhp/oa/graph/dump/funder_result_parameters.json"));
@ -65,12 +63,12 @@ public class SparkPrepareResultProject implements Serializable {
isSparkSessionManaged, isSparkSessionManaged,
spark -> { spark -> {
Utils.removeOutputDir(spark, outputPath); Utils.removeOutputDir(spark, outputPath);
prepareResultProjectList(spark, inputPath, outputPath, communityMapPath); writeResultProjectList(spark, inputPath, outputPath, communityMapPath);
}); });
} }
private static void prepareResultProjectList(SparkSession spark, String inputPath, String outputPath, private static void writeResultProjectList(SparkSession spark, String inputPath, String outputPath,
String communityMapPath) { String communityMapPath) {
CommunityMap communityMap = Utils.getCommunityMap(spark, communityMapPath); CommunityMap communityMap = Utils.getCommunityMap(spark, communityMapPath);
@ -85,36 +83,17 @@ public class SparkPrepareResultProject implements Serializable {
.union(Utils.readPath(spark, inputPath + "/software", eu.dnetlib.dhp.schema.oaf.Result.class)); .union(Utils.readPath(spark, inputPath + "/software", eu.dnetlib.dhp.schema.oaf.Result.class));
result result
.joinWith(relation, result.col("id").equalTo(relation.col("target"))) .joinWith(relation, result.col("id").equalTo(relation.col("target")), "inner")
.groupByKey( .map((MapFunction<Tuple2<eu.dnetlib.dhp.schema.oaf.Result, Relation>, FunderResults>) value ->{
(MapFunction<Tuple2<eu.dnetlib.dhp.schema.oaf.Result, Relation>, String>) value -> value FunderResults res = (FunderResults) ResultMapper.map(value._1(), communityMap, false);
._2() res.setFunder_id(value._2().getSource().substring(3,15));
.getSource() return res;
.substring(3, 15), }, Encoders.bean(FunderResults.class))
Encoders.STRING()) .write()
.mapGroups( .partitionBy("funder_id")
(MapGroupsFunction<String, Tuple2<eu.dnetlib.dhp.schema.oaf.Result, Relation>, Tuple2<String, FunderResults>>) ( .mode(SaveMode.Overwrite)
s, it) -> { .json(outputPath);
Tuple2<eu.dnetlib.dhp.schema.oaf.Result, Relation> first = it.next();
FunderResults fr = new FunderResults();
List<Result> resultList = new ArrayList<>();
resultList.add(ResultMapper.map(first._1(), communityMap, true));
it.forEachRemaining(c -> {
resultList.add(ResultMapper.map(c._1(), communityMap, true));
});
fr.setResults(resultList);
return new Tuple2<>(s, fr);
}, Encoders.tuple(Encoders.STRING(), Encoders.bean(FunderResults.class)))
.foreach(t -> {
String funder = t._1();
spark
.createDataFrame(t._2.getResults(), Result.class)
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.json(outputPath + "/" + funder);
});
} }
} }

View File

@ -137,7 +137,7 @@
<master>yarn</master> <master>yarn</master>
<mode>cluster</mode> <mode>cluster</mode>
<name>Dump funder results </name> <name>Dump funder results </name>
<class>eu.dnetlib.dhp.oa.graph.dump.funderresults.SparkPrepareResultProject</class> <class>eu.dnetlib.dhp.oa.graph.dump.funderresults.SparkDumpFunderResults</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar> <jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts> <spark-opts>
--executor-memory=${sparkExecutorMemory} --executor-memory=${sparkExecutorMemory}