diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/create_scholix_params.json b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/create_scholix_params.json index 336181e0a..c56e130c0 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/create_scholix_params.json +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/create_scholix_params.json @@ -2,5 +2,6 @@ {"paramName":"mt", "paramLongName":"master", "paramDescription": "should be local or yarn", "paramRequired": true}, {"paramName":"r", "paramLongName":"relationPath", "paramDescription": "the relation resolved Path", "paramRequired": true}, {"paramName":"s", "paramLongName":"summaryPath", "paramDescription": "the summary Path", "paramRequired": true}, - {"paramName":"t", "paramLongName":"targetPath", "paramDescription": "the target base path of the scholix", "paramRequired": true} + {"paramName":"t", "paramLongName":"targetPath", "paramDescription": "the target base path of the scholix", "paramRequired": true}, + {"paramName":"dc", "paramLongName":"dumpCitations", "paramDescription": "should dump citation relations", "paramRequired": false} ] \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/dumpScholix/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/dumpScholix/oozie_app/workflow.xml index a37d85ad4..8135c9815 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/dumpScholix/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/dumpScholix/oozie_app/workflow.xml @@ -16,7 +16,11 @@ maxNumberOfPid filter relation with at least #maxNumberOfPid - + + dumpCitations + false + should dump citation relations + @@ -98,6 +102,7 @@ --summaryPath${targetPath}/provision/summaries --targetPath${targetPath}/provision/scholix --relationPath${targetPath}/relation + --dumpCitations${dumpCitations} diff --git a/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/sx/graph/SparkConvertRDDtoDataset.scala b/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/sx/graph/SparkConvertRDDtoDataset.scala index a54acf1d3..556106180 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/sx/graph/SparkConvertRDDtoDataset.scala +++ b/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/sx/graph/SparkConvertRDDtoDataset.scala @@ -1,6 +1,8 @@ package eu.dnetlib.dhp.sx.graph import com.fasterxml.jackson.databind.ObjectMapper +import com.fasterxml.jackson.module.scala.DefaultScalaModule +import com.fasterxml.jackson.module.scala.experimental.ScalaObjectMapper import eu.dnetlib.dhp.application.ArgumentApplicationParser import eu.dnetlib.dhp.schema.common.ModelConstants import eu.dnetlib.dhp.schema.oaf.{OtherResearchProduct, Publication, Relation, Result, Software, Dataset => OafDataset} @@ -10,7 +12,8 @@ import org.apache.spark.SparkConf import org.apache.spark.sql.{Encoder, Encoders, SaveMode, SparkSession} import org.slf4j.{Logger, LoggerFactory} -import scala.collection.JavaConverters._ +import scala.reflect.ClassTag +import scala.util.Try object SparkConvertRDDtoDataset { @@ -37,11 +40,12 @@ object SparkConvertRDDtoDataset { val t = parser.get("targetPath") log.info(s"targetPath -> $t") - val filterRelation = parser.get("filterRelation") - log.info(s"filterRelation -> $filterRelation") + val subRelTypeFilter = parser.get("filterRelation") + log.info(s"filterRelation -> $subRelTypeFilter") val entityPath = s"$t/entities" val relPath = s"$t/relation" + val mapper = new ObjectMapper() implicit val datasetEncoder: Encoder[OafDataset] = Encoders.kryo(classOf[OafDataset]) implicit val publicationEncoder: Encoder[Publication] = Encoders.kryo(classOf[Publication]) @@ -100,7 +104,7 @@ object SparkConvertRDDtoDataset { log.info("Converting Relation") - val relationSemanticFilter = List( + val relClassFilter = List( ModelConstants.MERGES, ModelConstants.IS_MERGED_IN, ModelConstants.HAS_AMONG_TOP_N_SIMILAR_DOCS, @@ -110,22 +114,56 @@ object SparkConvertRDDtoDataset { val rddRelation = spark.sparkContext .textFile(s"$sourcePath/relation") .map(s => mapper.readValue(s, classOf[Relation])) - .filter(r => r.getDataInfo != null && r.getDataInfo.getDeletedbyinference == false) + .filter(r => r.getDataInfo != null && !r.getDataInfo.getDeletedbyinference) .filter(r => r.getSource.startsWith("50") && r.getTarget.startsWith("50")) + .filter(r => filterRelations(subRelTypeFilter, relClassFilter, r)) //filter OpenCitations relations .filter(r => r.getDataInfo.getProvenanceaction != null && !"sysimport:crosswalk:opencitations".equals(r.getDataInfo.getProvenanceaction.getClassid) ) - .filter(r => filterRelations(filterRelation, relationSemanticFilter, r)) + spark.createDataset(rddRelation).as[Relation].write.mode(SaveMode.Overwrite).save(s"$relPath") } - private def filterRelations(filterRelation: String, relationSemanticFilter: List[String], r: Relation): Boolean = { - if (filterRelation != null && StringUtils.isNoneBlank(filterRelation)) { - r.getSubRelType != null && r.getSubRelType.equalsIgnoreCase(filterRelation) + private def filterRelations(subRelTypeFilter: String, relClassFilter: List[String], r: Relation): Boolean = { + if (StringUtils.isNotBlank(subRelTypeFilter)) { + subRelTypeFilter.equalsIgnoreCase(r.getSubRelType) } else { - !relationSemanticFilter.exists(k => k.equalsIgnoreCase(r.getRelClass)) + !relClassFilter.exists(k => k.equalsIgnoreCase(r.getRelClass)) } } + + /* + //TODO: finalise implementation + private def processResult[T<: Result]( + implicit ct: ClassTag[T], + log: Logger, + spark: SparkSession, + sourcePath: String, + entityPath: String, + clazz: Class[T] + ): Unit = { + val entityType = clazz.getSimpleName.toLowerCase + + log.info(s"Converting $entityType") + + val mapper = new ObjectMapper() with ScalaObjectMapper + mapper.registerModule(DefaultScalaModule) + + val rdd = spark.sparkContext + .textFile(s"$sourcePath/$entityType") + .map(s => mapper.readValue(s, clazz)) + .filter(r => r.getDataInfo != null && !r.getDataInfo.getDeletedbyinference); + + implicit val encoder: Encoder[T] = Encoders.kryo(clazz) + spark + .createDataset(rdd) + .as[T] + .write + .mode(SaveMode.Overwrite) + .save(s"$entityPath/$entityType") + } + */ + } diff --git a/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/sx/graph/SparkCreateScholix.scala b/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/sx/graph/SparkCreateScholix.scala index af19b9698..fd06e7dea 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/sx/graph/SparkCreateScholix.scala +++ b/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/sx/graph/SparkCreateScholix.scala @@ -12,6 +12,8 @@ import org.apache.spark.sql.functions.count import org.apache.spark.sql._ import org.slf4j.{Logger, LoggerFactory} +import scala.util.Try + object SparkCreateScholix { def main(args: Array[String]): Unit = { @@ -37,6 +39,8 @@ object SparkCreateScholix { log.info(s"summaryPath -> $summaryPath") val targetPath = parser.get("targetPath") log.info(s"targetPath -> $targetPath") + val dumpCitations = Try(parser.get("dumpCitations").toBoolean).getOrElse(false) + log.info(s"dumpCitations -> $dumpCitations") implicit val relEncoder: Encoder[Relation] = Encoders.kryo[Relation] implicit val summaryEncoder: Encoder[ScholixSummary] = Encoders.kryo[ScholixSummary] @@ -138,7 +142,7 @@ object SparkCreateScholix { val relatedEntitiesDS: Dataset[RelatedEntities] = spark.read .load(s"$targetPath/related_entities") .as[RelatedEntities] - .filter(r => r.relatedPublication > 0 || r.relatedDataset > 0) + .filter(r => dumpCitations || r.relatedPublication > 0 || r.relatedDataset > 0) relatedEntitiesDS .joinWith(summaryDS, relatedEntitiesDS("id").equalTo(summaryDS("_1")), "inner")