From 5b76321d9cb58f68a52ee01d14ea77f38e81d749 Mon Sep 17 00:00:00 2001 From: Sandro La Bruzzo Date: Wed, 20 Jul 2022 16:34:32 +0200 Subject: [PATCH] implemented oozie workflow to generate scholix dump filtering relclass semantic --- .../sx/graph/convert_dataset_json_params.json | 1 + .../sx/graph/convert_object_json_params.json | 4 +- .../dumpScholix/oozie_app/config-default.xml | 10 ++ .../graph/dumpScholix/oozie_app/workflow.xml | 145 ++++++++++++++++++ .../sx/graph/SparkConvertObjectToJson.scala | 33 +++- .../sx/graph/SparkConvertRDDtoDataset.scala | 61 +++++--- 6 files changed, 228 insertions(+), 26 deletions(-) create mode 100644 dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/dumpScholix/oozie_app/config-default.xml create mode 100644 dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/dumpScholix/oozie_app/workflow.xml diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/convert_dataset_json_params.json b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/convert_dataset_json_params.json index 8bfdde5b0..f3e8cdbad 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/convert_dataset_json_params.json +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/convert_dataset_json_params.json @@ -2,4 +2,5 @@ {"paramName":"mt", "paramLongName":"master", "paramDescription": "should be local or yarn", "paramRequired": true}, {"paramName":"s", "paramLongName":"sourcePath", "paramDescription": "the source Path", "paramRequired": true}, {"paramName":"t", "paramLongName":"targetPath", "paramDescription": "the path of the raw graph", "paramRequired": true} + {"paramName":"r", "paramLongName":"filterRelation", "paramDescription": "the relation to filter", "paramRequired": false} ] \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/convert_object_json_params.json b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/convert_object_json_params.json index 890570a0b..cbb20bfe7 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/convert_object_json_params.json +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/convert_object_json_params.json @@ -3,5 +3,7 @@ {"paramName":"s", "paramLongName":"sourcePath", "paramDescription": "the source Path", "paramRequired": true}, {"paramName":"su", "paramLongName":"scholixUpdatePath", "paramDescription": "the scholix updated Path", "paramRequired": false}, {"paramName":"t", "paramLongName":"targetPath", "paramDescription": "the path of the raw graph", "paramRequired": true}, - {"paramName":"o", "paramLongName":"objectType", "paramDescription": "should be scholix or Summary", "paramRequired": true} + {"paramName":"o", "paramLongName":"objectType", "paramDescription": "should be scholix or Summary", "paramRequired": true}, + {"paramName":"mp", "paramLongName":"maxPidNumberFilter", "paramDescription": "filter max number of pids in source/target", "paramRequired": false} + ] \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/dumpScholix/oozie_app/config-default.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/dumpScholix/oozie_app/config-default.xml new file mode 100644 index 000000000..6fb2a1253 --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/dumpScholix/oozie_app/config-default.xml @@ -0,0 +1,10 @@ + + + oozie.use.system.libpath + true + + + oozie.action.sharelib.for.spark + spark2 + + \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/dumpScholix/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/dumpScholix/oozie_app/workflow.xml new file mode 100644 index 000000000..a37d85ad4 --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/dumpScholix/oozie_app/workflow.xml @@ -0,0 +1,145 @@ + + + + sourcePath + the working dir base path + + + targetPath + the final graph path + + + relationFilter + Filter relation semantic + + + maxNumberOfPid + filter relation with at least #maxNumberOfPid + + + + + + + + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] + + + + + yarn + cluster + Import JSONRDD to Dataset kryo + eu.dnetlib.dhp.sx.graph.SparkConvertRDDtoDataset + dhp-graph-mapper-${projectVersion}.jar + + --executor-memory=${sparkExecutorMemory} + --executor-cores=${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.shuffle.partitions=3000 + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + + --masteryarn + --sourcePath${sourcePath} + --targetPath${targetPath} + --filterRelation${relationFilter} + + + + + + + + + yarn + cluster + Convert Entities to summaries + eu.dnetlib.dhp.sx.graph.SparkCreateSummaryObject + dhp-graph-mapper-${projectVersion}.jar + + --executor-memory=${sparkExecutorMemory} + --executor-cores=${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.shuffle.partitions=20000 + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + + --masteryarn + --sourcePath${targetPath}/entities + --targetPath${targetPath}/provision/summaries + + + + + + + + yarn + cluster + Generate Scholix Dataset + eu.dnetlib.dhp.sx.graph.SparkCreateScholix + dhp-graph-mapper-${projectVersion}.jar + + --executor-memory=${sparkExecutorMemory} + --executor-cores=${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.shuffle.partitions=30000 + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + + --masteryarn + --summaryPath${targetPath}/provision/summaries + --targetPath${targetPath}/provision/scholix + --relationPath${targetPath}/relation + + + + + + + + + + + + + + + + + yarn + cluster + Serialize scholix to JSON + eu.dnetlib.dhp.sx.graph.SparkConvertObjectToJson + dhp-graph-mapper-${projectVersion}.jar + + --executor-memory=${sparkExecutorMemory} + --executor-cores=${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.shuffle.partitions=6000 + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + + --masteryarn + --sourcePath${targetPath}/provision/scholix/scholix + --targetPath${targetPath}/json/scholix_json + --objectTypescholix + --maxPidNumberFiltermaxNumberOfPid + + + + + + + + + \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/sx/graph/SparkConvertObjectToJson.scala b/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/sx/graph/SparkConvertObjectToJson.scala index bfa07eb69..6695ebd3c 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/sx/graph/SparkConvertObjectToJson.scala +++ b/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/sx/graph/SparkConvertObjectToJson.scala @@ -4,6 +4,7 @@ import com.fasterxml.jackson.databind.ObjectMapper import eu.dnetlib.dhp.application.ArgumentApplicationParser import eu.dnetlib.dhp.schema.sx.scholix.Scholix import eu.dnetlib.dhp.schema.sx.summary.ScholixSummary +import eu.dnetlib.dhp.sx.graph.SparkConvertObjectToJson.toInt import org.apache.commons.io.IOUtils import org.apache.hadoop.io.compress.GzipCodec import org.apache.spark.SparkConf @@ -12,6 +13,14 @@ import org.slf4j.{Logger, LoggerFactory} object SparkConvertObjectToJson { + def toInt(s: String): Option[Int] = { + try { + Some(s.toInt) + } catch { + case e: Exception => None + } + } + def main(args: Array[String]): Unit = { val log: Logger = LoggerFactory.getLogger(getClass) val conf: SparkConf = new SparkConf() @@ -37,6 +46,8 @@ object SparkConvertObjectToJson { log.info(s"objectType -> $objectType") val scholixUpdatePath = parser.get("scholixUpdatePath") log.info(s"scholixUpdatePath -> $scholixUpdatePath") + val maxPidNumberFilter = parser.get("maxPidNumberFilter") + log.info(s"maxPidNumberFilter -> $maxPidNumberFilter") implicit val scholixEncoder: Encoder[Scholix] = Encoders.kryo[Scholix] implicit val summaryEncoder: Encoder[ScholixSummary] = Encoders.kryo[ScholixSummary] @@ -47,12 +58,22 @@ object SparkConvertObjectToJson { case "scholix" => log.info("Serialize Scholix") val d: Dataset[Scholix] = spark.read.load(sourcePath).as[Scholix] - val u: Dataset[Scholix] = spark.read.load(s"$scholixUpdatePath/scholix").as[Scholix] - d.union(u) - .repartition(8000) - .map(s => mapper.writeValueAsString(s))(Encoders.STRING) - .rdd - .saveAsTextFile(targetPath, classOf[GzipCodec]) +// val u: Dataset[Scholix] = spark.read.load(s"$scholixUpdatePath/scholix").as[Scholix] + if (maxPidNumberFilter != null && toInt(maxPidNumberFilter).isDefined) { + val mp = toInt(maxPidNumberFilter).get + d + .filter(s => (s.getSource.getIdentifier.size() <= mp) && (s.getTarget.getIdentifier.size() <= mp)) + .map(s => mapper.writeValueAsString(s))(Encoders.STRING) + .rdd + .saveAsTextFile(targetPath, classOf[GzipCodec]) + } else { + d + .repartition(8000) + .map(s => mapper.writeValueAsString(s))(Encoders.STRING) + .rdd + .saveAsTextFile(targetPath, classOf[GzipCodec]) + } + case "summary" => log.info("Serialize Summary") val d: Dataset[ScholixSummary] = spark.read.load(sourcePath).as[ScholixSummary] diff --git a/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/sx/graph/SparkConvertRDDtoDataset.scala b/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/sx/graph/SparkConvertRDDtoDataset.scala index bd970a5cf..f72f8dd16 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/sx/graph/SparkConvertRDDtoDataset.scala +++ b/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/sx/graph/SparkConvertRDDtoDataset.scala @@ -4,9 +4,11 @@ import com.fasterxml.jackson.databind.ObjectMapper import eu.dnetlib.dhp.application.ArgumentApplicationParser import eu.dnetlib.dhp.schema.oaf.{OtherResearchProduct, Publication, Relation, Result, Software, Dataset => OafDataset} import org.apache.commons.io.IOUtils +import org.apache.commons.lang3.StringUtils import org.apache.spark.SparkConf import org.apache.spark.sql.{Encoder, Encoders, SaveMode, SparkSession} import org.slf4j.{Logger, LoggerFactory} + import scala.collection.JavaConverters._ object SparkConvertRDDtoDataset { @@ -34,6 +36,9 @@ object SparkConvertRDDtoDataset { val t = parser.get("targetPath") log.info(s"targetPath -> $t") + val filterRelation = parser.get("filterRelation") + log.info(s"filterRelation -> $filterRelation") + val entityPath = s"$t/entities" val relPath = s"$t/relation" val mapper = new ObjectMapper() @@ -94,28 +99,46 @@ object SparkConvertRDDtoDataset { log.info("Converting Relation") - val relationSemanticFilter = List( -// "cites", -// "iscitedby", - "merges", - "ismergedin", - "HasAmongTopNSimilarDocuments", - "IsAmongTopNSimilarDocuments" - ) + if (filterRelation != null && StringUtils.isNoneBlank(filterRelation)) { - val rddRelation = spark.sparkContext - .textFile(s"$sourcePath/relation") - .map(s => mapper.readValue(s, classOf[Relation])) - .filter(r => r.getDataInfo != null && r.getDataInfo.getDeletedbyinference == false) - .filter(r => r.getSource.startsWith("50") && r.getTarget.startsWith("50")) - //filter OpenCitations relations - .filter(r => - r.getCollectedfrom != null && r.getCollectedfrom.size() > 0 && !r.getCollectedfrom.asScala.exists(k => - "opencitations".equalsIgnoreCase(k.getValue) + val rddRelation = spark.sparkContext + .textFile(s"$sourcePath/relation") + .map(s => mapper.readValue(s, classOf[Relation])) + .filter(r => r.getDataInfo != null && r.getDataInfo.getDeletedbyinference == false) + .filter(r => r.getSource.startsWith("50") && r.getTarget.startsWith("50")) + //filter OpenCitations relations + .filter(r => + r.getCollectedfrom != null && r.getCollectedfrom.size() > 0 && !r.getCollectedfrom.asScala.exists(k => + "opencitations".equalsIgnoreCase(k.getValue) + ) ) + .filter(r => r.getRelClass != null && r.getRelClass.equalsIgnoreCase(filterRelation)) + spark.createDataset(rddRelation).as[Relation].write.mode(SaveMode.Overwrite).save(s"$relPath") + } else { + + val relationSemanticFilter = List( + // "cites", + // "iscitedby", + "merges", + "ismergedin", + "HasAmongTopNSimilarDocuments", + "IsAmongTopNSimilarDocuments" ) - .filter(r => !relationSemanticFilter.exists(k => k.equalsIgnoreCase(r.getRelClass))) - spark.createDataset(rddRelation).as[Relation].write.mode(SaveMode.Overwrite).save(s"$relPath") + + val rddRelation = spark.sparkContext + .textFile(s"$sourcePath/relation") + .map(s => mapper.readValue(s, classOf[Relation])) + .filter(r => r.getDataInfo != null && r.getDataInfo.getDeletedbyinference == false) + .filter(r => r.getSource.startsWith("50") && r.getTarget.startsWith("50")) + //filter OpenCitations relations + .filter(r => + r.getCollectedfrom != null && r.getCollectedfrom.size() > 0 && !r.getCollectedfrom.asScala.exists(k => + "opencitations".equalsIgnoreCase(k.getValue) + ) + ) + .filter(r => !relationSemanticFilter.exists(k => k.equalsIgnoreCase(r.getRelClass))) + spark.createDataset(rddRelation).as[Relation].write.mode(SaveMode.Overwrite).save(s"$relPath") + } } }