âÃMerge branch 'master' of code-repo.d4science.org:D-Net/dnet-hadoop
This commit is contained in:
commit
f9fc64ffaf
|
@ -4,9 +4,12 @@ import java.time.LocalDateTime
|
||||||
import java.time.format.DateTimeFormatter
|
import java.time.format.DateTimeFormatter
|
||||||
|
|
||||||
import eu.dnetlib.dhp.common.PacePerson
|
import eu.dnetlib.dhp.common.PacePerson
|
||||||
import eu.dnetlib.dhp.schema.oaf.{Author, DataInfo, Dataset, Field, Instance, KeyValue, Publication, Qualifier, Relation, StructuredProperty}
|
import eu.dnetlib.dhp.schema.action.AtomicAction
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.{Author, DataInfo, Dataset, ExternalReference, Field, Instance, KeyValue, Oaf, Publication, Qualifier, Relation, StructuredProperty}
|
||||||
import eu.dnetlib.dhp.schema.scholexplorer.{DLIDataset, DLIPublication, DLIRelation}
|
import eu.dnetlib.dhp.schema.scholexplorer.{DLIDataset, DLIPublication, DLIRelation}
|
||||||
|
import eu.dnetlib.dhp.utils.DHPUtils
|
||||||
import org.apache.commons.lang3.StringUtils
|
import org.apache.commons.lang3.StringUtils
|
||||||
|
import org.codehaus.jackson.map.ObjectMapper
|
||||||
|
|
||||||
import scala.collection.JavaConverters._
|
import scala.collection.JavaConverters._
|
||||||
|
|
||||||
|
@ -77,6 +80,76 @@ object DLIToOAF {
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
val rel_inverse: Map[String, String] = Map(
|
||||||
|
"isRelatedTo" -> "isRelatedTo",
|
||||||
|
"IsSupplementedBy" -> "isSupplementTo",
|
||||||
|
"cites" -> "IsCitedBy",
|
||||||
|
"IsCitedBy" -> "cites",
|
||||||
|
"reviews" -> "IsReviewedBy"
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
val PidTypeMap: Map[String, String] = Map(
|
||||||
|
"pbmid" -> "pmid",
|
||||||
|
"pmcid" -> "pmc",
|
||||||
|
"pmid" -> "pmid",
|
||||||
|
"pubmedid" -> "pmid",
|
||||||
|
"DOI" -> "doi",
|
||||||
|
"doi" -> "doi"
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def toActionSet(item: Oaf): (String, String) = {
|
||||||
|
val mapper = new ObjectMapper()
|
||||||
|
|
||||||
|
item match {
|
||||||
|
case dataset: Dataset =>
|
||||||
|
val a: AtomicAction[Dataset] = new AtomicAction[Dataset]
|
||||||
|
a.setClazz(classOf[Dataset])
|
||||||
|
a.setPayload(dataset)
|
||||||
|
(dataset.getClass.getCanonicalName, mapper.writeValueAsString(a))
|
||||||
|
case publication: Publication =>
|
||||||
|
val a: AtomicAction[Publication] = new AtomicAction[Publication]
|
||||||
|
a.setClazz(classOf[Publication])
|
||||||
|
a.setPayload(publication)
|
||||||
|
(publication.getClass.getCanonicalName, mapper.writeValueAsString(a))
|
||||||
|
case relation: Relation =>
|
||||||
|
val a: AtomicAction[Relation] = new AtomicAction[Relation]
|
||||||
|
a.setClazz(classOf[Relation])
|
||||||
|
a.setPayload(relation)
|
||||||
|
(relation.getClass.getCanonicalName, mapper.writeValueAsString(a))
|
||||||
|
case _ =>
|
||||||
|
null
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
def convertClinicalTrial(dataset: DLIDataset): (String, String) = {
|
||||||
|
val currentId = generateId(dataset.getId)
|
||||||
|
val pids = dataset.getPid.asScala.filter(p => "clinicaltrials.gov".equalsIgnoreCase(p.getQualifier.getClassname)).map(p => s"50|r3111dacbab5::${DHPUtils.md5(p.getValue.toLowerCase())}")
|
||||||
|
if (pids.isEmpty)
|
||||||
|
null
|
||||||
|
else
|
||||||
|
(currentId, pids.head)
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def insertExternalRefs(publication: Publication, externalReferences: List[DLIExternalReference]): Publication = {
|
||||||
|
|
||||||
|
val eRefs = externalReferences.map(e => {
|
||||||
|
val result = new ExternalReference()
|
||||||
|
result.setSitename(e.sitename)
|
||||||
|
result.setLabel(e.label)
|
||||||
|
result.setUrl(e.url)
|
||||||
|
result.setRefidentifier(e.pid)
|
||||||
|
result.setDataInfo(generateDataInfo())
|
||||||
|
result.setQualifier(createQualifier(e.classId, "dnet:externalReference_typologies"))
|
||||||
|
result
|
||||||
|
})
|
||||||
|
publication.setExternalReference(eRefs.asJava)
|
||||||
|
publication
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
def filterPid(p: StructuredProperty): Boolean = {
|
def filterPid(p: StructuredProperty): Boolean = {
|
||||||
if (expectecdPidType.contains(p.getQualifier.getClassname) && p.getQualifier.getClassname.equalsIgnoreCase("url"))
|
if (expectecdPidType.contains(p.getQualifier.getClassname) && p.getQualifier.getClassname.equalsIgnoreCase("url"))
|
||||||
if (filteredURL.exists(u => p.getValue.contains(u)))
|
if (filteredURL.exists(u => p.getValue.contains(u)))
|
||||||
|
@ -97,7 +170,6 @@ object DLIToOAF {
|
||||||
}
|
}
|
||||||
|
|
||||||
def convertDLIDatasetToExternalReference(dataset: DLIDataset): DLIExternalReference = {
|
def convertDLIDatasetToExternalReference(dataset: DLIDataset): DLIExternalReference = {
|
||||||
val currentId = generateId(dataset.getId)
|
|
||||||
val pids = dataset.getPid.asScala.filter(filterPid)
|
val pids = dataset.getPid.asScala.filter(filterPid)
|
||||||
|
|
||||||
if (pids == null || pids.isEmpty)
|
if (pids == null || pids.isEmpty)
|
||||||
|
@ -126,43 +198,50 @@ object DLIToOAF {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
def convertDLIPublicationToOAF(p: DLIPublication): Publication = {
|
def convertDLIPublicationToOAF(inputPublication: DLIPublication): Publication = {
|
||||||
|
|
||||||
val result = new Publication
|
val result = new Publication
|
||||||
result.setId(generateId(p.getId))
|
val cleanedPids = inputPublication.getPid.asScala.filter(p => PidTypeMap.contains(p.getQualifier.getClassid))
|
||||||
|
.map(p => {
|
||||||
|
p.setQualifier(createQualifier(PidTypeMap(p.getQualifier.getClassid), p.getQualifier.getSchemeid))
|
||||||
|
p
|
||||||
|
})
|
||||||
|
if (cleanedPids.isEmpty)
|
||||||
|
return null
|
||||||
|
result.setId(generateId(inputPublication.getId))
|
||||||
result.setDataInfo(generateDataInfo(invisibile = true))
|
result.setDataInfo(generateDataInfo(invisibile = true))
|
||||||
if (p.getCollectedfrom == null || p.getCollectedfrom.size() == 0 || (p.getCollectedfrom.size() == 1 && p.getCollectedfrom.get(0) == null))
|
if (inputPublication.getCollectedfrom == null || inputPublication.getCollectedfrom.size() == 0 || (inputPublication.getCollectedfrom.size() == 1 && inputPublication.getCollectedfrom.get(0) == null))
|
||||||
return null
|
return null
|
||||||
|
result.setCollectedfrom(inputPublication.getCollectedfrom.asScala.map(c => collectedFromMap.getOrElse(c.getKey, null)).filter(p => p != null).asJava)
|
||||||
result.setCollectedfrom(p.getCollectedfrom.asScala.map(c => collectedFromMap.getOrElse(c.getKey, null)).asJava)
|
if(result.getCollectedfrom.isEmpty)
|
||||||
result.setPid(p.getPid)
|
return null
|
||||||
result.setDateofcollection(p.getDateofcollection)
|
result.setPid(cleanedPids.asJava)
|
||||||
result.setOriginalId(p.getPid.asScala.map(p => p.getValue).asJava)
|
result.setDateofcollection(inputPublication.getDateofcollection)
|
||||||
|
result.setOriginalId(inputPublication.getPid.asScala.map(p => p.getValue).asJava)
|
||||||
result.setDateoftransformation(LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss'Z'")))
|
result.setDateoftransformation(LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss'Z'")))
|
||||||
if (p.getAuthor == null || p.getAuthor.isEmpty)
|
if (inputPublication.getAuthor == null || inputPublication.getAuthor.isEmpty)
|
||||||
return null
|
return null
|
||||||
result.setAuthor(p.getAuthor.asScala.map(convertAuthor).asJava)
|
result.setAuthor(inputPublication.getAuthor.asScala.map(convertAuthor).asJava)
|
||||||
result.setResulttype(createQualifier(p.getResulttype.getClassid, p.getResulttype.getClassname, "dnet:result_typologies", "dnet:result_typologies"))
|
result.setResulttype(createQualifier(inputPublication.getResulttype.getClassid, inputPublication.getResulttype.getClassname, "dnet:result_typologies", "dnet:result_typologies"))
|
||||||
|
|
||||||
if (p.getSubject != null)
|
if (inputPublication.getSubject != null)
|
||||||
result.setSubject(p.getSubject.asScala.map(convertSubject).asJava)
|
result.setSubject(inputPublication.getSubject.asScala.map(convertSubject).asJava)
|
||||||
|
|
||||||
if (p.getTitle == null || p.getTitle.isEmpty)
|
if (inputPublication.getTitle == null || inputPublication.getTitle.isEmpty)
|
||||||
return null
|
return null
|
||||||
|
|
||||||
result.setTitle(List(patchTitle(p.getTitle.get(0))).asJava)
|
result.setTitle(List(patchTitle(inputPublication.getTitle.get(0))).asJava)
|
||||||
|
|
||||||
if (p.getRelevantdate == null || p.getRelevantdate.size() == 0)
|
if (inputPublication.getRelevantdate == null || inputPublication.getRelevantdate.size() == 0)
|
||||||
return null
|
return null
|
||||||
|
|
||||||
result.setRelevantdate(p.getRelevantdate.asScala.map(patchRelevantDate).asJava)
|
result.setRelevantdate(inputPublication.getRelevantdate.asScala.map(patchRelevantDate).asJava)
|
||||||
|
|
||||||
|
|
||||||
result.setDescription(p.getDescription)
|
result.setDescription(inputPublication.getDescription)
|
||||||
|
|
||||||
result.setDateofacceptance(asField(p.getRelevantdate.get(0).getValue))
|
result.setDateofacceptance(asField(inputPublication.getRelevantdate.get(0).getValue))
|
||||||
result.setPublisher(p.getPublisher)
|
result.setPublisher(inputPublication.getPublisher)
|
||||||
result.setSource(p.getSource)
|
result.setSource(inputPublication.getSource)
|
||||||
result.setBestaccessright(createQualifier("UNKNOWN", "not available", "dnet:access_modes", "dnet:access_modes"))
|
result.setBestaccessright(createQualifier("UNKNOWN", "not available", "dnet:access_modes", "dnet:access_modes"))
|
||||||
|
|
||||||
val dois = result.getPid.asScala.filter(p => "doi".equalsIgnoreCase(p.getQualifier.getClassname)).map(p => p.getValue)
|
val dois = result.getPid.asScala.filter(p => "doi".equalsIgnoreCase(p.getQualifier.getClassname)).map(p => p.getValue)
|
||||||
|
@ -170,7 +249,7 @@ object DLIToOAF {
|
||||||
return null
|
return null
|
||||||
|
|
||||||
|
|
||||||
val i: Instance = createInstance(s"https://dx.doi.org/${dois.head}", firstInstanceOrNull(p.getInstance()), result.getDateofacceptance)
|
val i: Instance = createInstance(s"https://dx.doi.org/${dois.head}", firstInstanceOrNull(inputPublication.getInstance()), result.getDateofacceptance)
|
||||||
|
|
||||||
if (i != null)
|
if (i != null)
|
||||||
result.setInstance(List(i).asJava)
|
result.setInstance(List(i).asJava)
|
||||||
|
@ -211,7 +290,9 @@ object DLIToOAF {
|
||||||
val result: Dataset = new Dataset
|
val result: Dataset = new Dataset
|
||||||
result.setId(generateId(d.getId))
|
result.setId(generateId(d.getId))
|
||||||
result.setDataInfo(generateDataInfo())
|
result.setDataInfo(generateDataInfo())
|
||||||
result.setCollectedfrom(d.getCollectedfrom.asScala.map(c => collectedFromMap.getOrElse(c.getKey, null)).asJava)
|
result.setCollectedfrom(d.getCollectedfrom.asScala.map(c => collectedFromMap.getOrElse(c.getKey, null)).filter(p => p != null).asJava)
|
||||||
|
if(result.getCollectedfrom.isEmpty)
|
||||||
|
return null
|
||||||
|
|
||||||
|
|
||||||
result.setPid(d.getPid)
|
result.setPid(d.getPid)
|
||||||
|
@ -280,7 +361,7 @@ object DLIToOAF {
|
||||||
if (dataset)
|
if (dataset)
|
||||||
i.setInstancetype(createQualifier("0021", "Dataset", "dnet:publication_resource", "dnet:publication_resource"))
|
i.setInstancetype(createQualifier("0021", "Dataset", "dnet:publication_resource", "dnet:publication_resource"))
|
||||||
else
|
else
|
||||||
i.setInstancetype(createQualifier("0000", "UNKNOWN", "dnet:publication_resource", "dnet:publication_resource"))
|
i.setInstancetype(createQualifier("0000", "Unknown", "dnet:publication_resource", "dnet:publication_resource"))
|
||||||
if (originalInstance != null && originalInstance.getHostedby != null)
|
if (originalInstance != null && originalInstance.getHostedby != null)
|
||||||
i.setHostedby(originalInstance.getHostedby)
|
i.setHostedby(originalInstance.getHostedby)
|
||||||
|
|
||||||
|
|
|
@ -4,10 +4,16 @@ import eu.dnetlib.dhp.application.ArgumentApplicationParser
|
||||||
import eu.dnetlib.dhp.schema.oaf.{Publication, Relation, Dataset => OafDataset}
|
import eu.dnetlib.dhp.schema.oaf.{Publication, Relation, Dataset => OafDataset}
|
||||||
import eu.dnetlib.dhp.schema.scholexplorer.{DLIDataset, DLIPublication, DLIRelation}
|
import eu.dnetlib.dhp.schema.scholexplorer.{DLIDataset, DLIPublication, DLIRelation}
|
||||||
import org.apache.commons.io.IOUtils
|
import org.apache.commons.io.IOUtils
|
||||||
|
import org.apache.hadoop.io.Text
|
||||||
|
import org.apache.hadoop.io.compress.GzipCodec
|
||||||
|
import org.apache.hadoop.mapred.SequenceFileOutputFormat
|
||||||
import org.apache.spark.rdd.RDD
|
import org.apache.spark.rdd.RDD
|
||||||
import org.apache.spark.sql.{Dataset, Encoder, Encoders, SaveMode, SparkSession}
|
import org.apache.spark.sql.{Dataset, Encoder, Encoders, SaveMode, SparkSession}
|
||||||
|
import org.apache.spark.sql.functions._
|
||||||
|
import org.apache.spark.sql.expressions.Window
|
||||||
import org.apache.spark.{SparkConf, SparkContext}
|
import org.apache.spark.{SparkConf, SparkContext}
|
||||||
import org.codehaus.jackson.map.ObjectMapper
|
import org.codehaus.jackson.map.ObjectMapper
|
||||||
|
|
||||||
import scala.collection.mutable.ArrayBuffer
|
import scala.collection.mutable.ArrayBuffer
|
||||||
|
|
||||||
|
|
||||||
|
@ -36,57 +42,66 @@ object SparkExportContentForOpenAire {
|
||||||
implicit val dliRelEncoder: Encoder[DLIRelation] = Encoders.bean(classOf[DLIRelation])
|
implicit val dliRelEncoder: Encoder[DLIRelation] = Encoders.bean(classOf[DLIRelation])
|
||||||
import spark.implicits._
|
import spark.implicits._
|
||||||
|
|
||||||
//
|
|
||||||
// val relRDD:RDD[Relation] = sc.textFile(s"$workingPath/relation_j")
|
val relRDD:RDD[Relation] = sc.textFile(s"$workingPath/relation_j")
|
||||||
// .map(s => new ObjectMapper().readValue(s, classOf[DLIRelation]))
|
.map(s => new ObjectMapper().readValue(s, classOf[DLIRelation]))
|
||||||
// .filter(p => p.getDataInfo.getDeletedbyinference == false)
|
.filter(p => p.getDataInfo.getDeletedbyinference == false)
|
||||||
// .map(DLIToOAF.convertDLIRelation).filter(p=>p!= null)
|
.map(DLIToOAF.convertDLIRelation).filter(p=>p!= null)
|
||||||
// spark.createDataset(relRDD).write.mode(SaveMode.Overwrite).save(s"$workingPath/relationDS")
|
spark.createDataset(relRDD).write.mode(SaveMode.Overwrite).save(s"$workingPath/relationDS")
|
||||||
//
|
|
||||||
// val datRDD:RDD[OafDataset] = sc.textFile(s"$workingPath/dataset")
|
val datRDD:RDD[OafDataset] = sc.textFile(s"$workingPath/dataset")
|
||||||
// .map(s => new ObjectMapper().readValue(s, classOf[DLIDataset]))
|
.map(s => new ObjectMapper().readValue(s, classOf[DLIDataset]))
|
||||||
// .filter(p => p.getDataInfo.getDeletedbyinference == false)
|
.filter(p => p.getDataInfo.getDeletedbyinference == false)
|
||||||
// .map(DLIToOAF.convertDLIDatasetTOOAF).filter(p=>p!= null)
|
.map(DLIToOAF.convertDLIDatasetTOOAF).filter(p=>p!= null)
|
||||||
// spark.createDataset(datRDD).write.mode(SaveMode.Overwrite).save(s"$workingPath/datasetDS")
|
spark.createDataset(datRDD).write.mode(SaveMode.Overwrite).save(s"$workingPath/datasetDS")
|
||||||
//
|
|
||||||
//
|
|
||||||
// val pubRDD:RDD[Publication] = sc.textFile(s"$workingPath/publication")
|
val pubRDD:RDD[Publication] = sc.textFile(s"$workingPath/publication")
|
||||||
// .map(s => new ObjectMapper().readValue(s, classOf[DLIPublication]))
|
.map(s => new ObjectMapper().readValue(s, classOf[DLIPublication]))
|
||||||
// .filter(p => p.getDataInfo.getDeletedbyinference == false)
|
.filter(p => p.getDataInfo.getDeletedbyinference == false)
|
||||||
// .map(DLIToOAF.convertDLIPublicationToOAF).filter(p=>p!= null)
|
.map(DLIToOAF.convertDLIPublicationToOAF).filter(p=>p!= null)
|
||||||
// spark.createDataset(pubRDD).write.mode(SaveMode.Overwrite).save(s"$workingPath/publicationDS")
|
spark.createDataset(pubRDD).write.mode(SaveMode.Overwrite).save(s"$workingPath/publicationDS")
|
||||||
//
|
|
||||||
//
|
|
||||||
//
|
|
||||||
// val pubs:Dataset[Publication] = spark.read.load(s"$workingPath/publicationDS").as[Publication]
|
val pubs:Dataset[Publication] = spark.read.load(s"$workingPath/publicationDS").as[Publication]
|
||||||
// val dats :Dataset[OafDataset] = spark.read.load(s"$workingPath/datasetDS").as[OafDataset]
|
val dats :Dataset[OafDataset] = spark.read.load(s"$workingPath/datasetDS").as[OafDataset]
|
||||||
var relDS :Dataset[Relation] = spark.read.load(s"$workingPath/relationDS").as[Relation]
|
val relDS1 :Dataset[Relation] = spark.read.load(s"$workingPath/relationDS").as[Relation]
|
||||||
//
|
|
||||||
//
|
|
||||||
// pubs.joinWith(relDS, pubs("id").equalTo(relDS("source"))).map(k => k._2).write.mode(SaveMode.Overwrite).save(s"$workingPath/relationDS_f1")
|
val pub_id = pubs.select("id").distinct()
|
||||||
//
|
val dat_id = dats.select("id").distinct()
|
||||||
// relDS= spark.read.load(s"$workingPath/relationDS_f1").as[Relation]
|
|
||||||
//
|
|
||||||
// relDS.joinWith(dats, relDS("target").equalTo(dats("id"))).map(k => k._1).write.mode(SaveMode.Overwrite).save(s"$workingPath/relationDS_filtered")
|
pub_id.joinWith(relDS1, pub_id("id").equalTo(relDS1("source"))).map(k => k._2).write.mode(SaveMode.Overwrite).save(s"$workingPath/relationDS_f1")
|
||||||
//
|
|
||||||
//
|
val relDS2= spark.read.load(s"$workingPath/relationDS_f1").as[Relation]
|
||||||
// val r_source = relDS.select(relDS("source")).distinct()
|
|
||||||
// val r_target = relDS.select(relDS("source")).distinct()
|
relDS2.joinWith(dat_id, relDS2("target").equalTo(dats("id"))).map(k => k._1).write.mode(SaveMode.Overwrite).save(s"$workingPath/relationDS_filtered")
|
||||||
//
|
|
||||||
//
|
|
||||||
// pubs.joinWith(r_source, pubs("id").equalTo(r_source("source")), "inner").map(k => k._1).write.mode(SaveMode.Overwrite).save(s"$workingPath/publicationDS_filtered")
|
val r_source = relDS2.select(relDS2("source")).distinct()
|
||||||
//
|
val r_target = relDS2.select(relDS2("target")).distinct()
|
||||||
// dats.joinWith(r_target, dats("id").equalTo(r_target("target")), "inner").map(k => k._1).write.mode(SaveMode.Overwrite).save(s"$workingPath/datasetDS_filtered")
|
|
||||||
//
|
|
||||||
// spark.createDataset(sc.textFile(s"$workingPath/dataset")
|
val w2 = Window.partitionBy("id").orderBy("lastupdatetimestamp")
|
||||||
// .map(s => new ObjectMapper().readValue(s, classOf[DLIDataset]))
|
|
||||||
// .map(DLIToOAF.convertDLIDatasetToExternalReference)
|
pubs.joinWith(r_source, pubs("id").equalTo(r_source("source")), "inner").map(k => k._1)
|
||||||
// .filter(p => p != null)).as[DLIExternalReference].write.mode(SaveMode.Overwrite).save(s"$workingPath/externalReference")
|
.withColumn("row",row_number.over(w2)).where($"row" === 1).drop("row")
|
||||||
//
|
.write.mode(SaveMode.Overwrite).save(s"$workingPath/publicationDS_filtered")
|
||||||
|
|
||||||
|
dats.joinWith(r_target, dats("id").equalTo(r_target("target")), "inner").map(k => k._1)
|
||||||
|
.withColumn("row",row_number.over(w2)).where($"row" === 1).drop("row")
|
||||||
|
.write.mode(SaveMode.Overwrite).save(s"$workingPath/datasetAS")
|
||||||
|
|
||||||
|
spark.createDataset(sc.textFile(s"$workingPath/dataset")
|
||||||
|
.map(s => new ObjectMapper().readValue(s, classOf[DLIDataset]))
|
||||||
|
.map(DLIToOAF.convertDLIDatasetToExternalReference)
|
||||||
|
.filter(p => p != null)).as[DLIExternalReference].write.mode(SaveMode.Overwrite).save(s"$workingPath/externalReference")
|
||||||
|
|
||||||
val pf = spark.read.load(s"$workingPath/publicationDS_filtered").select("id")
|
val pf = spark.read.load(s"$workingPath/publicationDS_filtered").select("id")
|
||||||
relDS = spark.read.load(s"$workingPath/relationDS").as[Relation]
|
val relDS3 = spark.read.load(s"$workingPath/relationDS").as[Relation]
|
||||||
val relationTo = pf.joinWith(relDS, pf("id").equalTo(relDS("source")),"inner").map(t =>t._2)
|
val relationTo = pf.joinWith(relDS3, pf("id").equalTo(relDS3("source")),"inner").map(t =>t._2)
|
||||||
|
|
||||||
val extRef = spark.read.load(s"$workingPath/externalReference").as[DLIExternalReference]
|
val extRef = spark.read.load(s"$workingPath/externalReference").as[DLIExternalReference]
|
||||||
|
|
||||||
|
@ -100,11 +115,64 @@ object SparkExportContentForOpenAire {
|
||||||
(f._1, dli_ext)
|
(f._1, dli_ext)
|
||||||
})).write.mode(SaveMode.Overwrite).save(s"$workingPath/externalReference_grouped")
|
})).write.mode(SaveMode.Overwrite).save(s"$workingPath/externalReference_grouped")
|
||||||
|
|
||||||
|
val pubf :Dataset[Publication] = spark.read.load(s"$workingPath/publicationDS_filtered").as[Publication]
|
||||||
|
|
||||||
|
val groupedERf:Dataset[(String, List[DLIExternalReference])]= spark.read.load(s"$workingPath/externalReference_grouped").as[(String, List[DLIExternalReference])]
|
||||||
|
|
||||||
|
groupedERf.joinWith(pubf,pubf("id").equalTo(groupedERf("_1"))).map(t =>
|
||||||
|
{
|
||||||
|
val publication = t._2
|
||||||
|
if (t._1 != null) {
|
||||||
|
val eRefs = t._1._2
|
||||||
|
DLIToOAF.insertExternalRefs(publication, eRefs)
|
||||||
|
|
||||||
|
} else
|
||||||
|
publication
|
||||||
|
}
|
||||||
|
).write.mode(SaveMode.Overwrite).save(s"$workingPath/publicationAS")
|
||||||
|
|
||||||
|
|
||||||
|
spark.createDataset(sc.textFile(s"$workingPath/dataset")
|
||||||
|
.map(s => new ObjectMapper().readValue(s, classOf[DLIDataset]))
|
||||||
|
.map(DLIToOAF.convertClinicalTrial)
|
||||||
|
.filter(p => p != null))
|
||||||
|
.write.mode(SaveMode.Overwrite).save(s"$workingPath/clinicalTrials")
|
||||||
|
|
||||||
|
val ct:Dataset[(String,String)] = spark.read.load(s"$workingPath/clinicalTrials").as[(String,String)]
|
||||||
|
|
||||||
|
val relDS= spark.read.load(s"$workingPath/relationDS_f1").as[Relation]
|
||||||
|
|
||||||
|
relDS.joinWith(ct, relDS("target").equalTo(ct("_1")), "inner")
|
||||||
|
.map(k =>{
|
||||||
|
val currentRel = k._1
|
||||||
|
currentRel.setTarget(k._2._2)
|
||||||
|
currentRel
|
||||||
|
}).write.mode(SaveMode.Overwrite).save(s"$workingPath/clinicalTrialsRels")
|
||||||
|
|
||||||
|
|
||||||
|
val clRels:Dataset[Relation] = spark.read.load(s"$workingPath/clinicalTrialsRels").as[Relation]
|
||||||
|
val rels:Dataset[Relation] = spark.read.load(s"$workingPath/relationDS_filtered").as[Relation]
|
||||||
|
|
||||||
|
rels.union(clRels).flatMap(r => {
|
||||||
|
val inverseRel = new Relation
|
||||||
|
inverseRel.setSource(r.getTarget)
|
||||||
|
inverseRel.setTarget(r.getSource)
|
||||||
|
inverseRel.setDataInfo(r.getDataInfo)
|
||||||
|
inverseRel.setCollectedfrom(r.getCollectedfrom)
|
||||||
|
inverseRel.setRelType(r.getRelType)
|
||||||
|
inverseRel.setSubRelType(r.getSubRelType)
|
||||||
|
inverseRel.setRelClass(DLIToOAF.rel_inverse(r.getRelClass))
|
||||||
|
List(r, inverseRel)
|
||||||
|
}).write.mode(SaveMode.Overwrite).save(s"$workingPath/relationAS")
|
||||||
|
|
||||||
|
|
||||||
|
val fRels:Dataset[(String,String)] = spark.read.load(s"$workingPath/relationAS").as[Relation].map(DLIToOAF.toActionSet)
|
||||||
|
val fpubs:Dataset[(String,String)] = spark.read.load(s"$workingPath/publicationAS").as[Publication].map(DLIToOAF.toActionSet)
|
||||||
|
val fdats:Dataset[(String,String)] = spark.read.load(s"$workingPath/datasetAS").as[OafDataset].map(DLIToOAF.toActionSet)
|
||||||
|
|
||||||
|
|
||||||
|
fRels.union(fpubs).union(fdats).rdd.map(s => (new Text(s._1), new Text(s._2))).saveAsHadoopFile(s"$workingPath/rawset", classOf[Text], classOf[Text], classOf[SequenceFileOutputFormat[Text,Text]], classOf[GzipCodec])
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
@ -113,6 +181,4 @@ object SparkExportContentForOpenAire {
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue