forked from antonis.lempesis/dnet-hadoop
collecting all the atoic actions for result type and save them all in the AS path
This commit is contained in:
parent
0051ebede5
commit
45d06c45c7
|
@ -0,0 +1,85 @@
|
|||
|
||||
package eu.dnetlib.dhp.actionmanager.bipfinder;
|
||||
|
||||
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
|
||||
|
||||
import java.io.Serializable;
|
||||
import java.util.Optional;
|
||||
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.mapred.SequenceFileOutputFormat;
|
||||
import org.apache.spark.SparkConf;
|
||||
import org.apache.spark.api.java.JavaPairRDD;
|
||||
import org.apache.spark.api.java.JavaSparkContext;
|
||||
import org.apache.spark.api.java.function.MapFunction;
|
||||
import org.apache.spark.sql.Dataset;
|
||||
import org.apache.spark.sql.Encoders;
|
||||
import org.apache.spark.sql.SparkSession;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
|
||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||
import eu.dnetlib.dhp.common.HdfsSupport;
|
||||
import eu.dnetlib.dhp.schema.oaf.Result;
|
||||
|
||||
public class CollectAndSave implements Serializable {
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(CollectAndSave.class);
|
||||
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
|
||||
|
||||
public static <I extends Result> void main(String[] args) throws Exception {
|
||||
|
||||
String jsonConfiguration = IOUtils
|
||||
.toString(
|
||||
CollectAndSave.class
|
||||
.getResourceAsStream(
|
||||
"/eu/dnetlib/dhp/actionmanager/bipfinder/input_actionset_parameter.json"));
|
||||
|
||||
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
|
||||
|
||||
parser.parseArgument(args);
|
||||
|
||||
Boolean isSparkSessionManaged = Optional
|
||||
.ofNullable(parser.get("isSparkSessionManaged"))
|
||||
.map(Boolean::valueOf)
|
||||
.orElse(Boolean.TRUE);
|
||||
|
||||
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
|
||||
|
||||
final String inputPath = parser.get("inputPath");
|
||||
log.info("inputPath {}: ", inputPath);
|
||||
|
||||
final String outputPath = parser.get("outputPath");
|
||||
log.info("outputPath {}: ", outputPath);
|
||||
|
||||
SparkConf conf = new SparkConf();
|
||||
|
||||
runWithSparkSession(
|
||||
conf,
|
||||
isSparkSessionManaged,
|
||||
spark -> {
|
||||
removeOutputDir(spark, outputPath);
|
||||
collectAndSave(spark, inputPath, outputPath);
|
||||
});
|
||||
}
|
||||
|
||||
private static void collectAndSave(SparkSession spark, String inputPath, String outputPath) {
|
||||
JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
|
||||
|
||||
sc
|
||||
.sequenceFile(inputPath + "/publication", Text.class, Text.class)
|
||||
.union(sc.sequenceFile(inputPath + "/dataset", Text.class, Text.class))
|
||||
.union(sc.sequenceFile(inputPath + "/otherresearchproduct", Text.class, Text.class))
|
||||
.union(sc.sequenceFile(inputPath + "/software", Text.class, Text.class))
|
||||
.saveAsHadoopFile(outputPath, Text.class, Text.class, SequenceFileOutputFormat.class);
|
||||
;
|
||||
}
|
||||
|
||||
private static void removeOutputDir(SparkSession spark, String path) {
|
||||
HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration());
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,32 @@
|
|||
[
|
||||
{
|
||||
"paramName": "issm",
|
||||
"paramLongName": "isSparkSessionManaged",
|
||||
"paramDescription": "when true will stop SparkSession after job execution",
|
||||
"paramRequired": false
|
||||
},
|
||||
{
|
||||
"paramName": "ip",
|
||||
"paramLongName": "inputPath",
|
||||
"paramDescription": "the URL from where to get the programme file",
|
||||
"paramRequired": true
|
||||
},
|
||||
{
|
||||
"paramName": "o",
|
||||
"paramLongName": "outputPath",
|
||||
"paramDescription": "the path of the new ActionSet",
|
||||
"paramRequired": true
|
||||
},
|
||||
{
|
||||
"paramName": "rtn",
|
||||
"paramLongName": "resultTableName",
|
||||
"paramDescription": "the path of the new ActionSet",
|
||||
"paramRequired": true
|
||||
},
|
||||
{
|
||||
"paramName": "bsp",
|
||||
"paramLongName": "bipScorePath",
|
||||
"paramDescription": "the path of the new ActionSet",
|
||||
"paramRequired": true
|
||||
}
|
||||
]
|
|
@ -0,0 +1,58 @@
|
|||
<configuration>
|
||||
<property>
|
||||
<name>jobTracker</name>
|
||||
<value>yarnRM</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>nameNode</name>
|
||||
<value>hdfs://nameservice1</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>oozie.use.system.libpath</name>
|
||||
<value>true</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>oozie.action.sharelib.for.spark</name>
|
||||
<value>spark2</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>hive_metastore_uris</name>
|
||||
<value>thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>spark2YarnHistoryServerAddress</name>
|
||||
<value>http://iis-cdh5-test-gw.ocean.icm.edu.pl:18089</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>spark2ExtraListeners</name>
|
||||
<value>com.cloudera.spark.lineage.NavigatorAppListener</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>spark2SqlQueryExecutionListeners</name>
|
||||
<value>com.cloudera.spark.lineage.NavigatorQueryListener</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>oozie.launcher.mapreduce.user.classpath.first</name>
|
||||
<value>true</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>sparkExecutorNumber</name>
|
||||
<value>4</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>spark2EventLogDir</name>
|
||||
<value>/user/spark/spark2ApplicationHistory</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>sparkDriverMemory</name>
|
||||
<value>15G</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>sparkExecutorMemory</name>
|
||||
<value>6G</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>sparkExecutorCores</name>
|
||||
<value>1</value>
|
||||
</property>
|
||||
</configuration>
|
|
@ -0,0 +1,163 @@
|
|||
<workflow-app name="H2020Programme" xmlns="uri:oozie:workflow:0.5">
|
||||
<parameters>
|
||||
<property>
|
||||
<name>projectFileURL</name>
|
||||
<description>the url where to get the projects file</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>programmeFileURL</name>
|
||||
<description>the url where to get the programme file</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>topicFileURL</name>
|
||||
<description>the url where to get the topic file</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>outputPath</name>
|
||||
<description>path where to store the action set</description>
|
||||
</property>
|
||||
</parameters>
|
||||
|
||||
<start to="deleteoutputpath"/>
|
||||
<kill name="Kill">
|
||||
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
|
||||
</kill>
|
||||
<action name="deleteoutputpath">
|
||||
<fs>
|
||||
<delete path='${outputPath}'/>
|
||||
<mkdir path='${outputPath}'/>
|
||||
<delete path='${workingDir}'/>
|
||||
<mkdir path='${workingDir}'/>
|
||||
</fs>
|
||||
<ok to="get_project_file"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<action name="get_project_file">
|
||||
<java>
|
||||
<main-class>eu.dnetlib.dhp.actionmanager.project.utils.ReadCSV</main-class>
|
||||
<arg>--hdfsNameNode</arg><arg>${nameNode}</arg>
|
||||
<arg>--fileURL</arg><arg>${projectFileURL}</arg>
|
||||
<arg>--hdfsPath</arg><arg>${workingDir}/projects</arg>
|
||||
<arg>--classForName</arg><arg>eu.dnetlib.dhp.actionmanager.project.utils.CSVProject</arg>
|
||||
</java>
|
||||
<ok to="get_programme_file"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<action name="get_programme_file">
|
||||
<java>
|
||||
<main-class>eu.dnetlib.dhp.actionmanager.project.utils.ReadCSV</main-class>
|
||||
<arg>--hdfsNameNode</arg><arg>${nameNode}</arg>
|
||||
<arg>--fileURL</arg><arg>${programmeFileURL}</arg>
|
||||
<arg>--hdfsPath</arg><arg>${workingDir}/programme</arg>
|
||||
<arg>--classForName</arg><arg>eu.dnetlib.dhp.actionmanager.project.utils.CSVProgramme</arg>
|
||||
</java>
|
||||
<ok to="get_topic_file"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<action name="get_topic_file">
|
||||
<java>
|
||||
<main-class>eu.dnetlib.dhp.actionmanager.project.utils.ReadExcel</main-class>
|
||||
<arg>--hdfsNameNode</arg><arg>${nameNode}</arg>
|
||||
<arg>--fileURL</arg><arg>${topicFileURL}</arg>
|
||||
<arg>--hdfsPath</arg><arg>${workingDir}/topic</arg>
|
||||
<arg>--classForName</arg><arg>eu.dnetlib.dhp.actionmanager.project.utils.EXCELTopic</arg>
|
||||
</java>
|
||||
<ok to="read_projects"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<action name="read_projects">
|
||||
<java>
|
||||
<main-class>eu.dnetlib.dhp.actionmanager.project.ReadProjectsFromDB</main-class>
|
||||
<arg>--hdfsPath</arg><arg>${workingDir}/dbProjects</arg>
|
||||
<arg>--hdfsNameNode</arg><arg>${nameNode}</arg>
|
||||
<arg>--postgresUrl</arg><arg>${postgresURL}</arg>
|
||||
<arg>--postgresUser</arg><arg>${postgresUser}</arg>
|
||||
<arg>--postgresPassword</arg><arg>${postgresPassword}</arg>
|
||||
</java>
|
||||
<ok to="prepare_programme"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<action name="prepare_programme">
|
||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||
<master>yarn</master>
|
||||
<mode>cluster</mode>
|
||||
<name>PrepareProgramme</name>
|
||||
<class>eu.dnetlib.dhp.actionmanager.project.PrepareProgramme</class>
|
||||
<jar>dhp-aggregation-${projectVersion}.jar</jar>
|
||||
<spark-opts>
|
||||
--executor-cores=${sparkExecutorCores}
|
||||
--executor-memory=${sparkExecutorMemory}
|
||||
--driver-memory=${sparkDriverMemory}
|
||||
--conf spark.extraListeners=${spark2ExtraListeners}
|
||||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||
--conf spark.sql.shuffle.partitions=3840
|
||||
</spark-opts>
|
||||
<arg>--programmePath</arg><arg>${workingDir}/programme</arg>
|
||||
<arg>--outputPath</arg><arg>${workingDir}/preparedProgramme</arg>
|
||||
</spark>
|
||||
<ok to="prepare_project"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<action name="prepare_project">
|
||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||
<master>yarn</master>
|
||||
<mode>cluster</mode>
|
||||
<name>PrepareProjects</name>
|
||||
<class>eu.dnetlib.dhp.actionmanager.project.PrepareProjects</class>
|
||||
<jar>dhp-aggregation-${projectVersion}.jar</jar>
|
||||
<spark-opts>
|
||||
--executor-cores=${sparkExecutorCores}
|
||||
--executor-memory=${sparkExecutorMemory}
|
||||
--driver-memory=${sparkDriverMemory}
|
||||
--conf spark.extraListeners=${spark2ExtraListeners}
|
||||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||
--conf spark.sql.shuffle.partitions=3840
|
||||
</spark-opts>
|
||||
<arg>--projectPath</arg><arg>${workingDir}/projects</arg>
|
||||
<arg>--outputPath</arg><arg>${workingDir}/preparedProjects</arg>
|
||||
<arg>--dbProjectPath</arg><arg>${workingDir}/dbProjects</arg>
|
||||
</spark>
|
||||
<ok to="create_updates"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<action name="create_updates">
|
||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||
<master>yarn</master>
|
||||
<mode>cluster</mode>
|
||||
<name>ProjectProgrammeAS</name>
|
||||
<class>eu.dnetlib.dhp.actionmanager.project.SparkAtomicActionJob</class>
|
||||
<jar>dhp-aggregation-${projectVersion}.jar</jar>
|
||||
<spark-opts>
|
||||
--executor-cores=${sparkExecutorCores}
|
||||
--executor-memory=${sparkExecutorMemory}
|
||||
--driver-memory=${sparkDriverMemory}
|
||||
--conf spark.extraListeners=${spark2ExtraListeners}
|
||||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||
--conf spark.sql.shuffle.partitions=3840
|
||||
</spark-opts>
|
||||
<arg>--projectPath</arg><arg>${workingDir}/preparedProjects</arg>
|
||||
<arg>--programmePath</arg><arg>${workingDir}/preparedProgramme</arg>
|
||||
<arg>--topicPath</arg><arg>${workingDir}/topic</arg>
|
||||
<arg>--outputPath</arg><arg>${outputPath}</arg>
|
||||
</spark>
|
||||
<ok to="End"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<end name="End"/>
|
||||
</workflow-app>
|
Loading…
Reference in New Issue