diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipfinder/CollectAndSave.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipfinder/CollectAndSave.java deleted file mode 100644 index a48b84a33..000000000 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipfinder/CollectAndSave.java +++ /dev/null @@ -1,86 +0,0 @@ - -package eu.dnetlib.dhp.actionmanager.bipfinder; - -import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; - -import java.io.Serializable; -import java.util.Objects; -import java.util.Optional; - -import org.apache.commons.io.IOUtils; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapred.SequenceFileOutputFormat; -import org.apache.spark.SparkConf; -import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.sql.SparkSession; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.fasterxml.jackson.databind.ObjectMapper; - -import eu.dnetlib.dhp.application.ArgumentApplicationParser; -import eu.dnetlib.dhp.common.HdfsSupport; -import eu.dnetlib.dhp.schema.oaf.Result; - -/** - * Just collects all the atomic actions produced for the different results and saves them in - * outputpath for the ActionSet - */ -public class CollectAndSave implements Serializable { - - private static final Logger log = LoggerFactory.getLogger(CollectAndSave.class); - - public static void main(String[] args) throws Exception { - - String jsonConfiguration = IOUtils - .toString( - Objects - .requireNonNull( - CollectAndSave.class - .getResourceAsStream( - "/eu/dnetlib/dhp/actionmanager/bipfinder/input_actionset_parameter.json"))); - - final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); - - parser.parseArgument(args); - - Boolean isSparkSessionManaged = Optional - .ofNullable(parser.get("isSparkSessionManaged")) - .map(Boolean::valueOf) - .orElse(Boolean.TRUE); - - log.info("isSparkSessionManaged: {}", isSparkSessionManaged); - - final String inputPath = parser.get("inputPath"); - log.info("inputPath {}: ", inputPath); - - final String outputPath = parser.get("outputPath"); - log.info("outputPath {}: ", outputPath); - - SparkConf conf = new SparkConf(); - - runWithSparkSession( - conf, - isSparkSessionManaged, - spark -> { - removeOutputDir(spark, outputPath); - collectAndSave(spark, inputPath, outputPath); - }); - } - - private static void collectAndSave(SparkSession spark, String inputPath, String outputPath) { - JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); - - sc - .sequenceFile(inputPath + "/publication", Text.class, Text.class) - .union(sc.sequenceFile(inputPath + "/dataset", Text.class, Text.class)) - .union(sc.sequenceFile(inputPath + "/otherresearchproduct", Text.class, Text.class)) - .union(sc.sequenceFile(inputPath + "/software", Text.class, Text.class)) - .saveAsHadoopFile(outputPath, Text.class, Text.class, SequenceFileOutputFormat.class); - } - - private static void removeOutputDir(SparkSession spark, String path) { - HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration()); - } - -} diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipfinder/SparkAtomicActionScoreJob.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipfinder/SparkAtomicActionScoreJob.java index d4dce6ae4..8cda19d07 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipfinder/SparkAtomicActionScoreJob.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipfinder/SparkAtomicActionScoreJob.java @@ -50,7 +50,7 @@ public class SparkAtomicActionScoreJob implements Serializable { .toString( SparkAtomicActionScoreJob.class .getResourceAsStream( - "/eu/dnetlib/dhp/actionmanager/bipfinder/input_parameters.json")); + "/eu/dnetlib/dhp/actionmanager/bipfinder/input_actionset_parameter.json")); final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); @@ -69,14 +69,6 @@ public class SparkAtomicActionScoreJob implements Serializable { final String outputPath = parser.get("outputPath"); log.info("outputPath {}: ", outputPath); - final String bipScorePath = parser.get("bipScorePath"); - log.info("bipScorePath: {}", bipScorePath); - - final String resultClassName = parser.get("resultTableName"); - log.info("resultTableName: {}", resultClassName); - - Class<I> inputClazz = (Class<I>) Class.forName(resultClassName); - SparkConf conf = new SparkConf(); runWithSparkSession( @@ -84,12 +76,11 @@ public class SparkAtomicActionScoreJob implements Serializable { isSparkSessionManaged, spark -> { removeOutputDir(spark, outputPath); - prepareResults(spark, inputPath, outputPath, bipScorePath, inputClazz); + prepareResults(spark, inputPath, outputPath); }); } - private static <I extends Result> void prepareResults(SparkSession spark, String inputPath, String outputPath, - String bipScorePath, Class<I> inputClazz) { + private static <I extends Result> void prepareResults(SparkSession spark, String bipScorePath, String outputPath) { final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); @@ -105,26 +96,7 @@ public class SparkAtomicActionScoreJob implements Serializable { return bs; }).collect(Collectors.toList()).iterator()).rdd(), Encoders.bean(BipScore.class)); - Dataset<I> results = readPath(spark, inputPath, inputClazz); - - results.createOrReplaceTempView("result"); - - Dataset<Row> preparedResult = spark - .sql( - "select id " + - "from result " + - - "where dataInfo.deletedbyinference = false "); - bipScores - .joinWith( - preparedResult, bipScores.col("id").equalTo(preparedResult.col("id")), - "inner") - .map((MapFunction<Tuple2<BipScore, Row>, BipScore>) value -> { - BipScore ret = value._1(); - ret.setId(value._1().getId()); - return ret; - }, Encoders.bean(BipScore.class)) .map((MapFunction<BipScore, Result>) bs -> { Result ret = new Result(); @@ -136,7 +108,7 @@ public class SparkAtomicActionScoreJob implements Serializable { return ret; }, Encoders.bean(Result.class)) .toJavaRDD() - .map(p -> new AtomicAction(inputClazz, p)) + .map(p -> new AtomicAction(Result.class, p)) .mapToPair( aa -> new Tuple2<>(new Text(aa.getClazz().getCanonicalName()), new Text(OBJECT_MAPPER.writeValueAsString(aa)))) diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/bipfinder/oozie_app/workflow.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/bipfinder/oozie_app/workflow.xml index eef429087..d2fe6712c 100644 --- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/bipfinder/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/bipfinder/oozie_app/workflow.xml @@ -49,6 +49,25 @@ </property> </parameters> + <global> + <job-tracker>${jobTracker}</job-tracker> + <name-node>${nameNode}</name-node> + <configuration> + <property> + <name>mapreduce.job.queuename</name> + <value>${queueName}</value> + </property> + <property> + <name>oozie.launcher.mapred.job.queue.name</name> + <value>${oozieLauncherQueueName}</value> + </property> + <property> + <name>oozie.action.sharelib.for.spark</name> + <value>${oozieActionShareLibForSpark2}</value> + </property> + + </configuration> + </global> <start to="deleteoutputpath"/> <kill name="Kill"> <message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message> @@ -64,14 +83,8 @@ <error to="Kill"/> </action> - <fork name="atomicactions"> - <path start="atomicactions_publication"/> - <path start="atomicactions_dataset"/> - <path start="atomicactions_orp"/> - <path start="atomicactions_software"/> - </fork> - <action name="atomicactions_publication"> + <action name="atomicactions"> <spark xmlns="uri:oozie:spark-action:0.2"> <master>yarn</master> <mode>cluster</mode> @@ -88,118 +101,14 @@ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir} </spark-opts> - <arg>--inputPath</arg><arg>${inputPath}/publication</arg> - <arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Publication</arg> - <arg>--outputPath</arg><arg>${workingDir}/publication</arg> - <arg>--bipScorePath</arg><arg>${bipScorePath}</arg> - </spark> - <ok to="join_aa"/> - <error to="Kill"/> - </action> - - <action name="atomicactions_dataset"> - <spark xmlns="uri:oozie:spark-action:0.2"> - <master>yarn</master> - <mode>cluster</mode> - <name>Produces the atomic action with the bip finder scores for datasets</name> - <class>eu.dnetlib.dhp.actionmanager.bipfinder.SparkAtomicActionScoreJob</class> - <jar>dhp-aggregation-${projectVersion}.jar</jar> - <spark-opts> - --executor-memory=${sparkExecutorMemory} - --executor-cores=${sparkExecutorCores} - --driver-memory=${sparkDriverMemory} - --conf spark.extraListeners=${spark2ExtraListeners} - --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} - --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} - --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - --conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir} - </spark-opts> - <arg>--inputPath</arg><arg>${inputPath}/dataset</arg> - <arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Dataset</arg> - <arg>--outputPath</arg><arg>${workingDir}/dataset</arg> - <arg>--bipScorePath</arg><arg>${bipScorePath}</arg> - </spark> - <ok to="join_aa"/> - <error to="Kill"/> - </action> - - <action name="atomicactions_orp"> - <spark xmlns="uri:oozie:spark-action:0.2"> - <master>yarn</master> - <mode>cluster</mode> - <name>Produces the atomic action with the bip finder scores for orp</name> - <class>eu.dnetlib.dhp.actionmanager.bipfinder.SparkAtomicActionScoreJob</class> - <jar>dhp-aggregation-${projectVersion}.jar</jar> - <spark-opts> - --executor-memory=${sparkExecutorMemory} - --executor-cores=${sparkExecutorCores} - --driver-memory=${sparkDriverMemory} - --conf spark.extraListeners=${spark2ExtraListeners} - --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} - --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} - --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - --conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir} - </spark-opts> - <arg>--inputPath</arg><arg>${inputPath}/otherresearchproduct</arg> - <arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.OtherResearchProduct</arg> - <arg>--outputPath</arg><arg>${workingDir}/otherresearchproduct</arg> - <arg>--bipScorePath</arg><arg>${bipScorePath}</arg> - </spark> - <ok to="join_aa"/> - <error to="Kill"/> - </action> - - <action name="atomicactions_software"> - <spark xmlns="uri:oozie:spark-action:0.2"> - <master>yarn</master> - <mode>cluster</mode> - <name>Produces the atomic action with the bip finder scores for software</name> - <class>eu.dnetlib.dhp.actionmanager.bipfinder.SparkAtomicActionScoreJob</class> - <jar>dhp-aggregation-${projectVersion}.jar</jar> - <spark-opts> - --executor-memory=${sparkExecutorMemory} - --executor-cores=${sparkExecutorCores} - --driver-memory=${sparkDriverMemory} - --conf spark.extraListeners=${spark2ExtraListeners} - --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} - --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} - --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - --conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir} - </spark-opts> - <arg>--inputPath</arg><arg>${inputPath}/software</arg> - <arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Software</arg> - <arg>--outputPath</arg><arg>${workingDir}/software</arg> - <arg>--bipScorePath</arg><arg>${bipScorePath}</arg> - </spark> - <ok to="join_aa"/> - <error to="Kill"/> - </action> - - <join name="join_aa" to="collectandsave"/> - - <action name="collectandsave"> - <spark xmlns="uri:oozie:spark-action:0.2"> - <master>yarn</master> - <mode>cluster</mode> - <name>saves all the aa produced for the several types of results in the as output path</name> - <class>eu.dnetlib.dhp.actionmanager.bipfinder.CollectAndSave</class> - <jar>dhp-aggregation-${projectVersion}.jar</jar> - <spark-opts> - --executor-memory=${sparkExecutorMemory} - --executor-cores=${sparkExecutorCores} - --driver-memory=${sparkDriverMemory} - --conf spark.extraListeners=${spark2ExtraListeners} - --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} - --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} - --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - --conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir} - </spark-opts> - <arg>--inputPath</arg><arg>${workingDir}</arg> + <arg>--inputPath</arg><arg>${bipScorePath}</arg> <arg>--outputPath</arg><arg>${outputPath}</arg> </spark> <ok to="End"/> <error to="Kill"/> </action> + + <end name="End"/> </workflow-app> \ No newline at end of file