[Funder Products Dump] new way to avoid using hive

This commit is contained in:
Miriam Baglioni 2022-06-21 17:52:27 +02:00
parent b94a791bc5
commit b98f904d48
25 changed files with 744 additions and 806 deletions

View File

@ -10,6 +10,7 @@ import java.util.stream.Collectors;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf; import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.api.java.function.MapGroupsFunction; import org.apache.spark.api.java.function.MapGroupsFunction;
import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Dataset;
@ -81,8 +82,9 @@ public class SparkPrepareResultProject implements Serializable {
Dataset<Relation> relation = Utils Dataset<Relation> relation = Utils
.readPath(spark, inputPath + "/relation", Relation.class) .readPath(spark, inputPath + "/relation", Relation.class)
.filter( .filter(
"dataInfo.deletedbyinference = false and lower(relClass) = '" (FilterFunction<Relation>) r -> !r.getDataInfo().getDeletedbyinference() &&
+ ModelConstants.IS_PRODUCED_BY.toLowerCase() + "'"); r.getRelClass().equalsIgnoreCase(ModelConstants.IS_PRODUCED_BY));
Dataset<eu.dnetlib.dhp.schema.oaf.Project> projects = Utils Dataset<eu.dnetlib.dhp.schema.oaf.Project> projects = Utils
.readPath(spark, inputPath + "/project", eu.dnetlib.dhp.schema.oaf.Project.class); .readPath(spark, inputPath + "/project", eu.dnetlib.dhp.schema.oaf.Project.class);

View File

@ -1,140 +1,119 @@
package eu.dnetlib.dhp.oa.graph.dump.funderresults; package eu.dnetlib.dhp.oa.graph.dump.funderresults;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import java.io.Serializable; import java.io.Serializable;
import java.util.List; import java.util.List;
import java.util.Objects; import java.util.Objects;
import java.util.Optional; import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf; import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.ForeachFunction;
import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.*; import org.apache.spark.sql.*;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.oa.graph.dump.Utils; import eu.dnetlib.dhp.oa.graph.dump.Utils;
import eu.dnetlib.dhp.schema.dump.oaf.community.CommunityResult; import eu.dnetlib.dhp.schema.dump.oaf.community.CommunityResult;
import eu.dnetlib.dhp.schema.dump.oaf.community.Funder;
import eu.dnetlib.dhp.schema.dump.oaf.community.Project; import eu.dnetlib.dhp.schema.dump.oaf.community.Project;
/** /**
* Splits the dumped results by funder and stores them in a folder named as the funder nsp (for all the funders, but the EC * Splits the dumped results by funder and stores them in a folder named as the funder nsp (for all the funders, but the EC
* for the EC it specifies also the fundingStream (FP7 or H2020) * for the EC it specifies also the fundingStream (FP7 or H2020)
*/ */
public class SparkDumpFunderResults implements Serializable { public class SparkDumpFunderResults implements Serializable {
private static final Logger log = LoggerFactory.getLogger(SparkDumpFunderResults.class); private static final Logger log = LoggerFactory.getLogger(SparkDumpFunderResults.class);
public static void main(String[] args) throws Exception { public static void main(String[] args) throws Exception {
String jsonConfiguration = IOUtils String jsonConfiguration = IOUtils
.toString( .toString(
SparkDumpFunderResults.class SparkDumpFunderResults.class
.getResourceAsStream( .getResourceAsStream(
"/eu/dnetlib/dhp/oa/graph/dump/funder_result_parameters.json")); "/eu/dnetlib/dhp/oa/graph/dump/funder_result_parameters.json"));
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
parser.parseArgument(args); parser.parseArgument(args);
Boolean isSparkSessionManaged = Optional Boolean isSparkSessionManaged = Optional
.ofNullable(parser.get("isSparkSessionManaged")) .ofNullable(parser.get("isSparkSessionManaged"))
.map(Boolean::valueOf) .map(Boolean::valueOf)
.orElse(Boolean.TRUE); .orElse(Boolean.TRUE);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged); log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
final String inputPath = parser.get("sourcePath"); final String inputPath = parser.get("sourcePath");
log.info("inputPath: {}", inputPath); log.info("inputPath: {}", inputPath);
final String outputPath = parser.get("outputPath"); final String outputPath = parser.get("outputPath");
log.info("outputPath: {}", outputPath); log.info("outputPath: {}", outputPath);
final String graphPath = parser.get("graphPath");
log.info("relationPath: {}", graphPath);
SparkConf conf = new SparkConf(); SparkConf conf = new SparkConf();
runWithSparkSession( runWithSparkSession(
conf, conf,
isSparkSessionManaged, isSparkSessionManaged,
spark -> { spark -> {
Utils.removeOutputDir(spark, outputPath); Utils.removeOutputDir(spark, outputPath);
writeResultProjectList(spark, inputPath, outputPath, graphPath); writeResultProjectList(spark, inputPath, outputPath);
}); });
} }
private static void writeResultProjectList(SparkSession spark, String inputPath, String outputPath) {
private static void writeResultProjectList(SparkSession spark, String inputPath, String outputPath,
String graphPath) {
Dataset<eu.dnetlib.dhp.schema.oaf.Project> project = Utils
.readPath(spark, graphPath + "/project", eu.dnetlib.dhp.schema.oaf.Project.class);
Dataset<CommunityResult> result = Utils Dataset<CommunityResult> result = Utils
.readPath(spark, inputPath + "/publication", CommunityResult.class) .readPath(spark, inputPath + "/publication", CommunityResult.class)
.union(Utils.readPath(spark, inputPath + "/dataset", CommunityResult.class)) .union(Utils.readPath(spark, inputPath + "/dataset", CommunityResult.class))
.union(Utils.readPath(spark, inputPath + "/orp", CommunityResult.class)) .union(Utils.readPath(spark, inputPath + "/otherresearchproduct", CommunityResult.class))
.union(Utils.readPath(spark, inputPath + "/software", CommunityResult.class)); .union(Utils.readPath(spark, inputPath + "/software", CommunityResult.class));
log.info("Number of result {}", result.count());
List<String> funderList = project Dataset<String> tmp = result
.select("id") .flatMap((FlatMapFunction<CommunityResult, String>) cr -> cr.getProjects().stream().map(p -> {
.map((MapFunction<Row, String>) value -> value.getString(0).substring(0, 15), Encoders.STRING()) return getFunderName(p);
.distinct() }).collect(Collectors.toList()).iterator(), Encoders.STRING())
.collectAsList(); .distinct();
List<String> funderList = tmp.collectAsList();
funderList.forEach(funder -> { funderList.forEach(funder -> {
String fundernsp = funder.substring(3); dumpResults(funder, result, outputPath);
String funderdump;
if (fundernsp.startsWith("corda")) {
funderdump = "EC_";
if (fundernsp.endsWith("h2020")) {
funderdump += "H2020";
} else {
funderdump += "FP7";
}
} else {
funderdump = fundernsp.substring(0, fundernsp.indexOf("_")).toUpperCase();
}
writeFunderResult(funder, result, outputPath, funderdump);
}); });
} }
@NotNull
private static void dumpResults(String nsp, Dataset<CommunityResult> results, String outputPath, private static String getFunderName(Project p) {
String funderName) { Optional<Funder> ofunder = Optional.ofNullable(p.getFunder());
if (ofunder.isPresent()) {
results.map((MapFunction<CommunityResult, CommunityResult>) r -> { String fName = ofunder.get().getShortName();
if (!Optional.ofNullable(r.getProjects()).isPresent()) { if (fName.equalsIgnoreCase("ec")) {
return null; fName += "_" + ofunder.get().getFundingStream();
} }
for (Project p : r.getProjects()) { return fName;
if (p.getId().startsWith(nsp)) { } else {
if (nsp.startsWith("40|irb")) { String fName = p.getId().substring(3, p.getId().indexOf("_")).toUpperCase();
if (p.getFunder().getShortName().equals(funderName)) if (fName.equalsIgnoreCase("ec")) {
return r; if (p.getId().contains("h2020")) {
else fName += "_H2020";
return null; } else {
} fName += "_FP7";
return r;
} }
} else if (fName.equalsIgnoreCase("conicytf")) {
fName = "CONICYT";
} else if (fName.equalsIgnoreCase("dfgf")) {
fName = "DFG";
} else if (fName.equalsIgnoreCase("tubitakf")) {
fName = "TUBITAK";
} else if (fName.equalsIgnoreCase("euenvagency")) {
fName = "EEA";
} }
return null; return fName;
}, Encoders.bean(CommunityResult.class)) }
.filter(Objects::nonNull)
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.json(outputPath + "/" + funderName);
} }
private static void dumpResults(String funder, Dataset<CommunityResult> results, String outputPath) {
private static void writeFunderResult(String funder, Dataset<CommunityResult> results, String outputPath, results.map((MapFunction<CommunityResult, CommunityResult>) r -> {
String funderDump) { if (!Optional.ofNullable(r.getProjects()).isPresent()) {
return null;
if (funder.startsWith("40|irb")) { }
dumpResults(funder, results, outputPath, "HRZZ"); for (Project p : r.getProjects()) {
dumpResults(funder, results, outputPath, "MZOS"); String fName = getFunderName(p);
} else if (fName.equalsIgnoreCase(funder)) {
dumpResults(funder, results, outputPath, funderDump); return r;
}
}
return null;
}, Encoders.bean(CommunityResult.class))
.filter(Objects::nonNull)
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.json(outputPath + "/" + funder);
} }
}
}

View File

@ -5,9 +5,12 @@ import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import java.io.Serializable; import java.io.Serializable;
import java.util.Optional; import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf; import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.api.java.function.MapGroupsFunction; import org.apache.spark.api.java.function.MapGroupsFunction;
import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Dataset;
@ -18,11 +21,18 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.oa.graph.dump.Constants;
import eu.dnetlib.dhp.oa.graph.dump.DumpProducts;
import eu.dnetlib.dhp.oa.graph.dump.ResultMapper;
import eu.dnetlib.dhp.oa.graph.dump.Utils; import eu.dnetlib.dhp.oa.graph.dump.Utils;
import eu.dnetlib.dhp.oa.graph.dump.community.CommunityMap;
import eu.dnetlib.dhp.oa.graph.dump.community.ResultProject;
import eu.dnetlib.dhp.schema.common.ModelConstants; import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.dump.oaf.community.CommunityResult;
import eu.dnetlib.dhp.schema.oaf.Project; import eu.dnetlib.dhp.schema.oaf.Project;
import eu.dnetlib.dhp.schema.oaf.Relation; import eu.dnetlib.dhp.schema.oaf.Relation;
import eu.dnetlib.dhp.schema.oaf.Result; import eu.dnetlib.dhp.schema.oaf.Result;
import scala.Tuple2;
/** /**
* Selects the results linked to projects. Only for these results the dump will be performed. * Selects the results linked to projects. Only for these results the dump will be performed.
@ -35,18 +45,18 @@ public class SparkResultLinkedToProject implements Serializable {
public static void main(String[] args) throws Exception { public static void main(String[] args) throws Exception {
String jsonConfiguration = IOUtils String jsonConfiguration = IOUtils
.toString( .toString(
SparkResultLinkedToProject.class SparkResultLinkedToProject.class
.getResourceAsStream( .getResourceAsStream(
"/eu/dnetlib/dhp/oa/graph/dump/input_parameters_link_prj.json")); "/eu/dnetlib/dhp/oa/graph/dump/input_parameters_link_prj.json"));
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
parser.parseArgument(args); parser.parseArgument(args);
Boolean isSparkSessionManaged = Optional Boolean isSparkSessionManaged = Optional
.ofNullable(parser.get("isSparkSessionManaged")) .ofNullable(parser.get("isSparkSessionManaged"))
.map(Boolean::valueOf) .map(Boolean::valueOf)
.orElse(Boolean.TRUE); .orElse(Boolean.TRUE);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged); log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
final String inputPath = parser.get("sourcePath"); final String inputPath = parser.get("sourcePath");
@ -58,59 +68,51 @@ public class SparkResultLinkedToProject implements Serializable {
final String resultClassName = parser.get("resultTableName"); final String resultClassName = parser.get("resultTableName");
log.info("resultTableName: {}", resultClassName); log.info("resultTableName: {}", resultClassName);
final String graphPath = parser.get("graphPath"); final String resultProjectsPath = parser.get("graphPath");
log.info("graphPath: {}", graphPath); log.info("graphPath: {}", resultProjectsPath);
String communityMapPath = parser.get("communityMapPath");
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
Class<? extends Result> inputClazz = (Class<? extends Result>) Class.forName(resultClassName); Class<? extends Result> inputClazz = (Class<? extends Result>) Class.forName(resultClassName);
SparkConf conf = new SparkConf(); SparkConf conf = new SparkConf();
runWithSparkSession( runWithSparkSession(
conf, conf,
isSparkSessionManaged, isSparkSessionManaged,
spark -> { spark -> {
Utils.removeOutputDir(spark, outputPath); Utils.removeOutputDir(spark, outputPath);
writeResultsLinkedToProjects(spark, inputClazz, inputPath, outputPath, graphPath); writeResultsLinkedToProjects(
}); communityMapPath, spark, inputClazz, inputPath, outputPath, resultProjectsPath);
});
} }
private static <R extends Result> void writeResultsLinkedToProjects(SparkSession spark, Class<R> inputClazz, private static <R extends Result> void writeResultsLinkedToProjects(String communityMapPath, SparkSession spark,
String inputPath, String outputPath, String graphPath) { Class<R> inputClazz,
String inputPath, String outputPath, String resultProjectsPath) {
Dataset<R> results = Utils Dataset<R> results = Utils
.readPath(spark, inputPath, inputClazz) .readPath(spark, inputPath, inputClazz)
.filter("dataInfo.deletedbyinference = false and datainfo.invisible = false"); .filter(
Dataset<Relation> relations = Utils (FilterFunction<R>) r -> !r.getDataInfo().getDeletedbyinference() &&
.readPath(spark, graphPath + "/relation", Relation.class) !r.getDataInfo().getInvisible());
.filter( Dataset<ResultProject> resultProjectDataset = Utils
"dataInfo.deletedbyinference = false and lower(relClass) = '" .readPath(spark, resultProjectsPath, ResultProject.class);
+ ModelConstants.IS_PRODUCED_BY.toLowerCase() + "'"); CommunityMap communityMap = Utils.getCommunityMap(spark, communityMapPath);
Dataset<Project> project = Utils.readPath(spark, graphPath + "/project", Project.class); results
.joinWith(resultProjectDataset, results.col("id").equalTo(resultProjectDataset.col("resultId")))
results.createOrReplaceTempView("result"); .map((MapFunction<Tuple2<R, ResultProject>, CommunityResult>) t2 -> {
relations.createOrReplaceTempView("relation"); CommunityResult cr = (CommunityResult) ResultMapper
project.createOrReplaceTempView("project"); .map(
t2._1(),
Dataset<R> tmp = spark communityMap, Constants.DUMPTYPE.FUNDER.getType());
.sql( cr.setProjects(t2._2().getProjectsList());
"Select res.* " + return cr;
"from relation rel " + }, Encoders.bean(CommunityResult.class))
"join result res " + .write()
"on rel.source = res.id " + .mode(SaveMode.Overwrite)
"join project p " + .option("compression", "gzip")
"on rel.target = p.id " + .json(outputPath);
"")
.as(Encoders.bean(inputClazz));
tmp
.groupByKey(
(MapFunction<R, String>) value -> value
.getId(),
Encoders.STRING())
.mapGroups((MapGroupsFunction<String, R, R>) (k, it) -> it.next(), Encoders.bean(inputClazz))
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.json(outputPath);
} }
} }

View File

@ -17,10 +17,10 @@
"paramDescription": "true if the spark session is managed, false otherwise", "paramDescription": "true if the spark session is managed, false otherwise",
"paramRequired": false "paramRequired": false
}, },
{ {
"paramName": "gp", "paramName": "gp",
"paramLongName": "graphPath", "paramLongName": "graphPath",
"paramDescription": "the relationPath", "paramDescription": "the relationPath",
"paramRequired": true "paramRequired": false
} }
] ]

View File

@ -0,0 +1,20 @@
[
{
"paramName":"s",
"paramLongName":"sourcePath",
"paramDescription": "the path of the sequencial file to read",
"paramRequired": true
},
{
"paramName": "out",
"paramLongName": "outputPath",
"paramDescription": "the path used to store temporary output files",
"paramRequired": true
},
{
"paramName": "ssm",
"paramLongName": "isSparkSessionManaged",
"paramDescription": "true if the spark session is managed, false otherwise",
"paramRequired": false
}
]

View File

@ -28,6 +28,12 @@
"paramLongName":"graphPath", "paramLongName":"graphPath",
"paramDescription": "the path to the relations", "paramDescription": "the path to the relations",
"paramRequired": true "paramRequired": true
},
{
"paramName":"cmp",
"paramLongName":"communityMapPath",
"paramDescription": "the path to the relations",
"paramRequired": true
} }
] ]

View File

@ -1,30 +0,0 @@
<configuration>
<property>
<name>jobTracker</name>
<value>yarnRM</value>
</property>
<property>
<name>nameNode</name>
<value>hdfs://nameservice1</value>
</property>
<property>
<name>oozie.use.system.libpath</name>
<value>true</value>
</property>
<property>
<name>hiveMetastoreUris</name>
<value>thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083</value>
</property>
<property>
<name>hiveJdbcUrl</name>
<value>jdbc:hive2://iis-cdh5-test-m3.ocean.icm.edu.pl:10000</value>
</property>
<property>
<name>hiveDbName</name>
<value>openaire</value>
</property>
<property>
<name>oozie.launcher.mapreduce.user.classpath.first</name>
<value>true</value>
</property>
</configuration>

View File

@ -1,347 +0,0 @@
<workflow-app name="sub_dump_community_funder_results" xmlns="uri:oozie:workflow:0.5">
<parameters>
<property>
<name>sourcePath</name>
<description>the source path</description>
</property>
<property>
<name>outputPath</name>
<description>the output path</description>
</property>
<property>
<name>communityMapPath</name>
<description>the path to the community map</description>
</property>
<property>
<name>selectedResults</name>
<description>the path the the possible subset ot results to be dumped</description>
</property>
<property>
<name>hiveDbName</name>
<description>the target hive database name</description>
</property>
<property>
<name>hiveJdbcUrl</name>
<description>hive server jdbc url</description>
</property>
<property>
<name>hiveMetastoreUris</name>
<description>hive server metastore URIs</description>
</property>
<property>
<name>sparkDriverMemory</name>
<description>memory for driver process</description>
</property>
<property>
<name>sparkExecutorMemory</name>
<description>memory for individual executor</description>
</property>
<property>
<name>sparkExecutorCores</name>
<description>number of cores used by single executor</description>
</property>
<property>
<name>oozieActionShareLibForSpark2</name>
<description>oozie action sharelib for spark 2.*</description>
</property>
<property>
<name>spark2ExtraListeners</name>
<value>com.cloudera.spark.lineage.NavigatorAppListener</value>
<description>spark 2.* extra listeners classname</description>
</property>
<property>
<name>spark2SqlQueryExecutionListeners</name>
<value>com.cloudera.spark.lineage.NavigatorQueryListener</value>
<description>spark 2.* sql query execution listeners classname</description>
</property>
<property>
<name>spark2YarnHistoryServerAddress</name>
<description>spark 2.* yarn history server address</description>
</property>
<property>
<name>spark2EventLogDir</name>
<description>spark 2.* event log dir location</description>
</property>
</parameters>
<global>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<configuration>
<property>
<name>mapreduce.job.queuename</name>
<value>${queueName}</value>
</property>
<property>
<name>oozie.launcher.mapred.job.queue.name</name>
<value>${oozieLauncherQueueName}</value>
</property>
<property>
<name>oozie.action.sharelib.for.spark</name>
<value>${oozieActionShareLibForSpark2}</value>
</property>
</configuration>
</global>
<start to="fork_dump"/>
<kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<fork name="fork_dump">
<path start="dump_publication"/>
<path start="dump_dataset"/>
<path start="dump_orp"/>
<path start="dump_software"/>
</fork>
<action name="dump_publication">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Dump table publication for community/funder related products</name>
<class>eu.dnetlib.dhp.oa.graph.dump.community.SparkDumpCommunityProducts</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
</spark-opts>
<arg>--sourcePath</arg><arg>${selectedResults}/publication</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Publication</arg>
<arg>--outputPath</arg><arg>${workingDir}/dump/publication</arg>
<arg>--communityMapPath</arg><arg>${communityMapPath}</arg>
<arg>--dumpType</arg><arg>${dumpType}</arg>
</spark>
<ok to="join_dump"/>
<error to="Kill"/>
</action>
<action name="dump_dataset">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Dump table dataset for community/funder related products</name>
<class>eu.dnetlib.dhp.oa.graph.dump.community.SparkDumpCommunityProducts</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
</spark-opts>
<arg>--sourcePath</arg><arg>${selectedResults}/dataset</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Dataset</arg>
<arg>--outputPath</arg><arg>${workingDir}/dump/dataset</arg>
<arg>--communityMapPath</arg><arg>${communityMapPath}</arg>
</spark>
<ok to="join_dump"/>
<error to="Kill"/>
</action>
<action name="dump_orp">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Dump table ORP for community related products</name>
<class>eu.dnetlib.dhp.oa.graph.dump.community.SparkDumpCommunityProducts</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
</spark-opts>
<arg>--sourcePath</arg><arg>${selectedResults}/otherresearchproduct</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.OtherResearchProduct</arg>
<arg>--outputPath</arg><arg>${workingDir}/dump/otherresearchproduct</arg>
<arg>--communityMapPath</arg><arg>${communityMapPath}</arg>
</spark>
<ok to="join_dump"/>
<error to="Kill"/>
</action>
<action name="dump_software">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Dump table software for community related products</name>
<class>eu.dnetlib.dhp.oa.graph.dump.community.SparkDumpCommunityProducts</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
</spark-opts>
<arg>--sourcePath</arg><arg>${selectedResults}/software</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Software</arg>
<arg>--outputPath</arg><arg>${workingDir}/dump/software</arg>
<arg>--communityMapPath</arg><arg>${communityMapPath}</arg>
</spark>
<ok to="join_dump"/>
<error to="Kill"/>
</action>
<join name="join_dump" to="prepareResultProject"/>
<action name="prepareResultProject">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Prepare association result subset of project info</name>
<class>eu.dnetlib.dhp.oa.graph.dump.community.SparkPrepareResultProject</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
</spark-opts>
<arg>--sourcePath</arg><arg>${sourcePath}</arg>
<arg>--outputPath</arg><arg>${workingDir}/preparedInfo</arg>
</spark>
<ok to="fork_extendWithProject"/>
<error to="Kill"/>
</action>
<fork name="fork_extendWithProject">
<path start="extend_publication"/>
<path start="extend_dataset"/>
<path start="extend_orp"/>
<path start="extend_software"/>
</fork>
<action name="extend_publication">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Extend dumped publications with information about project</name>
<class>eu.dnetlib.dhp.oa.graph.dump.community.SparkUpdateProjectInfo</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
</spark-opts>
<arg>--sourcePath</arg><arg>${workingDir}/dump/publication</arg>
<arg>--outputPath</arg><arg>${outputPath}/ext/publication</arg>
<arg>--preparedInfoPath</arg><arg>${workingDir}/preparedInfo</arg>
</spark>
<ok to="join_extend"/>
<error to="Kill"/>
</action>
<action name="extend_dataset">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Extend dumped dataset with information about project</name>
<class>eu.dnetlib.dhp.oa.graph.dump.community.SparkUpdateProjectInfo</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
</spark-opts>
<arg>--sourcePath</arg><arg>${workingDir}/dump/dataset</arg>
<arg>--outputPath</arg><arg>${outputPath}/ext/dataset</arg>
<arg>--preparedInfoPath</arg><arg>${workingDir}/preparedInfo</arg>
</spark>
<ok to="join_extend"/>
<error to="Kill"/>
</action>
<action name="extend_orp">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Extend dumped ORP with information about project</name>
<class>eu.dnetlib.dhp.oa.graph.dump.community.SparkUpdateProjectInfo</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
</spark-opts>
<arg>--sourcePath</arg><arg>${workingDir}/dump/otherresearchproduct</arg>
<arg>--outputPath</arg><arg>${outputPath}/ext/orp</arg>
<arg>--preparedInfoPath</arg><arg>${workingDir}/preparedInfo</arg>
</spark>
<ok to="join_extend"/>
<error to="Kill"/>
</action>
<action name="extend_software">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Extend dumped software with information about project</name>
<class>eu.dnetlib.dhp.oa.graph.dump.community.SparkUpdateProjectInfo</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
</spark-opts>
<arg>--sourcePath</arg><arg>${workingDir}/dump/software</arg>
<arg>--outputPath</arg><arg>${outputPath}/ext/software</arg>
<arg>--preparedInfoPath</arg><arg>${workingDir}/preparedInfo</arg>
</spark>
<ok to="join_extend"/>
<error to="Kill"/>
</action>
<join name="join_extend" to="End"/>
<end name="End"/>
</workflow-app>

View File

@ -1,2 +0,0 @@
## This is a classpath-based import file (this header is required)
dump_common classpath eu/dnetlib/dhp/oa/graph/dump/wf/subworkflows/commoncommunityfunder/oozie_app

View File

@ -77,42 +77,259 @@
</configuration> </configuration>
</global> </global>
<start to="common_action_community_funder"/> <start to="fork_dump"/>
<kill name="Kill"> <kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message> <message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill> </kill>
<action name="common_action_community_funder"> <fork name="fork_dump">
<sub-workflow> <path start="dump_publication"/>
<app-path>${wf:appPath()}/dump_common <path start="dump_dataset"/>
</app-path> <path start="dump_orp"/>
<propagate-configuration/> <path start="dump_software"/>
<configuration> </fork>
<property>
<name>sourcePath</name> <action name="dump_publication">
<value>${sourcePath}</value> <spark xmlns="uri:oozie:spark-action:0.2">
</property> <master>yarn</master>
<property> <mode>cluster</mode>
<name>selectedResults</name> <name>Dump table publication for community/funder related products</name>
<value>${sourcePath}</value> <class>eu.dnetlib.dhp.oa.graph.dump.community.SparkDumpCommunityProducts</class>
</property> <jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<property> <spark-opts>
<name>communityMapPath</name> --executor-memory=${sparkExecutorMemory}
<value>${workingDir}/communityMap</value> --executor-cores=${sparkExecutorCores}
</property> --driver-memory=${sparkDriverMemory}
<property> --conf spark.extraListeners=${spark2ExtraListeners}
<name>outputPath</name> --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
<value>${workingDir}</value> --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
</property> --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
</configuration> --conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
</sub-workflow> </spark-opts>
<ok to="splitForCommunities" /> <arg>--sourcePath</arg><arg>${selectedResults}/publication</arg>
<error to="Kill" /> <arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Publication</arg>
<arg>--outputPath</arg><arg>${workingDir}/dump/publication</arg>
<arg>--communityMapPath</arg><arg>${communityMapPath}</arg>
<arg>--dumpType</arg><arg>${dumpType}</arg>
</spark>
<ok to="join_dump"/>
<error to="Kill"/>
</action>
<action name="dump_dataset">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Dump table dataset for community/funder related products</name>
<class>eu.dnetlib.dhp.oa.graph.dump.community.SparkDumpCommunityProducts</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
</spark-opts>
<arg>--sourcePath</arg><arg>${selectedResults}/dataset</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Dataset</arg>
<arg>--outputPath</arg><arg>${workingDir}/dump/dataset</arg>
<arg>--communityMapPath</arg><arg>${communityMapPath}</arg>
</spark>
<ok to="join_dump"/>
<error to="Kill"/>
</action>
<action name="dump_orp">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Dump table ORP for community related products</name>
<class>eu.dnetlib.dhp.oa.graph.dump.community.SparkDumpCommunityProducts</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
</spark-opts>
<arg>--sourcePath</arg><arg>${selectedResults}/otherresearchproduct</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.OtherResearchProduct</arg>
<arg>--outputPath</arg><arg>${workingDir}/dump/otherresearchproduct</arg>
<arg>--communityMapPath</arg><arg>${communityMapPath}</arg>
</spark>
<ok to="join_dump"/>
<error to="Kill"/>
</action>
<action name="dump_software">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Dump table software for community related products</name>
<class>eu.dnetlib.dhp.oa.graph.dump.community.SparkDumpCommunityProducts</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
</spark-opts>
<arg>--sourcePath</arg><arg>${selectedResults}/software</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Software</arg>
<arg>--outputPath</arg><arg>${workingDir}/dump/software</arg>
<arg>--communityMapPath</arg><arg>${communityMapPath}</arg>
</spark>
<ok to="join_dump"/>
<error to="Kill"/>
</action>
<join name="join_dump" to="prepareResultProject"/>
<action name="prepareResultProject">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Prepare association result subset of project info</name>
<class>eu.dnetlib.dhp.oa.graph.dump.community.SparkPrepareResultProject</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
</spark-opts>
<arg>--sourcePath</arg><arg>${sourcePath}</arg>
<arg>--outputPath</arg><arg>${workingDir}/preparedInfo</arg>
</spark>
<ok to="fork_extendWithProject"/>
<error to="Kill"/>
</action>
<fork name="fork_extendWithProject">
<path start="extend_publication"/>
<path start="extend_dataset"/>
<path start="extend_orp"/>
<path start="extend_software"/>
</fork>
<action name="extend_publication">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Extend dumped publications with information about project</name>
<class>eu.dnetlib.dhp.oa.graph.dump.community.SparkUpdateProjectInfo</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
</spark-opts>
<arg>--sourcePath</arg><arg>${workingDir}/dump/publication</arg>
<arg>--outputPath</arg><arg>${outputPath}/ext/publication</arg>
<arg>--preparedInfoPath</arg><arg>${workingDir}/preparedInfo</arg>
</spark>
<ok to="join_extend"/>
<error to="Kill"/>
</action>
<action name="extend_dataset">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Extend dumped dataset with information about project</name>
<class>eu.dnetlib.dhp.oa.graph.dump.community.SparkUpdateProjectInfo</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
</spark-opts>
<arg>--sourcePath</arg><arg>${workingDir}/dump/dataset</arg>
<arg>--outputPath</arg><arg>${outputPath}/ext/dataset</arg>
<arg>--preparedInfoPath</arg><arg>${workingDir}/preparedInfo</arg>
</spark>
<ok to="join_extend"/>
<error to="Kill"/>
</action>
<action name="extend_orp">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Extend dumped ORP with information about project</name>
<class>eu.dnetlib.dhp.oa.graph.dump.community.SparkUpdateProjectInfo</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
</spark-opts>
<arg>--sourcePath</arg><arg>${workingDir}/dump/otherresearchproduct</arg>
<arg>--outputPath</arg><arg>${outputPath}/ext/orp</arg>
<arg>--preparedInfoPath</arg><arg>${workingDir}/preparedInfo</arg>
</spark>
<ok to="join_extend"/>
<error to="Kill"/>
</action>
<action name="extend_software">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Extend dumped software with information about project</name>
<class>eu.dnetlib.dhp.oa.graph.dump.community.SparkUpdateProjectInfo</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
</spark-opts>
<arg>--sourcePath</arg><arg>${workingDir}/dump/software</arg>
<arg>--outputPath</arg><arg>${outputPath}/ext/software</arg>
<arg>--preparedInfoPath</arg><arg>${workingDir}/preparedInfo</arg>
</spark>
<ok to="join_extend"/>
<error to="Kill"/>
</action> </action>
<join name="join_extend" to="splitForCommunities"/>
<action name="splitForCommunities"> <action name="splitForCommunities">
<spark xmlns="uri:oozie:spark-action:0.2"> <spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master> <master>yarn</master>

View File

@ -298,6 +298,7 @@
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir} --conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
--conf spark.sql.shuffle.partitions=3840
</spark-opts> </spark-opts>
<arg>--sourcePath</arg><arg>${sourcePath}</arg> <arg>--sourcePath</arg><arg>${sourcePath}</arg>
<arg>--outputPath</arg><arg>${workingDir}/validrelation</arg> <arg>--outputPath</arg><arg>${workingDir}/validrelation</arg>

View File

@ -1,2 +0,0 @@
## This is a classpath-based import file (this header is required)
dump_common classpath eu/dnetlib/dhp/oa/graph/dump/wf/subworkflows/commoncommunityfunder/oozie_app

View File

@ -77,12 +77,36 @@
</configuration> </configuration>
</global> </global>
<start to="fork_result_linked_to_projects"/> <start to="prepareResultProject"/>
<kill name="Kill"> <kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message> <message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill> </kill>
<action name="prepareResultProject">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Prepare association result subset of project info</name>
<class>eu.dnetlib.dhp.oa.graph.dump.community.SparkPrepareResultProject</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
</spark-opts>
<arg>--sourcePath</arg><arg>${sourcePath}</arg>
<arg>--outputPath</arg><arg>${workingDir}/preparedInfo</arg>
</spark>
<ok to="fork_result_linked_to_projects"/>
<error to="Kill"/>
</action>
<fork name="fork_result_linked_to_projects"> <fork name="fork_result_linked_to_projects">
<path start="select_publication_linked_to_projects"/> <path start="select_publication_linked_to_projects"/>
@ -111,7 +135,8 @@
<arg>--sourcePath</arg><arg>${sourcePath}/publication</arg> <arg>--sourcePath</arg><arg>${sourcePath}/publication</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Publication</arg> <arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Publication</arg>
<arg>--outputPath</arg><arg>${workingDir}/result/publication</arg> <arg>--outputPath</arg><arg>${workingDir}/result/publication</arg>
<arg>--graphPath</arg><arg>${sourcePath}</arg> <arg>--graphPath</arg><arg>${workingDir}/preparedInfo</arg>
<arg>--communityMapPath</arg><arg>${communityMapPath}</arg>
</spark> </spark>
<ok to="join_link"/> <ok to="join_link"/>
<error to="Kill"/> <error to="Kill"/>
@ -137,7 +162,8 @@
<arg>--sourcePath</arg><arg>${sourcePath}/dataset</arg> <arg>--sourcePath</arg><arg>${sourcePath}/dataset</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Dataset</arg> <arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Dataset</arg>
<arg>--outputPath</arg><arg>${workingDir}/result/dataset</arg> <arg>--outputPath</arg><arg>${workingDir}/result/dataset</arg>
<arg>--graphPath</arg><arg>${sourcePath}</arg> <arg>--graphPath</arg><arg>${workingDir}/preparedInfo</arg>
<arg>--communityMapPath</arg><arg>${communityMapPath}</arg>
</spark> </spark>
<ok to="join_link"/> <ok to="join_link"/>
<error to="Kill"/> <error to="Kill"/>
@ -163,7 +189,8 @@
<arg>--sourcePath</arg><arg>${sourcePath}/otherresearchproduct</arg> <arg>--sourcePath</arg><arg>${sourcePath}/otherresearchproduct</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.OtherResearchProduct</arg> <arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.OtherResearchProduct</arg>
<arg>--outputPath</arg><arg>${workingDir}/result/otherresearchproduct</arg> <arg>--outputPath</arg><arg>${workingDir}/result/otherresearchproduct</arg>
<arg>--graphPath</arg><arg>${sourcePath}</arg> <arg>--graphPath</arg><arg>${workingDir}/preparedInfo</arg>
<arg>--communityMapPath</arg><arg>${communityMapPath}</arg>
</spark> </spark>
<ok to="join_link"/> <ok to="join_link"/>
<error to="Kill"/> <error to="Kill"/>
@ -189,41 +216,14 @@
<arg>--sourcePath</arg><arg>${sourcePath}/software</arg> <arg>--sourcePath</arg><arg>${sourcePath}/software</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Software</arg> <arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Software</arg>
<arg>--outputPath</arg><arg>${workingDir}/result/software</arg> <arg>--outputPath</arg><arg>${workingDir}/result/software</arg>
<arg>--graphPath</arg><arg>${sourcePath}</arg> <arg>--graphPath</arg><arg>${workingDir}/preparedInfo</arg>
<arg>--communityMapPath</arg><arg>${communityMapPath}</arg>
</spark> </spark>
<ok to="join_link"/> <ok to="join_link"/>
<error to="Kill"/> <error to="Kill"/>
</action> </action>
<join name="join_link" to="common_action_community_funder"/> <join name="join_link" to="dump_funder_results"/>
<action name="common_action_community_funder">
<sub-workflow>
<app-path>${wf:appPath()}/dump_common
</app-path>
<propagate-configuration/>
<configuration>
<property>
<name>sourcePath</name>
<value>${sourcePath}</value>
</property>
<property>
<name>selectedResults</name>
<value>${workingDir}/result</value>
</property>
<property>
<name>communityMapPath</name>
<value>${workingDir}/communityMap</value>
</property>
<property>
<name>outputPath</name>
<value>${workingDir}</value>
</property>
</configuration>
</sub-workflow>
<ok to="dump_funder_results" />
<error to="Kill" />
</action>
<action name="dump_funder_results"> <action name="dump_funder_results">
<spark xmlns="uri:oozie:spark-action:0.2"> <spark xmlns="uri:oozie:spark-action:0.2">
@ -242,9 +242,8 @@
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir} --conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
</spark-opts> </spark-opts>
<arg>--sourcePath</arg><arg>${workingDir}/ext</arg> <arg>--sourcePath</arg><arg>${workingDir}/result</arg>
<arg>--outputPath</arg><arg>${outputPath}</arg> <arg>--outputPath</arg><arg>${outputPath}</arg>
<arg>--graphPath</arg><arg>${sourcePath}</arg>
</spark> </spark>
<ok to="End"/> <ok to="End"/>
<error to="Kill"/> <error to="Kill"/>

View File

@ -35,12 +35,12 @@ public class PrepareResultProjectJobTest {
private static Path workingDir; private static Path workingDir;
private static final Logger log = LoggerFactory private static final Logger log = LoggerFactory
.getLogger(eu.dnetlib.dhp.oa.graph.dump.PrepareResultProjectJobTest.class); .getLogger(eu.dnetlib.dhp.oa.graph.dump.PrepareResultProjectJobTest.class);
@BeforeAll @BeforeAll
public static void beforeAll() throws IOException { public static void beforeAll() throws IOException {
workingDir = Files workingDir = Files
.createTempDirectory(eu.dnetlib.dhp.oa.graph.dump.PrepareResultProjectJobTest.class.getSimpleName()); .createTempDirectory(eu.dnetlib.dhp.oa.graph.dump.PrepareResultProjectJobTest.class.getSimpleName());
log.info("using work dir {}", workingDir); log.info("using work dir {}", workingDir);
SparkConf conf = new SparkConf(); SparkConf conf = new SparkConf();
@ -54,10 +54,10 @@ public class PrepareResultProjectJobTest {
conf.set("hive.metastore.warehouse.dir", workingDir.resolve("warehouse").toString()); conf.set("hive.metastore.warehouse.dir", workingDir.resolve("warehouse").toString());
spark = SparkSession spark = SparkSession
.builder() .builder()
.appName(eu.dnetlib.dhp.oa.graph.dump.PrepareResultProjectJobTest.class.getSimpleName()) .appName(eu.dnetlib.dhp.oa.graph.dump.PrepareResultProjectJobTest.class.getSimpleName())
.config(conf) .config(conf)
.getOrCreate(); .getOrCreate();
} }
@AfterAll @AfterAll
@ -70,23 +70,23 @@ public class PrepareResultProjectJobTest {
void testNoMatch() throws Exception { void testNoMatch() throws Exception {
final String sourcePath = getClass() final String sourcePath = getClass()
.getResource("/eu/dnetlib/dhp/oa/graph/dump/resultProject/no_match") .getResource("/eu/dnetlib/dhp/oa/graph/dump/resultProject/no_match")
.getPath(); .getPath();
SparkPrepareResultProject.main(new String[] { SparkPrepareResultProject.main(new String[] {
"-isSparkSessionManaged", Boolean.FALSE.toString(), "-isSparkSessionManaged", Boolean.FALSE.toString(),
"-outputPath", workingDir.toString() + "/preparedInfo", "-outputPath", workingDir.toString() + "/preparedInfo",
"-sourcePath", sourcePath "-sourcePath", sourcePath
}); });
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
JavaRDD<ResultProject> tmp = sc JavaRDD<ResultProject> tmp = sc
.textFile(workingDir.toString() + "/preparedInfo") .textFile(workingDir.toString() + "/preparedInfo")
.map(item -> OBJECT_MAPPER.readValue(item, ResultProject.class)); .map(item -> OBJECT_MAPPER.readValue(item, ResultProject.class));
org.apache.spark.sql.Dataset<ResultProject> verificationDataset = spark org.apache.spark.sql.Dataset<ResultProject> verificationDataset = spark
.createDataset(tmp.rdd(), Encoders.bean(ResultProject.class)); .createDataset(tmp.rdd(), Encoders.bean(ResultProject.class));
assertEquals(0, verificationDataset.count()); assertEquals(0, verificationDataset.count());
@ -96,37 +96,37 @@ public class PrepareResultProjectJobTest {
void testMatchOne() throws Exception { void testMatchOne() throws Exception {
final String sourcePath = getClass() final String sourcePath = getClass()
.getResource("/eu/dnetlib/dhp/oa/graph/dump/resultProject/match_one") .getResource("/eu/dnetlib/dhp/oa/graph/dump/resultProject/match_one")
.getPath(); .getPath();
SparkPrepareResultProject.main(new String[] { SparkPrepareResultProject.main(new String[] {
"-isSparkSessionManaged", Boolean.FALSE.toString(), "-isSparkSessionManaged", Boolean.FALSE.toString(),
"-outputPath", workingDir.toString() + "/preparedInfo", "-outputPath", workingDir.toString() + "/preparedInfo",
"-sourcePath", sourcePath "-sourcePath", sourcePath
}); });
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
JavaRDD<ResultProject> tmp = sc JavaRDD<ResultProject> tmp = sc
.textFile(workingDir.toString() + "/preparedInfo") .textFile(workingDir.toString() + "/preparedInfo")
.map(item -> OBJECT_MAPPER.readValue(item, ResultProject.class)); .map(item -> OBJECT_MAPPER.readValue(item, ResultProject.class));
org.apache.spark.sql.Dataset<ResultProject> verificationDataset = spark org.apache.spark.sql.Dataset<ResultProject> verificationDataset = spark
.createDataset(tmp.rdd(), Encoders.bean(ResultProject.class)); .createDataset(tmp.rdd(), Encoders.bean(ResultProject.class));
assertEquals(1, verificationDataset.count()); assertEquals(1, verificationDataset.count());
assertEquals( assertEquals(
1, 1,
verificationDataset.filter("resultId = '50|dedup_wf_001::e4805d005bfab0cd39a1642cbf477fdb'").count()); verificationDataset.filter("resultId = '50|dedup_wf_001::e4805d005bfab0cd39a1642cbf477fdb'").count());
verificationDataset.createOrReplaceTempView("table"); verificationDataset.createOrReplaceTempView("table");
Dataset<Row> check = spark Dataset<Row> check = spark
.sql( .sql(
"Select projList.provenance.provenance " + "Select projList.provenance.provenance " +
"from table " + "from table " +
"lateral view explode (projectsList) pl as projList"); "lateral view explode (projectsList) pl as projList");
assertEquals(1, check.filter("provenance = 'sysimport:crosswalk:entityregistry'").count()); assertEquals(1, check.filter("provenance = 'sysimport:crosswalk:entityregistry'").count());
@ -138,88 +138,88 @@ public class PrepareResultProjectJobTest {
void testMatch() throws Exception { void testMatch() throws Exception {
final String sourcePath = getClass() final String sourcePath = getClass()
.getResource("/eu/dnetlib/dhp/oa/graph/dump/resultProject/match") .getResource("/eu/dnetlib/dhp/oa/graph/dump/resultProject/match")
.getPath(); .getPath();
SparkPrepareResultProject.main(new String[] { SparkPrepareResultProject.main(new String[] {
"-isSparkSessionManaged", Boolean.FALSE.toString(), "-isSparkSessionManaged", Boolean.FALSE.toString(),
"-outputPath", workingDir.toString() + "/preparedInfo", "-outputPath", workingDir.toString() + "/preparedInfo",
"-sourcePath", sourcePath "-sourcePath", sourcePath
}); });
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
JavaRDD<ResultProject> tmp = sc JavaRDD<ResultProject> tmp = sc
.textFile(workingDir.toString() + "/preparedInfo") .textFile(workingDir.toString() + "/preparedInfo")
.map(item -> OBJECT_MAPPER.readValue(item, ResultProject.class)); .map(item -> OBJECT_MAPPER.readValue(item, ResultProject.class));
org.apache.spark.sql.Dataset<ResultProject> verificationDataset = spark org.apache.spark.sql.Dataset<ResultProject> verificationDataset = spark
.createDataset(tmp.rdd(), Encoders.bean(ResultProject.class)); .createDataset(tmp.rdd(), Encoders.bean(ResultProject.class));
assertEquals(2, verificationDataset.count()); assertEquals(2, verificationDataset.count());
assertEquals( assertEquals(
1, 1,
verificationDataset.filter("resultId = '50|dedup_wf_001::e4805d005bfab0cd39a1642cbf477fdb'").count()); verificationDataset.filter("resultId = '50|dedup_wf_001::e4805d005bfab0cd39a1642cbf477fdb'").count());
assertEquals( assertEquals(
1, 1,
verificationDataset.filter("resultId = '50|dedup_wf_001::51b88f272ba9c3bb181af64e70255a80'").count()); verificationDataset.filter("resultId = '50|dedup_wf_001::51b88f272ba9c3bb181af64e70255a80'").count());
verificationDataset.createOrReplaceTempView("dataset"); verificationDataset.createOrReplaceTempView("dataset");
String query = "select resultId, MyT.id project , MyT.title title, MyT.acronym acronym , MyT.provenance.provenance provenance " String query = "select resultId, MyT.id project , MyT.title title, MyT.acronym acronym , MyT.provenance.provenance provenance "
+ "from dataset " + "from dataset "
+ "lateral view explode(projectsList) p as MyT "; + "lateral view explode(projectsList) p as MyT ";
org.apache.spark.sql.Dataset<Row> resultExplodedProvenance = spark.sql(query); org.apache.spark.sql.Dataset<Row> resultExplodedProvenance = spark.sql(query);
assertEquals(3, resultExplodedProvenance.count()); assertEquals(3, resultExplodedProvenance.count());
assertEquals( assertEquals(
2, 2,
resultExplodedProvenance resultExplodedProvenance
.filter("resultId = '50|dedup_wf_001::e4805d005bfab0cd39a1642cbf477fdb'") .filter("resultId = '50|dedup_wf_001::e4805d005bfab0cd39a1642cbf477fdb'")
.count()); .count());
assertEquals( assertEquals(
1, 1,
resultExplodedProvenance resultExplodedProvenance
.filter("resultId = '50|dedup_wf_001::51b88f272ba9c3bb181af64e70255a80'") .filter("resultId = '50|dedup_wf_001::51b88f272ba9c3bb181af64e70255a80'")
.count()); .count());
assertEquals( assertEquals(
2, 2,
resultExplodedProvenance resultExplodedProvenance
.filter("project = '40|aka_________::0f7d119de1f656b5763a16acf876fed6'") .filter("project = '40|aka_________::0f7d119de1f656b5763a16acf876fed6'")
.count()); .count());
assertEquals( assertEquals(
1, 1,
resultExplodedProvenance resultExplodedProvenance
.filter( .filter(
"project = '40|aka_________::0f7d119de1f656b5763a16acf876fed6' and resultId = '50|dedup_wf_001::e4805d005bfab0cd39a1642cbf477fdb'") "project = '40|aka_________::0f7d119de1f656b5763a16acf876fed6' and resultId = '50|dedup_wf_001::e4805d005bfab0cd39a1642cbf477fdb'")
.count()); .count());
assertEquals( assertEquals(
1, 1,
resultExplodedProvenance resultExplodedProvenance
.filter( .filter(
"project = '40|aka_________::0f7d119de1f656b5763a16acf876fed6' and resultId = '50|dedup_wf_001::51b88f272ba9c3bb181af64e70255a80'") "project = '40|aka_________::0f7d119de1f656b5763a16acf876fed6' and resultId = '50|dedup_wf_001::51b88f272ba9c3bb181af64e70255a80'")
.count()); .count());
assertEquals( assertEquals(
1, 1,
resultExplodedProvenance resultExplodedProvenance
.filter("project = '40|aka_________::03376222b28a3aebf2730ac514818d04'") .filter("project = '40|aka_________::03376222b28a3aebf2730ac514818d04'")
.count()); .count());
assertEquals( assertEquals(
1, 1,
resultExplodedProvenance resultExplodedProvenance
.filter( .filter(
"project = '40|aka_________::03376222b28a3aebf2730ac514818d04' and resultId = '50|dedup_wf_001::e4805d005bfab0cd39a1642cbf477fdb'") "project = '40|aka_________::03376222b28a3aebf2730ac514818d04' and resultId = '50|dedup_wf_001::e4805d005bfab0cd39a1642cbf477fdb'")
.count()); .count());
assertEquals( assertEquals(
3, resultExplodedProvenance.filter("provenance = 'sysimport:crosswalk:entityregistry'").count()); 3, resultExplodedProvenance.filter("provenance = 'sysimport:crosswalk:entityregistry'").count());
} }
@ -227,98 +227,121 @@ public class PrepareResultProjectJobTest {
public void testMatchValidated() throws Exception { public void testMatchValidated() throws Exception {
final String sourcePath = getClass() final String sourcePath = getClass()
.getResource("/eu/dnetlib/dhp/oa/graph/dump/resultProject/match_validatedRels") .getResource("/eu/dnetlib/dhp/oa/graph/dump/resultProject/match_validatedRels")
.getPath(); .getPath();
SparkPrepareResultProject.main(new String[] { SparkPrepareResultProject.main(new String[] {
"-isSparkSessionManaged", Boolean.FALSE.toString(), "-isSparkSessionManaged", Boolean.FALSE.toString(),
"-outputPath", workingDir.toString() + "/preparedInfo", "-outputPath", workingDir.toString() + "/preparedInfo",
"-sourcePath", sourcePath "-sourcePath", sourcePath
}); });
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
JavaRDD<ResultProject> tmp = sc JavaRDD<ResultProject> tmp = sc
.textFile(workingDir.toString() + "/preparedInfo") .textFile(workingDir.toString() + "/preparedInfo")
.map(item -> OBJECT_MAPPER.readValue(item, ResultProject.class)); .map(item -> OBJECT_MAPPER.readValue(item, ResultProject.class));
org.apache.spark.sql.Dataset<ResultProject> verificationDataset = spark org.apache.spark.sql.Dataset<ResultProject> verificationDataset = spark
.createDataset(tmp.rdd(), Encoders.bean(ResultProject.class)); .createDataset(tmp.rdd(), Encoders.bean(ResultProject.class));
assertEquals(2, verificationDataset.count()); assertEquals(2, verificationDataset.count());
assertEquals( assertEquals(
1, 1,
verificationDataset.filter("resultId = '50|dedup_wf_001::e4805d005bfab0cd39a1642cbf477fdb'").count()); verificationDataset.filter("resultId = '50|dedup_wf_001::e4805d005bfab0cd39a1642cbf477fdb'").count());
assertEquals( assertEquals(
1, 1,
verificationDataset.filter("resultId = '50|dedup_wf_001::51b88f272ba9c3bb181af64e70255a80'").count()); verificationDataset.filter("resultId = '50|dedup_wf_001::51b88f272ba9c3bb181af64e70255a80'").count());
verificationDataset.createOrReplaceTempView("dataset"); verificationDataset.createOrReplaceTempView("dataset");
String query = "select resultId, MyT.id project , MyT.title title, MyT.acronym acronym , MyT.provenance.provenance provenance, " String query = "select resultId, MyT.id project , MyT.title title, MyT.acronym acronym , MyT.provenance.provenance provenance, "
+ +
"MyT.validated.validatedByFunder, MyT.validated.validationDate " "MyT.validated.validatedByFunder, MyT.validated.validationDate "
+ "from dataset " + "from dataset "
+ "lateral view explode(projectsList) p as MyT "; + "lateral view explode(projectsList) p as MyT ";
org.apache.spark.sql.Dataset<Row> resultExplodedProvenance = spark.sql(query); org.apache.spark.sql.Dataset<Row> resultExplodedProvenance = spark.sql(query);
assertEquals(3, resultExplodedProvenance.count()); assertEquals(3, resultExplodedProvenance.count());
assertEquals(3, resultExplodedProvenance.filter("validatedByFunder = true").count()); assertEquals(3, resultExplodedProvenance.filter("validatedByFunder = true").count());
assertEquals( assertEquals(
2, 2,
resultExplodedProvenance resultExplodedProvenance
.filter("resultId = '50|dedup_wf_001::e4805d005bfab0cd39a1642cbf477fdb'") .filter("resultId = '50|dedup_wf_001::e4805d005bfab0cd39a1642cbf477fdb'")
.count()); .count());
assertEquals( assertEquals(
1, 1,
resultExplodedProvenance resultExplodedProvenance
.filter("resultId = '50|dedup_wf_001::51b88f272ba9c3bb181af64e70255a80'") .filter("resultId = '50|dedup_wf_001::51b88f272ba9c3bb181af64e70255a80'")
.count()); .count());
assertEquals( assertEquals(
2, 2,
resultExplodedProvenance resultExplodedProvenance
.filter("project = '40|aka_________::0f7d119de1f656b5763a16acf876fed6'") .filter("project = '40|aka_________::0f7d119de1f656b5763a16acf876fed6'")
.count()); .count());
assertEquals( assertEquals(
1, 1,
resultExplodedProvenance resultExplodedProvenance
.filter( .filter(
"project = '40|aka_________::0f7d119de1f656b5763a16acf876fed6' " + "project = '40|aka_________::0f7d119de1f656b5763a16acf876fed6' " +
"and resultId = '50|dedup_wf_001::e4805d005bfab0cd39a1642cbf477fdb' " + "and resultId = '50|dedup_wf_001::e4805d005bfab0cd39a1642cbf477fdb' " +
"and validatedByFunder = true " + "and validatedByFunder = true " +
"and validationDate = '2021-08-06'") "and validationDate = '2021-08-06'")
.count()); .count());
assertEquals( assertEquals(
1, 1,
resultExplodedProvenance resultExplodedProvenance
.filter( .filter(
"project = '40|aka_________::0f7d119de1f656b5763a16acf876fed6' " + "project = '40|aka_________::0f7d119de1f656b5763a16acf876fed6' " +
"and resultId = '50|dedup_wf_001::51b88f272ba9c3bb181af64e70255a80' " + "and resultId = '50|dedup_wf_001::51b88f272ba9c3bb181af64e70255a80' " +
"and validatedByFunder = true and validationDate = '2021-08-04'") "and validatedByFunder = true and validationDate = '2021-08-04'")
.count()); .count());
assertEquals( assertEquals(
1, 1,
resultExplodedProvenance resultExplodedProvenance
.filter("project = '40|aka_________::03376222b28a3aebf2730ac514818d04'") .filter("project = '40|aka_________::03376222b28a3aebf2730ac514818d04'")
.count()); .count());
assertEquals( assertEquals(
1, 1,
resultExplodedProvenance resultExplodedProvenance
.filter( .filter(
"project = '40|aka_________::03376222b28a3aebf2730ac514818d04' " + "project = '40|aka_________::03376222b28a3aebf2730ac514818d04' " +
"and resultId = '50|dedup_wf_001::e4805d005bfab0cd39a1642cbf477fdb' " + "and resultId = '50|dedup_wf_001::e4805d005bfab0cd39a1642cbf477fdb' " +
"and validatedByFunder = true and validationDate = '2021-08-05'") "and validatedByFunder = true and validationDate = '2021-08-05'")
.count()); .count());
assertEquals( assertEquals(
3, resultExplodedProvenance.filter("provenance = 'sysimport:crosswalk:entityregistry'").count()); 3, resultExplodedProvenance.filter("provenance = 'sysimport:crosswalk:entityregistry'").count());
} }
@Test
void testMatchx() throws Exception {
final String sourcePath = getClass()
.getResource("/eu/dnetlib/dhp/oa/graph/dump/funderresource/match")
.getPath();
SparkPrepareResultProject.main(new String[] {
"-isSparkSessionManaged", Boolean.FALSE.toString(),
"-outputPath", workingDir.toString() + "/preparedInfo",
"-sourcePath", sourcePath
});
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
JavaRDD<ResultProject> tmp = sc
.textFile(workingDir.toString() + "/preparedInfo")
.map(item -> OBJECT_MAPPER.readValue(item, ResultProject.class));
tmp.foreach(r -> System.out.println(OBJECT_MAPPER.writeValueAsString(r)));
}
} }

View File

@ -22,6 +22,7 @@ import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.oa.graph.dump.funderresults.SparkResultLinkedToProject; import eu.dnetlib.dhp.oa.graph.dump.funderresults.SparkResultLinkedToProject;
import eu.dnetlib.dhp.schema.dump.oaf.community.CommunityResult;
import eu.dnetlib.dhp.schema.oaf.Publication; import eu.dnetlib.dhp.schema.oaf.Publication;
import eu.dnetlib.dhp.schema.oaf.Result; import eu.dnetlib.dhp.schema.oaf.Result;
@ -34,15 +35,15 @@ public class ResultLinkedToProjectTest {
private static Path workingDir; private static Path workingDir;
private static final Logger log = LoggerFactory private static final Logger log = LoggerFactory
.getLogger(eu.dnetlib.dhp.oa.graph.dump.funderresult.ResultLinkedToProjectTest.class); .getLogger(eu.dnetlib.dhp.oa.graph.dump.funderresult.ResultLinkedToProjectTest.class);
private static final HashMap<String, String> map = new HashMap<>(); private static final HashMap<String, String> map = new HashMap<>();
@BeforeAll @BeforeAll
public static void beforeAll() throws IOException { public static void beforeAll() throws IOException {
workingDir = Files workingDir = Files
.createTempDirectory( .createTempDirectory(
eu.dnetlib.dhp.oa.graph.dump.funderresult.ResultLinkedToProjectTest.class.getSimpleName()); eu.dnetlib.dhp.oa.graph.dump.funderresult.ResultLinkedToProjectTest.class.getSimpleName());
log.info("using work dir {}", workingDir); log.info("using work dir {}", workingDir);
SparkConf conf = new SparkConf(); SparkConf conf = new SparkConf();
@ -56,10 +57,10 @@ public class ResultLinkedToProjectTest {
conf.set("hive.metastore.warehouse.dir", workingDir.resolve("warehouse").toString()); conf.set("hive.metastore.warehouse.dir", workingDir.resolve("warehouse").toString());
spark = SparkSession spark = SparkSession
.builder() .builder()
.appName(eu.dnetlib.dhp.oa.graph.dump.funderresult.ResultLinkedToProjectTest.class.getSimpleName()) .appName(eu.dnetlib.dhp.oa.graph.dump.funderresult.ResultLinkedToProjectTest.class.getSimpleName())
.config(conf) .config(conf)
.getOrCreate(); .getOrCreate();
} }
@AfterAll @AfterAll
@ -72,32 +73,34 @@ public class ResultLinkedToProjectTest {
void testNoMatch() throws Exception { void testNoMatch() throws Exception {
final String sourcePath = getClass() final String sourcePath = getClass()
.getResource("/eu/dnetlib/dhp/oa/graph/dump/funderresource/nomatch/papers.json") .getResource("/eu/dnetlib/dhp/oa/graph/dump/funderresource/nomatch/papers.json")
.getPath(); .getPath();
final String graphPath = getClass() final String graphPath = getClass()
.getResource("/eu/dnetlib/dhp/oa/graph/dump/funderresource/nomatch") .getResource("/eu/dnetlib/dhp/oa/graph/dump/funderresource/preparedInfo")
.getPath(); .getPath();
final String communityMapPath = getClass()
.getResource("/eu/dnetlib/dhp/oa/graph/dump/funderresource/communityMapPath")
.getPath();
SparkResultLinkedToProject.main(new String[] { SparkResultLinkedToProject.main(new String[] {
"-isSparkSessionManaged", Boolean.FALSE.toString(), "-isSparkSessionManaged", Boolean.FALSE.toString(),
"-outputPath", workingDir.toString() + "/preparedInfo", "-outputPath", workingDir.toString() + "/preparedInfo",
"-sourcePath", sourcePath, "-sourcePath", sourcePath,
"-resultTableName", "eu.dnetlib.dhp.schema.oaf.Publication", "-resultTableName", "eu.dnetlib.dhp.schema.oaf.Publication",
"-graphPath", graphPath "-graphPath", graphPath,
"-communityMapPath", communityMapPath
}); });
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
JavaRDD<Result> tmp = sc JavaRDD<CommunityResult> tmp = sc
.textFile(workingDir.toString() + "/preparedInfo") .textFile(workingDir.toString() + "/preparedInfo")
.map(item -> OBJECT_MAPPER.readValue(item, Result.class)); .map(item -> OBJECT_MAPPER.readValue(item, CommunityResult.class));
org.apache.spark.sql.Dataset<Result> verificationDataset = spark Assertions.assertEquals(0, tmp.count());
.createDataset(tmp.rdd(), Encoders.bean(Result.class));
Assertions.assertEquals(0, verificationDataset.count());
} }
@ -105,32 +108,34 @@ public class ResultLinkedToProjectTest {
void testMatchOne() throws Exception { void testMatchOne() throws Exception {
final String sourcePath = getClass() final String sourcePath = getClass()
.getResource("/eu/dnetlib/dhp/oa/graph/dump/funderresource/match/papers.json") .getResource("/eu/dnetlib/dhp/oa/graph/dump/funderresource/match/papers.json")
.getPath(); .getPath();
final String relationPath = getClass() final String graphPath = getClass()
.getResource("/eu/dnetlib/dhp/oa/graph/dump/funderresource/match") .getResource("/eu/dnetlib/dhp/oa/graph/dump/funderresource/preparedInfo")
.getPath(); .getPath();
final String communityMapPath = getClass()
.getResource("/eu/dnetlib/dhp/oa/graph/dump/funderresource/communityMapPath")
.getPath();
SparkResultLinkedToProject.main(new String[] { SparkResultLinkedToProject.main(new String[] {
"-isSparkSessionManaged", Boolean.FALSE.toString(), "-isSparkSessionManaged", Boolean.FALSE.toString(),
"-outputPath", workingDir.toString() + "/preparedInfo", "-outputPath", workingDir.toString() + "/preparedInfo",
"-sourcePath", sourcePath, "-sourcePath", sourcePath,
"-resultTableName", "eu.dnetlib.dhp.schema.oaf.Publication", "-resultTableName", "eu.dnetlib.dhp.schema.oaf.Publication",
"-graphPath", relationPath "-graphPath", graphPath,
"-communityMapPath", communityMapPath
}); });
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
JavaRDD<Publication> tmp = sc JavaRDD<CommunityResult> tmp = sc
.textFile(workingDir.toString() + "/preparedInfo") .textFile(workingDir.toString() + "/preparedInfo")
.map(item -> OBJECT_MAPPER.readValue(item, Publication.class)); .map(item -> OBJECT_MAPPER.readValue(item, CommunityResult.class));
org.apache.spark.sql.Dataset<Publication> verificationDataset = spark Assertions.assertEquals(1, tmp.count());
.createDataset(tmp.rdd(), Encoders.bean(Publication.class));
Assertions.assertEquals(1, verificationDataset.count());
} }

View File

@ -5,10 +5,14 @@ import java.io.IOException;
import java.nio.file.Files; import java.nio.file.Files;
import java.nio.file.Path; import java.nio.file.Path;
// import eu.dnetlib.dhp.oa.graph.dump.funderresults.SparkDumpFunderResults2;
// import eu.dnetlib.dhp.oa.graph.dump.funderresults.SparkGetFunderList;
import org.apache.commons.io.FileUtils; import org.apache.commons.io.FileUtils;
import org.apache.spark.SparkConf; import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.ForeachFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.SparkSession;
import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.AfterAll;
@ -52,10 +56,10 @@ public class SplitPerFunderTest {
conf.set("hive.metastore.warehouse.dir", workingDir.resolve("warehouse").toString()); conf.set("hive.metastore.warehouse.dir", workingDir.resolve("warehouse").toString());
spark = SparkSession spark = SparkSession
.builder() .builder()
.appName(SplitPerFunderTest.class.getSimpleName()) .appName(SplitPerFunderTest.class.getSimpleName())
.config(conf) .config(conf)
.getOrCreate(); .getOrCreate();
} }
@AfterAll @AfterAll
@ -68,86 +72,80 @@ public class SplitPerFunderTest {
void test1() throws Exception { void test1() throws Exception {
final String sourcePath = getClass() final String sourcePath = getClass()
.getResource("/eu/dnetlib/dhp/oa/graph/dump/funderresource/extendeddump") .getResource("/eu/dnetlib/dhp/oa/graph/dump/funderresource/ext")
.getPath(); .getPath();
SparkDumpFunderResults.main(new String[] { SparkDumpFunderResults.main(new String[] {
"-isSparkSessionManaged", Boolean.FALSE.toString(), "-isSparkSessionManaged", Boolean.FALSE.toString(),
"-outputPath", workingDir.toString() + "/split", "-outputPath", workingDir.toString() + "/split",
"-sourcePath", sourcePath, "-sourcePath", sourcePath
"-graphPath", sourcePath
}); });
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
// FP7 3 // FP7 3 and H2020 3
JavaRDD<CommunityResult> tmp = sc JavaRDD<CommunityResult> tmp = sc
.textFile(workingDir.toString() + "/split/EC_FP7") .textFile(workingDir.toString() + "/split/EC_FP7")
.map(item -> OBJECT_MAPPER.readValue(item, CommunityResult.class)); .map(item -> OBJECT_MAPPER.readValue(item, CommunityResult.class));
org.apache.spark.sql.Dataset<CommunityResult> verificationDataset = spark org.apache.spark.sql.Dataset<CommunityResult> verificationDataset = spark
.createDataset(tmp.rdd(), Encoders.bean(CommunityResult.class)); .createDataset(tmp.rdd(), Encoders.bean(CommunityResult.class));
Assertions.assertEquals(3, verificationDataset.count()); Assertions.assertEquals(3, verificationDataset.count());
Assertions Assertions
.assertEquals( .assertEquals(
1, verificationDataset.filter("id = '50|dedup_wf_001::0d16b1714ab3077df73893a8ea57d776'").count()); 1, verificationDataset.filter("id = '50|dedup_wf_001::0d16b1714ab3077df73893a8ea57d776'").count());
// CIHR 2 // CIHR 2
tmp = sc tmp = sc
.textFile(workingDir.toString() + "/split/CIHR") .textFile(workingDir.toString() + "/split/CIHR")
.map(item -> OBJECT_MAPPER.readValue(item, CommunityResult.class)); .map(item -> OBJECT_MAPPER.readValue(item, CommunityResult.class));
Assertions.assertEquals(2, tmp.count()); Assertions.assertEquals(2, tmp.count());
// NWO 1 // NWO 1
tmp = sc tmp = sc
.textFile(workingDir.toString() + "/split/NWO") .textFile(workingDir.toString() + "/split/NWO")
.map(item -> OBJECT_MAPPER.readValue(item, CommunityResult.class)); .map(item -> OBJECT_MAPPER.readValue(item, CommunityResult.class));
Assertions.assertEquals(1, tmp.count()); Assertions.assertEquals(1, tmp.count());
// NIH 3 // NIH 3
tmp = sc tmp = sc
.textFile(workingDir.toString() + "/split/NIH") .textFile(workingDir.toString() + "/split/NIH")
.map(item -> OBJECT_MAPPER.readValue(item, CommunityResult.class)); .map(item -> OBJECT_MAPPER.readValue(item, CommunityResult.class));
Assertions.assertEquals(2, tmp.count()); Assertions.assertEquals(2, tmp.count());
// NSF 1 // NSF 1
tmp = sc tmp = sc
.textFile(workingDir.toString() + "/split/NSF") .textFile(workingDir.toString() + "/split/NSF")
.map(item -> OBJECT_MAPPER.readValue(item, CommunityResult.class)); .map(item -> OBJECT_MAPPER.readValue(item, CommunityResult.class));
Assertions.assertEquals(1, tmp.count()); Assertions.assertEquals(1, tmp.count());
// SNSF 1 // SNSF 1
tmp = sc tmp = sc
.textFile(workingDir.toString() + "/split/SNSF") .textFile(workingDir.toString() + "/split/SNSF")
.map(item -> OBJECT_MAPPER.readValue(item, CommunityResult.class)); .map(item -> OBJECT_MAPPER.readValue(item, CommunityResult.class));
Assertions.assertEquals(1, tmp.count()); Assertions.assertEquals(1, tmp.count());
// NHMRC 1 // NHMRC 1
tmp = sc tmp = sc
.textFile(workingDir.toString() + "/split/NHMRC") .textFile(workingDir.toString() + "/split/NHMRC")
.map(item -> OBJECT_MAPPER.readValue(item, CommunityResult.class)); .map(item -> OBJECT_MAPPER.readValue(item, CommunityResult.class));
Assertions.assertEquals(1, tmp.count()); Assertions.assertEquals(1, tmp.count());
// H2020 3 // H2020 3
tmp = sc tmp = sc
.textFile(workingDir.toString() + "/split/EC_H2020") .textFile(workingDir.toString() + "/split/EC_H2020")
.map(item -> OBJECT_MAPPER.readValue(item, CommunityResult.class)); .map(item -> OBJECT_MAPPER.readValue(item, CommunityResult.class));
Assertions.assertEquals(3, tmp.count()); Assertions.assertEquals(3, tmp.count());
// MZOS 1 // MZOS 1
tmp = sc tmp = sc
.textFile(workingDir.toString() + "/split/MZOS") .textFile(workingDir.toString() + "/split/MZOS")
.map(item -> OBJECT_MAPPER.readValue(item, CommunityResult.class)); .map(item -> OBJECT_MAPPER.readValue(item, CommunityResult.class));
Assertions.assertEquals(1, tmp.count()); Assertions.assertEquals(1, tmp.count());
// CONICYT 0
tmp = sc
.textFile(workingDir.toString() + "/split/CONICYTF")
.map(item -> OBJECT_MAPPER.readValue(item, CommunityResult.class));
Assertions.assertEquals(0, tmp.count());
} }
} }

View File

@ -0,0 +1 @@
{"ee":"SDSN - Greece","epos":"EPOS","enrmaps":"Energy Research","fet-h2020":"FET H2020","instruct":"Instruct-Eric","egi":"EGI Federation","euromarine":"Euromarine","covid-19":"COVID-19","dariah":"DARIAH EU","rda":"Research Data Alliance","clarin":"CLARIN","aginfra":"Agricultural and Food Sciences","risis":"RISI","fam":"Fisheries and Aquaculture Management","beopen":"Transport Research","elixir-gr":"ELIXIR GR","fet-fp7":"FET FP7","ifremer":"Ifremer","science-innovation-policy":"Science and Innovation Policy Studies","mes":"European Marine Scinece","oa-pg":"EC Post-Grant Open Access Pilot","ni":"Neuroinformatics","dh-ch":"Digital Humanities and Cultural Heritage"}

File diff suppressed because one or more lines are too long

View File

@ -0,0 +1,8 @@
NSF
CIHR
NWO
NHMRC
NIH
MZOS
SNSF
EC

View File

@ -0,0 +1 @@
{"resultId":"50|a89337edbe55::43e8b61e5e8d682545cb867be8118585","projectsList":[{"id":"40|aka_________::01bb7b48e29d732a1c7bc5150b9195c4","code":"135027","acronym":null,"title":"Dynamic 3D resolution-enhanced low-coherence interferometric imaging / Consortium: Hi-Lo","funder":{"shortName":"AKA","name":"Academy of Finland","jurisdiction":"FI","fundingStream":null},"provenance":{"provenance":"Harvested","trust":"0.900000000000000022"},"validated":null},{"id":"40|aka_________::9d1af21dbd0f5bc719f71553d19a6b3a","code":"316061","acronym":null,"title":"Finnish Imaging of Degenerative Shoulder Study (FIMAGE): A study on the prevalence of degenerative imaging changes of the shoulder and their relevance to clinical symptoms in the general population.","funder":{"shortName":"AKA","name":"Academy of Finland","jurisdiction":"FI","fundingStream":null},"provenance":{"provenance":"Harvested","trust":"0.900000000000000022"},"validated":null}]}