[Dump Funders] new code for the dump od products related to funders

This commit is contained in:
Miriam Baglioni 2022-03-24 10:56:30 +01:00
parent 9ba598a9b5
commit 5331dea71b
16 changed files with 344 additions and 259 deletions

View File

@ -80,11 +80,13 @@ public class SparkPrepareResultProject implements Serializable {
private static void prepareResultProjectList(SparkSession spark, String inputPath, String outputPath) {
Dataset<Relation> relation = Utils
.readPath(spark, inputPath + "/relation", Relation.class)
.filter((FilterFunction<Relation>) r -> !r.getDataInfo().getDeletedbyinference() &&
r.getRelClass().equalsIgnoreCase(ModelConstants.IS_PRODUCED_BY));
.readPath(spark, inputPath + "/relation", Relation.class)
.filter(
(FilterFunction<Relation>) r -> !r.getDataInfo().getDeletedbyinference() &&
r.getRelClass().equalsIgnoreCase(ModelConstants.IS_PRODUCED_BY));
Dataset<eu.dnetlib.dhp.schema.oaf.Project> projects = Utils.readPath(spark, inputPath + "/project", eu.dnetlib.dhp.schema.oaf.Project.class);
Dataset<eu.dnetlib.dhp.schema.oaf.Project> projects = Utils
.readPath(spark, inputPath + "/project", eu.dnetlib.dhp.schema.oaf.Project.class);
projects
.joinWith(relation, projects.col("id").equalTo(relation.col("target")), "inner")

View File

@ -1,32 +1,25 @@
package eu.dnetlib.dhp.oa.graph.dump.funderresults;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.ForeachFunction;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.api.java.function.MapGroupsFunction;
import org.apache.spark.sql.*;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.oa.graph.dump.Utils;
import eu.dnetlib.dhp.schema.dump.oaf.community.CommunityResult;
import eu.dnetlib.dhp.schema.dump.oaf.community.Project;
import scala.Tuple2;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Serializable;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
/**
* Splits the dumped results by funder and stores them in a folder named as the funder nsp (for all the funders, but the EC
@ -37,18 +30,18 @@ public class SparkDumpFunderResults implements Serializable {
public static void main(String[] args) throws Exception {
String jsonConfiguration = IOUtils
.toString(
SparkDumpFunderResults.class
.getResourceAsStream(
"/eu/dnetlib/dhp/oa/graph/dump/funder_result_parameters.json"));
.toString(
SparkDumpFunderResults.class
.getResourceAsStream(
"/eu/dnetlib/dhp/oa/graph/dump/funder_result_parameters.json"));
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
parser.parseArgument(args);
Boolean isSparkSessionManaged = Optional
.ofNullable(parser.get("isSparkSessionManaged"))
.map(Boolean::valueOf)
.orElse(Boolean.TRUE);
.ofNullable(parser.get("isSparkSessionManaged"))
.map(Boolean::valueOf)
.orElse(Boolean.TRUE);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
final String inputPath = parser.get("sourcePath");
@ -57,81 +50,67 @@ public class SparkDumpFunderResults implements Serializable {
final String outputPath = parser.get("outputPath");
log.info("outputPath: {}", outputPath);
final String graphPath = parser.get("graphPath");
log.info("relationPath: {}", graphPath);
SparkConf conf = new SparkConf();
runWithSparkSession(
conf,
isSparkSessionManaged,
spark -> {
Utils.removeOutputDir(spark, outputPath);
writeResultProjectList(spark, inputPath, outputPath, graphPath);
});
conf,
isSparkSessionManaged,
spark -> {
Utils.removeOutputDir(spark, outputPath);
writeResultProjectList(spark, inputPath, outputPath);
});
}
private static void writeResultProjectList(SparkSession spark, String inputPath, String outputPath,
String graphPath) {
private static void writeResultProjectList(SparkSession spark, String inputPath, String outputPath) {
Dataset<String> funderList = Utils
Dataset<CommunityResult> result = Utils
.readPath(spark, inputPath + "/publication", CommunityResult.class)
.union(Utils.readPath(spark, inputPath + "/dataset", CommunityResult.class))
.union(Utils.readPath(spark, inputPath + "/otherresearchproduct", CommunityResult.class))
.union(Utils.readPath(spark, inputPath + "/software", CommunityResult.class))
.flatMap((FlatMapFunction<CommunityResult, String>) cr ->
cr.getProjects().stream().map(p -> p.getFunder().getShortName()).collect(Collectors.toList()).iterator()
, Encoders.STRING())
.distinct();
.union(Utils.readPath(spark, inputPath + "/software", CommunityResult.class));
Dataset<CommunityResult> pubs;
Dataset<CommunityResult> result ;
pubs = Utils
.readPath(spark, inputPath + "/publication", CommunityResult.class);
Dataset<CommunityResult> dats = Utils.readPath(spark, inputPath + "/dataset", CommunityResult.class);
Dataset<CommunityResult> orp = Utils.readPath(spark, inputPath + "/otherresearchproduct", CommunityResult.class);
Dataset<CommunityResult> sw = Utils.readPath(spark, inputPath + "/software", CommunityResult.class);
result = pubs.union(dats).union(orp).union(sw);
funderList.foreach((ForeachFunction<String>) funder ->
getFunderResult(funder, inputPath, spark)
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.json(outputPath + "/" + funder)
);
List<String> funderList = result.flatMap((FlatMapFunction<CommunityResult, String>) cr ->
cr.getProjects().stream().map(p -> {
String fName = p.getFunder().getShortName();
if (fName.equalsIgnoreCase("ec")) {
fName += "_" + p.getFunder().getFundingStream();
}
return fName;
}).collect(Collectors.toList()).iterator()
, Encoders.STRING()).distinct().collectAsList();
funderList.forEach(funder -> {
dumpResults(funder, result, outputPath);
});
}
private static void dumpResults(String funder, Dataset<CommunityResult> results, String outputPath) {
@Nullable
private static Dataset<CommunityResult> getFunderResult(String funderName, String inputPath, SparkSession spark) {
Dataset<CommunityResult> pubs;
Dataset<CommunityResult> result ;
pubs = Utils
.readPath(spark, inputPath + "/publication", CommunityResult.class);
Dataset<CommunityResult> dats = Utils.readPath(spark, inputPath + "/dataset", CommunityResult.class);
Dataset<CommunityResult> orp = Utils.readPath(spark, inputPath + "/otherresearchproduct", CommunityResult.class);
Dataset<CommunityResult> sw = Utils.readPath(spark, inputPath + "/software", CommunityResult.class);
result = pubs.union(dats).union(orp).union(sw);
Dataset<CommunityResult> tmp = result.map((MapFunction<CommunityResult, CommunityResult>) cr -> {
if (!Optional.ofNullable(cr.getProjects()).isPresent()) {
results.map((MapFunction<CommunityResult, CommunityResult>) r -> {
if (!Optional.ofNullable(r.getProjects()).isPresent()) {
return null;
}
for (Project p : cr.getProjects()) {
if (p.getFunder().getShortName().equalsIgnoreCase(funderName)) {
return cr;
for (Project p : r.getProjects()) {
String fName = p.getFunder().getShortName();
if (fName.equalsIgnoreCase("ec")){
fName += "_" + p.getFunder().getFundingStream();
}
if (fName.equalsIgnoreCase(funder)) {
return r;
}
}
return null;
}, Encoders.bean(CommunityResult.class))
.filter(Objects::nonNull);
System.out.println(tmp.count());
return tmp;
.filter(Objects::nonNull)
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.json(outputPath + "/" + funder);
}
}

View File

@ -0,0 +1,119 @@
package eu.dnetlib.dhp.oa.graph.dump.funderresults;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.oa.graph.dump.Utils;
import eu.dnetlib.dhp.schema.dump.oaf.community.CommunityResult;
import eu.dnetlib.dhp.schema.dump.oaf.community.Project;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Serializable;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
/**
* Splits the dumped results by funder and stores them in a folder named as the funder nsp (for all the funders, but the EC
* for the EC it specifies also the fundingStream (FP7 or H2020)
*/
public class SparkDumpFunderResults2 implements Serializable {
private static final Logger log = LoggerFactory.getLogger(SparkDumpFunderResults2.class);
public static void main(String[] args) throws Exception {
String jsonConfiguration = IOUtils
.toString(
SparkDumpFunderResults2.class
.getResourceAsStream(
"/eu/dnetlib/dhp/oa/graph/dump/funder_result_parameters.json"));
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
parser.parseArgument(args);
Boolean isSparkSessionManaged = Optional
.ofNullable(parser.get("isSparkSessionManaged"))
.map(Boolean::valueOf)
.orElse(Boolean.TRUE);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
final String inputPath = parser.get("sourcePath");
log.info("inputPath: {}", inputPath);
final String outputPath = parser.get("outputPath");
log.info("outputPath: {}", outputPath);
final String graphPath = parser.get("graphPath");
log.info("relationPath: {}", graphPath);
SparkConf conf = new SparkConf();
runWithSparkSession(
conf,
isSparkSessionManaged,
spark -> {
Utils.removeOutputDir(spark, outputPath);
writeResultProjectList(spark, inputPath, outputPath);
});
}
private static void writeResultProjectList(SparkSession spark, String inputPath, String outputPath) {
Dataset<CommunityResult> result = Utils
.readPath(spark, inputPath + "/publication", CommunityResult.class)
.union(Utils.readPath(spark, inputPath + "/dataset", CommunityResult.class))
.union(Utils.readPath(spark, inputPath + "/otherresearchproduct", CommunityResult.class))
.union(Utils.readPath(spark, inputPath + "/software", CommunityResult.class));
List<String> funderList = result.flatMap((FlatMapFunction<CommunityResult, String>) cr ->
cr.getProjects().stream().map(p -> {
String fName = p.getFunder().getShortName();
if (fName.equalsIgnoreCase("ec")) {
fName += "_" + p.getFunder().getFundingStream();
}
return fName;
}).collect(Collectors.toList()).iterator()
, Encoders.STRING()).distinct().collectAsList();
funderList.forEach(funder -> {
dumpResults(funder, result, outputPath);
});
}
private static void dumpResults(String funder, Dataset<CommunityResult> results, String outputPath) {
results.map((MapFunction<CommunityResult, CommunityResult>) r -> {
if (!Optional.ofNullable(r.getProjects()).isPresent()) {
return null;
}
for (Project p : r.getProjects()) {
String fName = p.getFunder().getShortName();
if (fName.equalsIgnoreCase("ec")){
fName += "_" + p.getFunder().getFundingStream();
}
if (fName.equalsIgnoreCase(funder)) {
return r;
}
}
return null;
}, Encoders.bean(CommunityResult.class))
.filter(Objects::nonNull)
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.json(outputPath + "/" + funder);
}
}

View File

@ -7,12 +7,6 @@ import java.io.Serializable;
import java.util.Optional;
import java.util.stream.Collectors;
import eu.dnetlib.dhp.oa.graph.dump.Constants;
import eu.dnetlib.dhp.oa.graph.dump.DumpProducts;
import eu.dnetlib.dhp.oa.graph.dump.ResultMapper;
import eu.dnetlib.dhp.oa.graph.dump.community.CommunityMap;
import eu.dnetlib.dhp.oa.graph.dump.community.ResultProject;
import eu.dnetlib.dhp.schema.dump.oaf.community.CommunityResult;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FilterFunction;
@ -27,8 +21,14 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.oa.graph.dump.Constants;
import eu.dnetlib.dhp.oa.graph.dump.DumpProducts;
import eu.dnetlib.dhp.oa.graph.dump.ResultMapper;
import eu.dnetlib.dhp.oa.graph.dump.Utils;
import eu.dnetlib.dhp.oa.graph.dump.community.CommunityMap;
import eu.dnetlib.dhp.oa.graph.dump.community.ResultProject;
import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.dump.oaf.community.CommunityResult;
import eu.dnetlib.dhp.schema.oaf.Project;
import eu.dnetlib.dhp.schema.oaf.Relation;
import eu.dnetlib.dhp.schema.oaf.Result;
@ -82,36 +82,37 @@ public class SparkResultLinkedToProject implements Serializable {
isSparkSessionManaged,
spark -> {
Utils.removeOutputDir(spark, outputPath);
writeResultsLinkedToProjects(communityMapPath, spark, inputClazz, inputPath, outputPath, resultProjectsPath);
writeResultsLinkedToProjects(
communityMapPath, spark, inputClazz, inputPath, outputPath, resultProjectsPath);
});
}
private static <R extends Result> void writeResultsLinkedToProjects(String communityMapPath, SparkSession spark, Class<R> inputClazz,
private static <R extends Result> void writeResultsLinkedToProjects(String communityMapPath, SparkSession spark,
Class<R> inputClazz,
String inputPath, String outputPath, String resultProjectsPath) {
Dataset<R> results = Utils
.readPath(spark, inputPath, inputClazz)
.filter((FilterFunction<R>) r -> !r.getDataInfo().getDeletedbyinference() &&
!r.getDataInfo().getInvisible())
;
.filter(
(FilterFunction<R>) r -> !r.getDataInfo().getDeletedbyinference() &&
!r.getDataInfo().getInvisible());
Dataset<ResultProject> resultProjectDataset = Utils
.readPath(spark, resultProjectsPath , ResultProject.class)
;
.readPath(spark, resultProjectsPath, ResultProject.class);
CommunityMap communityMap = Utils.getCommunityMap(spark, communityMapPath);
results.joinWith(resultProjectDataset, results.col("id").equalTo(resultProjectDataset.col("resultId")))
.map((MapFunction<Tuple2<R, ResultProject>, CommunityResult>) t2 ->
{
CommunityResult cr = (CommunityResult) ResultMapper.map(t2._1(),
communityMap, Constants.DUMPTYPE.FUNDER.getType());
cr.setProjects(t2._2().getProjectsList());
return cr;
}
, Encoders.bean(CommunityResult.class) )
results
.joinWith(resultProjectDataset, results.col("id").equalTo(resultProjectDataset.col("resultId")))
.map((MapFunction<Tuple2<R, ResultProject>, CommunityResult>) t2 -> {
CommunityResult cr = (CommunityResult) ResultMapper
.map(
t2._1(),
communityMap, Constants.DUMPTYPE.FUNDER.getType());
cr.setProjects(t2._2().getProjectsList());
return cr;
}, Encoders.bean(CommunityResult.class))
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.json(outputPath);
}
}

View File

@ -1,3 +1,4 @@
package eu.dnetlib.dhp.oa.graph.hostedbymap;
import static eu.dnetlib.dhp.common.collection.DecompressTarGz.doExtract;
@ -27,90 +28,90 @@ import eu.dnetlib.dhp.oa.graph.hostedbymap.model.doaj.DOAJEntry;
public class ExtractAndMapDoajJson {
private static final Logger log = LoggerFactory.getLogger(ExtractAndMapDoajJson.class);
private static final Logger log = LoggerFactory.getLogger(ExtractAndMapDoajJson.class);
public static void main(final String[] args) throws Exception {
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
IOUtils
.toString(
Objects
.requireNonNull(
ExtractAndMapDoajJson.class
.getResourceAsStream(
"/eu/dnetlib/dhp/oa/graph/hostedbymap/download_json_parameters.json"))));
public static void main(final String[] args) throws Exception {
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
IOUtils
.toString(
Objects
.requireNonNull(
ExtractAndMapDoajJson.class
.getResourceAsStream(
"/eu/dnetlib/dhp/oa/graph/hostedbymap/download_json_parameters.json"))));
parser.parseArgument(args);
parser.parseArgument(args);
final String compressedInput = parser.get("compressedFile");
log.info("compressedInput {}", compressedInput);
final String compressedInput = parser.get("compressedFile");
log.info("compressedInput {}", compressedInput);
final String hdfsNameNode = parser.get("hdfsNameNode");
log.info("hdfsNameNode {}", hdfsNameNode);
final String hdfsNameNode = parser.get("hdfsNameNode");
log.info("hdfsNameNode {}", hdfsNameNode);
final String outputPath = parser.get("outputPath");
log.info("outputPath {}", outputPath);
final String outputPath = parser.get("outputPath");
log.info("outputPath {}", outputPath);
final String workingPath = parser.get("workingPath");
log.info("workingPath {}", workingPath);
final String workingPath = parser.get("workingPath");
log.info("workingPath {}", workingPath);
Configuration conf = new Configuration();
conf.set("fs.defaultFS", hdfsNameNode);
Configuration conf = new Configuration();
conf.set("fs.defaultFS", hdfsNameNode);
FileSystem fs = FileSystem.get(conf);
CompressionCodecFactory factory = new CompressionCodecFactory(conf);
CompressionCodec codec = factory.getCodecByClassName("org.apache.hadoop.io.compress.GzipCodec");
doExtract(fs, workingPath, compressedInput);
doMap(fs, workingPath, outputPath, codec);
FileSystem fs = FileSystem.get(conf);
CompressionCodecFactory factory = new CompressionCodecFactory(conf);
CompressionCodec codec = factory.getCodecByClassName("org.apache.hadoop.io.compress.GzipCodec");
doExtract(fs, workingPath, compressedInput);
doMap(fs, workingPath, outputPath, codec);
}
}
private static void doMap(FileSystem fs, String workingPath, String outputPath, CompressionCodec codec)
throws IOException {
RemoteIterator<LocatedFileStatus> fileStatusListIterator = fs
.listFiles(
new Path(workingPath), true);
private static void doMap(FileSystem fs, String workingPath, String outputPath, CompressionCodec codec)
throws IOException {
RemoteIterator<LocatedFileStatus> fileStatusListIterator = fs
.listFiles(
new Path(workingPath), true);
Path hdfsWritePath = new Path(outputPath);
if (fs.exists(hdfsWritePath)) {
fs.delete(hdfsWritePath, true);
Path hdfsWritePath = new Path(outputPath);
if (fs.exists(hdfsWritePath)) {
fs.delete(hdfsWritePath, true);
}
try (
}
try (
FSDataOutputStream out = fs
.create(hdfsWritePath);
PrintWriter writer = new PrintWriter(new BufferedOutputStream(out))) {
FSDataOutputStream out = fs
.create(hdfsWritePath);
PrintWriter writer = new PrintWriter(new BufferedOutputStream(out))) {
while (fileStatusListIterator.hasNext()) {
Path path = fileStatusListIterator.next().getPath();
if (!fs.isDirectory(path)) {
FSDataInputStream is = fs.open(path);
CompressionInputStream compressionInputStream = codec.createInputStream(is);
DOAJEntry[] doajEntries = new ObjectMapper().readValue(compressionInputStream, DOAJEntry[].class);
Arrays.stream(doajEntries).forEach(doaj -> {
try {
writer.println(new ObjectMapper().writeValueAsString(getDoajModel(doaj)));
} catch (JsonProcessingException e) {
e.printStackTrace();
}
});
}
while (fileStatusListIterator.hasNext()) {
Path path = fileStatusListIterator.next().getPath();
if (!fs.isDirectory(path)) {
FSDataInputStream is = fs.open(path);
CompressionInputStream compressionInputStream = codec.createInputStream(is);
DOAJEntry[] doajEntries = new ObjectMapper().readValue(compressionInputStream, DOAJEntry[].class);
Arrays.stream(doajEntries).forEach(doaj -> {
try {
writer.println(new ObjectMapper().writeValueAsString(getDoajModel(doaj)));
} catch (JsonProcessingException e) {
e.printStackTrace();
}
});
}
}
}
}
}
}
}
@NotNull
public static DOAJModel getDoajModel(DOAJEntry doaj) {
DOAJModel doajModel = new DOAJModel();
doajModel.setOaStart(doaj.getBibjson().getOa_start());
doajModel.setEissn(doaj.getBibjson().getEissn());
doajModel.setIssn(doaj.getBibjson().getPissn());
doajModel.setJournalTitle(doaj.getBibjson().getTitle());
doajModel.setReviewProcess(doaj.getBibjson().getEditorial().getReview_process());
return doajModel;
}
@NotNull
public static DOAJModel getDoajModel(DOAJEntry doaj) {
DOAJModel doajModel = new DOAJModel();
doajModel.setOaStart(doaj.getBibjson().getOa_start());
doajModel.setEissn(doaj.getBibjson().getEissn());
doajModel.setIssn(doaj.getBibjson().getPissn());
doajModel.setJournalTitle(doaj.getBibjson().getTitle());
doajModel.setReviewProcess(doaj.getBibjson().getEditorial().getReview_process());
return doajModel;
}
}
}

View File

@ -21,6 +21,6 @@
"paramName": "gp",
"paramLongName": "graphPath",
"paramDescription": "the relationPath",
"paramRequired": true
"paramRequired": false
}
]

View File

@ -107,29 +107,6 @@
<error to="Kill"/>
</action>
<!-- <action name="select_relations">-->
<!-- <spark xmlns="uri:oozie:spark-action:0.2">-->
<!-- <master>yarn</master>-->
<!-- <mode>cluster</mode>-->
<!-- <name>Dump funder results </name>-->
<!-- <class>eu.dnetlib.dhp.oa.graph.dump.funderresults.SparkSelectRelations</class>-->
<!-- <jar>dhp-graph-mapper-${projectVersion}.jar</jar>-->
<!-- <spark-opts>-->
<!-- &#45;&#45;executor-memory=${sparkExecutorMemory}-->
<!-- &#45;&#45;executor-cores=${sparkExecutorCores}-->
<!-- &#45;&#45;driver-memory=${sparkDriverMemory}-->
<!-- &#45;&#45;conf spark.extraListeners=${spark2ExtraListeners}-->
<!-- &#45;&#45;conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}-->
<!-- &#45;&#45;conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}-->
<!-- &#45;&#45;conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}-->
<!-- &#45;&#45;conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}-->
<!-- </spark-opts>-->
<!-- <arg>&#45;&#45;sourcePath</arg><arg>${sourcePath}/relation</arg>-->
<!-- <arg>&#45;&#45;outputPath</arg><arg>${workingDir}/preparedInfo</arg>-->
<!-- </spark>-->
<!-- <ok to="fork_result_linked_to_projects"/>-->
<!-- <error to="Kill"/>-->
<!-- </action>-->
<fork name="fork_result_linked_to_projects">
<path start="select_publication_linked_to_projects"/>
@ -244,34 +221,6 @@
<join name="join_link" to="dump_funder_results"/>
<!-- <action name="common_action_community_funder">-->
<!-- <sub-workflow>-->
<!-- <app-path>${wf:appPath()}/dump_common-->
<!-- </app-path>-->
<!-- <propagate-configuration/>-->
<!-- <configuration>-->
<!-- <property>-->
<!-- <name>sourcePath</name>-->
<!-- <value>${sourcePath}</value>-->
<!-- </property>-->
<!-- <property>-->
<!-- <name>selectedResults</name>-->
<!-- <value>${workingDir}/result</value>-->
<!-- </property>-->
<!-- <property>-->
<!-- <name>communityMapPath</name>-->
<!-- <value>${workingDir}/communityMap</value>-->
<!-- </property>-->
<!-- <property>-->
<!-- <name>outputPath</name>-->
<!-- <value>${workingDir}</value>-->
<!-- </property>-->
<!-- </configuration>-->
<!-- </sub-workflow>-->
<!-- <ok to="dump_funder_results" />-->
<!-- <error to="Kill" />-->
<!-- </action>-->
<action name="dump_funder_results">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
@ -291,7 +240,6 @@
</spark-opts>
<arg>--sourcePath</arg><arg>${workingDir}/result</arg>
<arg>--outputPath</arg><arg>${outputPath}</arg>
<arg>--graphPath</arg><arg>${sourcePath}</arg>
</spark>
<ok to="End"/>
<error to="Kill"/>

View File

@ -326,20 +326,20 @@ public class PrepareResultProjectJobTest {
void testMatchx() throws Exception {
final String sourcePath = getClass()
.getResource("/eu/dnetlib/dhp/oa/graph/dump/funderresource/match")
.getPath();
.getResource("/eu/dnetlib/dhp/oa/graph/dump/funderresource/match")
.getPath();
SparkPrepareResultProject.main(new String[]{
"-isSparkSessionManaged", Boolean.FALSE.toString(),
"-outputPath", workingDir.toString() + "/preparedInfo",
"-sourcePath", sourcePath
SparkPrepareResultProject.main(new String[] {
"-isSparkSessionManaged", Boolean.FALSE.toString(),
"-outputPath", workingDir.toString() + "/preparedInfo",
"-sourcePath", sourcePath
});
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
JavaRDD<ResultProject> tmp = sc
.textFile(workingDir.toString() + "/preparedInfo")
.map(item -> OBJECT_MAPPER.readValue(item, ResultProject.class));
.textFile(workingDir.toString() + "/preparedInfo")
.map(item -> OBJECT_MAPPER.readValue(item, ResultProject.class));
tmp.foreach(r -> System.out.println(OBJECT_MAPPER.writeValueAsString(r)));
}

View File

@ -6,7 +6,6 @@ import java.nio.file.Files;
import java.nio.file.Path;
import java.util.HashMap;
import eu.dnetlib.dhp.schema.dump.oaf.community.CommunityResult;
import org.apache.commons.io.FileUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
@ -23,6 +22,7 @@ import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.oa.graph.dump.funderresults.SparkResultLinkedToProject;
import eu.dnetlib.dhp.schema.dump.oaf.community.CommunityResult;
import eu.dnetlib.dhp.schema.oaf.Publication;
import eu.dnetlib.dhp.schema.oaf.Result;
@ -81,8 +81,8 @@ public class ResultLinkedToProjectTest {
.getPath();
final String communityMapPath = getClass()
.getResource("/eu/dnetlib/dhp/oa/graph/dump/funderresource/communityMapPath")
.getPath();
.getResource("/eu/dnetlib/dhp/oa/graph/dump/funderresource/communityMapPath")
.getPath();
SparkResultLinkedToProject.main(new String[] {
"-isSparkSessionManaged", Boolean.FALSE.toString(),
@ -90,7 +90,7 @@ public class ResultLinkedToProjectTest {
"-sourcePath", sourcePath,
"-resultTableName", "eu.dnetlib.dhp.schema.oaf.Publication",
"-graphPath", graphPath,
"-communityMapPath",communityMapPath
"-communityMapPath", communityMapPath
});
@ -112,12 +112,12 @@ public class ResultLinkedToProjectTest {
.getPath();
final String graphPath = getClass()
.getResource("/eu/dnetlib/dhp/oa/graph/dump/funderresource/preparedInfo")
.getPath();
.getResource("/eu/dnetlib/dhp/oa/graph/dump/funderresource/preparedInfo")
.getPath();
final String communityMapPath = getClass()
.getResource("/eu/dnetlib/dhp/oa/graph/dump/funderresource/communityMapPath")
.getPath();
.getResource("/eu/dnetlib/dhp/oa/graph/dump/funderresource/communityMapPath")
.getPath();
SparkResultLinkedToProject.main(new String[] {
"-isSparkSessionManaged", Boolean.FALSE.toString(),
@ -125,7 +125,7 @@ public class ResultLinkedToProjectTest {
"-sourcePath", sourcePath,
"-resultTableName", "eu.dnetlib.dhp.schema.oaf.Publication",
"-graphPath", graphPath,
"-communityMapPath", communityMapPath
"-communityMapPath", communityMapPath
});

View File

@ -5,10 +5,14 @@ import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
//import eu.dnetlib.dhp.oa.graph.dump.funderresults.SparkDumpFunderResults2;
//import eu.dnetlib.dhp.oa.graph.dump.funderresults.SparkGetFunderList;
import org.apache.commons.io.FileUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.ForeachFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SparkSession;
import org.junit.jupiter.api.AfterAll;
@ -68,14 +72,14 @@ public class SplitPerFunderTest {
void test1() throws Exception {
final String sourcePath = getClass()
.getResource("/eu/dnetlib/dhp/oa/graph/dump/funderresource/extendeddump")
.getResource("/eu/dnetlib/dhp/oa/graph/dump/funderresource/ext")
.getPath();
SparkDumpFunderResults.main(new String[] {
"-isSparkSessionManaged", Boolean.FALSE.toString(),
"-outputPath", workingDir.toString() + "/split",
"-sourcePath", sourcePath,
"-graphPath", sourcePath
"-sourcePath", sourcePath
});
@ -83,13 +87,13 @@ public class SplitPerFunderTest {
// FP7 3 and H2020 3
JavaRDD<CommunityResult> tmp = sc
.textFile(workingDir.toString() + "/split/EC")
.textFile(workingDir.toString() + "/split/EC_FP7")
.map(item -> OBJECT_MAPPER.readValue(item, CommunityResult.class));
org.apache.spark.sql.Dataset<CommunityResult> verificationDataset = spark
.createDataset(tmp.rdd(), Encoders.bean(CommunityResult.class));
Assertions.assertEquals(6, verificationDataset.count());
Assertions.assertEquals(3, verificationDataset.count());
Assertions
.assertEquals(
@ -132,10 +136,10 @@ public class SplitPerFunderTest {
Assertions.assertEquals(1, tmp.count());
// H2020 3
// tmp = sc
// .textFile(workingDir.toString() + "/split/EC_H2020")
// .map(item -> OBJECT_MAPPER.readValue(item, CommunityResult.class));
// Assertions.assertEquals(3, tmp.count());
tmp = sc
.textFile(workingDir.toString() + "/split/EC_H2020")
.map(item -> OBJECT_MAPPER.readValue(item, CommunityResult.class));
Assertions.assertEquals(3, tmp.count());
// MZOS 1
tmp = sc
@ -143,11 +147,9 @@ public class SplitPerFunderTest {
.map(item -> OBJECT_MAPPER.readValue(item, CommunityResult.class));
Assertions.assertEquals(1, tmp.count());
// CONICYT 0
tmp = sc
.textFile(workingDir.toString() + "/split/CONICYTF")
.map(item -> OBJECT_MAPPER.readValue(item, CommunityResult.class));
Assertions.assertEquals(0, tmp.count());
}
}

File diff suppressed because one or more lines are too long

View File

@ -0,0 +1,8 @@
NSF
CIHR
NWO
NHMRC
NIH
MZOS
SNSF
EC