code, workflow and parameters for the dump of the results associated to funders

pull/61/head
Miriam Baglioni 3 years ago
parent 57cac36898
commit 46ba3793f6

@ -0,0 +1,19 @@
package eu.dnetlib.dhp.oa.graph.dump.funderresults;
import java.io.Serializable;
import java.util.List;
import eu.dnetlib.dhp.schema.dump.oaf.Result;
public class FunderResults implements Serializable {
private List<Result> results;
public List<Result> getResults() {
return results;
}
public void setResults(List<Result> results) {
this.results = results;
}
}

@ -0,0 +1,165 @@
package eu.dnetlib.dhp.oa.graph.dump.funderresults;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import java.io.Serializable;
import java.util.*;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.api.java.function.MapGroupsFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.oa.graph.dump.ResultMapper;
import eu.dnetlib.dhp.oa.graph.dump.Utils;
import eu.dnetlib.dhp.oa.graph.dump.community.CommunityMap;
import eu.dnetlib.dhp.schema.dump.oaf.Result;
import eu.dnetlib.dhp.schema.oaf.Relation;
import scala.Tuple2;
/**
* Preparation of the Project information to be added to the dumped results. For each result associated to at least one
* Project, a serialization of an instance af ResultProject closs is done. ResultProject contains the resultId, and the
* list of Projects (as in eu.dnetlib.dhp.schema.dump.oaf.community.Project) it is associated to
*/
public class SparkPrepareResultProject implements Serializable {
private static final Logger log = LoggerFactory.getLogger(SparkPrepareResultProject.class);
public static void main(String[] args) throws Exception {
String jsonConfiguration = IOUtils
.toString(
SparkPrepareResultProject.class
.getResourceAsStream(
"/eu/dnetlib/dhp/oa/graph/dump/project_prep_parameters.json"));
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
parser.parseArgument(args);
Boolean isSparkSessionManaged = Optional
.ofNullable(parser.get("isSparkSessionManaged"))
.map(Boolean::valueOf)
.orElse(Boolean.TRUE);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
final String inputPath = parser.get("sourcePath");
log.info("inputPath: {}", inputPath);
final String outputPath = parser.get("outputPath");
log.info("outputPath: {}", outputPath);
final String communityMapPath = parser.get("communityMapPath");
log.info("communityMapPath: {}", communityMapPath);
SparkConf conf = new SparkConf();
runWithSparkSession(
conf,
isSparkSessionManaged,
spark -> {
Utils.removeOutputDir(spark, outputPath);
prepareResultProjectList2(spark, inputPath, outputPath, communityMapPath);
});
}
private static void prepareResultProjectList(SparkSession spark, String inputPath, String outputPath,
String communityMapPath) {
CommunityMap communityMap = Utils.getCommunityMap(spark, communityMapPath);
Dataset<Relation> relation = Utils
.readPath(spark, inputPath + "/relation", Relation.class)
.filter("dataInfo.deletedbyinference = false and relClass = 'produces'");
Dataset<eu.dnetlib.dhp.schema.oaf.Result> result = Utils
.readPath(spark, inputPath + "/publication", eu.dnetlib.dhp.schema.oaf.Result.class)
.union(Utils.readPath(spark, inputPath + "/dataset", eu.dnetlib.dhp.schema.oaf.Result.class))
.union(Utils.readPath(spark, inputPath + "/otherresearchproduct", eu.dnetlib.dhp.schema.oaf.Result.class))
.union(Utils.readPath(spark, inputPath + "/software", eu.dnetlib.dhp.schema.oaf.Result.class));
result
.joinWith(relation, result.col("id").equalTo(relation.col("target")))
.groupByKey(
(MapFunction<Tuple2<eu.dnetlib.dhp.schema.oaf.Result, Relation>, String>) value -> value
._2()
.getSource()
.substring(3, 15),
Encoders.STRING())
.mapGroups(
(MapGroupsFunction<String, Tuple2<eu.dnetlib.dhp.schema.oaf.Result, Relation>, Tuple2<String, FunderResults>>) (
s, it) -> {
Tuple2<eu.dnetlib.dhp.schema.oaf.Result, Relation> first = it.next();
FunderResults fr = new FunderResults();
List<Result> resultList = new ArrayList<>();
resultList.add(ResultMapper.map(first._1(), communityMap, true));
it.forEachRemaining(c -> {
resultList.add(ResultMapper.map(c._1(), communityMap, true));
});
fr.setResults(resultList);
return new Tuple2<>(s, fr);
}, Encoders.tuple(Encoders.STRING(), Encoders.bean(FunderResults.class)))
.foreach(t -> {
String funder = t._1();
spark
.createDataFrame(t._2.getResults(), Result.class)
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.json(outputPath + "/" + funder);
});
}
private static void prepareResultProjectList2(SparkSession spark, String inputPath, String outputPath,
String communityMapPath) {
CommunityMap communityMap = Utils.getCommunityMap(spark, communityMapPath);
Dataset<Relation> relation = Utils
.readPath(spark, inputPath + "/relation", Relation.class)
.filter("dataInfo.deletedbyinference = false and relClass = 'produces'");
Dataset<eu.dnetlib.dhp.schema.oaf.Result> result = Utils
.readPath(spark, inputPath + "/publication", eu.dnetlib.dhp.schema.oaf.Result.class)
.union(Utils.readPath(spark, inputPath + "/dataset", eu.dnetlib.dhp.schema.oaf.Result.class))
.union(Utils.readPath(spark, inputPath + "/otherresearchproduct", eu.dnetlib.dhp.schema.oaf.Result.class))
.union(Utils.readPath(spark, inputPath + "/software", eu.dnetlib.dhp.schema.oaf.Result.class));
result
.joinWith(relation, result.col("id").equalTo(relation.col("target")))
.groupByKey(
(MapFunction<Tuple2<eu.dnetlib.dhp.schema.oaf.Result, Relation>, String>) value -> value
._2()
.getSource()
.substring(3, 15),
Encoders.STRING())
.mapGroups(
(MapGroupsFunction<String, Tuple2<eu.dnetlib.dhp.schema.oaf.Result, Relation>, String>) (s, it) -> {
Tuple2<eu.dnetlib.dhp.schema.oaf.Result, Relation> first = it.next();
List<Result> resultList = new ArrayList<>();
resultList.add(ResultMapper.map(first._1(), communityMap, true));
it.forEachRemaining(c -> {
resultList.add(ResultMapper.map(c._1(), communityMap, true));
});
spark
.createDataFrame(resultList, Result.class)
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.json(outputPath + "/" + s);
return new String();
}, Encoders.STRING());
}
}

@ -0,0 +1,26 @@
[
{
"paramName":"s",
"paramLongName":"sourcePath",
"paramDescription": "the path of the sequencial file to read",
"paramRequired": true
},
{
"paramName": "out",
"paramLongName": "outputPath",
"paramDescription": "the path used to store temporary output files",
"paramRequired": true
},
{
"paramName": "ssm",
"paramLongName": "isSparkSessionManaged",
"paramDescription": "true if the spark session is managed, false otherwise",
"paramRequired": false
},
{
"paramName": "cmp",
"paramLongName": "communityMapPath",
"paramDescription": "the community map path",
"paramRequired": true
}
]

@ -0,0 +1,30 @@
<configuration>
<property>
<name>jobTracker</name>
<value>yarnRM</value>
</property>
<property>
<name>nameNode</name>
<value>hdfs://nameservice1</value>
</property>
<property>
<name>oozie.use.system.libpath</name>
<value>true</value>
</property>
<property>
<name>hiveMetastoreUris</name>
<value>thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083</value>
</property>
<property>
<name>hiveJdbcUrl</name>
<value>jdbc:hive2://iis-cdh5-test-m3.ocean.icm.edu.pl:10000</value>
</property>
<property>
<name>hiveDbName</name>
<value>openaire</value>
</property>
<property>
<name>oozie.launcher.mapreduce.user.classpath.first</name>
<value>true</value>
</property>
</configuration>

@ -0,0 +1,179 @@
<workflow-app name="dump_funder_results" xmlns="uri:oozie:workflow:0.5">
<parameters>
<property>
<name>sourcePath</name>
<description>the source path</description>
</property>
<property>
<name>isLookUpUrl</name>
<description>the isLookup service endpoint</description>
</property>
<property>
<name>outputPath</name>
<description>the output path</description>
</property>
<property>
<name>accessToken</name>
<description>the access token used for the deposition in Zenodo</description>
</property>
<property>
<name>connectionUrl</name>
<description>the connection url for Zenodo</description>
</property>
<property>
<name>metadata</name>
<description> the metadata associated to the deposition</description>
</property>
<property>
<name>depositionType</name>
<description>the type of deposition we want to perform. "new" for brand new deposition, "version" for a new version of a published deposition (in this case the concept record id must be provided), "upload" to upload content to an open deposition for which we already have the deposition id (in this case the deposition id should be provided)</description>
</property>
<property>
<name>conceptRecordId</name>
<description>for new version, the id of the record for the old deposition</description>
</property>
<property>
<name>depositionId</name>
<description>the depositionId of a deposition open that has to be added content</description>
</property>
<property>
<name>hiveDbName</name>
<description>the target hive database name</description>
</property>
<property>
<name>hiveJdbcUrl</name>
<description>hive server jdbc url</description>
</property>
<property>
<name>hiveMetastoreUris</name>
<description>hive server metastore URIs</description>
</property>
<property>
<name>sparkDriverMemory</name>
<description>memory for driver process</description>
</property>
<property>
<name>sparkExecutorMemory</name>
<description>memory for individual executor</description>
</property>
<property>
<name>sparkExecutorCores</name>
<description>number of cores used by single executor</description>
</property>
<property>
<name>oozieActionShareLibForSpark2</name>
<description>oozie action sharelib for spark 2.*</description>
</property>
<property>
<name>spark2ExtraListeners</name>
<value>com.cloudera.spark.lineage.NavigatorAppListener</value>
<description>spark 2.* extra listeners classname</description>
</property>
<property>
<name>spark2SqlQueryExecutionListeners</name>
<value>com.cloudera.spark.lineage.NavigatorQueryListener</value>
<description>spark 2.* sql query execution listeners classname</description>
</property>
<property>
<name>spark2YarnHistoryServerAddress</name>
<description>spark 2.* yarn history server address</description>
</property>
<property>
<name>spark2EventLogDir</name>
<description>spark 2.* event log dir location</description>
</property>
</parameters>
<global>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<configuration>
<property>
<name>mapreduce.job.queuename</name>
<value>${queueName}</value>
</property>
<property>
<name>oozie.launcher.mapred.job.queue.name</name>
<value>${oozieLauncherQueueName}</value>
</property>
<property>
<name>oozie.action.sharelib.for.spark</name>
<value>${oozieActionShareLibForSpark2}</value>
</property>
</configuration>
</global>
<start to="reset_outputpath"/>
<kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<action name="reset_outputpath">
<fs>
<delete path="${outputPath}"/>
<mkdir path="${outputPath}"/>
</fs>
<ok to="save_community_map"/>
<error to="Kill"/>
</action>
<action name="save_community_map">
<java>
<main-class>eu.dnetlib.dhp.oa.graph.dump.SaveCommunityMap</main-class>
<arg>--outputPath</arg><arg>${workingDir}/communityMap</arg>
<arg>--nameNode</arg><arg>${nameNode}</arg>
<arg>--isLookUpUrl</arg><arg>${isLookUpUrl}</arg>
</java>
<ok to="dump_funder_results"/>
<error to="Kill"/>
</action>
<action name="dump_funder_results">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Dump funder results </name>
<class>eu.dnetlib.dhp.oa.graph.dump.funderresults.SparkPrepareResultProject</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
</spark-opts>
<arg>--sourcePath</arg><arg>${sourcePath}</arg>
<arg>--outputPath</arg><arg>${workingDir}/result</arg>
<arg>--communityMapPath</arg><arg>${workingDir}/communityMap</arg>
</spark>
<ok to="End"/>
<error to="Kill"/>
</action>
<action name="send_zenodo">
<java>
<main-class>eu.dnetlib.dhp.oa.graph.dump.SendToZenodoHDFS</main-class>
<arg>--hdfsPath</arg><arg>${outputPath}</arg>
<arg>--nameNode</arg><arg>${nameNode}</arg>
<arg>--accessToken</arg><arg>${accessToken}</arg>
<arg>--connectionUrl</arg><arg>${connectionUrl}</arg>
<arg>--metadata</arg><arg>${metadata}</arg>
<arg>--communityMapPath</arg><arg>${workingDir}/communityMap</arg>
<arg>--conceptRecordId</arg><arg>${conceptRecordId}</arg>
<arg>--depositionType</arg><arg>${depositionType}</arg>
<arg>--depositionId</arg><arg>${depositionId}</arg>
</java>
<ok to="End"/>
<error to="Kill"/>
</action>
<end name="End"/>
</workflow-app>
Loading…
Cancel
Save