diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/community/SparkPrepareResultProject.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/community/SparkPrepareResultProject.java index 87ac20948..c277cdda1 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/community/SparkPrepareResultProject.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/community/SparkPrepareResultProject.java @@ -10,6 +10,7 @@ import java.util.stream.Collectors; import org.apache.commons.io.IOUtils; import org.apache.spark.SparkConf; +import org.apache.spark.api.java.function.FilterFunction; import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.api.java.function.MapGroupsFunction; import org.apache.spark.sql.Dataset; @@ -79,12 +80,11 @@ public class SparkPrepareResultProject implements Serializable { private static void prepareResultProjectList(SparkSession spark, String inputPath, String outputPath) { Dataset relation = Utils - .readPath(spark, inputPath + "/relation", Relation.class) - .filter( - "dataInfo.deletedbyinference = false and lower(relClass) = '" - + ModelConstants.IS_PRODUCED_BY.toLowerCase() + "'"); - Dataset projects = Utils - .readPath(spark, inputPath + "/project", eu.dnetlib.dhp.schema.oaf.Project.class); + .readPath(spark, inputPath + "/relation", Relation.class) + .filter((FilterFunction) r -> !r.getDataInfo().getDeletedbyinference() && + r.getRelClass().equalsIgnoreCase(ModelConstants.IS_PRODUCED_BY)); + + Dataset projects = Utils.readPath(spark, inputPath + "/project", eu.dnetlib.dhp.schema.oaf.Project.class); projects .joinWith(relation, projects.col("id").equalTo(relation.col("target")), "inner") diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/funderresults/SparkDumpFunderResults.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/funderresults/SparkDumpFunderResults.java index 6aed9d7a8..03a209220 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/funderresults/SparkDumpFunderResults.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/funderresults/SparkDumpFunderResults.java @@ -76,8 +76,7 @@ public class SparkDumpFunderResults implements Serializable { .union(Utils.readPath(spark, inputPath + "/software", CommunityResult.class)); List funderList = project - .select("id") - .map((MapFunction) value -> value.getString(0).substring(0, 15), Encoders.STRING()) + .map((MapFunction) p -> p.getId(),Encoders.STRING() ) .distinct() .collectAsList(); diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/funderresults/SparkResultLinkedToProject.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/funderresults/SparkResultLinkedToProject.java index d89b9e86d..a6997ce2e 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/funderresults/SparkResultLinkedToProject.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/funderresults/SparkResultLinkedToProject.java @@ -6,8 +6,15 @@ import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; import java.io.Serializable; import java.util.Optional; +import eu.dnetlib.dhp.oa.graph.dump.Constants; +import eu.dnetlib.dhp.oa.graph.dump.DumpProducts; +import eu.dnetlib.dhp.oa.graph.dump.ResultMapper; +import eu.dnetlib.dhp.oa.graph.dump.community.CommunityMap; +import eu.dnetlib.dhp.oa.graph.dump.community.ResultProject; +import eu.dnetlib.dhp.schema.dump.oaf.community.CommunityResult; import org.apache.commons.io.IOUtils; import org.apache.spark.SparkConf; +import org.apache.spark.api.java.function.FilterFunction; import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.api.java.function.MapGroupsFunction; import org.apache.spark.sql.Dataset; @@ -23,6 +30,7 @@ import eu.dnetlib.dhp.schema.common.ModelConstants; import eu.dnetlib.dhp.schema.oaf.Project; import eu.dnetlib.dhp.schema.oaf.Relation; import eu.dnetlib.dhp.schema.oaf.Result; +import scala.Tuple2; /** * Selects the results linked to projects. Only for these results the dump will be performed. @@ -58,8 +66,10 @@ public class SparkResultLinkedToProject implements Serializable { final String resultClassName = parser.get("resultTableName"); log.info("resultTableName: {}", resultClassName); - final String graphPath = parser.get("graphPath"); - log.info("graphPath: {}", graphPath); + final String resultProjectsPath = parser.get("graphPath"); + log.info("graphPath: {}", resultProjectsPath); + + String communityMapPath = parser.get("communityMapPath"); @SuppressWarnings("unchecked") Class inputClazz = (Class) Class.forName(resultClassName); @@ -70,43 +80,32 @@ public class SparkResultLinkedToProject implements Serializable { isSparkSessionManaged, spark -> { Utils.removeOutputDir(spark, outputPath); - writeResultsLinkedToProjects(spark, inputClazz, inputPath, outputPath, graphPath); + writeResultsLinkedToProjects(communityMapPath, spark, inputClazz, inputPath, outputPath, resultProjectsPath); }); } - private static void writeResultsLinkedToProjects(SparkSession spark, Class inputClazz, - String inputPath, String outputPath, String graphPath) { + private static void writeResultsLinkedToProjects(String communityMapPath, SparkSession spark, Class inputClazz, + String inputPath, String outputPath, String resultProjectsPath) { Dataset results = Utils .readPath(spark, inputPath, inputClazz) - .filter("dataInfo.deletedbyinference = false and datainfo.invisible = false"); - Dataset relations = Utils - .readPath(spark, graphPath + "/relation", Relation.class) - .filter( - "dataInfo.deletedbyinference = false and lower(relClass) = '" - + ModelConstants.IS_PRODUCED_BY.toLowerCase() + "'"); - Dataset project = Utils.readPath(spark, graphPath + "/project", Project.class); + .filter((FilterFunction) r -> !r.getDataInfo().getDeletedbyinference() && + !r.getDataInfo().getInvisible()) + ; + Dataset resultProjectDataset = Utils + .readPath(spark, resultProjectsPath , ResultProject.class) + ; + CommunityMap communityMap = Utils.getCommunityMap(spark, communityMapPath); + results.joinWith(resultProjectDataset, results.col("id").equalTo(resultProjectDataset.col("resultId"))) + .map((MapFunction, CommunityResult>) t2 -> + { + CommunityResult cr = (CommunityResult) ResultMapper.map(t2._1(), + communityMap, Constants.DUMPTYPE.FUNDER.getType()); + cr.setProjects(t2._2().getProjectsList()); + return cr; + } - results.createOrReplaceTempView("result"); - relations.createOrReplaceTempView("relation"); - project.createOrReplaceTempView("project"); - - Dataset tmp = spark - .sql( - "Select res.* " + - "from relation rel " + - "join result res " + - "on rel.source = res.id " + - "join project p " + - "on rel.target = p.id " + - "") - .as(Encoders.bean(inputClazz)); - tmp - .groupByKey( - (MapFunction) value -> value - .getId(), - Encoders.STRING()) - .mapGroups((MapGroupsFunction) (k, it) -> it.next(), Encoders.bean(inputClazz)) + , Encoders.bean(CommunityResult.class) ) .write() .mode(SaveMode.Overwrite) .option("compression", "gzip") diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/input_parameter_select_relation.json b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/input_parameter_select_relation.json new file mode 100644 index 000000000..4a2e8159c --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/input_parameter_select_relation.json @@ -0,0 +1,23 @@ +[ + { + "paramName":"s", + "paramLongName":"sourcePath", + "paramDescription": "the path of the sequencial file to read", + "paramRequired": true + }, + { + "paramName": "out", + "paramLongName": "outputPath", + "paramDescription": "the path used to store temporary output files", + "paramRequired": true + }, + { + "paramName": "ssm", + "paramLongName": "isSparkSessionManaged", + "paramDescription": "true if the spark session is managed, false otherwise", + "paramRequired": false + } +] + + + diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/wf/subworkflows/commoncommunityfunder/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/wf/subworkflows/commoncommunityfunder/oozie_app/workflow.xml index 5f954ff38..1a0c71a71 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/wf/subworkflows/commoncommunityfunder/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/wf/subworkflows/commoncommunityfunder/oozie_app/workflow.xml @@ -92,8 +92,6 @@ - - diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/wf/subworkflows/funder/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/wf/subworkflows/funder/oozie_app/workflow.xml index b990f4e49..3270445c4 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/wf/subworkflows/funder/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/wf/subworkflows/funder/oozie_app/workflow.xml @@ -77,12 +77,59 @@ - + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] + + + yarn + cluster + Prepare association result subset of project info + eu.dnetlib.dhp.oa.graph.dump.community.SparkPrepareResultProject + dhp-graph-mapper-${projectVersion}.jar + + --executor-memory=${sparkExecutorMemory} + --executor-cores=${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir} + + --sourcePath${sourcePath} + --outputPath${workingDir}/preparedInfo + + + + + + + + + + + + + + + + + + + + + + + + + + + + @@ -111,7 +158,7 @@ --sourcePath${sourcePath}/publication --resultTableNameeu.dnetlib.dhp.schema.oaf.Publication --outputPath${workingDir}/result/publication - --graphPath${sourcePath} + --graphPath${workingDir}/preparedInfo @@ -137,7 +184,7 @@ --sourcePath${sourcePath}/dataset --resultTableNameeu.dnetlib.dhp.schema.oaf.Dataset --outputPath${workingDir}/result/dataset - --graphPath${sourcePath} + --graphPath${workingDir}/preparedInfo @@ -163,7 +210,7 @@ --sourcePath${sourcePath}/otherresearchproduct --resultTableNameeu.dnetlib.dhp.schema.oaf.OtherResearchProduct --outputPath${workingDir}/result/otherresearchproduct - --graphPath${sourcePath} + --graphPath${workingDir}/preparedInfo @@ -189,41 +236,41 @@ --sourcePath${sourcePath}/software --resultTableNameeu.dnetlib.dhp.schema.oaf.Software --outputPath${workingDir}/result/software - --graphPath${sourcePath} + --graphPath${workingDir}/preparedInfo - + - - - ${wf:appPath()}/dump_common - - - - - sourcePath - ${sourcePath} - - - selectedResults - ${workingDir}/result - - - communityMapPath - ${workingDir}/communityMap - - - outputPath - ${workingDir} - - - - - - + + + + + + + + + + + + + + + + + + + + + + + + + + + @@ -242,7 +289,7 @@ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir} - --sourcePath${workingDir}/ext + --sourcePath${workingDir}/result --outputPath${outputPath} --graphPath${sourcePath}