diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/oa/merge/AuthorMerger.java b/dhp-common/src/main/java/eu/dnetlib/dhp/oa/merge/AuthorMerger.java index 0b602b774..fe9f94960 100644 --- a/dhp-common/src/main/java/eu/dnetlib/dhp/oa/merge/AuthorMerger.java +++ b/dhp-common/src/main/java/eu/dnetlib/dhp/oa/merge/AuthorMerger.java @@ -78,6 +78,7 @@ public class AuthorMerger { a -> a .getPid() .stream() + .filter(Objects::nonNull) .filter(p -> !basePidAuthorMap.containsKey(pidToComparableString(p))) .map(p -> new Tuple2<>(p, a))) .collect(Collectors.toList()); diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkConvertDatasetToJsonRDD.scala b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkConvertDatasetToJsonRDD.scala new file mode 100644 index 000000000..0886a1fca --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkConvertDatasetToJsonRDD.scala @@ -0,0 +1,41 @@ +package eu.dnetlib.dhp.sx.graph + +import com.fasterxml.jackson.databind.ObjectMapper +import eu.dnetlib.dhp.application.ArgumentApplicationParser +import eu.dnetlib.dhp.schema.oaf.{Oaf, OtherResearchProduct, Publication, Result, Software, Dataset => OafDataset} +import org.apache.commons.io.IOUtils +import org.apache.hadoop.io.compress.GzipCodec +import org.apache.spark.SparkConf +import org.apache.spark.sql.{Encoder, Encoders, SparkSession} +import org.slf4j.{Logger, LoggerFactory} + +object SparkConvertDatasetToJsonRDD { + + + def main(args: Array[String]): Unit = { + val log: Logger = LoggerFactory.getLogger(getClass) + val conf: SparkConf = new SparkConf() + val parser = new ArgumentApplicationParser(IOUtils.toString(getClass.getResourceAsStream("/eu/dnetlib/dhp/sx/graph/convert_dataset_json_params.json"))) + parser.parseArgument(args) + val spark: SparkSession = + SparkSession + .builder() + .config(conf) + .appName(getClass.getSimpleName) + .master(parser.get("master")).getOrCreate() + + val sourcePath = parser.get("sourcePath") + log.info(s"sourcePath -> $sourcePath") + val targetPath = parser.get("targetPath") + log.info(s"targetPath -> $targetPath") + + val resultObject = List("publication","dataset","software", "otherResearchProduct") + val mapper = new ObjectMapper() + implicit val oafEncoder: Encoder[Result] = Encoders.kryo(classOf[Result]) + + resultObject.foreach{item => + spark.read.load(s"$sourcePath/$item").as[Result].map(r=> mapper.writeValueAsString(r))(Encoders.STRING).rdd.saveAsTextFile(s"$targetPath/${item.toLowerCase}", classOf[GzipCodec]) + } + } + +} diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkCreateInputGraph.scala b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkCreateInputGraph.scala index a37dd2132..350b00c5e 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkCreateInputGraph.scala +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkCreateInputGraph.scala @@ -70,7 +70,7 @@ object SparkCreateInputGraph { resultObject.foreach { r => log.info(s"Make ${r._1} unique") - makeDatasetUnique(s"$targetPath/extracted/${r._1}",s"$targetPath/dedup/${r._1}",spark, r._2) + makeDatasetUnique(s"$targetPath/extracted/${r._1}",s"$targetPath/preprocess/${r._1}",spark, r._2) } } diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/convert_dataset_json_params.json b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/convert_dataset_json_params.json new file mode 100644 index 000000000..8bfdde5b0 --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/convert_dataset_json_params.json @@ -0,0 +1,5 @@ +[ + {"paramName":"mt", "paramLongName":"master", "paramDescription": "should be local or yarn", "paramRequired": true}, + {"paramName":"s", "paramLongName":"sourcePath", "paramDescription": "the source Path", "paramRequired": true}, + {"paramName":"t", "paramLongName":"targetPath", "paramDescription": "the path of the raw graph", "paramRequired": true} +] \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/step2/oozie_app/config-default.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/extractEntities/oozie_app/config-default.xml similarity index 100% rename from dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/step2/oozie_app/config-default.xml rename to dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/extractEntities/oozie_app/config-default.xml diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/extractEntities/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/extractEntities/oozie_app/workflow.xml new file mode 100644 index 000000000..685976ce6 --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/extractEntities/oozie_app/workflow.xml @@ -0,0 +1,85 @@ + + + + sourcePath + the working dir base path + + + targetPath + the graph Raw base path + + + + + + + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] + + + + + yarn + cluster + Extract entities in raw graph + eu.dnetlib.dhp.sx.graph.SparkCreateInputGraph + dhp-graph-mapper-${projectVersion}.jar + + --executor-memory=${sparkExecutorMemory} + --executor-cores=${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.shuffle.partitions=2000 + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + + --masteryarn + --sourcePath${sourcePath} + --targetPath${targetPath} + + + + + + + + + + + + + + + + + yarn + cluster + Generate Input Graph for deduplication + eu.dnetlib.dhp.sx.graph.SparkConvertDatasetToJsonRDD + dhp-graph-mapper-${projectVersion}.jar + + --executor-memory=${sparkExecutorMemory} + --executor-cores=${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.shuffle.partitions=3000 + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + + --masteryarn + --sourcePath${targetPath}/preprocess + --targetPath${targetPath}/dedup + + + + + + + + + + + + + \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/step2/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/step2/oozie_app/workflow.xml deleted file mode 100644 index 9d06c42d6..000000000 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/step2/oozie_app/workflow.xml +++ /dev/null @@ -1,120 +0,0 @@ - - - - workingPath - the working path - - - sparkDriverMemory - memory for driver process - - - sparkExecutorMemory - memory for individual executor - - - - - - - Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] - - - - - ${jobTracker} - ${nameNode} - yarn-cluster - cluster - Extract DLI Entities (Publication) - eu.dnetlib.dhp.sx.graph.SparkSplitOafTODLIEntities - dhp-graph-mapper-${projectVersion}.jar - - --executor-memory ${sparkExecutorMemory} - --executor-cores=${sparkExecutorCores} - --driver-memory=${sparkDriverMemory} - --conf spark.sql.shuffle.partitions=5000 - ${sparkExtraOPT} - - -mt yarn-cluster - --workingPath${workingPath} - -epublication - - - - - - - - ${jobTracker} - ${nameNode} - yarn-cluster - cluster - Extract DLI Entities (Dataset) - eu.dnetlib.dhp.sx.graph.SparkSplitOafTODLIEntities - dhp-graph-mapper-${projectVersion}.jar - - --executor-memory ${sparkExecutorMemory} - --executor-cores=${sparkExecutorCores} - --driver-memory=${sparkDriverMemory} - --conf spark.sql.shuffle.partitions=5000 - ${sparkExtraOPT} - - -mt yarn-cluster - --workingPath${workingPath} - -edataset - - - - - - - - ${jobTracker} - ${nameNode} - yarn-cluster - cluster - Extract DLI Entities (Unknown) - eu.dnetlib.dhp.sx.graph.SparkSplitOafTODLIEntities - dhp-graph-mapper-${projectVersion}.jar - - --executor-memory ${sparkExecutorMemory} - --executor-cores=${sparkExecutorCores} - --driver-memory=${sparkDriverMemory} - --conf spark.sql.shuffle.partitions=5000 - ${sparkExtraOPT} - - -mt yarn-cluster - --workingPath${workingPath} - -eunknown - - - - - - - - ${jobTracker} - ${nameNode} - yarn-cluster - cluster - Extract DLI Entities (Relation) - eu.dnetlib.dhp.sx.graph.SparkSplitOafTODLIEntities - dhp-graph-mapper-${projectVersion}.jar - - --executor-memory ${sparkExecutorMemory} - --executor-cores=${sparkExecutorCores} - --driver-memory=${sparkDriverMemory} - --conf spark.sql.shuffle.partitions=5000 - ${sparkExtraOPT} - - -mt yarn-cluster - --workingPath${workingPath} - -erelation - - - - - - - \ No newline at end of file