diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/convert_dataset_json_params.json b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/convert_dataset_json_params.json
index 8bfdde5b0..3a2f90708 100644
--- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/convert_dataset_json_params.json
+++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/convert_dataset_json_params.json
@@ -1,5 +1,6 @@
[
{"paramName":"mt", "paramLongName":"master", "paramDescription": "should be local or yarn", "paramRequired": true},
{"paramName":"s", "paramLongName":"sourcePath", "paramDescription": "the source Path", "paramRequired": true},
- {"paramName":"t", "paramLongName":"targetPath", "paramDescription": "the path of the raw graph", "paramRequired": true}
+ {"paramName":"t", "paramLongName":"targetPath", "paramDescription": "the path of the raw graph", "paramRequired": true},
+ {"paramName":"r", "paramLongName":"filterRelation", "paramDescription": "the relation to filter", "paramRequired": false}
]
\ No newline at end of file
diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/convert_object_json_params.json b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/convert_object_json_params.json
index 890570a0b..cbb20bfe7 100644
--- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/convert_object_json_params.json
+++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/convert_object_json_params.json
@@ -3,5 +3,7 @@
{"paramName":"s", "paramLongName":"sourcePath", "paramDescription": "the source Path", "paramRequired": true},
{"paramName":"su", "paramLongName":"scholixUpdatePath", "paramDescription": "the scholix updated Path", "paramRequired": false},
{"paramName":"t", "paramLongName":"targetPath", "paramDescription": "the path of the raw graph", "paramRequired": true},
- {"paramName":"o", "paramLongName":"objectType", "paramDescription": "should be scholix or Summary", "paramRequired": true}
+ {"paramName":"o", "paramLongName":"objectType", "paramDescription": "should be scholix or Summary", "paramRequired": true},
+ {"paramName":"mp", "paramLongName":"maxPidNumberFilter", "paramDescription": "filter max number of pids in source/target", "paramRequired": false}
+
]
\ No newline at end of file
diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/dumpScholix/oozie_app/config-default.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/dumpScholix/oozie_app/config-default.xml
new file mode 100644
index 000000000..6fb2a1253
--- /dev/null
+++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/dumpScholix/oozie_app/config-default.xml
@@ -0,0 +1,10 @@
+
+
+ oozie.use.system.libpath
+ true
+
+
+ oozie.action.sharelib.for.spark
+ spark2
+
+
\ No newline at end of file
diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/dumpScholix/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/dumpScholix/oozie_app/workflow.xml
new file mode 100644
index 000000000..a37d85ad4
--- /dev/null
+++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/dumpScholix/oozie_app/workflow.xml
@@ -0,0 +1,145 @@
+
+
+
+ sourcePath
+ the working dir base path
+
+
+ targetPath
+ the final graph path
+
+
+ relationFilter
+ Filter relation semantic
+
+
+ maxNumberOfPid
+ filter relation with at least #maxNumberOfPid
+
+
+
+
+
+
+
+ Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]
+
+
+
+
+ yarn
+ cluster
+ Import JSONRDD to Dataset kryo
+ eu.dnetlib.dhp.sx.graph.SparkConvertRDDtoDataset
+ dhp-graph-mapper-${projectVersion}.jar
+
+ --executor-memory=${sparkExecutorMemory}
+ --executor-cores=${sparkExecutorCores}
+ --driver-memory=${sparkDriverMemory}
+ --conf spark.extraListeners=${spark2ExtraListeners}
+ --conf spark.sql.shuffle.partitions=3000
+ --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
+ --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
+ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
+
+ --masteryarn
+ --sourcePath${sourcePath}
+ --targetPath${targetPath}
+ --filterRelation${relationFilter}
+
+
+
+
+
+
+
+
+ yarn
+ cluster
+ Convert Entities to summaries
+ eu.dnetlib.dhp.sx.graph.SparkCreateSummaryObject
+ dhp-graph-mapper-${projectVersion}.jar
+
+ --executor-memory=${sparkExecutorMemory}
+ --executor-cores=${sparkExecutorCores}
+ --driver-memory=${sparkDriverMemory}
+ --conf spark.extraListeners=${spark2ExtraListeners}
+ --conf spark.sql.shuffle.partitions=20000
+ --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
+ --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
+ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
+
+ --masteryarn
+ --sourcePath${targetPath}/entities
+ --targetPath${targetPath}/provision/summaries
+
+
+
+
+
+
+
+ yarn
+ cluster
+ Generate Scholix Dataset
+ eu.dnetlib.dhp.sx.graph.SparkCreateScholix
+ dhp-graph-mapper-${projectVersion}.jar
+
+ --executor-memory=${sparkExecutorMemory}
+ --executor-cores=${sparkExecutorCores}
+ --driver-memory=${sparkDriverMemory}
+ --conf spark.extraListeners=${spark2ExtraListeners}
+ --conf spark.sql.shuffle.partitions=30000
+ --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
+ --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
+ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
+
+ --masteryarn
+ --summaryPath${targetPath}/provision/summaries
+ --targetPath${targetPath}/provision/scholix
+ --relationPath${targetPath}/relation
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ yarn
+ cluster
+ Serialize scholix to JSON
+ eu.dnetlib.dhp.sx.graph.SparkConvertObjectToJson
+ dhp-graph-mapper-${projectVersion}.jar
+
+ --executor-memory=${sparkExecutorMemory}
+ --executor-cores=${sparkExecutorCores}
+ --driver-memory=${sparkDriverMemory}
+ --conf spark.extraListeners=${spark2ExtraListeners}
+ --conf spark.sql.shuffle.partitions=6000
+ --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
+ --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
+ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
+
+ --masteryarn
+ --sourcePath${targetPath}/provision/scholix/scholix
+ --targetPath${targetPath}/json/scholix_json
+ --objectTypescholix
+ --maxPidNumberFiltermaxNumberOfPid
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/sx/graph/SparkConvertObjectToJson.scala b/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/sx/graph/SparkConvertObjectToJson.scala
index bfa07eb69..6695ebd3c 100644
--- a/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/sx/graph/SparkConvertObjectToJson.scala
+++ b/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/sx/graph/SparkConvertObjectToJson.scala
@@ -4,6 +4,7 @@ import com.fasterxml.jackson.databind.ObjectMapper
import eu.dnetlib.dhp.application.ArgumentApplicationParser
import eu.dnetlib.dhp.schema.sx.scholix.Scholix
import eu.dnetlib.dhp.schema.sx.summary.ScholixSummary
+import eu.dnetlib.dhp.sx.graph.SparkConvertObjectToJson.toInt
import org.apache.commons.io.IOUtils
import org.apache.hadoop.io.compress.GzipCodec
import org.apache.spark.SparkConf
@@ -12,6 +13,14 @@ import org.slf4j.{Logger, LoggerFactory}
object SparkConvertObjectToJson {
+ def toInt(s: String): Option[Int] = {
+ try {
+ Some(s.toInt)
+ } catch {
+ case e: Exception => None
+ }
+ }
+
def main(args: Array[String]): Unit = {
val log: Logger = LoggerFactory.getLogger(getClass)
val conf: SparkConf = new SparkConf()
@@ -37,6 +46,8 @@ object SparkConvertObjectToJson {
log.info(s"objectType -> $objectType")
val scholixUpdatePath = parser.get("scholixUpdatePath")
log.info(s"scholixUpdatePath -> $scholixUpdatePath")
+ val maxPidNumberFilter = parser.get("maxPidNumberFilter")
+ log.info(s"maxPidNumberFilter -> $maxPidNumberFilter")
implicit val scholixEncoder: Encoder[Scholix] = Encoders.kryo[Scholix]
implicit val summaryEncoder: Encoder[ScholixSummary] = Encoders.kryo[ScholixSummary]
@@ -47,12 +58,22 @@ object SparkConvertObjectToJson {
case "scholix" =>
log.info("Serialize Scholix")
val d: Dataset[Scholix] = spark.read.load(sourcePath).as[Scholix]
- val u: Dataset[Scholix] = spark.read.load(s"$scholixUpdatePath/scholix").as[Scholix]
- d.union(u)
- .repartition(8000)
- .map(s => mapper.writeValueAsString(s))(Encoders.STRING)
- .rdd
- .saveAsTextFile(targetPath, classOf[GzipCodec])
+// val u: Dataset[Scholix] = spark.read.load(s"$scholixUpdatePath/scholix").as[Scholix]
+ if (maxPidNumberFilter != null && toInt(maxPidNumberFilter).isDefined) {
+ val mp = toInt(maxPidNumberFilter).get
+ d
+ .filter(s => (s.getSource.getIdentifier.size() <= mp) && (s.getTarget.getIdentifier.size() <= mp))
+ .map(s => mapper.writeValueAsString(s))(Encoders.STRING)
+ .rdd
+ .saveAsTextFile(targetPath, classOf[GzipCodec])
+ } else {
+ d
+ .repartition(8000)
+ .map(s => mapper.writeValueAsString(s))(Encoders.STRING)
+ .rdd
+ .saveAsTextFile(targetPath, classOf[GzipCodec])
+ }
+
case "summary" =>
log.info("Serialize Summary")
val d: Dataset[ScholixSummary] = spark.read.load(sourcePath).as[ScholixSummary]
diff --git a/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/sx/graph/SparkConvertRDDtoDataset.scala b/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/sx/graph/SparkConvertRDDtoDataset.scala
index bd970a5cf..0073afff5 100644
--- a/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/sx/graph/SparkConvertRDDtoDataset.scala
+++ b/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/sx/graph/SparkConvertRDDtoDataset.scala
@@ -4,9 +4,11 @@ import com.fasterxml.jackson.databind.ObjectMapper
import eu.dnetlib.dhp.application.ArgumentApplicationParser
import eu.dnetlib.dhp.schema.oaf.{OtherResearchProduct, Publication, Relation, Result, Software, Dataset => OafDataset}
import org.apache.commons.io.IOUtils
+import org.apache.commons.lang3.StringUtils
import org.apache.spark.SparkConf
import org.apache.spark.sql.{Encoder, Encoders, SaveMode, SparkSession}
import org.slf4j.{Logger, LoggerFactory}
+
import scala.collection.JavaConverters._
object SparkConvertRDDtoDataset {
@@ -34,6 +36,9 @@ object SparkConvertRDDtoDataset {
val t = parser.get("targetPath")
log.info(s"targetPath -> $t")
+ val filterRelation = parser.get("filterRelation")
+ log.info(s"filterRelation -> $filterRelation")
+
val entityPath = s"$t/entities"
val relPath = s"$t/relation"
val mapper = new ObjectMapper()
@@ -94,28 +99,44 @@ object SparkConvertRDDtoDataset {
log.info("Converting Relation")
- val relationSemanticFilter = List(
-// "cites",
-// "iscitedby",
- "merges",
- "ismergedin",
- "HasAmongTopNSimilarDocuments",
- "IsAmongTopNSimilarDocuments"
- )
+ if (filterRelation != null && StringUtils.isNoneBlank(filterRelation)) {
- val rddRelation = spark.sparkContext
- .textFile(s"$sourcePath/relation")
- .map(s => mapper.readValue(s, classOf[Relation]))
- .filter(r => r.getDataInfo != null && r.getDataInfo.getDeletedbyinference == false)
- .filter(r => r.getSource.startsWith("50") && r.getTarget.startsWith("50"))
- //filter OpenCitations relations
- .filter(r =>
- r.getCollectedfrom != null && r.getCollectedfrom.size() > 0 && !r.getCollectedfrom.asScala.exists(k =>
- "opencitations".equalsIgnoreCase(k.getValue)
+ val rddRelation = spark.sparkContext
+ .textFile(s"$sourcePath/relation")
+ .map(s => mapper.readValue(s, classOf[Relation]))
+ .filter(r => r.getDataInfo != null && r.getDataInfo.getDeletedbyinference == false)
+ .filter(r => r.getSource.startsWith("50") && r.getTarget.startsWith("50"))
+ //filter OpenCitations relations
+ .filter(r =>
+ r.getCollectedfrom != null && r.getCollectedfrom.size() > 0 && !r.getCollectedfrom.asScala.exists(k =>
+ "opencitations".equalsIgnoreCase(k.getValue)
+ )
)
+ .filter(r => r.getSubRelType != null && r.getSubRelType.equalsIgnoreCase(filterRelation))
+ spark.createDataset(rddRelation).as[Relation].write.mode(SaveMode.Overwrite).save(s"$relPath")
+ } else {
+
+ val relationSemanticFilter = List(
+ "merges",
+ "ismergedin",
+ "HasAmongTopNSimilarDocuments",
+ "IsAmongTopNSimilarDocuments"
)
- .filter(r => !relationSemanticFilter.exists(k => k.equalsIgnoreCase(r.getRelClass)))
- spark.createDataset(rddRelation).as[Relation].write.mode(SaveMode.Overwrite).save(s"$relPath")
+
+ val rddRelation = spark.sparkContext
+ .textFile(s"$sourcePath/relation")
+ .map(s => mapper.readValue(s, classOf[Relation]))
+ .filter(r => r.getDataInfo != null && r.getDataInfo.getDeletedbyinference == false)
+ .filter(r => r.getSource.startsWith("50") && r.getTarget.startsWith("50"))
+ //filter OpenCitations relations
+ .filter(r =>
+ r.getCollectedfrom != null && r.getCollectedfrom.size() > 0 && !r.getCollectedfrom.asScala.exists(k =>
+ "opencitations".equalsIgnoreCase(k.getValue)
+ )
+ )
+ .filter(r => !relationSemanticFilter.exists(k => k.equalsIgnoreCase(r.getRelClass)))
+ spark.createDataset(rddRelation).as[Relation].write.mode(SaveMode.Overwrite).save(s"$relPath")
+ }
}
}