diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkConvertRDDtoDataset.scala b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkConvertRDDtoDataset.scala index 2cd176dee..cb41d6134 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkConvertRDDtoDataset.scala +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkConvertRDDtoDataset.scala @@ -1,7 +1,8 @@ package eu.dnetlib.dhp.sx.graph -import com.cloudera.com.fasterxml.jackson.databind.ObjectMapper + +import com.fasterxml.jackson.databind.ObjectMapper import eu.dnetlib.dhp.application.ArgumentApplicationParser -import eu.dnetlib.dhp.schema.oaf.{OtherResearchProduct, Publication, Result, Software, Dataset => OafDataset} +import eu.dnetlib.dhp.schema.oaf.{OtherResearchProduct, Publication, Relation, Result, Software, Dataset => OafDataset} import org.apache.commons.io.IOUtils import org.apache.spark.SparkConf import org.apache.spark.sql.{Encoder, Encoders, SaveMode, SparkSession} @@ -9,12 +10,7 @@ import org.slf4j.{Logger, LoggerFactory} object SparkConvertRDDtoDataset { def main(args: Array[String]): Unit = { - val entities = List( - ("dataset", classOf[OafDataset]), - ("otherresearchproduct", classOf[OtherResearchProduct]), - ("publication", classOf[Publication]), - ("software", classOf[Software]) - ) + val log: Logger = LoggerFactory.getLogger(getClass) val conf: SparkConf = new SparkConf() @@ -29,15 +25,43 @@ object SparkConvertRDDtoDataset { val sourcePath = parser.get("sourcePath") log.info(s"sourcePath -> $sourcePath") - val targetPath = parser.get("targetPath") - log.info(s"targetPath -> $targetPath") - val mapper = new ObjectMapper() - implicit val resultEncoder: Encoder[Result] = Encoders.kryo(classOf[Result]) + val t = parser.get("targetPath") + log.info(s"targetPath -> $t") + + val entityPath = s"$t/entities" + val relPath = s"$t/relation" + val mapper = new ObjectMapper() + implicit val datasetEncoder: Encoder[OafDataset] = Encoders.kryo(classOf[OafDataset]) + implicit val publicationEncoder: Encoder[Publication] = Encoders.kryo(classOf[Publication]) + implicit val relationEncoder: Encoder[Relation] = Encoders.kryo(classOf[Relation]) + implicit val orpEncoder: Encoder[OtherResearchProduct] = Encoders.kryo(classOf[OtherResearchProduct]) + implicit val softwareEncoder: Encoder[Software] = Encoders.kryo(classOf[Software]) + + + log.info("Converting dataset") + val rddDataset =spark.sparkContext.textFile(s"$sourcePath/dataset").map(s => mapper.readValue(s, classOf[OafDataset])) + spark.createDataset(rddDataset).as[OafDataset].write.mode(SaveMode.Overwrite).save(s"$entityPath/dataset") + + + log.info("Converting publication") + val rddPublication =spark.sparkContext.textFile(s"$sourcePath/publication").map(s => mapper.readValue(s, classOf[Publication])) + spark.createDataset(rddPublication).as[Publication].write.mode(SaveMode.Overwrite).save(s"$entityPath/publication") + + log.info("Converting software") + val rddSoftware =spark.sparkContext.textFile(s"$sourcePath/software").map(s => mapper.readValue(s, classOf[Software])) + spark.createDataset(rddSoftware).as[Software].write.mode(SaveMode.Overwrite).save(s"$entityPath/software") + + log.info("Converting otherresearchproduct") + val rddOtherResearchProduct =spark.sparkContext.textFile(s"$sourcePath/otherresearchproduct").map(s => mapper.readValue(s, classOf[OtherResearchProduct])) + spark.createDataset(rddOtherResearchProduct).as[OtherResearchProduct].write.mode(SaveMode.Overwrite).save(s"$entityPath/otherresearchproduct") + + + log.info("Converting Relation") + + + val rddRelation =spark.sparkContext.textFile(s"$sourcePath/relation").map(s => mapper.readValue(s, classOf[Relation])) + spark.createDataset(rddRelation).as[Relation].write.mode(SaveMode.Overwrite).save(s"$relPath") + - entities.foreach{ - e => - val rdd =spark.sparkContext.textFile(s"$sourcePath/${e._1}").map(s => mapper.readValue(s, e._2)) - spark.createDataset(rdd).as[Result].write.mode(SaveMode.Overwrite).save(s"$targetPath/${e._1}") - } } } diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkCreateSummaryObject.scala b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkCreateSummaryObject.scala index a66da3e6d..ac189b6ba 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkCreateSummaryObject.scala +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkCreateSummaryObject.scala @@ -1,7 +1,7 @@ package eu.dnetlib.dhp.sx.graph import eu.dnetlib.dhp.application.ArgumentApplicationParser -import eu.dnetlib.dhp.schema.oaf.Result +import eu.dnetlib.dhp.schema.oaf.{Oaf, Result} import eu.dnetlib.dhp.schema.sx.summary.ScholixSummary import eu.dnetlib.dhp.sx.graph.scholix.ScholixUtils import org.apache.commons.io.IOUtils @@ -29,11 +29,12 @@ object SparkCreateSummaryObject { log.info(s"targetPath -> $targetPath") implicit val resultEncoder:Encoder[Result] = Encoders.kryo[Result] + implicit val oafEncoder:Encoder[Oaf] = Encoders.kryo[Oaf] implicit val summaryEncoder:Encoder[ScholixSummary] = Encoders.kryo[ScholixSummary] - val ds:Dataset[Result] = spark.read.load(s"$sourcePath/*").as[Result] + val ds:Dataset[Result] = spark.read.load(s"$sourcePath/*").as[Oaf].filter(r => r.isInstanceOf[Result]).map(r => r.asInstanceOf[Result]) ds.repartition(6000).map(r => ScholixUtils.resultToSummary(r)).filter(s => s!= null).write.mode(SaveMode.Overwrite).save(targetPath) diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkResolveRelation.scala b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkResolveRelation.scala index 82bf3c50e..b2fddec20 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkResolveRelation.scala +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkResolveRelation.scala @@ -57,10 +57,10 @@ object SparkResolveRelation { currentRelation }.write .mode(SaveMode.Overwrite) - .save(s"$workingPath/resolvedSource") + .save(s"$workingPath/relationResolvedSource") - val relationSourceResolved:Dataset[(String,Relation)] = spark.read.load(s"$workingPath/resolvedSource").as[Relation].map(r => (r.getTarget.toLowerCase, r))(Encoders.tuple(Encoders.STRING, relEncoder)) + val relationSourceResolved:Dataset[(String,Relation)] = spark.read.load(s"$workingPath/relationResolvedSource").as[Relation].map(r => (r.getTarget.toLowerCase, r))(Encoders.tuple(Encoders.STRING, relEncoder)) relationSourceResolved.joinWith(rPid, relationSourceResolved("_1").equalTo(rPid("_2")), "left").map{ m => val targetResolved = m._2 diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/finalGraph/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/finalGraph/oozie_app/workflow.xml index 4e601bff3..d8eb1fc80 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/finalGraph/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/finalGraph/oozie_app/workflow.xml @@ -35,7 +35,7 @@ --masteryarn --sourcePath${sourcePath} - --targetPath${targetPath}/entities + --targetPath${targetPath} @@ -87,7 +87,7 @@ --masteryarn --summaryPath${targetPath}/provision/summaries --targetPath${targetPath}/provision/scholix - --relationPath${sourcePath}/relation_resolved + --relationPath${targetPath}/relation