forked from D-Net/dnet-hadoop
changed the way to produce the AS for bipFinder.
This commit is contained in:
parent
4eb8276493
commit
6fb6236cd4
|
@ -1,86 +0,0 @@
|
||||||
|
|
||||||
package eu.dnetlib.dhp.actionmanager.bipfinder;
|
|
||||||
|
|
||||||
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
|
|
||||||
|
|
||||||
import java.io.Serializable;
|
|
||||||
import java.util.Objects;
|
|
||||||
import java.util.Optional;
|
|
||||||
|
|
||||||
import org.apache.commons.io.IOUtils;
|
|
||||||
import org.apache.hadoop.io.Text;
|
|
||||||
import org.apache.hadoop.mapred.SequenceFileOutputFormat;
|
|
||||||
import org.apache.spark.SparkConf;
|
|
||||||
import org.apache.spark.api.java.JavaSparkContext;
|
|
||||||
import org.apache.spark.sql.SparkSession;
|
|
||||||
import org.slf4j.Logger;
|
|
||||||
import org.slf4j.LoggerFactory;
|
|
||||||
|
|
||||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
|
||||||
|
|
||||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
|
||||||
import eu.dnetlib.dhp.common.HdfsSupport;
|
|
||||||
import eu.dnetlib.dhp.schema.oaf.Result;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Just collects all the atomic actions produced for the different results and saves them in
|
|
||||||
* outputpath for the ActionSet
|
|
||||||
*/
|
|
||||||
public class CollectAndSave implements Serializable {
|
|
||||||
|
|
||||||
private static final Logger log = LoggerFactory.getLogger(CollectAndSave.class);
|
|
||||||
|
|
||||||
public static void main(String[] args) throws Exception {
|
|
||||||
|
|
||||||
String jsonConfiguration = IOUtils
|
|
||||||
.toString(
|
|
||||||
Objects
|
|
||||||
.requireNonNull(
|
|
||||||
CollectAndSave.class
|
|
||||||
.getResourceAsStream(
|
|
||||||
"/eu/dnetlib/dhp/actionmanager/bipfinder/input_actionset_parameter.json")));
|
|
||||||
|
|
||||||
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
|
|
||||||
|
|
||||||
parser.parseArgument(args);
|
|
||||||
|
|
||||||
Boolean isSparkSessionManaged = Optional
|
|
||||||
.ofNullable(parser.get("isSparkSessionManaged"))
|
|
||||||
.map(Boolean::valueOf)
|
|
||||||
.orElse(Boolean.TRUE);
|
|
||||||
|
|
||||||
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
|
|
||||||
|
|
||||||
final String inputPath = parser.get("inputPath");
|
|
||||||
log.info("inputPath {}: ", inputPath);
|
|
||||||
|
|
||||||
final String outputPath = parser.get("outputPath");
|
|
||||||
log.info("outputPath {}: ", outputPath);
|
|
||||||
|
|
||||||
SparkConf conf = new SparkConf();
|
|
||||||
|
|
||||||
runWithSparkSession(
|
|
||||||
conf,
|
|
||||||
isSparkSessionManaged,
|
|
||||||
spark -> {
|
|
||||||
removeOutputDir(spark, outputPath);
|
|
||||||
collectAndSave(spark, inputPath, outputPath);
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
private static void collectAndSave(SparkSession spark, String inputPath, String outputPath) {
|
|
||||||
JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
|
|
||||||
|
|
||||||
sc
|
|
||||||
.sequenceFile(inputPath + "/publication", Text.class, Text.class)
|
|
||||||
.union(sc.sequenceFile(inputPath + "/dataset", Text.class, Text.class))
|
|
||||||
.union(sc.sequenceFile(inputPath + "/otherresearchproduct", Text.class, Text.class))
|
|
||||||
.union(sc.sequenceFile(inputPath + "/software", Text.class, Text.class))
|
|
||||||
.saveAsHadoopFile(outputPath, Text.class, Text.class, SequenceFileOutputFormat.class);
|
|
||||||
}
|
|
||||||
|
|
||||||
private static void removeOutputDir(SparkSession spark, String path) {
|
|
||||||
HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration());
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
|
@ -50,7 +50,7 @@ public class SparkAtomicActionScoreJob implements Serializable {
|
||||||
.toString(
|
.toString(
|
||||||
SparkAtomicActionScoreJob.class
|
SparkAtomicActionScoreJob.class
|
||||||
.getResourceAsStream(
|
.getResourceAsStream(
|
||||||
"/eu/dnetlib/dhp/actionmanager/bipfinder/input_parameters.json"));
|
"/eu/dnetlib/dhp/actionmanager/bipfinder/input_actionset_parameter.json"));
|
||||||
|
|
||||||
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
|
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
|
||||||
|
|
||||||
|
@ -69,14 +69,6 @@ public class SparkAtomicActionScoreJob implements Serializable {
|
||||||
final String outputPath = parser.get("outputPath");
|
final String outputPath = parser.get("outputPath");
|
||||||
log.info("outputPath {}: ", outputPath);
|
log.info("outputPath {}: ", outputPath);
|
||||||
|
|
||||||
final String bipScorePath = parser.get("bipScorePath");
|
|
||||||
log.info("bipScorePath: {}", bipScorePath);
|
|
||||||
|
|
||||||
final String resultClassName = parser.get("resultTableName");
|
|
||||||
log.info("resultTableName: {}", resultClassName);
|
|
||||||
|
|
||||||
Class<I> inputClazz = (Class<I>) Class.forName(resultClassName);
|
|
||||||
|
|
||||||
SparkConf conf = new SparkConf();
|
SparkConf conf = new SparkConf();
|
||||||
|
|
||||||
runWithSparkSession(
|
runWithSparkSession(
|
||||||
|
@ -84,12 +76,11 @@ public class SparkAtomicActionScoreJob implements Serializable {
|
||||||
isSparkSessionManaged,
|
isSparkSessionManaged,
|
||||||
spark -> {
|
spark -> {
|
||||||
removeOutputDir(spark, outputPath);
|
removeOutputDir(spark, outputPath);
|
||||||
prepareResults(spark, inputPath, outputPath, bipScorePath, inputClazz);
|
prepareResults(spark, inputPath, outputPath);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
private static <I extends Result> void prepareResults(SparkSession spark, String inputPath, String outputPath,
|
private static <I extends Result> void prepareResults(SparkSession spark, String bipScorePath, String outputPath) {
|
||||||
String bipScorePath, Class<I> inputClazz) {
|
|
||||||
|
|
||||||
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
|
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
|
||||||
|
|
||||||
|
@ -105,26 +96,7 @@ public class SparkAtomicActionScoreJob implements Serializable {
|
||||||
return bs;
|
return bs;
|
||||||
}).collect(Collectors.toList()).iterator()).rdd(), Encoders.bean(BipScore.class));
|
}).collect(Collectors.toList()).iterator()).rdd(), Encoders.bean(BipScore.class));
|
||||||
|
|
||||||
Dataset<I> results = readPath(spark, inputPath, inputClazz);
|
|
||||||
|
|
||||||
results.createOrReplaceTempView("result");
|
|
||||||
|
|
||||||
Dataset<Row> preparedResult = spark
|
|
||||||
.sql(
|
|
||||||
"select id " +
|
|
||||||
"from result " +
|
|
||||||
|
|
||||||
"where dataInfo.deletedbyinference = false ");
|
|
||||||
|
|
||||||
bipScores
|
bipScores
|
||||||
.joinWith(
|
|
||||||
preparedResult, bipScores.col("id").equalTo(preparedResult.col("id")),
|
|
||||||
"inner")
|
|
||||||
.map((MapFunction<Tuple2<BipScore, Row>, BipScore>) value -> {
|
|
||||||
BipScore ret = value._1();
|
|
||||||
ret.setId(value._1().getId());
|
|
||||||
return ret;
|
|
||||||
}, Encoders.bean(BipScore.class))
|
|
||||||
|
|
||||||
.map((MapFunction<BipScore, Result>) bs -> {
|
.map((MapFunction<BipScore, Result>) bs -> {
|
||||||
Result ret = new Result();
|
Result ret = new Result();
|
||||||
|
@ -136,7 +108,7 @@ public class SparkAtomicActionScoreJob implements Serializable {
|
||||||
return ret;
|
return ret;
|
||||||
}, Encoders.bean(Result.class))
|
}, Encoders.bean(Result.class))
|
||||||
.toJavaRDD()
|
.toJavaRDD()
|
||||||
.map(p -> new AtomicAction(inputClazz, p))
|
.map(p -> new AtomicAction(Result.class, p))
|
||||||
.mapToPair(
|
.mapToPair(
|
||||||
aa -> new Tuple2<>(new Text(aa.getClazz().getCanonicalName()),
|
aa -> new Tuple2<>(new Text(aa.getClazz().getCanonicalName()),
|
||||||
new Text(OBJECT_MAPPER.writeValueAsString(aa))))
|
new Text(OBJECT_MAPPER.writeValueAsString(aa))))
|
||||||
|
|
|
@ -49,6 +49,25 @@
|
||||||
</property>
|
</property>
|
||||||
</parameters>
|
</parameters>
|
||||||
|
|
||||||
|
<global>
|
||||||
|
<job-tracker>${jobTracker}</job-tracker>
|
||||||
|
<name-node>${nameNode}</name-node>
|
||||||
|
<configuration>
|
||||||
|
<property>
|
||||||
|
<name>mapreduce.job.queuename</name>
|
||||||
|
<value>${queueName}</value>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>oozie.launcher.mapred.job.queue.name</name>
|
||||||
|
<value>${oozieLauncherQueueName}</value>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>oozie.action.sharelib.for.spark</name>
|
||||||
|
<value>${oozieActionShareLibForSpark2}</value>
|
||||||
|
</property>
|
||||||
|
|
||||||
|
</configuration>
|
||||||
|
</global>
|
||||||
<start to="deleteoutputpath"/>
|
<start to="deleteoutputpath"/>
|
||||||
<kill name="Kill">
|
<kill name="Kill">
|
||||||
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
|
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
|
||||||
|
@ -64,14 +83,8 @@
|
||||||
<error to="Kill"/>
|
<error to="Kill"/>
|
||||||
</action>
|
</action>
|
||||||
|
|
||||||
<fork name="atomicactions">
|
|
||||||
<path start="atomicactions_publication"/>
|
|
||||||
<path start="atomicactions_dataset"/>
|
|
||||||
<path start="atomicactions_orp"/>
|
|
||||||
<path start="atomicactions_software"/>
|
|
||||||
</fork>
|
|
||||||
|
|
||||||
<action name="atomicactions_publication">
|
<action name="atomicactions">
|
||||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||||
<master>yarn</master>
|
<master>yarn</master>
|
||||||
<mode>cluster</mode>
|
<mode>cluster</mode>
|
||||||
|
@ -88,118 +101,14 @@
|
||||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||||
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
|
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
|
||||||
</spark-opts>
|
</spark-opts>
|
||||||
<arg>--inputPath</arg><arg>${inputPath}/publication</arg>
|
<arg>--inputPath</arg><arg>${bipScorePath}</arg>
|
||||||
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Publication</arg>
|
|
||||||
<arg>--outputPath</arg><arg>${workingDir}/publication</arg>
|
|
||||||
<arg>--bipScorePath</arg><arg>${bipScorePath}</arg>
|
|
||||||
</spark>
|
|
||||||
<ok to="join_aa"/>
|
|
||||||
<error to="Kill"/>
|
|
||||||
</action>
|
|
||||||
|
|
||||||
<action name="atomicactions_dataset">
|
|
||||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
|
||||||
<master>yarn</master>
|
|
||||||
<mode>cluster</mode>
|
|
||||||
<name>Produces the atomic action with the bip finder scores for datasets</name>
|
|
||||||
<class>eu.dnetlib.dhp.actionmanager.bipfinder.SparkAtomicActionScoreJob</class>
|
|
||||||
<jar>dhp-aggregation-${projectVersion}.jar</jar>
|
|
||||||
<spark-opts>
|
|
||||||
--executor-memory=${sparkExecutorMemory}
|
|
||||||
--executor-cores=${sparkExecutorCores}
|
|
||||||
--driver-memory=${sparkDriverMemory}
|
|
||||||
--conf spark.extraListeners=${spark2ExtraListeners}
|
|
||||||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
|
||||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
|
||||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
|
||||||
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
|
|
||||||
</spark-opts>
|
|
||||||
<arg>--inputPath</arg><arg>${inputPath}/dataset</arg>
|
|
||||||
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Dataset</arg>
|
|
||||||
<arg>--outputPath</arg><arg>${workingDir}/dataset</arg>
|
|
||||||
<arg>--bipScorePath</arg><arg>${bipScorePath}</arg>
|
|
||||||
</spark>
|
|
||||||
<ok to="join_aa"/>
|
|
||||||
<error to="Kill"/>
|
|
||||||
</action>
|
|
||||||
|
|
||||||
<action name="atomicactions_orp">
|
|
||||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
|
||||||
<master>yarn</master>
|
|
||||||
<mode>cluster</mode>
|
|
||||||
<name>Produces the atomic action with the bip finder scores for orp</name>
|
|
||||||
<class>eu.dnetlib.dhp.actionmanager.bipfinder.SparkAtomicActionScoreJob</class>
|
|
||||||
<jar>dhp-aggregation-${projectVersion}.jar</jar>
|
|
||||||
<spark-opts>
|
|
||||||
--executor-memory=${sparkExecutorMemory}
|
|
||||||
--executor-cores=${sparkExecutorCores}
|
|
||||||
--driver-memory=${sparkDriverMemory}
|
|
||||||
--conf spark.extraListeners=${spark2ExtraListeners}
|
|
||||||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
|
||||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
|
||||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
|
||||||
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
|
|
||||||
</spark-opts>
|
|
||||||
<arg>--inputPath</arg><arg>${inputPath}/otherresearchproduct</arg>
|
|
||||||
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.OtherResearchProduct</arg>
|
|
||||||
<arg>--outputPath</arg><arg>${workingDir}/otherresearchproduct</arg>
|
|
||||||
<arg>--bipScorePath</arg><arg>${bipScorePath}</arg>
|
|
||||||
</spark>
|
|
||||||
<ok to="join_aa"/>
|
|
||||||
<error to="Kill"/>
|
|
||||||
</action>
|
|
||||||
|
|
||||||
<action name="atomicactions_software">
|
|
||||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
|
||||||
<master>yarn</master>
|
|
||||||
<mode>cluster</mode>
|
|
||||||
<name>Produces the atomic action with the bip finder scores for software</name>
|
|
||||||
<class>eu.dnetlib.dhp.actionmanager.bipfinder.SparkAtomicActionScoreJob</class>
|
|
||||||
<jar>dhp-aggregation-${projectVersion}.jar</jar>
|
|
||||||
<spark-opts>
|
|
||||||
--executor-memory=${sparkExecutorMemory}
|
|
||||||
--executor-cores=${sparkExecutorCores}
|
|
||||||
--driver-memory=${sparkDriverMemory}
|
|
||||||
--conf spark.extraListeners=${spark2ExtraListeners}
|
|
||||||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
|
||||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
|
||||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
|
||||||
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
|
|
||||||
</spark-opts>
|
|
||||||
<arg>--inputPath</arg><arg>${inputPath}/software</arg>
|
|
||||||
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Software</arg>
|
|
||||||
<arg>--outputPath</arg><arg>${workingDir}/software</arg>
|
|
||||||
<arg>--bipScorePath</arg><arg>${bipScorePath}</arg>
|
|
||||||
</spark>
|
|
||||||
<ok to="join_aa"/>
|
|
||||||
<error to="Kill"/>
|
|
||||||
</action>
|
|
||||||
|
|
||||||
<join name="join_aa" to="collectandsave"/>
|
|
||||||
|
|
||||||
<action name="collectandsave">
|
|
||||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
|
||||||
<master>yarn</master>
|
|
||||||
<mode>cluster</mode>
|
|
||||||
<name>saves all the aa produced for the several types of results in the as output path</name>
|
|
||||||
<class>eu.dnetlib.dhp.actionmanager.bipfinder.CollectAndSave</class>
|
|
||||||
<jar>dhp-aggregation-${projectVersion}.jar</jar>
|
|
||||||
<spark-opts>
|
|
||||||
--executor-memory=${sparkExecutorMemory}
|
|
||||||
--executor-cores=${sparkExecutorCores}
|
|
||||||
--driver-memory=${sparkDriverMemory}
|
|
||||||
--conf spark.extraListeners=${spark2ExtraListeners}
|
|
||||||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
|
||||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
|
||||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
|
||||||
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
|
|
||||||
</spark-opts>
|
|
||||||
<arg>--inputPath</arg><arg>${workingDir}</arg>
|
|
||||||
<arg>--outputPath</arg><arg>${outputPath}</arg>
|
<arg>--outputPath</arg><arg>${outputPath}</arg>
|
||||||
</spark>
|
</spark>
|
||||||
<ok to="End"/>
|
<ok to="End"/>
|
||||||
<error to="Kill"/>
|
<error to="Kill"/>
|
||||||
</action>
|
</action>
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
<end name="End"/>
|
<end name="End"/>
|
||||||
</workflow-app>
|
</workflow-app>
|
Loading…
Reference in New Issue