forked from D-Net/dnet-hadoop
fixed scholexplorer bug
This commit is contained in:
parent
db358ad0d2
commit
6efab4d88e
|
@ -389,7 +389,7 @@ object ScholixUtils extends Serializable {
|
||||||
if (persistentIdentifiers.isEmpty)
|
if (persistentIdentifiers.isEmpty)
|
||||||
return null
|
return null
|
||||||
s.setLocalIdentifier(persistentIdentifiers.asJava)
|
s.setLocalIdentifier(persistentIdentifiers.asJava)
|
||||||
s.setTypology(r.getResulttype.getClassid)
|
// s.setTypology(r.getResulttype.getClassid)
|
||||||
|
|
||||||
s.setSubType(r.getInstance().get(0).getInstancetype.getClassname)
|
s.setSubType(r.getInstance().get(0).getInstancetype.getClassname)
|
||||||
|
|
||||||
|
|
|
@ -31,86 +31,86 @@
|
||||||
<artifactId>dhp-actionmanager</artifactId>
|
<artifactId>dhp-actionmanager</artifactId>
|
||||||
<version>${project.version}</version>
|
<version>${project.version}</version>
|
||||||
</dependency>
|
</dependency>
|
||||||
<dependency>
|
<!-- <dependency>-->
|
||||||
<groupId>eu.dnetlib.dhp</groupId>
|
<!-- <groupId>eu.dnetlib.dhp</groupId>-->
|
||||||
<artifactId>dhp-aggregation</artifactId>
|
<!-- <artifactId>dhp-aggregation</artifactId>-->
|
||||||
<version>${project.version}</version>
|
<!-- <version>${project.version}</version>-->
|
||||||
</dependency>
|
<!-- </dependency>-->
|
||||||
<dependency>
|
<!-- <dependency>-->
|
||||||
<groupId>eu.dnetlib.dhp</groupId>
|
<!-- <groupId>eu.dnetlib.dhp</groupId>-->
|
||||||
<artifactId>dhp-blacklist</artifactId>
|
<!-- <artifactId>dhp-blacklist</artifactId>-->
|
||||||
<version>${project.version}</version>
|
<!-- <version>${project.version}</version>-->
|
||||||
</dependency>
|
<!-- </dependency>-->
|
||||||
<dependency>
|
<!-- <dependency>-->
|
||||||
<groupId>eu.dnetlib.dhp</groupId>
|
<!-- <groupId>eu.dnetlib.dhp</groupId>-->
|
||||||
<artifactId>dhp-broker-events</artifactId>
|
<!-- <artifactId>dhp-broker-events</artifactId>-->
|
||||||
<version>${project.version}</version>
|
<!-- <version>${project.version}</version>-->
|
||||||
</dependency>
|
<!-- </dependency>-->
|
||||||
<dependency>
|
<!-- <dependency>-->
|
||||||
<groupId>eu.dnetlib.dhp</groupId>
|
<!-- <groupId>eu.dnetlib.dhp</groupId>-->
|
||||||
<artifactId>dhp-dedup-openaire</artifactId>
|
<!-- <artifactId>dhp-dedup-openaire</artifactId>-->
|
||||||
<version>${project.version}</version>
|
<!-- <version>${project.version}</version>-->
|
||||||
</dependency>
|
<!-- </dependency>-->
|
||||||
<dependency>
|
<!-- <dependency>-->
|
||||||
<groupId>eu.dnetlib.dhp</groupId>
|
<!-- <groupId>eu.dnetlib.dhp</groupId>-->
|
||||||
<artifactId>dhp-enrichment</artifactId>
|
<!-- <artifactId>dhp-enrichment</artifactId>-->
|
||||||
<version>${project.version}</version>
|
<!-- <version>${project.version}</version>-->
|
||||||
</dependency>
|
<!-- </dependency>-->
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>eu.dnetlib.dhp</groupId>
|
<groupId>eu.dnetlib.dhp</groupId>
|
||||||
<artifactId>dhp-graph-mapper</artifactId>
|
<artifactId>dhp-graph-mapper</artifactId>
|
||||||
<version>${project.version}</version>
|
<version>${project.version}</version>
|
||||||
</dependency>
|
</dependency>
|
||||||
<dependency>
|
<!-- <dependency>-->
|
||||||
<groupId>eu.dnetlib.dhp</groupId>
|
<!-- <groupId>eu.dnetlib.dhp</groupId>-->
|
||||||
<artifactId>dhp-graph-provision</artifactId>
|
<!-- <artifactId>dhp-graph-provision</artifactId>-->
|
||||||
<version>${project.version}</version>
|
<!-- <version>${project.version}</version>-->
|
||||||
</dependency>
|
<!-- </dependency>-->
|
||||||
<dependency>
|
<!-- <dependency>-->
|
||||||
<groupId>eu.dnetlib.dhp</groupId>
|
<!-- <groupId>eu.dnetlib.dhp</groupId>-->
|
||||||
<artifactId>dhp-impact-indicators</artifactId>
|
<!-- <artifactId>dhp-impact-indicators</artifactId>-->
|
||||||
<version>${project.version}</version>
|
<!-- <version>${project.version}</version>-->
|
||||||
</dependency>
|
<!-- </dependency>-->
|
||||||
<dependency>
|
<!-- <dependency>-->
|
||||||
<groupId>eu.dnetlib.dhp</groupId>
|
<!-- <groupId>eu.dnetlib.dhp</groupId>-->
|
||||||
<artifactId>dhp-stats-actionsets</artifactId>
|
<!-- <artifactId>dhp-stats-actionsets</artifactId>-->
|
||||||
<version>${project.version}</version>
|
<!-- <version>${project.version}</version>-->
|
||||||
</dependency>
|
<!-- </dependency>-->
|
||||||
<dependency>
|
<!-- <dependency>-->
|
||||||
<groupId>eu.dnetlib.dhp</groupId>
|
<!-- <groupId>eu.dnetlib.dhp</groupId>-->
|
||||||
<artifactId>dhp-stats-hist-snaps</artifactId>
|
<!-- <artifactId>dhp-stats-hist-snaps</artifactId>-->
|
||||||
<version>${project.version}</version>
|
<!-- <version>${project.version}</version>-->
|
||||||
</dependency>
|
<!-- </dependency>-->
|
||||||
<dependency>
|
<!-- <dependency>-->
|
||||||
<groupId>eu.dnetlib.dhp</groupId>
|
<!-- <groupId>eu.dnetlib.dhp</groupId>-->
|
||||||
<artifactId>dhp-stats-monitor-irish</artifactId>
|
<!-- <artifactId>dhp-stats-monitor-irish</artifactId>-->
|
||||||
<version>${project.version}</version>
|
<!-- <version>${project.version}</version>-->
|
||||||
</dependency>
|
<!-- </dependency>-->
|
||||||
<dependency>
|
<!-- <dependency>-->
|
||||||
<groupId>eu.dnetlib.dhp</groupId>
|
<!-- <groupId>eu.dnetlib.dhp</groupId>-->
|
||||||
<artifactId>dhp-stats-promote</artifactId>
|
<!-- <artifactId>dhp-stats-promote</artifactId>-->
|
||||||
<version>${project.version}</version>
|
<!-- <version>${project.version}</version>-->
|
||||||
</dependency>
|
<!-- </dependency>-->
|
||||||
<dependency>
|
<!-- <dependency>-->
|
||||||
<groupId>eu.dnetlib.dhp</groupId>
|
<!-- <groupId>eu.dnetlib.dhp</groupId>-->
|
||||||
<artifactId>dhp-stats-update</artifactId>
|
<!-- <artifactId>dhp-stats-update</artifactId>-->
|
||||||
<version>${project.version}</version>
|
<!-- <version>${project.version}</version>-->
|
||||||
</dependency>
|
<!-- </dependency>-->
|
||||||
<dependency>
|
<!-- <dependency>-->
|
||||||
<groupId>eu.dnetlib.dhp</groupId>
|
<!-- <groupId>eu.dnetlib.dhp</groupId>-->
|
||||||
<artifactId>dhp-swh</artifactId>
|
<!-- <artifactId>dhp-swh</artifactId>-->
|
||||||
<version>${project.version}</version>
|
<!-- <version>${project.version}</version>-->
|
||||||
</dependency>
|
<!-- </dependency>-->
|
||||||
<dependency>
|
<!-- <dependency>-->
|
||||||
<groupId>eu.dnetlib.dhp</groupId>
|
<!-- <groupId>eu.dnetlib.dhp</groupId>-->
|
||||||
<artifactId>dhp-usage-raw-data-update</artifactId>
|
<!-- <artifactId>dhp-usage-raw-data-update</artifactId>-->
|
||||||
<version>${project.version}</version>
|
<!-- <version>${project.version}</version>-->
|
||||||
</dependency>
|
<!-- </dependency>-->
|
||||||
<dependency>
|
<!-- <dependency>-->
|
||||||
<groupId>eu.dnetlib.dhp</groupId>
|
<!-- <groupId>eu.dnetlib.dhp</groupId>-->
|
||||||
<artifactId>dhp-usage-stats-build</artifactId>
|
<!-- <artifactId>dhp-usage-stats-build</artifactId>-->
|
||||||
<version>${project.version}</version>
|
<!-- <version>${project.version}</version>-->
|
||||||
</dependency>
|
<!-- </dependency>-->
|
||||||
</dependencies>
|
</dependencies>
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -1,14 +1,8 @@
|
||||||
package eu.dnetlib.dhp.sx.graph
|
package eu.dnetlib.dhp.sx.graph
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.databind.ObjectMapper
|
||||||
import eu.dnetlib.dhp.schema.oaf.{KeyValue, Result, StructuredProperty}
|
import eu.dnetlib.dhp.schema.oaf.{KeyValue, Result, StructuredProperty}
|
||||||
import eu.dnetlib.dhp.schema.sx.scholix.{
|
import eu.dnetlib.dhp.schema.sx.scholix.{Scholix, ScholixCollectedFrom, ScholixEntityId, ScholixIdentifier, ScholixRelationship, ScholixResource}
|
||||||
Scholix,
|
|
||||||
ScholixCollectedFrom,
|
|
||||||
ScholixEntityId,
|
|
||||||
ScholixIdentifier,
|
|
||||||
ScholixRelationship,
|
|
||||||
ScholixResource
|
|
||||||
}
|
|
||||||
import org.json4s
|
import org.json4s
|
||||||
import org.json4s.DefaultFormats
|
import org.json4s.DefaultFormats
|
||||||
import org.json4s.jackson.JsonMethods.parse
|
import org.json4s.jackson.JsonMethods.parse
|
||||||
|
@ -28,6 +22,7 @@ case class RelKeyValue(key: String, value: String) {}
|
||||||
object ScholexplorerUtils {
|
object ScholexplorerUtils {
|
||||||
|
|
||||||
val OPENAIRE_IDENTIFIER_SCHEMA: String = "OpenAIRE Identifier"
|
val OPENAIRE_IDENTIFIER_SCHEMA: String = "OpenAIRE Identifier"
|
||||||
|
val mapper= new ObjectMapper()
|
||||||
|
|
||||||
case class RelationVocabulary(original: String, inverse: String) {}
|
case class RelationVocabulary(original: String, inverse: String) {}
|
||||||
|
|
||||||
|
@ -242,7 +237,7 @@ object ScholexplorerUtils {
|
||||||
s
|
s
|
||||||
}
|
}
|
||||||
|
|
||||||
def updateTarget(s: Scholix, t: ScholixResource): Scholix = {
|
def updateTarget(s: Scholix, t: ScholixResource): String = {
|
||||||
|
|
||||||
s.setTarget(t)
|
s.setTarget(t)
|
||||||
val spublishers: Seq[ScholixEntityId] =
|
val spublishers: Seq[ScholixEntityId] =
|
||||||
|
@ -251,6 +246,6 @@ object ScholexplorerUtils {
|
||||||
if (t.getPublisher != null && !t.getPublisher.isEmpty) t.getPublisher.asScala else List()
|
if (t.getPublisher != null && !t.getPublisher.isEmpty) t.getPublisher.asScala else List()
|
||||||
val mergedPublishers = spublishers.union(tpublishers).distinct.take(10).toList
|
val mergedPublishers = spublishers.union(tpublishers).distinct.take(10).toList
|
||||||
s.setPublisher(mergedPublishers.asJava)
|
s.setPublisher(mergedPublishers.asJava)
|
||||||
s
|
mapper.writeValueAsString(s)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -11,7 +11,7 @@ import eu.dnetlib.dhp.schema.oaf.{
|
||||||
Dataset => OafDataset
|
Dataset => OafDataset
|
||||||
}
|
}
|
||||||
import eu.dnetlib.dhp.schema.sx.scholix.{Scholix, ScholixResource}
|
import eu.dnetlib.dhp.schema.sx.scholix.{Scholix, ScholixResource}
|
||||||
import org.apache.spark.sql.functions.{col, concat, expr, md5}
|
import org.apache.spark.sql.functions.{col, concat, expr, first, md5}
|
||||||
import org.apache.spark.sql.types.StructType
|
import org.apache.spark.sql.types.StructType
|
||||||
import org.apache.spark.sql._
|
import org.apache.spark.sql._
|
||||||
import org.slf4j.{Logger, LoggerFactory}
|
import org.slf4j.{Logger, LoggerFactory}
|
||||||
|
@ -89,7 +89,13 @@ class SparkCreateScholexplorerDump(propertyPath: String, args: Array[String], lo
|
||||||
.withColumn("cf", expr("transform(collectedfrom, x -> struct(x.key, x.value))"))
|
.withColumn("cf", expr("transform(collectedfrom, x -> struct(x.key, x.value))"))
|
||||||
.drop("collectedfrom")
|
.drop("collectedfrom")
|
||||||
.withColumnRenamed("cf", "collectedfrom")
|
.withColumnRenamed("cf", "collectedfrom")
|
||||||
.distinct()
|
.groupBy(col("id"))
|
||||||
|
.agg(
|
||||||
|
first("source").alias("source"),
|
||||||
|
first("target").alias("target"),
|
||||||
|
first("relClass").alias("relClass"),
|
||||||
|
first("collectedfrom").alias("collectedfrom")
|
||||||
|
)
|
||||||
|
|
||||||
bidRel.write.mode(SaveMode.Overwrite).save(s"$otuputPath/relation")
|
bidRel.write.mode(SaveMode.Overwrite).save(s"$otuputPath/relation")
|
||||||
|
|
||||||
|
@ -97,27 +103,32 @@ class SparkCreateScholexplorerDump(propertyPath: String, args: Array[String], lo
|
||||||
|
|
||||||
def generateScholix(outputPath: String, spark: SparkSession): Unit = {
|
def generateScholix(outputPath: String, spark: SparkSession): Unit = {
|
||||||
implicit val scholixResourceEncoder: Encoder[ScholixResource] = Encoders.bean(classOf[ScholixResource])
|
implicit val scholixResourceEncoder: Encoder[ScholixResource] = Encoders.bean(classOf[ScholixResource])
|
||||||
implicit val scholixEncoder: Encoder[Scholix] = Encoders.bean(classOf[Scholix])
|
implicit val scholixEncoder: Encoder[Scholix] = Encoders.kryo(classOf[Scholix])
|
||||||
|
|
||||||
import spark.implicits._
|
import spark.implicits._
|
||||||
val relations = spark.read.load(s"$outputPath/relation").as[RelationInfo]
|
val relations = spark.read.load(s"$outputPath/relation").as[RelationInfo]
|
||||||
val resource = spark.read.load(s"$outputPath/resource").as[ScholixResource]
|
val resource = spark.read.load(s"$outputPath/resource").as[ScholixResource]
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
val scholix_one_verse = relations
|
val scholix_one_verse = relations
|
||||||
.joinWith(resource, relations("source") === resource("dnetIdentifier"), "inner")
|
.joinWith(resource, relations("source") === resource("dnetIdentifier"), "inner")
|
||||||
.map(res => ScholexplorerUtils.generateScholix(res._1, res._2))
|
.map(res => ScholexplorerUtils.generateScholix(res._1, res._2))
|
||||||
|
.map(s=> (s.getIdentifier, s))(Encoders.tuple(Encoders.STRING, Encoders.kryo(classOf[Scholix])))
|
||||||
|
|
||||||
|
|
||||||
val resourceTarget = relations
|
val resourceTarget = relations
|
||||||
.joinWith(resource, relations("target") === resource("dnetIdentifier"), "inner")
|
.joinWith(resource, relations("target") === resource("dnetIdentifier"), "inner")
|
||||||
.map(res => (res._1.id, res._2))(Encoders.tuple(Encoders.STRING, Encoders.kryo(classOf[ScholixResource])))
|
.map(res => (res._1.id, res._2))(Encoders.tuple(Encoders.STRING, Encoders.kryo(classOf[ScholixResource])))
|
||||||
|
|
||||||
|
|
||||||
scholix_one_verse
|
scholix_one_verse
|
||||||
.joinWith(resourceTarget, scholix_one_verse("identifier") === resourceTarget("_1"), "inner")
|
.joinWith(resourceTarget, scholix_one_verse("_1") === resourceTarget("_1"), "inner")
|
||||||
.map(k => ScholexplorerUtils.updateTarget(k._1, k._2._2))
|
.map(k => ScholexplorerUtils.updateTarget(k._1._2, k._2._2))
|
||||||
.write
|
.write
|
||||||
.mode(SaveMode.Overwrite)
|
.mode(SaveMode.Overwrite)
|
||||||
.option("compression", "gzip")
|
.option("compression", "gzip")
|
||||||
.json(s"$outputPath/scholix")
|
.text(s"$outputPath/scholix")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1,17 +1,28 @@
|
||||||
package eu.dnetlib.dhp.sx.graph.scholix
|
package eu.dnetlib.dhp.sx.graph.scholix
|
||||||
|
|
||||||
|
import eu.dnetlib.dhp.schema.sx.scholix.ScholixResource
|
||||||
import eu.dnetlib.dhp.sx.graph.SparkCreateScholexplorerDump
|
import eu.dnetlib.dhp.sx.graph.SparkCreateScholexplorerDump
|
||||||
import org.apache.spark.sql.SparkSession
|
import org.apache.spark.SparkConf
|
||||||
|
import org.apache.spark.sql.{Encoder, Encoders, SparkSession}
|
||||||
import org.junit.jupiter.api.Test
|
import org.junit.jupiter.api.Test
|
||||||
|
import org.objenesis.strategy.StdInstantiatorStrategy
|
||||||
|
|
||||||
class ScholixGenerationTest {
|
class ScholixGenerationTest {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
def generateScholix(): Unit = {
|
def generateScholix(): Unit = {
|
||||||
|
|
||||||
val spark: SparkSession = SparkSession.builder().master("local[*]").getOrCreate()
|
val spark: SparkSession = SparkSession.builder().master("local[*]").getOrCreate()
|
||||||
val app = new SparkCreateScholexplorerDump(null, null, null)
|
val app = new SparkCreateScholexplorerDump(null, null, null)
|
||||||
// app.generateScholixResource("/home/sandro/Downloads/scholix_sample/", "/home/sandro/Downloads/scholix/", spark)
|
// app.generateScholixResource("/home/sandro/Downloads/scholix_sample/", "/home/sandro/Downloads/scholix/", spark)
|
||||||
// app.generateBidirectionalRelations("/home/sandro/Downloads/scholix_sample/", "/home/sandro/Downloads/scholix/", spark)
|
// app.generateBidirectionalRelations(
|
||||||
|
// "/home/sandro/Downloads/scholix_sample/",
|
||||||
|
// "/home/sandro/Downloads/scholix/",
|
||||||
|
// spark
|
||||||
|
// )
|
||||||
app.generateScholix("/home/sandro/Downloads/scholix/", spark)
|
app.generateScholix("/home/sandro/Downloads/scholix/", spark)
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue