From 24c56fa7a3584c4a258e9674e089ce36f23b7125 Mon Sep 17 00:00:00 2001 From: "miriam.baglioni" Date: Thu, 19 Nov 2020 19:15:39 +0100 Subject: [PATCH] new logic and workflow for dump of results with link to projects. In this implementation the result match the model of the communityresult. --- .../dump/funderresults/FunderResults.java | 4 +- .../funderresults/SparkDumpFunderResults.java | 41 +- .../SparkResultLinkedToProject.java | 82 ++++ .../dump/funderresults/oozie_app/workflow.xml | 365 +++++++++++++++++- .../graph/dump/input_parameters_link_prj.json | 36 ++ 5 files changed, 501 insertions(+), 27 deletions(-) create mode 100644 dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/funderresults/SparkResultLinkedToProject.java create mode 100644 dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/input_parameters_link_prj.json diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/funderresults/FunderResults.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/funderresults/FunderResults.java index c3d2d4c2c..138e262e4 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/funderresults/FunderResults.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/funderresults/FunderResults.java @@ -3,9 +3,9 @@ package eu.dnetlib.dhp.oa.graph.dump.funderresults; import java.io.Serializable; -import eu.dnetlib.dhp.schema.dump.oaf.Result; +import eu.dnetlib.dhp.schema.dump.oaf.community.CommunityResult; -public class FunderResults extends Result implements Serializable { +public class FunderResults extends CommunityResult implements Serializable { private String funder_id; public String getFunder_id() { diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/funderresults/SparkDumpFunderResults.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/funderresults/SparkDumpFunderResults.java index f84a5fd11..d4e294735 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/funderresults/SparkDumpFunderResults.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/funderresults/SparkDumpFunderResults.java @@ -17,9 +17,11 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.common.api.zenodo.Community; import eu.dnetlib.dhp.oa.graph.dump.ResultMapper; import eu.dnetlib.dhp.oa.graph.dump.Utils; import eu.dnetlib.dhp.oa.graph.dump.community.CommunityMap; +import eu.dnetlib.dhp.schema.dump.oaf.community.CommunityResult; import eu.dnetlib.dhp.schema.oaf.Relation; import scala.Tuple2; @@ -53,8 +55,8 @@ public class SparkDumpFunderResults implements Serializable { final String outputPath = parser.get("outputPath"); log.info("outputPath: {}", outputPath); - final String communityMapPath = parser.get("communityMapPath"); - log.info("communityMapPath: {}", communityMapPath); + final String relationPath = parser.get("relationPath"); + log.info("relationPath: {}", relationPath); SparkConf conf = new SparkConf(); @@ -63,36 +65,33 @@ public class SparkDumpFunderResults implements Serializable { isSparkSessionManaged, spark -> { Utils.removeOutputDir(spark, outputPath); - writeResultProjectList(spark, inputPath, outputPath, communityMapPath); + writeResultProjectList(spark, inputPath, outputPath); }); } - private static void writeResultProjectList(SparkSession spark, String inputPath, String outputPath, - String communityMapPath) { - - CommunityMap communityMap = Utils.getCommunityMap(spark, communityMapPath); + private static void writeResultProjectList(SparkSession spark, String inputPath, String outputPath) { Dataset relation = Utils .readPath(spark, inputPath + "/relation", Relation.class) .filter("dataInfo.deletedbyinference = false and relClass = 'produces'"); - Dataset result = Utils - .readPath(spark, inputPath + "/publication", eu.dnetlib.dhp.schema.oaf.Result.class) - .union(Utils.readPath(spark, inputPath + "/dataset", eu.dnetlib.dhp.schema.oaf.Result.class)) - .union(Utils.readPath(spark, inputPath + "/otherresearchproduct", eu.dnetlib.dhp.schema.oaf.Result.class)) - .union(Utils.readPath(spark, inputPath + "/software", eu.dnetlib.dhp.schema.oaf.Result.class)); + Dataset result = Utils + .readPath(spark, inputPath + "/publication", CommunityResult.class) + .union(Utils.readPath(spark, inputPath + "/dataset", CommunityResult.class)) + .union(Utils.readPath(spark, inputPath + "/otherresearchproduct", CommunityResult.class)) + .union(Utils.readPath(spark, inputPath + "/software", CommunityResult.class)); result .joinWith(relation, result.col("id").equalTo(relation.col("target")), "inner") - .map((MapFunction, FunderResults>) value ->{ - FunderResults res = (FunderResults) ResultMapper.map(value._1(), communityMap, false); - res.setFunder_id(value._2().getSource().substring(3,15)); - return res; - }, Encoders.bean(FunderResults.class)) - .write() - .partitionBy("funder_id") - .mode(SaveMode.Overwrite) - .json(outputPath); + .map((MapFunction, FunderResults>) value -> { + FunderResults res = (FunderResults) value._1(); + res.setFunder_id(value._2().getSource().substring(3, 15)); + return res; + }, Encoders.bean(FunderResults.class)) + .write() + .partitionBy("funder_id") + .mode(SaveMode.Overwrite) + .json(outputPath); } diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/funderresults/SparkResultLinkedToProject.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/funderresults/SparkResultLinkedToProject.java new file mode 100644 index 000000000..456e44447 --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/funderresults/SparkResultLinkedToProject.java @@ -0,0 +1,82 @@ + +package eu.dnetlib.dhp.oa.graph.dump.funderresults; + +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; + +import java.io.Serializable; +import java.util.Optional; + +import org.apache.commons.io.IOUtils; +import org.apache.spark.SparkConf; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.SparkSession; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.oa.graph.dump.Utils; +import eu.dnetlib.dhp.schema.oaf.Relation; +import eu.dnetlib.dhp.schema.oaf.Result; + +public class SparkResultLinkedToProject implements Serializable { + + private static final Logger log = LoggerFactory.getLogger(SparkResultLinkedToProject.class); + + public static void main(String[] args) throws Exception { + String jsonConfiguration = IOUtils + .toString( + SparkResultLinkedToProject.class + .getResourceAsStream( + "/eu/dnetlib/dhp/oa/graph/dump/input_parameters_link_prj.json")); + + final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); + parser.parseArgument(args); + + Boolean isSparkSessionManaged = Optional + .ofNullable(parser.get("isSparkSessionManaged")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); + log.info("isSparkSessionManaged: {}", isSparkSessionManaged); + + final String inputPath = parser.get("sourcePath"); + log.info("inputPath: {}", inputPath); + + final String outputPath = parser.get("outputPath"); + log.info("outputPath: {}", outputPath); + + final String resultClassName = parser.get("resultTableName"); + log.info("resultTableName: {}", resultClassName); + + final String relationPath = parser.get("relationPath"); + log.info("relationPath: {}", relationPath); + + Class inputClazz = (Class) Class.forName(resultClassName); + SparkConf conf = new SparkConf(); + + runWithSparkSession( + conf, + isSparkSessionManaged, + spark -> { + Utils.removeOutputDir(spark, outputPath); + writeResultsLikedToProjects(spark, inputClazz, inputPath, outputPath, relationPath); + }); + } + + private static void writeResultsLikedToProjects(SparkSession spark, Class inputClazz, + String inputPath, String outputPath, String relationPath) { + + Dataset results = Utils.readPath(spark, inputPath, inputClazz); + Dataset relations = Utils + .readPath(spark, relationPath, Relation.class) + .filter("dataInfo.deletedbyinference = false and relClass = 'produces'"); + relations + .joinWith( + results, relations.col("target").equalTo(results.col("id")), + "inner") + .write() + .mode(SaveMode.Overwrite) + .option("compression", "gzip") + .json(outputPath); + } +} diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/funderresults/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/funderresults/oozie_app/workflow.xml index aca8f9714..d0acdf051 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/funderresults/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/funderresults/oozie_app/workflow.xml @@ -127,10 +127,367 @@ --nameNode${nameNode} --isLookUpUrl${isLookUpUrl} - + + + + + + + + + + + yarn + cluster + Dump funder results + eu.dnetlib.dhp.oa.graph.dump.funderresults.SparkResultLinkedToProject + dhp-graph-mapper-${projectVersion}.jar + + --executor-memory=${sparkExecutorMemory} + --executor-cores=${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir} + + --sourcePath${sourcePath}/publication + --resultTableNameeu.dnetlib.dhp.schema.oaf.Publication + --outputPath${workingDir}/result/publication + --relationPath${sourcePath}/relation + + + + + + + + yarn + cluster + Dump funder results + eu.dnetlib.dhp.oa.graph.dump.funderresults.SparkResultLinkedToProject + dhp-graph-mapper-${projectVersion}.jar + + --executor-memory=${sparkExecutorMemory} + --executor-cores=${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir} + + --sourcePath${sourcePath}/dataset + --resultTableNameeu.dnetlib.dhp.schema.oaf.Publication + --outputPath${workingDir}/result/dataset + --relationPath${sourcePath}/relation + + + + + + + + yarn + cluster + Dump funder results + eu.dnetlib.dhp.oa.graph.dump.funderresults.SparkResultLinkedToProject + dhp-graph-mapper-${projectVersion}.jar + + --executor-memory=${sparkExecutorMemory} + --executor-cores=${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir} + + --sourcePath${sourcePath}/otherresearchproduct + --resultTableNameeu.dnetlib.dhp.schema.oaf.Publication + --outputPath${workingDir}/result/otherresearchproduct + --relationPath${sourcePath}/relation + + + + + + + + yarn + cluster + Dump funder results + eu.dnetlib.dhp.oa.graph.dump.funderresults.SparkResultLinkedToProject + dhp-graph-mapper-${projectVersion}.jar + + --executor-memory=${sparkExecutorMemory} + --executor-cores=${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir} + + --sourcePath${sourcePath}/software + --resultTableNameeu.dnetlib.dhp.schema.oaf.Publication + --outputPath${workingDir}/result/software + --relationPath${sourcePath}/relation + + + + + + + + + + + + + + + + + yarn + cluster + Dump table publication for community related products + eu.dnetlib.dhp.oa.graph.dump.community.SparkDumpCommunityProducts + dhp-graph-mapper-${projectVersion}.jar + + --executor-memory=${sparkExecutorMemory} + --executor-cores=${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir} + + --sourcePath${workingDir}/result/publication + --resultTableNameeu.dnetlib.dhp.schema.oaf.Publication + --outputPath${workingDir}/dump/publication + --communityMapPath${workingDir}/communityMap + + + + + + + + yarn + cluster + Dump table dataset for community related products + eu.dnetlib.dhp.oa.graph.dump.community.SparkDumpCommunityProducts + dhp-graph-mapper-${projectVersion}.jar + + --executor-memory=${sparkExecutorMemory} + --executor-cores=${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir} + + --sourcePath${workingDir}/result/dataset + --resultTableNameeu.dnetlib.dhp.schema.oaf.Dataset + --outputPath${workingDir}/dump/dataset + --communityMapPath${workingDir}/communityMap + + + + + + + + yarn + cluster + Dump table ORP for community related products + eu.dnetlib.dhp.oa.graph.dump.community.SparkDumpCommunityProducts + dhp-graph-mapper-${projectVersion}.jar + + --executor-memory=${sparkExecutorMemory} + --executor-cores=${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir} + + --sourcePath${workingDir}/result/otherresearchproduct + --resultTableNameeu.dnetlib.dhp.schema.oaf.OtherResearchProduct + --outputPath${workingDir}/dump/otherresearchproduct + --communityMapPath${workingDir}/communityMap + + + + + + + + yarn + cluster + Dump table software for community related products + eu.dnetlib.dhp.oa.graph.dump.community.SparkDumpCommunityProducts + dhp-graph-mapper-${projectVersion}.jar + + --executor-memory=${sparkExecutorMemory} + --executor-cores=${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir} + + --sourcePath${workingDir}/result/software + --resultTableNameeu.dnetlib.dhp.schema.oaf.Software + --outputPath${workingDir}/dump/software + --communityMapPath${workingDir}/communityMap + + + + + + + + + + yarn + cluster + Prepare association result subset of project info + eu.dnetlib.dhp.oa.graph.dump.community.SparkPrepareResultProject + dhp-graph-mapper-${projectVersion}.jar + + --executor-memory=${sparkExecutorMemory} + --executor-cores=${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir} + + --sourcePath${sourcePath} + --outputPath${workingDir}/preparedInfo + + + + + + + + + + + + + + + yarn + cluster + Extend dumped publications with information about project + eu.dnetlib.dhp.oa.graph.dump.community.SparkUpdateProjectInfo + dhp-graph-mapper-${projectVersion}.jar + + --executor-memory=${sparkExecutorMemory} + --executor-cores=${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir} + + --sourcePath${workingDir}/dump/publication + --outputPath${workingDir}/ext/publication + --preparedInfoPath${workingDir}/preparedInfo + + + + + + + + yarn + cluster + Extend dumped dataset with information about project + eu.dnetlib.dhp.oa.graph.dump.community.SparkUpdateProjectInfo + dhp-graph-mapper-${projectVersion}.jar + + --executor-memory=${sparkExecutorMemory} + --executor-cores=${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir} + + --sourcePath${workingDir}/dump/dataset + --outputPath${workingDir}/ext/dataset + --preparedInfoPath${workingDir}/preparedInfo + + + + + + + + yarn + cluster + Extend dumped ORP with information about project + eu.dnetlib.dhp.oa.graph.dump.community.SparkUpdateProjectInfo + dhp-graph-mapper-${projectVersion}.jar + + --executor-memory=${sparkExecutorMemory} + --executor-cores=${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir} + + --sourcePath${workingDir}/dump/otherresearchproduct + --outputPath${workingDir}/ext/orp + --preparedInfoPath${workingDir}/preparedInfo + + + + + + + + yarn + cluster + Extend dumped software with information about project + eu.dnetlib.dhp.oa.graph.dump.community.SparkUpdateProjectInfo + dhp-graph-mapper-${projectVersion}.jar + + --executor-memory=${sparkExecutorMemory} + --executor-cores=${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir} + + --sourcePath${workingDir}/dump/software + --outputPath${workingDir}/ext/software + --preparedInfoPath${workingDir}/preparedInfo + + + + + @@ -149,9 +506,9 @@ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir} - --sourcePath${sourcePath} - --outputPath${workingDir}/result - --communityMapPath${workingDir}/communityMap + --sourcePath${workingDir}/ext + --outputPath${outputPath} + --relationPath${sourcePath}/relation diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/input_parameters_link_prj.json b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/input_parameters_link_prj.json new file mode 100644 index 000000000..b1f4c026a --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/input_parameters_link_prj.json @@ -0,0 +1,36 @@ +[ + + { + "paramName":"cmp", + "paramLongName":"communityMapPath", + "paramDescription": "the path to the serialization of the community map", + "paramRequired": true + }, + { + "paramName":"s", + "paramLongName":"sourcePath", + "paramDescription": "the path of the sequencial file to read", + "paramRequired": true + }, + { + "paramName": "out", + "paramLongName": "outputPath", + "paramDescription": "the path used to store temporary output files", + "paramRequired": true + }, + { + "paramName": "ssm", + "paramLongName": "isSparkSessionManaged", + "paramDescription": "true if the spark session is managed, false otherwise", + "paramRequired": false + }, + { + "paramName":"tn", + "paramLongName":"resultTableName", + "paramDescription": "the name of the result table we are currently working on", + "paramRequired": true + } +] + + +