forked from antonis.lempesis/dnet-hadoop
generate first side of scholix mapping
This commit is contained in:
parent
e4b84ef5d6
commit
c952c8d236
|
@ -0,0 +1,66 @@
|
||||||
|
package eu.dnetlib.dhp.sx.graph
|
||||||
|
|
||||||
|
import eu.dnetlib.dhp.application.ArgumentApplicationParser
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.Relation
|
||||||
|
import eu.dnetlib.dhp.schema.sx.scholix.Scholix
|
||||||
|
import eu.dnetlib.dhp.schema.sx.summary.ScholixSummary
|
||||||
|
import org.apache.commons.io.IOUtils
|
||||||
|
import org.apache.spark.SparkConf
|
||||||
|
import org.apache.spark.sql.{Dataset, Encoder, Encoders, SparkSession}
|
||||||
|
import org.slf4j.{Logger, LoggerFactory}
|
||||||
|
|
||||||
|
object SparkCreateScholix {
|
||||||
|
|
||||||
|
def main(args: Array[String]): Unit = {
|
||||||
|
val log: Logger = LoggerFactory.getLogger(getClass)
|
||||||
|
val conf: SparkConf = new SparkConf()
|
||||||
|
val parser = new ArgumentApplicationParser(IOUtils.toString(getClass.getResourceAsStream("/eu/dnetlib/dhp/sx/graph/create_scholix_params.json")))
|
||||||
|
parser.parseArgument(args)
|
||||||
|
val spark: SparkSession =
|
||||||
|
SparkSession
|
||||||
|
.builder()
|
||||||
|
.config(conf)
|
||||||
|
.appName(getClass.getSimpleName)
|
||||||
|
.master(parser.get("master")).getOrCreate()
|
||||||
|
|
||||||
|
val relationPath = parser.get("relationPath")
|
||||||
|
log.info(s"relationPath -> $relationPath")
|
||||||
|
val summaryPath = parser.get("summaryPath")
|
||||||
|
log.info(s"summaryPath -> $summaryPath")
|
||||||
|
val targetPath = parser.get("targetPath")
|
||||||
|
log.info(s"targetPath -> $targetPath")
|
||||||
|
|
||||||
|
|
||||||
|
implicit val relEncoder:Encoder[Relation] = Encoders.kryo[Relation]
|
||||||
|
implicit val summaryEncoder :Encoder[ScholixSummary] = Encoders.kryo[ScholixSummary]
|
||||||
|
implicit val scholixEncoder :Encoder[Scholix] = Encoders.kryo[Scholix]
|
||||||
|
|
||||||
|
|
||||||
|
val relationDS:Dataset[(String, Relation)] = spark.read.load(relationPath).as[Relation]
|
||||||
|
.map(r => (r.getSource, r))(Encoders.tuple(Encoders.STRING, relEncoder))
|
||||||
|
|
||||||
|
val summaryDS:Dataset[(String, ScholixSummary)] = spark.read.load(summaryPath).as[ScholixSummary]
|
||||||
|
.map(r => (r.getId, r))(Encoders.tuple(Encoders.STRING, summaryEncoder))
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
// relationDS.joinWith(summaryDS, relationDS("_1").equalTo(summaryDS("_1")), "left")
|
||||||
|
// .map {input:((String,Relation), (String, ScholixSummary)) =>
|
||||||
|
// val rel:Relation = input._1._2
|
||||||
|
// val source:ScholixSummary = input._2._2
|
||||||
|
//
|
||||||
|
//
|
||||||
|
// val s = new Scholix
|
||||||
|
//
|
||||||
|
//
|
||||||
|
// }
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -1,25 +1,203 @@
|
||||||
package eu.dnetlib.dhp.sx.graph.scholix
|
package eu.dnetlib.dhp.sx.graph.scholix
|
||||||
|
|
||||||
import eu.dnetlib.dhp.schema.oaf.{Dataset, Result}
|
|
||||||
import eu.dnetlib.dhp.schema.sx.summary.{CollectedFromType, SchemeValue, ScholixSummary, TypedIdentifier, Typology}
|
import eu.dnetlib.dhp.schema.oaf.{Dataset, Relation, Result, StructuredProperty}
|
||||||
|
import eu.dnetlib.dhp.schema.sx.scholix.{Scholix, ScholixCollectedFrom, ScholixEntityId, ScholixIdentifier, ScholixRelationship, ScholixResource}
|
||||||
|
import eu.dnetlib.dhp.schema.sx.summary.{CollectedFromType, SchemeValue, ScholixSummary, Typology}
|
||||||
|
import org.json4s
|
||||||
|
import org.json4s.DefaultFormats
|
||||||
|
import org.json4s.jackson.JsonMethods.parse
|
||||||
|
|
||||||
import scala.collection.JavaConverters._
|
import scala.collection.JavaConverters._
|
||||||
|
import scala.io.Source
|
||||||
import scala.language.postfixOps
|
import scala.language.postfixOps
|
||||||
|
|
||||||
object ScholixUtils {
|
object ScholixUtils {
|
||||||
|
|
||||||
|
|
||||||
|
val DNET_IDENTIFIER_SCHEMA: String = "DNET Identifier"
|
||||||
|
|
||||||
|
val DATE_RELATION_KEY:String = "RelationDate"
|
||||||
|
case class RelationVocabulary(original:String, inverse:String){}
|
||||||
|
|
||||||
|
val relations:Map[String, RelationVocabulary] = {
|
||||||
|
val input =Source.fromInputStream(getClass.getResourceAsStream("/eu/dnetlib/dhp/sx/graph/relations.json")).mkString
|
||||||
|
implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats
|
||||||
|
|
||||||
|
lazy val json: json4s.JValue = parse(input)
|
||||||
|
|
||||||
|
json.extract[Map[String, RelationVocabulary]]
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def extractRelationDate(relation: Relation):String = {
|
||||||
|
|
||||||
|
if (relation.getProperties== null || !relation.getProperties.isEmpty)
|
||||||
|
null
|
||||||
|
else {
|
||||||
|
val date =relation.getProperties.asScala.find(p => DATE_RELATION_KEY.equalsIgnoreCase(p.getKey)).map(p => p.getValue)
|
||||||
|
if (date.isDefined)
|
||||||
|
date.get
|
||||||
|
else
|
||||||
|
null
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
def extractRelationDate(summary: ScholixSummary):String = {
|
||||||
|
|
||||||
|
if(summary.getDate== null && !summary.getDate.isEmpty)
|
||||||
|
null
|
||||||
|
else {
|
||||||
|
summary.getDate.get(0)
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def extractCollectedFrom(summary:ScholixSummary): List[ScholixEntityId] = {
|
||||||
|
if (summary.getDatasources!= null && !summary.getDatasources.isEmpty) {
|
||||||
|
val l: List[ScholixEntityId] = summary.getDatasources.asScala.map{
|
||||||
|
d => new ScholixEntityId(d.getDatasourceName, List(new ScholixIdentifier(d.getDatasourceId, "DNET Identifier", null)).asJava)
|
||||||
|
}(collection.breakOut)
|
||||||
|
l
|
||||||
|
} else List()
|
||||||
|
}
|
||||||
|
|
||||||
|
def extractCollectedFrom(relation: Relation) : List[ScholixEntityId] = {
|
||||||
|
if (relation.getCollectedfrom != null && !relation.getCollectedfrom.isEmpty) {
|
||||||
|
|
||||||
|
|
||||||
|
val l: List[ScholixEntityId] = relation.getCollectedfrom.asScala.map {
|
||||||
|
c =>
|
||||||
|
|
||||||
|
new ScholixEntityId(c.getValue, List(new ScholixIdentifier(c.getKey, DNET_IDENTIFIER_SCHEMA,null)).asJava)
|
||||||
|
}(collection breakOut)
|
||||||
|
l
|
||||||
|
} else List()
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def generateScholixResourceFromSummary(summaryObject: ScholixSummary): ScholixResource = {
|
||||||
|
val r = new ScholixResource
|
||||||
|
r.setIdentifier(summaryObject.getLocalIdentifier)
|
||||||
|
r.setDnetIdentifier(summaryObject.getId)
|
||||||
|
|
||||||
|
r.setObjectType(summaryObject.getTypology.toString)
|
||||||
|
r.setObjectSubType(summaryObject.getSubType)
|
||||||
|
|
||||||
|
if (summaryObject.getTitle!= null && !summaryObject.getTitle.isEmpty)
|
||||||
|
r.setTitle(summaryObject.getTitle.get(0))
|
||||||
|
|
||||||
|
if (summaryObject.getAuthor!= null && !summaryObject.getAuthor.isEmpty){
|
||||||
|
val l:List[ScholixEntityId] = summaryObject.getAuthor.asScala.map(a => new ScholixEntityId(a,null)).toList
|
||||||
|
if (l.nonEmpty)
|
||||||
|
r.setCreator(l.asJava)
|
||||||
|
}
|
||||||
|
|
||||||
|
if (summaryObject.getDate!= null && !summaryObject.getDate.isEmpty)
|
||||||
|
r.setPublicationDate(summaryObject.getDate.get(0))
|
||||||
|
if (summaryObject.getPublisher!= null && !summaryObject.getPublisher.isEmpty)
|
||||||
|
{
|
||||||
|
val plist:List[ScholixEntityId] =summaryObject.getPublisher.asScala.map(p => new ScholixEntityId(p, null)).toList
|
||||||
|
|
||||||
|
if (plist.nonEmpty)
|
||||||
|
r.setPublisher(plist.asJava)
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
if (summaryObject.getDatasources!= null && !summaryObject.getDatasources.isEmpty) {
|
||||||
|
|
||||||
|
val l:List[ScholixCollectedFrom] = summaryObject.getDatasources.asScala.map(c => new ScholixCollectedFrom(
|
||||||
|
new ScholixEntityId(c.getDatasourceName, List(new ScholixIdentifier(c.getDatasourceId, DNET_IDENTIFIER_SCHEMA, null)).asJava)
|
||||||
|
, "collected", "complete"
|
||||||
|
|
||||||
|
)).toList
|
||||||
|
|
||||||
|
if (l.nonEmpty)
|
||||||
|
r.setCollectedFrom(l.asJava)
|
||||||
|
|
||||||
|
}
|
||||||
|
r
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
def scholixFromSource(relation:Relation, source:ScholixSummary):Scholix = {
|
||||||
|
|
||||||
|
if (relation== null || source== null)
|
||||||
|
return null
|
||||||
|
|
||||||
|
val s = new Scholix
|
||||||
|
|
||||||
|
var l: List[ScholixEntityId] = extractCollectedFrom(relation)
|
||||||
|
if (l.isEmpty)
|
||||||
|
l = extractCollectedFrom(source)
|
||||||
|
if (l.isEmpty)
|
||||||
|
return null
|
||||||
|
|
||||||
|
s.setLinkprovider(l.asJava)
|
||||||
|
|
||||||
|
var d = extractRelationDate(relation)
|
||||||
|
if (d == null)
|
||||||
|
d = extractRelationDate(source)
|
||||||
|
|
||||||
|
s.setPublicationDate(d)
|
||||||
|
|
||||||
|
|
||||||
|
if (source.getPublisher!= null && !source.getPublisher.isEmpty) {
|
||||||
|
val l: List[ScholixEntityId] = source.getPublisher.asScala
|
||||||
|
.map{
|
||||||
|
p =>
|
||||||
|
new ScholixEntityId(p, null)
|
||||||
|
}(collection.breakOut)
|
||||||
|
|
||||||
|
if (l.nonEmpty)
|
||||||
|
s.setPublisher(l.asJava)
|
||||||
|
}
|
||||||
|
|
||||||
|
val semanticRelation = relations.getOrElse(relation.getRelClass.toLowerCase, null)
|
||||||
|
if (semanticRelation== null)
|
||||||
|
return null
|
||||||
|
s.setRelationship(new ScholixRelationship(semanticRelation.original, "datacite", semanticRelation.inverse))
|
||||||
|
s.setSource(generateScholixResourceFromSummary(source))
|
||||||
|
|
||||||
|
s
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def findURLforPID(pidValue:List[StructuredProperty], urls:List[String]):List[(StructuredProperty, String)] = {
|
||||||
|
pidValue.map{
|
||||||
|
p =>
|
||||||
|
val pv = p.getValue
|
||||||
|
|
||||||
|
val r = urls.find(u => u.toLowerCase.contains(pv.toLowerCase))
|
||||||
|
(p, r.orNull)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def extractTypedIdentifierFromInstance(r:Result):List[ScholixIdentifier] = {
|
||||||
|
if (r.getInstance() == null || r.getInstance().isEmpty)
|
||||||
|
return List()
|
||||||
|
r.getInstance().asScala.filter(i => i.getUrl!= null && !i.getUrl.isEmpty)
|
||||||
|
|
||||||
|
.flatMap(i => findURLforPID(i.getPid.asScala.toList, i.getUrl.asScala.toList))
|
||||||
|
.map(i => new ScholixIdentifier(i._1.getValue, i._1.getQualifier.getClassid, i._2)).distinct.toList
|
||||||
|
}
|
||||||
|
|
||||||
def resultToSummary(r:Result):ScholixSummary = {
|
def resultToSummary(r:Result):ScholixSummary = {
|
||||||
val s = new ScholixSummary
|
val s = new ScholixSummary
|
||||||
s.setId(r.getId)
|
s.setId(r.getId)
|
||||||
if (r.getPid == null || r.getPid.isEmpty)
|
if (r.getPid == null || r.getPid.isEmpty)
|
||||||
return null
|
return null
|
||||||
|
|
||||||
val pids:List[TypedIdentifier] = r.getPid.asScala.map(p => new TypedIdentifier(p.getValue, p.getQualifier.getClassid))(collection breakOut)
|
val pids:List[ScholixIdentifier] = extractTypedIdentifierFromInstance(r)
|
||||||
|
if (pids.isEmpty)
|
||||||
|
return null
|
||||||
s.setLocalIdentifier(pids.asJava)
|
s.setLocalIdentifier(pids.asJava)
|
||||||
|
|
||||||
s.getLocalIdentifier.isEmpty
|
|
||||||
|
|
||||||
if (r.isInstanceOf[Dataset])
|
if (r.isInstanceOf[Dataset])
|
||||||
s.setTypology(Typology.dataset)
|
s.setTypology(Typology.dataset)
|
||||||
else
|
else
|
||||||
|
@ -43,7 +221,7 @@ object ScholixUtils {
|
||||||
if (r.getInstance() != null) {
|
if (r.getInstance() != null) {
|
||||||
val dt:List[String] = r.getInstance().asScala.filter(i => i.getDateofacceptance != null).map(i => i.getDateofacceptance.getValue)(collection.breakOut)
|
val dt:List[String] = r.getInstance().asScala.filter(i => i.getDateofacceptance != null).map(i => i.getDateofacceptance.getValue)(collection.breakOut)
|
||||||
if (dt.nonEmpty)
|
if (dt.nonEmpty)
|
||||||
s.setDate(dt.asJava)
|
s.setDate(dt.distinct.asJava)
|
||||||
}
|
}
|
||||||
if (r.getDescription!= null && !r.getDescription.isEmpty) {
|
if (r.getDescription!= null && !r.getDescription.isEmpty) {
|
||||||
val d = r.getDescription.asScala.find(f => f.getValue!=null)
|
val d = r.getDescription.asScala.find(f => f.getValue!=null)
|
||||||
|
@ -63,7 +241,7 @@ object ScholixUtils {
|
||||||
if (r.getCollectedfrom!= null && !r.getCollectedfrom.isEmpty) {
|
if (r.getCollectedfrom!= null && !r.getCollectedfrom.isEmpty) {
|
||||||
val cf:List[CollectedFromType] = r.getCollectedfrom.asScala.map(c => new CollectedFromType(c.getValue, c.getKey, "complete"))(collection breakOut)
|
val cf:List[CollectedFromType] = r.getCollectedfrom.asScala.map(c => new CollectedFromType(c.getValue, c.getKey, "complete"))(collection breakOut)
|
||||||
if (cf.nonEmpty)
|
if (cf.nonEmpty)
|
||||||
s.setDatasources(cf.asJava)
|
s.setDatasources(cf.distinct.asJava)
|
||||||
}
|
}
|
||||||
|
|
||||||
s.setRelatedDatasets(0)
|
s.setRelatedDatasets(0)
|
||||||
|
|
|
@ -0,0 +1,85 @@
|
||||||
|
package eu.dnetlib.dhp.sx.graph.scholix
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper, SerializationFeature}
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.{Oaf, Relation, Result}
|
||||||
|
import eu.dnetlib.dhp.sx.graph.bio.BioDBToOAF
|
||||||
|
import eu.dnetlib.dhp.sx.graph.bio.BioDBToOAF.ScholixResolved
|
||||||
|
import eu.dnetlib.dhp.sx.graph.bio.pubmed.AbstractVocabularyTest
|
||||||
|
import org.json4s
|
||||||
|
import org.json4s.DefaultFormats
|
||||||
|
import org.json4s.JsonAST.{JField, JObject, JString}
|
||||||
|
import org.json4s.jackson.JsonMethods.parse
|
||||||
|
import org.junit.jupiter.api.Assertions._
|
||||||
|
import org.junit.jupiter.api.extension.ExtendWith
|
||||||
|
import org.junit.jupiter.api.{BeforeEach, Test}
|
||||||
|
import org.mockito.junit.jupiter.MockitoExtension
|
||||||
|
|
||||||
|
import java.io.{BufferedReader, InputStream, InputStreamReader}
|
||||||
|
import java.util.zip.GZIPInputStream
|
||||||
|
import scala.collection.JavaConverters._
|
||||||
|
import scala.io.Source
|
||||||
|
import scala.xml.pull.XMLEventReader
|
||||||
|
|
||||||
|
@ExtendWith(Array(classOf[MockitoExtension]))
|
||||||
|
class ScholixGraphTest extends AbstractVocabularyTest{
|
||||||
|
|
||||||
|
|
||||||
|
val mapper = new ObjectMapper().enable(SerializationFeature.INDENT_OUTPUT)
|
||||||
|
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES,false)
|
||||||
|
|
||||||
|
@BeforeEach
|
||||||
|
def setUp() :Unit = {
|
||||||
|
|
||||||
|
super.setUpVocabulary()
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@Test
|
||||||
|
def testOAFToSummary():Unit= {
|
||||||
|
val inputRelations = Source.fromInputStream(getClass.getResourceAsStream("/eu/dnetlib/dhp/sx/graph/oaf_to_summary")).mkString
|
||||||
|
val items = inputRelations.lines.toList
|
||||||
|
assertNotNull(items)
|
||||||
|
items.foreach(i =>assertTrue(i.nonEmpty))
|
||||||
|
val result = items.map(r => mapper.readValue(r, classOf[Result])).map(i => ScholixUtils.resultToSummary(i))
|
||||||
|
|
||||||
|
assertNotNull(result)
|
||||||
|
|
||||||
|
assertEquals(result.size, items.size)
|
||||||
|
val d = result.find(s => s.getLocalIdentifier.asScala.exists(i => i.getUrl == null || i.getUrl.isEmpty))
|
||||||
|
assertFalse(d.isDefined)
|
||||||
|
|
||||||
|
println(mapper.writeValueAsString(result.head))
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
@Test
|
||||||
|
def testScholixRelationshipsClean() = {
|
||||||
|
val inputRelations = Source.fromInputStream(getClass.getResourceAsStream("/eu/dnetlib/dhp/sx/graph/relation_transform.json")).mkString
|
||||||
|
implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats
|
||||||
|
|
||||||
|
lazy val json: json4s.JValue = parse(inputRelations)
|
||||||
|
val l:List[String] =json.extract[List[String]]
|
||||||
|
assertNotNull(l)
|
||||||
|
assertTrue(l.nonEmpty)
|
||||||
|
|
||||||
|
|
||||||
|
val relVocbaulary =ScholixUtils.relations
|
||||||
|
|
||||||
|
l.foreach(r => assertTrue(relVocbaulary.contains(r.toLowerCase)))
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
}
|
File diff suppressed because one or more lines are too long
|
@ -0,0 +1,26 @@
|
||||||
|
["References",
|
||||||
|
"isRelatedTo",
|
||||||
|
"IsSupplementTo",
|
||||||
|
"IsPartOf",
|
||||||
|
"IsVersionOf",
|
||||||
|
"HasVersion",
|
||||||
|
"IsReferencedBy",
|
||||||
|
"HasPart",
|
||||||
|
"IsIdenticalTo",
|
||||||
|
"IsPreviousVersionOf",
|
||||||
|
"Continues",
|
||||||
|
"IsContinuedBy",
|
||||||
|
"Cites",
|
||||||
|
"IsDocumentedBy",
|
||||||
|
"IsCitedBy",
|
||||||
|
"IsNewVersionOf",
|
||||||
|
"IsDerivedFrom",
|
||||||
|
"IsVariantFormOf",
|
||||||
|
"IsObsoletedBy",
|
||||||
|
"Reviews",
|
||||||
|
"IsSupplementedBy",
|
||||||
|
"Documents",
|
||||||
|
"IsCompiledBy",
|
||||||
|
"IsSourceOf",
|
||||||
|
"Compiles",
|
||||||
|
"IsReviewedBy"]
|
2
pom.xml
2
pom.xml
|
@ -736,7 +736,7 @@
|
||||||
<mockito-core.version>3.3.3</mockito-core.version>
|
<mockito-core.version>3.3.3</mockito-core.version>
|
||||||
<mongodb.driver.version>3.4.2</mongodb.driver.version>
|
<mongodb.driver.version>3.4.2</mongodb.driver.version>
|
||||||
<vtd.version>[2.12,3.0)</vtd.version>
|
<vtd.version>[2.12,3.0)</vtd.version>
|
||||||
<dhp-schemas.version>[2.6.14]</dhp-schemas.version>
|
<dhp-schemas.version>[2.7.14-SNAPSHOT]</dhp-schemas.version>
|
||||||
<dnet-actionmanager-api.version>[4.0.3]</dnet-actionmanager-api.version>
|
<dnet-actionmanager-api.version>[4.0.3]</dnet-actionmanager-api.version>
|
||||||
<dnet-actionmanager-common.version>[6.0.5]</dnet-actionmanager-common.version>
|
<dnet-actionmanager-common.version>[6.0.5]</dnet-actionmanager-common.version>
|
||||||
<dnet-openaire-broker-common.version>[3.1.6]</dnet-openaire-broker-common.version>
|
<dnet-openaire-broker-common.version>[3.1.6]</dnet-openaire-broker-common.version>
|
||||||
|
|
Loading…
Reference in New Issue