diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/projectssubset/ProjectsSubsetSparkJob.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/projectssubset/ProjectsSubsetSparkJob.java new file mode 100644 index 000000000..67da24185 --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/projectssubset/ProjectsSubsetSparkJob.java @@ -0,0 +1,80 @@ +package eu.dnetlib.dhp.oa.graph.dump.projectssubset; +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; +import java.io.Serializable; +import java.util.List; +import java.util.Objects; +import java.util.Optional; +import java.util.stream.Collectors; +import org.apache.commons.io.IOUtils; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.function.FilterFunction; +import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.SparkSession; +import org.jetbrains.annotations.NotNull; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.oa.graph.dump.Utils; +import eu.dnetlib.dhp.schema.dump.oaf.community.Funder; +import eu.dnetlib.dhp.schema.dump.oaf.graph.Project; +import scala.Tuple2; +public class ProjectsSubsetSparkJob implements Serializable { + private static final Logger log = LoggerFactory.getLogger(ProjectsSubsetSparkJob.class); + public static void main(String[] args) throws Exception { + String jsonConfiguration = IOUtils + .toString( + ProjectsSubsetSparkJob.class + .getResourceAsStream( + "/eu/dnetlib/dhp/oa/graph/dump/project_subset_parameters.json")); + final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); + parser.parseArgument(args); + Boolean isSparkSessionManaged = Optional + .ofNullable(parser.get("isSparkSessionManaged")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); + log.info("isSparkSessionManaged: {}", isSparkSessionManaged); + final String inputPath = parser.get("sourcePath"); + log.info("inputPath: {}", inputPath); + final String outputPath = parser.get("outputPath"); + log.info("outputPath: {}", outputPath); + final String projectListPath = parser.get("projectListPath"); + log.info("projectListPath: {}", projectListPath); + SparkConf conf = new SparkConf(); + runWithSparkSession( + conf, + isSparkSessionManaged, + spark -> { + Utils.removeOutputDir(spark, outputPath); + getNewProjectList(spark, inputPath, outputPath, projectListPath); + }); + } + private static void getNewProjectList(SparkSession spark, String inputPath, String outputPath, + String projectListPath) { + Dataset projectList = spark.read().textFile(projectListPath); + // projectList.show(false); + Dataset projects; + projects = Utils.readPath(spark, inputPath, Project.class); + projects + .joinWith(projectList, projects.col("id").equalTo(projectList.col("value")), "left") + .map((MapFunction, Project>) t2 -> { + if (Optional.ofNullable(t2._2()).isPresent()) + return null; + return t2._1(); + }, Encoders.bean(Project.class)) + .filter(Objects::nonNull) + .write() + .mode(SaveMode.Overwrite) + .option("compression", "gzip") + .json(outputPath); + Utils + .readPath(spark, outputPath, Project.class) + .map((MapFunction) p -> p.getId(), Encoders.STRING()) + .write() + .mode(SaveMode.Append) + .option("compression", "gzip") + .text(projectListPath); + } +} \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/project_subset_parameters.json b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/project_subset_parameters.json new file mode 100644 index 000000000..ed2313680 --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/project_subset_parameters.json @@ -0,0 +1,27 @@ +[ + { + "paramName":"s", + "paramLongName":"sourcePath", + "paramDescription": "the path of the sequencial file to read", + "paramRequired": true + }, + { + "paramName": "out", + "paramLongName": "outputPath", + "paramDescription": "the path used to store temporary output files", + "paramRequired": true + }, + { + "paramName": "ssm", + "paramLongName": "isSparkSessionManaged", + "paramDescription": "true if the spark session is managed, false otherwise", + "paramRequired": false + }, + { + "paramName": "pl", + "paramLongName": "projectListPath", + "paramDescription": "the path of the association result projectlist", + "paramRequired": true + } +] + diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/projectsubset/oozie_app/config-default.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/projectsubset/oozie_app/config-default.xml new file mode 100644 index 000000000..d262cb6e0 --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/projectsubset/oozie_app/config-default.xml @@ -0,0 +1,30 @@ + + + jobTracker + yarnRM + + + nameNode + hdfs://nameservice1 + + + oozie.use.system.libpath + true + + + hiveMetastoreUris + thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083 + + + hiveJdbcUrl + jdbc:hive2://iis-cdh5-test-m3.ocean.icm.edu.pl:10000 + + + hiveDbName + openaire + + + oozie.launcher.mapreduce.user.classpath.first + true + + diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/projectsubset/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/projectsubset/oozie_app/workflow.xml new file mode 100644 index 000000000..619e3aa20 --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/projectsubset/oozie_app/workflow.xml @@ -0,0 +1,171 @@ + + + + sourcePath + the source path + + + projectListPath + the path to the project list + + + outputPath + the output path + + + accessToken + the access token used for the deposition in Zenodo + + + connectionUrl + the connection url for Zenodo + + + metadata + the metadata associated to the deposition + + + depositionType + the type of deposition we want to perform. "new" for brand new deposition, "version" for a new version of a published deposition (in this case the concept record id must be provided), "upload" to upload content to an open deposition for which we already have the deposition id (in this case the deposition id should be provided) + + + conceptRecordId + for new version, the id of the record for the old deposition + + + depositionId + the depositionId of a deposition open that has to be added content + + + sparkDriverMemory + memory for driver process + + + sparkExecutorMemory + memory for individual executor + + + sparkExecutorCores + number of cores used by single executor + + + oozieActionShareLibForSpark2 + oozie action sharelib for spark 2.* + + + spark2ExtraListeners + com.cloudera.spark.lineage.NavigatorAppListener + spark 2.* extra listeners classname + + + spark2SqlQueryExecutionListeners + com.cloudera.spark.lineage.NavigatorQueryListener + spark 2.* sql query execution listeners classname + + + spark2YarnHistoryServerAddress + spark 2.* yarn history server address + + + spark2EventLogDir + spark 2.* event log dir location + + + + ${jobTracker} + ${nameNode} + + + mapreduce.job.queuename + ${queueName} + + + oozie.launcher.mapred.job.queue.name + ${oozieLauncherQueueName} + + + oozie.action.sharelib.for.spark + ${oozieActionShareLibForSpark2} + + + + + + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] + + + + yarn + cluster + Dump table project + eu.dnetlib.dhp.oa.graph.dump.complete.SparkDumpEntitiesJob + dhp-graph-mapper-${projectVersion}.jar + + --executor-memory=${sparkExecutorMemory} + --executor-cores=${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir} + + --sourcePath${sourcePath}/project + --resultTableNameeu.dnetlib.dhp.schema.oaf.Project + --outputPath${workingDir}/project + --communityMapPathnoneed + + + + + + + yarn + cluster + Dump table project + eu.dnetlib.dhp.oa.graph.dump.projectssubset.ProjectsSubsetSparkJob + dhp-graph-mapper-${projectVersion}.jar + + --executor-memory=${sparkExecutorMemory} + --executor-cores=${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir} + + --sourcePath${workingDir}/project + --outputPath${workingDir}/tar/project + --projectListPath${projectListPath} + + + + + + + eu.dnetlib.dhp.oa.graph.dump.MakeTar + --hdfsPath${outputPath} + --nameNode${nameNode} + --sourcePath${workingDir}/tar + + + + + + + eu.dnetlib.dhp.oa.graph.dump.SendToZenodoHDFS + --hdfsPath${outputPath} + --nameNode${nameNode} + --accessToken${accessToken} + --connectionUrl${connectionUrl} + --metadata${metadata} + --conceptRecordId${conceptRecordId} + --depositionType${depositionType} + --depositionId${depositionId} + + + + + + \ No newline at end of file