fixed defintion wf of creation final infospace of scholexplorer

This commit is contained in:
Sandro La Bruzzo 2021-07-25 11:15:37 +02:00
parent 3920c69bc8
commit 8fac10c91e
4 changed files with 48 additions and 23 deletions

View File

@ -1,7 +1,8 @@
package eu.dnetlib.dhp.sx.graph
import com.cloudera.com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.databind.ObjectMapper
import eu.dnetlib.dhp.application.ArgumentApplicationParser
import eu.dnetlib.dhp.schema.oaf.{OtherResearchProduct, Publication, Result, Software, Dataset => OafDataset}
import eu.dnetlib.dhp.schema.oaf.{OtherResearchProduct, Publication, Relation, Result, Software, Dataset => OafDataset}
import org.apache.commons.io.IOUtils
import org.apache.spark.SparkConf
import org.apache.spark.sql.{Encoder, Encoders, SaveMode, SparkSession}
@ -9,12 +10,7 @@ import org.slf4j.{Logger, LoggerFactory}
object SparkConvertRDDtoDataset {
def main(args: Array[String]): Unit = {
val entities = List(
("dataset", classOf[OafDataset]),
("otherresearchproduct", classOf[OtherResearchProduct]),
("publication", classOf[Publication]),
("software", classOf[Software])
)
val log: Logger = LoggerFactory.getLogger(getClass)
val conf: SparkConf = new SparkConf()
@ -29,15 +25,43 @@ object SparkConvertRDDtoDataset {
val sourcePath = parser.get("sourcePath")
log.info(s"sourcePath -> $sourcePath")
val targetPath = parser.get("targetPath")
log.info(s"targetPath -> $targetPath")
val mapper = new ObjectMapper()
implicit val resultEncoder: Encoder[Result] = Encoders.kryo(classOf[Result])
val t = parser.get("targetPath")
log.info(s"targetPath -> $t")
val entityPath = s"$t/entities"
val relPath = s"$t/relation"
val mapper = new ObjectMapper()
implicit val datasetEncoder: Encoder[OafDataset] = Encoders.kryo(classOf[OafDataset])
implicit val publicationEncoder: Encoder[Publication] = Encoders.kryo(classOf[Publication])
implicit val relationEncoder: Encoder[Relation] = Encoders.kryo(classOf[Relation])
implicit val orpEncoder: Encoder[OtherResearchProduct] = Encoders.kryo(classOf[OtherResearchProduct])
implicit val softwareEncoder: Encoder[Software] = Encoders.kryo(classOf[Software])
log.info("Converting dataset")
val rddDataset =spark.sparkContext.textFile(s"$sourcePath/dataset").map(s => mapper.readValue(s, classOf[OafDataset]))
spark.createDataset(rddDataset).as[OafDataset].write.mode(SaveMode.Overwrite).save(s"$entityPath/dataset")
log.info("Converting publication")
val rddPublication =spark.sparkContext.textFile(s"$sourcePath/publication").map(s => mapper.readValue(s, classOf[Publication]))
spark.createDataset(rddPublication).as[Publication].write.mode(SaveMode.Overwrite).save(s"$entityPath/publication")
log.info("Converting software")
val rddSoftware =spark.sparkContext.textFile(s"$sourcePath/software").map(s => mapper.readValue(s, classOf[Software]))
spark.createDataset(rddSoftware).as[Software].write.mode(SaveMode.Overwrite).save(s"$entityPath/software")
log.info("Converting otherresearchproduct")
val rddOtherResearchProduct =spark.sparkContext.textFile(s"$sourcePath/otherresearchproduct").map(s => mapper.readValue(s, classOf[OtherResearchProduct]))
spark.createDataset(rddOtherResearchProduct).as[OtherResearchProduct].write.mode(SaveMode.Overwrite).save(s"$entityPath/otherresearchproduct")
log.info("Converting Relation")
val rddRelation =spark.sparkContext.textFile(s"$sourcePath/relation").map(s => mapper.readValue(s, classOf[Relation]))
spark.createDataset(rddRelation).as[Relation].write.mode(SaveMode.Overwrite).save(s"$relPath")
entities.foreach{
e =>
val rdd =spark.sparkContext.textFile(s"$sourcePath/${e._1}").map(s => mapper.readValue(s, e._2))
spark.createDataset(rdd).as[Result].write.mode(SaveMode.Overwrite).save(s"$targetPath/${e._1}")
}
}
}

View File

@ -1,7 +1,7 @@
package eu.dnetlib.dhp.sx.graph
import eu.dnetlib.dhp.application.ArgumentApplicationParser
import eu.dnetlib.dhp.schema.oaf.Result
import eu.dnetlib.dhp.schema.oaf.{Oaf, Result}
import eu.dnetlib.dhp.schema.sx.summary.ScholixSummary
import eu.dnetlib.dhp.sx.graph.scholix.ScholixUtils
import org.apache.commons.io.IOUtils
@ -29,11 +29,12 @@ object SparkCreateSummaryObject {
log.info(s"targetPath -> $targetPath")
implicit val resultEncoder:Encoder[Result] = Encoders.kryo[Result]
implicit val oafEncoder:Encoder[Oaf] = Encoders.kryo[Oaf]
implicit val summaryEncoder:Encoder[ScholixSummary] = Encoders.kryo[ScholixSummary]
val ds:Dataset[Result] = spark.read.load(s"$sourcePath/*").as[Result]
val ds:Dataset[Result] = spark.read.load(s"$sourcePath/*").as[Oaf].filter(r => r.isInstanceOf[Result]).map(r => r.asInstanceOf[Result])
ds.repartition(6000).map(r => ScholixUtils.resultToSummary(r)).filter(s => s!= null).write.mode(SaveMode.Overwrite).save(targetPath)

View File

@ -57,10 +57,10 @@ object SparkResolveRelation {
currentRelation
}.write
.mode(SaveMode.Overwrite)
.save(s"$workingPath/resolvedSource")
.save(s"$workingPath/relationResolvedSource")
val relationSourceResolved:Dataset[(String,Relation)] = spark.read.load(s"$workingPath/resolvedSource").as[Relation].map(r => (r.getTarget.toLowerCase, r))(Encoders.tuple(Encoders.STRING, relEncoder))
val relationSourceResolved:Dataset[(String,Relation)] = spark.read.load(s"$workingPath/relationResolvedSource").as[Relation].map(r => (r.getTarget.toLowerCase, r))(Encoders.tuple(Encoders.STRING, relEncoder))
relationSourceResolved.joinWith(rPid, relationSourceResolved("_1").equalTo(rPid("_2")), "left").map{
m =>
val targetResolved = m._2

View File

@ -35,7 +35,7 @@
</spark-opts>
<arg>--master</arg><arg>yarn</arg>
<arg>--sourcePath</arg><arg>${sourcePath}</arg>
<arg>--targetPath</arg><arg>${targetPath}/entities</arg>
<arg>--targetPath</arg><arg>${targetPath}</arg>
</spark>
<ok to="CreateSummaries"/>
<error to="Kill"/>
@ -87,7 +87,7 @@
<arg>--master</arg><arg>yarn</arg>
<arg>--summaryPath</arg><arg>${targetPath}/provision/summaries</arg>
<arg>--targetPath</arg><arg>${targetPath}/provision/scholix</arg>
<arg>--relationPath</arg><arg>${sourcePath}/relation_resolved</arg>
<arg>--relationPath</arg><arg>${targetPath}/relation</arg>
</spark>
<ok to="DropJSONPath"/>