forked from D-Net/dnet-hadoop
implemented new version of scholix using only the require index field and move the resource and target metadata on another index to reduce space
This commit is contained in:
parent
0c934d3c39
commit
844a31f7a6
|
@ -1,7 +1,15 @@
|
|||
package eu.dnetlib.dhp.sx.graph
|
||||
|
||||
import eu.dnetlib.dhp.application.AbstractScalaApplication
|
||||
import eu.dnetlib.dhp.schema.oaf.{KeyValue, OtherResearchProduct, Publication, Relation, Result, Software, Dataset => OafDataset}
|
||||
import eu.dnetlib.dhp.schema.oaf.{
|
||||
KeyValue,
|
||||
OtherResearchProduct,
|
||||
Publication,
|
||||
Relation,
|
||||
Result,
|
||||
Software,
|
||||
Dataset => OafDataset
|
||||
}
|
||||
import eu.dnetlib.dhp.schema.sx.scholix.flat.ScholixFlat
|
||||
import eu.dnetlib.dhp.schema.sx.scholix.{Scholix, ScholixResource}
|
||||
import org.apache.spark.sql.functions.{col, concat, expr, first, md5}
|
||||
|
@ -22,7 +30,7 @@ class SparkCreateScholexplorerDump(propertyPath: String, args: Array[String], lo
|
|||
log.info("targetPath: {}", targetPath)
|
||||
generateBidirectionalRelations(sourcePath, targetPath, spark)
|
||||
generateScholixResource(sourcePath, targetPath, spark)
|
||||
generateScholix(targetPath, spark)
|
||||
generateFlatScholix(targetPath, spark)
|
||||
}
|
||||
|
||||
def generateScholixResource(inputPath: String, outputPath: String, spark: SparkSession): Unit = {
|
||||
|
@ -101,18 +109,34 @@ class SparkCreateScholexplorerDump(propertyPath: String, args: Array[String], lo
|
|||
val relations = spark.read.load(s"$outputPath/relation").as[RelationInfo]
|
||||
val resource = spark.read.load(s"$outputPath/resource").as[ScholixResource]
|
||||
|
||||
val summaries =resource.map(s=> ScholexplorerUtils.generateSummaryResource(s))
|
||||
resource
|
||||
.map(s => ScholexplorerUtils.generateSummaryResource(s))
|
||||
.write
|
||||
.mode(SaveMode.Overwrite)
|
||||
.save(s"$outputPath/summary")
|
||||
val summaries = spark.read.load(s"$outputPath/summary").as[SummaryResource]
|
||||
|
||||
val scholix_source =relations
|
||||
.joinWith(summaries, relations("source")=== summaries("id"))
|
||||
relations
|
||||
.joinWith(summaries, relations("source") === summaries("id"))
|
||||
.map(k => ScholexplorerUtils.generateScholixFlat(k._1, k._2, true))
|
||||
.write
|
||||
.mode(SaveMode.Overwrite)
|
||||
.save(s"$outputPath/scholix_source")
|
||||
|
||||
val scholix_target =relations.joinWith(summaries, relations("target")=== summaries("id"))
|
||||
val scholix_source = spark.read.load(s"$outputPath/scholix_source").as[ScholixFlat]
|
||||
|
||||
relations
|
||||
.joinWith(summaries, relations("target") === summaries("id"))
|
||||
.map(k => ScholexplorerUtils.generateScholixFlat(k._1, k._2, false))
|
||||
.write
|
||||
.mode(SaveMode.Overwrite)
|
||||
.save(s"$outputPath/scholix_target")
|
||||
|
||||
val scholix_target = spark.read.load(s"$outputPath/scholix_target").as[ScholixFlat]
|
||||
|
||||
scholix_source
|
||||
.joinWith(scholix_target, scholix_source("identifier")===scholix_target("identifier"), "inner")
|
||||
.map(s =>ScholexplorerUtils.mergeScholixFlat(s._1, s._2))
|
||||
.joinWith(scholix_target, scholix_source("identifier") === scholix_target("identifier"), "inner")
|
||||
.map(s => ScholexplorerUtils.mergeScholixFlat(s._1, s._2))
|
||||
.write
|
||||
.mode(SaveMode.Overwrite)
|
||||
.option("compression", "gzip")
|
||||
|
|
|
@ -16,7 +16,7 @@ class ScholixGenerationTest {
|
|||
val app = new SparkCreateScholexplorerDump(null, null, null)
|
||||
|
||||
val basePath = "/Users/sandro/Downloads"
|
||||
app.generateScholixResource(s"$basePath/scholix_sample/", s"$basePath/scholix/", spark)
|
||||
app.generateScholixResource(s"$basePath/scholix_sample/", s"$basePath/scholix/", spark)
|
||||
app.generateBidirectionalRelations(
|
||||
s"$basePath/scholix_sample/",
|
||||
s"$basePath/scholix/",
|
||||
|
|
Loading…
Reference in New Issue