forked from D-Net/dnet-hadoop
implemented new version of scholix using only the require index field and move the resource and target metadata on another index to reduce space
This commit is contained in:
parent
a02f3f0d2b
commit
0c934d3c39
|
@ -2,6 +2,7 @@ package eu.dnetlib.dhp.sx.graph
|
||||||
|
|
||||||
import com.fasterxml.jackson.databind.ObjectMapper
|
import com.fasterxml.jackson.databind.ObjectMapper
|
||||||
import eu.dnetlib.dhp.schema.oaf.{KeyValue, Result, StructuredProperty}
|
import eu.dnetlib.dhp.schema.oaf.{KeyValue, Result, StructuredProperty}
|
||||||
|
import eu.dnetlib.dhp.schema.sx.scholix.flat.ScholixFlat
|
||||||
import eu.dnetlib.dhp.schema.sx.scholix.{
|
import eu.dnetlib.dhp.schema.sx.scholix.{
|
||||||
Scholix,
|
Scholix,
|
||||||
ScholixCollectedFrom,
|
ScholixCollectedFrom,
|
||||||
|
@ -10,6 +11,7 @@ import eu.dnetlib.dhp.schema.sx.scholix.{
|
||||||
ScholixRelationship,
|
ScholixRelationship,
|
||||||
ScholixResource
|
ScholixResource
|
||||||
}
|
}
|
||||||
|
import org.apache.logging.log4j.core.appender.ConsoleAppender.Target
|
||||||
import org.json4s
|
import org.json4s
|
||||||
import org.json4s.DefaultFormats
|
import org.json4s.DefaultFormats
|
||||||
import org.json4s.jackson.JsonMethods.parse
|
import org.json4s.jackson.JsonMethods.parse
|
||||||
|
@ -26,6 +28,16 @@ case class RelationInfo(
|
||||||
) {}
|
) {}
|
||||||
case class RelKeyValue(key: String, value: String) {}
|
case class RelKeyValue(key: String, value: String) {}
|
||||||
|
|
||||||
|
case class SummaryResource(
|
||||||
|
id: String,
|
||||||
|
typology: String,
|
||||||
|
subType: String,
|
||||||
|
pids: List[String],
|
||||||
|
pidTypes: List[String],
|
||||||
|
publishers: List[String],
|
||||||
|
date: String
|
||||||
|
) {}
|
||||||
|
|
||||||
object ScholexplorerUtils {
|
object ScholexplorerUtils {
|
||||||
|
|
||||||
val OPENAIRE_IDENTIFIER_SCHEMA: String = "OpenAIRE Identifier"
|
val OPENAIRE_IDENTIFIER_SCHEMA: String = "OpenAIRE Identifier"
|
||||||
|
@ -86,6 +98,99 @@ object ScholexplorerUtils {
|
||||||
.toList
|
.toList
|
||||||
}
|
}
|
||||||
|
|
||||||
|
def generateSummaryResource(input: ScholixResource): SummaryResource = {
|
||||||
|
val distinctIds = if (input.getIdentifier != null) {
|
||||||
|
input.getIdentifier.asScala.map(i => i.getIdentifier).distinct.toList
|
||||||
|
} else List()
|
||||||
|
val distinctTypes = if (input.getIdentifier != null) {
|
||||||
|
input.getIdentifier.asScala.map(i => i.getSchema).distinct.toList
|
||||||
|
} else List()
|
||||||
|
val distinctPublishers = if (input.getPublisher != null) {
|
||||||
|
input.getPublisher.asScala.map(i => i.getName).distinct.sorted.take(5).toList
|
||||||
|
} else List()
|
||||||
|
SummaryResource(
|
||||||
|
id = input.getDnetIdentifier,
|
||||||
|
typology = input.getObjectType,
|
||||||
|
subType = input.getObjectSubType,
|
||||||
|
pids = distinctIds,
|
||||||
|
pidTypes = distinctTypes,
|
||||||
|
publishers = distinctPublishers,
|
||||||
|
date = input.getPublicationDate
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
def generateScholixFlat(relation: RelationInfo, summary: SummaryResource, updateSource: Boolean): ScholixFlat = {
|
||||||
|
val scholix = new ScholixFlat
|
||||||
|
scholix.setIdentifier(relation.id)
|
||||||
|
if (relation.collectedfrom != null && relation.collectedfrom.nonEmpty)
|
||||||
|
scholix.setLinkProviders(
|
||||||
|
relation.collectedfrom
|
||||||
|
.map(cf => {
|
||||||
|
cf.value
|
||||||
|
})
|
||||||
|
.distinct
|
||||||
|
.sorted
|
||||||
|
.take(5)
|
||||||
|
.toList
|
||||||
|
.asJava
|
||||||
|
)
|
||||||
|
else {
|
||||||
|
scholix.setLinkProviders(List("OpenAIRE").asJava)
|
||||||
|
}
|
||||||
|
val semanticRelation = relations.getOrElse(relation.relclass.toLowerCase, null)
|
||||||
|
if (semanticRelation == null)
|
||||||
|
return null
|
||||||
|
|
||||||
|
scholix.setRelationType(semanticRelation.original)
|
||||||
|
scholix.setPublicationDate(summary.date)
|
||||||
|
if (updateSource) {
|
||||||
|
if (summary.pids.isEmpty)
|
||||||
|
return null
|
||||||
|
scholix.setSourceId(summary.id)
|
||||||
|
scholix.setSourcePid(summary.pids.asJava)
|
||||||
|
scholix.setSourcePidType(summary.pidTypes.asJava)
|
||||||
|
scholix.setSourceType(summary.typology)
|
||||||
|
scholix.setSourceSubType(summary.subType)
|
||||||
|
if (summary.publishers.nonEmpty) {
|
||||||
|
scholix.setSourcePublisher(summary.publishers.asJava)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
if (summary.pids.isEmpty)
|
||||||
|
return null
|
||||||
|
scholix.setTargetId(summary.id)
|
||||||
|
scholix.setTargetPid(summary.pids.asJava)
|
||||||
|
scholix.setTargetPidType(summary.pidTypes.asJava)
|
||||||
|
scholix.setTargetType(summary.typology)
|
||||||
|
scholix.setTargetSubType(summary.subType)
|
||||||
|
if (summary.publishers.nonEmpty) {
|
||||||
|
scholix.setTargetPublisher(summary.publishers.asJava)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
scholix
|
||||||
|
}
|
||||||
|
|
||||||
|
def mergeScholixFlat(source: ScholixFlat, target: ScholixFlat): ScholixFlat = {
|
||||||
|
if (source.getPublicationDate == null) {
|
||||||
|
source.setPublicationDate(target.getPublicationDate)
|
||||||
|
}
|
||||||
|
|
||||||
|
source.setTargetId(target.getTargetId)
|
||||||
|
source.setTargetPid(target.getTargetPid)
|
||||||
|
source.setTargetPidType(target.getTargetPidType)
|
||||||
|
source.setTargetType(target.getTargetType)
|
||||||
|
source.setTargetSubType(target.getTargetSubType)
|
||||||
|
|
||||||
|
if (source.getLinkProviders != null)
|
||||||
|
source.setTargetPublisher(target.getTargetPublisher)
|
||||||
|
else if (source.getLinkProviders != null && target.getLinkProviders != null) {
|
||||||
|
source.setLinkProviders(
|
||||||
|
source.getLinkProviders.asScala.union(target.getLinkProviders.asScala).sorted.distinct.take(5).asJava
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
source
|
||||||
|
}
|
||||||
|
|
||||||
def generateScholixResourceFromResult(result: Result): ScholixResource = {
|
def generateScholixResourceFromResult(result: Result): ScholixResource = {
|
||||||
|
|
||||||
if (result.getInstance() == null || result.getInstance().size() == 0)
|
if (result.getInstance() == null || result.getInstance().size() == 0)
|
||||||
|
|
|
@ -1,15 +1,8 @@
|
||||||
package eu.dnetlib.dhp.sx.graph
|
package eu.dnetlib.dhp.sx.graph
|
||||||
|
|
||||||
import eu.dnetlib.dhp.application.AbstractScalaApplication
|
import eu.dnetlib.dhp.application.AbstractScalaApplication
|
||||||
import eu.dnetlib.dhp.schema.oaf.{
|
import eu.dnetlib.dhp.schema.oaf.{KeyValue, OtherResearchProduct, Publication, Relation, Result, Software, Dataset => OafDataset}
|
||||||
KeyValue,
|
import eu.dnetlib.dhp.schema.sx.scholix.flat.ScholixFlat
|
||||||
OtherResearchProduct,
|
|
||||||
Publication,
|
|
||||||
Relation,
|
|
||||||
Result,
|
|
||||||
Software,
|
|
||||||
Dataset => OafDataset
|
|
||||||
}
|
|
||||||
import eu.dnetlib.dhp.schema.sx.scholix.{Scholix, ScholixResource}
|
import eu.dnetlib.dhp.schema.sx.scholix.{Scholix, ScholixResource}
|
||||||
import org.apache.spark.sql.functions.{col, concat, expr, first, md5}
|
import org.apache.spark.sql.functions.{col, concat, expr, first, md5}
|
||||||
import org.apache.spark.sql.types.StructType
|
import org.apache.spark.sql.types.StructType
|
||||||
|
@ -101,6 +94,31 @@ class SparkCreateScholexplorerDump(propertyPath: String, args: Array[String], lo
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
def generateFlatScholix(outputPath: String, spark: SparkSession): Unit = {
|
||||||
|
import spark.implicits._
|
||||||
|
implicit val scholixResourceEncoder: Encoder[ScholixResource] = Encoders.bean(classOf[ScholixResource])
|
||||||
|
implicit val scholixEncoder: Encoder[ScholixFlat] = Encoders.bean(classOf[ScholixFlat])
|
||||||
|
val relations = spark.read.load(s"$outputPath/relation").as[RelationInfo]
|
||||||
|
val resource = spark.read.load(s"$outputPath/resource").as[ScholixResource]
|
||||||
|
|
||||||
|
val summaries =resource.map(s=> ScholexplorerUtils.generateSummaryResource(s))
|
||||||
|
|
||||||
|
val scholix_source =relations
|
||||||
|
.joinWith(summaries, relations("source")=== summaries("id"))
|
||||||
|
.map(k => ScholexplorerUtils.generateScholixFlat(k._1, k._2, true))
|
||||||
|
|
||||||
|
val scholix_target =relations.joinWith(summaries, relations("target")=== summaries("id"))
|
||||||
|
.map(k => ScholexplorerUtils.generateScholixFlat(k._1, k._2, false))
|
||||||
|
|
||||||
|
scholix_source
|
||||||
|
.joinWith(scholix_target, scholix_source("identifier")===scholix_target("identifier"), "inner")
|
||||||
|
.map(s =>ScholexplorerUtils.mergeScholixFlat(s._1, s._2))
|
||||||
|
.write
|
||||||
|
.mode(SaveMode.Overwrite)
|
||||||
|
.option("compression", "gzip")
|
||||||
|
.json(s"$outputPath/scholix")
|
||||||
|
}
|
||||||
|
|
||||||
def generateScholix(outputPath: String, spark: SparkSession): Unit = {
|
def generateScholix(outputPath: String, spark: SparkSession): Unit = {
|
||||||
implicit val scholixResourceEncoder: Encoder[ScholixResource] = Encoders.bean(classOf[ScholixResource])
|
implicit val scholixResourceEncoder: Encoder[ScholixResource] = Encoders.bean(classOf[ScholixResource])
|
||||||
implicit val scholixEncoder: Encoder[Scholix] = Encoders.kryo(classOf[Scholix])
|
implicit val scholixEncoder: Encoder[Scholix] = Encoders.kryo(classOf[Scholix])
|
||||||
|
|
|
@ -14,13 +14,15 @@ class ScholixGenerationTest {
|
||||||
|
|
||||||
val spark: SparkSession = SparkSession.builder().master("local[*]").getOrCreate()
|
val spark: SparkSession = SparkSession.builder().master("local[*]").getOrCreate()
|
||||||
val app = new SparkCreateScholexplorerDump(null, null, null)
|
val app = new SparkCreateScholexplorerDump(null, null, null)
|
||||||
// app.generateScholixResource("/home/sandro/Downloads/scholix_sample/", "/home/sandro/Downloads/scholix/", spark)
|
|
||||||
// app.generateBidirectionalRelations(
|
val basePath = "/Users/sandro/Downloads"
|
||||||
// "/home/sandro/Downloads/scholix_sample/",
|
app.generateScholixResource(s"$basePath/scholix_sample/", s"$basePath/scholix/", spark)
|
||||||
// "/home/sandro/Downloads/scholix/",
|
app.generateBidirectionalRelations(
|
||||||
// spark
|
s"$basePath/scholix_sample/",
|
||||||
// )
|
s"$basePath/scholix/",
|
||||||
app.generateScholix("/home/sandro/Downloads/scholix/", spark)
|
spark
|
||||||
|
)
|
||||||
|
app.generateFlatScholix(s"$basePath/scholix/", spark)
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
2
pom.xml
2
pom.xml
|
@ -960,7 +960,7 @@
|
||||||
<commons.logging.version>1.1.3</commons.logging.version>
|
<commons.logging.version>1.1.3</commons.logging.version>
|
||||||
<commons-validator.version>1.7</commons-validator.version>
|
<commons-validator.version>1.7</commons-validator.version>
|
||||||
<dateparser.version>1.0.7</dateparser.version>
|
<dateparser.version>1.0.7</dateparser.version>
|
||||||
<dhp-schemas.version>[6.1.2]</dhp-schemas.version>
|
<dhp-schemas.version>[6.1.3-FLAT-SCHOLIX]</dhp-schemas.version>
|
||||||
<dhp.cdh.version>cdh5.9.2</dhp.cdh.version>
|
<dhp.cdh.version>cdh5.9.2</dhp.cdh.version>
|
||||||
<dhp.commons.lang.version>3.5</dhp.commons.lang.version>
|
<dhp.commons.lang.version>3.5</dhp.commons.lang.version>
|
||||||
<dhp.guava.version>11.0.2</dhp.guava.version>
|
<dhp.guava.version>11.0.2</dhp.guava.version>
|
||||||
|
|
Loading…
Reference in New Issue