updated changing in the workflow of provision in the phase of aggregation. Removed serialization in JSON RDD and used spark Dataset

This commit is contained in:
Sandro La Bruzzo 2020-07-30 09:25:56 +02:00
parent 487226f669
commit 3010a362bc
12 changed files with 476 additions and 119 deletions

View File

@ -1,5 +1,6 @@
package eu.dnetlib.dhp.sx.ebi
import eu.dnetlib.dhp.schema.oaf.{Publication, Relation, Dataset => OafDataset}
import eu.dnetlib.dhp.schema.scholexplorer.{DLIDataset, DLIPublication, DLIRelation, DLIUnknown}
import org.apache.spark.sql.{Encoder, Encoders}
import org.apache.spark.sql.expressions.Aggregator
@ -35,6 +36,88 @@ object EBIAggregator {
}
def getDLIUnknownAggregator(): Aggregator[(String, DLIUnknown), DLIUnknown, DLIUnknown] = new Aggregator[(String, DLIUnknown), DLIUnknown, DLIUnknown]{
override def zero: DLIUnknown = new DLIUnknown()
override def reduce(b: DLIUnknown, a: (String, DLIUnknown)): DLIUnknown = {
b.mergeFrom(a._2)
if (b.getId == null)
b.setId(a._2.getId)
b
}
override def merge(wx: DLIUnknown, wy: DLIUnknown): DLIUnknown = {
wx.mergeFrom(wy)
if(wx.getId == null && wy.getId.nonEmpty)
wx.setId(wy.getId)
wx
}
override def finish(reduction: DLIUnknown): DLIUnknown = reduction
override def bufferEncoder: Encoder[DLIUnknown] =
Encoders.kryo(classOf[DLIUnknown])
override def outputEncoder: Encoder[DLIUnknown] =
Encoders.kryo(classOf[DLIUnknown])
}
def getDLIDatasetAggregator(): Aggregator[(String, DLIDataset), DLIDataset, DLIDataset] = new Aggregator[(String, DLIDataset), DLIDataset, DLIDataset]{
override def zero: DLIDataset = new DLIDataset()
override def reduce(b: DLIDataset, a: (String, DLIDataset)): DLIDataset = {
b.mergeFrom(a._2)
if (b.getId == null)
b.setId(a._2.getId)
b
}
override def merge(wx: DLIDataset, wy: DLIDataset): DLIDataset = {
wx.mergeFrom(wy)
if(wx.getId == null && wy.getId.nonEmpty)
wx.setId(wy.getId)
wx
}
override def finish(reduction: DLIDataset): DLIDataset = reduction
override def bufferEncoder: Encoder[DLIDataset] =
Encoders.kryo(classOf[DLIDataset])
override def outputEncoder: Encoder[DLIDataset] =
Encoders.kryo(classOf[DLIDataset])
}
def getDLIPublicationAggregator(): Aggregator[(String, DLIPublication), DLIPublication, DLIPublication] = new Aggregator[(String, DLIPublication), DLIPublication, DLIPublication]{
override def zero: DLIPublication = new DLIPublication()
override def reduce(b: DLIPublication, a: (String, DLIPublication)): DLIPublication = {
b.mergeFrom(a._2)
if (b.getId == null)
b.setId(a._2.getId)
b
}
override def merge(wx: DLIPublication, wy: DLIPublication): DLIPublication = {
wx.mergeFrom(wy)
if(wx.getId == null && wy.getId.nonEmpty)
wx.setId(wy.getId)
wx
}
override def finish(reduction: DLIPublication): DLIPublication = reduction
override def bufferEncoder: Encoder[DLIPublication] =
Encoders.kryo(classOf[DLIPublication])
override def outputEncoder: Encoder[DLIPublication] =
Encoders.kryo(classOf[DLIPublication])
}
def getPublicationAggregator(): Aggregator[(String, Publication), Publication, Publication] = new Aggregator[(String, Publication), Publication, Publication]{
override def zero: Publication = new Publication()
@ -85,5 +168,27 @@ object EBIAggregator {
}
def getDLIRelationAggregator(): Aggregator[(String, DLIRelation), DLIRelation, DLIRelation] = new Aggregator[(String, DLIRelation), DLIRelation, DLIRelation]{
override def zero: DLIRelation = new DLIRelation()
override def reduce(b: DLIRelation, a: (String, DLIRelation)): DLIRelation = {
a._2
}
override def merge(a: DLIRelation, b: DLIRelation): DLIRelation = {
if(b!= null) b else a
}
override def finish(reduction: DLIRelation): DLIRelation = reduction
override def bufferEncoder: Encoder[DLIRelation] =
Encoders.kryo(classOf[DLIRelation])
override def outputEncoder: Encoder[DLIRelation] =
Encoders.kryo(classOf[DLIRelation])
}
}

View File

@ -1,8 +1,9 @@
package eu.dnetlib.dhp.sx.ebi
import eu.dnetlib.dhp.application.ArgumentApplicationParser
import eu.dnetlib.dhp.schema.oaf.{Instance, KeyValue, Oaf}
import eu.dnetlib.dhp.schema.oaf.{Author, Instance, Journal, KeyValue, Oaf, Publication, Dataset => OafDataset}
import eu.dnetlib.dhp.schema.scholexplorer.OafUtils.createQualifier
import eu.dnetlib.dhp.schema.scholexplorer.{DLIDataset, DLIRelation, OafUtils, ProvenaceInfo}
import eu.dnetlib.dhp.schema.scholexplorer.{DLIDataset, DLIPublication, DLIRelation, OafUtils, ProvenaceInfo}
import eu.dnetlib.dhp.sx.ebi.model.{PMArticle, PMAuthor, PMJournal}
import eu.dnetlib.dhp.utils.DHPUtils
import eu.dnetlib.scholexplorer.relation.RelationMapper
import org.apache.commons.io.IOUtils
@ -12,6 +13,7 @@ import org.json4s
import org.json4s.DefaultFormats
import org.json4s.JsonAST.{JField, JObject, JString}
import org.json4s.jackson.JsonMethods.parse
import org.apache.spark.sql.functions._
import scala.collection.JavaConverters._
@ -28,6 +30,64 @@ case class EBILinks(relation:String, pubdate:String, tpid:String, tpidType:Strin
}
def journalToOAF(pj:PMJournal): Journal = {
val j = new Journal
j.setIssnPrinted(pj.getIssn)
j.setVol(pj.getVolume)
j.setName(pj.getTitle)
j.setIss(pj.getIssue)
j.setDataInfo(OafUtils.generateDataInfo())
j
}
def pubmedTOPublication(input:PMArticle):DLIPublication = {
val dnetPublicationId = s"50|${DHPUtils.md5(s"${input.getPmid}::pmid")}"
val p = new DLIPublication
p.setId(dnetPublicationId)
p.setDataInfo(OafUtils.generateDataInfo())
p.setPid(List(OafUtils.createSP(input.getPmid.toLowerCase.trim, "pmid", "dnet:pid_types")).asJava)
p.setCompletionStatus("complete")
val pi = new ProvenaceInfo
pi.setId("dli_________::europe_pmc__")
pi.setName( "Europe PMC")
pi.setCompletionStatus("complete")
pi.setCollectionMode("collected")
p.setDlicollectedfrom(List(pi).asJava)
p.setCollectedfrom(List(generatePubmedDLICollectedFrom()).asJava)
if (input.getAuthors != null && input.getAuthors.size() >0) {
var aths: List[Author] = List()
input.getAuthors.asScala.filter(a=> a!= null).foreach(a => {
val c = new Author
c.setFullname(a.getFullName)
c.setName(a.getForeName)
c.setSurname(a.getLastName)
aths = aths ::: List(c)
})
if (aths.nonEmpty)
p.setAuthor(aths.asJava)
}
if (input.getJournal != null)
p.setJournal(journalToOAF(input.getJournal))
p.setTitle(List(OafUtils.createSP(input.getTitle, "main title", "dnet:dataCite_title")).asJava)
p.setDateofacceptance(OafUtils.asField(input.getDate))
val i = new Instance
i.setCollectedfrom(generatePubmedDLICollectedFrom())
i.setDateofacceptance(p.getDateofacceptance)
i.setUrl(List(s"https://pubmed.ncbi.nlm.nih.gov/${input.getPmid}").asJava)
i.setInstancetype(createQualifier("0001", "Article", "dnet:publication_resource", "dnet:publication_resource"))
p.setInstance(List(i).asJava)
p
}
def ebiLinksToOaf(input:(String, String)):List[Oaf] = {
val pmid :String = input._1
val input_json :String = input._2
@ -116,8 +176,16 @@ case class EBILinks(relation:String, pubdate:String, tpid:String, tpidType:Strin
val workingPath = parser.get("workingPath")
implicit val oafEncoder: Encoder[Oaf] = Encoders.kryo(classOf[Oaf])
implicit val oafpubEncoder: Encoder[Publication] = Encoders.kryo[Publication]
implicit val relEncoder: Encoder[DLIRelation] = Encoders.kryo(classOf[DLIRelation])
implicit val datEncoder: Encoder[DLIDataset] = Encoders.kryo(classOf[DLIDataset])
implicit val pubEncoder: Encoder[DLIPublication] = Encoders.kryo(classOf[DLIPublication])
implicit val atEncoder: Encoder[Author] = Encoders.kryo(classOf[Author])
implicit val strEncoder:Encoder[String] = Encoders.STRING
implicit val PMEncoder: Encoder[PMArticle] = Encoders.kryo(classOf[PMArticle])
implicit val PMJEncoder: Encoder[PMJournal] = Encoders.kryo(classOf[PMJournal])
implicit val PMAEncoder: Encoder[PMAuthor] = Encoders.kryo(classOf[PMAuthor])
val ds:Dataset[(String,String)] = spark.read.load(s"$workingPath/baseline_links_updates").as[(String,String)](Encoders.tuple(Encoders.STRING, Encoders.STRING))
@ -133,6 +201,46 @@ case class EBILinks(relation:String, pubdate:String, tpid:String, tpidType:Strin
oDataset.filter(p =>p.isInstanceOf[DLIDataset]).map(p => p.asInstanceOf[DLIDataset]).write.mode(SaveMode.Overwrite).save(s"$workingPath/baseline_links_updates_dataset")
val idPublicationSolved:Dataset[String] = spark.read.load(s"$workingPath/baseline_links_updates").where(col("links").isNotNull).select("pmid").as[String]
val baseline:Dataset[(String, PMArticle)]= spark.read.load(s"$workingPath/baseline_dataset").as[PMArticle].map(p=> (p.getPmid, p))(Encoders.tuple(strEncoder,PMEncoder))
idPublicationSolved.joinWith(baseline, idPublicationSolved("pmid").equalTo(baseline("_1"))).map(k => pubmedTOPublication(k._2._2)).write.mode(SaveMode.Overwrite).save(s"$workingPath/baseline_links_updates_publication")
val pmaDatasets = spark.read.load("/user/sandro.labruzzo/scholix/EBI/ebi_garr/baseline_dataset").as[PMArticle]
pmaDatasets.map(p => pubmedTOPublication(p)).write.mode(SaveMode.Overwrite).save(s"$workingPath/baseline_publication_all")
val pubs: Dataset[(String,Publication)] = spark.read.load("/user/sandro.labruzzo/scholix/EBI/publication").as[Publication].map(p => (p.getId, p))(Encoders.tuple(Encoders.STRING,oafpubEncoder))
val pubdate:Dataset[(String,DLIPublication)] = spark.read.load(s"$workingPath/baseline_publication_all").as[DLIPublication].map(p => (p.getId, p))(Encoders.tuple(Encoders.STRING,pubEncoder))
pubs.joinWith(pubdate, pubs("_1").equalTo(pubdate("_1"))).map(k => k._2._2).write.mode(SaveMode.Overwrite).save(s"$workingPath/baseline_publication_ebi")
val dt : Dataset[DLIDataset] = spark.read.load(s"$workingPath/dataset").as[DLIDataset]
val update : Dataset[DLIDataset] = spark.read.load(s"$workingPath/ebi_garr/baseline_links_updates_dataset").as[DLIDataset]
dt.union(update).map(d => (d.getId,d))(Encoders.tuple(Encoders.STRING, datEncoder))
.groupByKey(_._1)(Encoders.STRING)
.agg(EBIAggregator.getDLIDatasetAggregator().toColumn)
.map(p => p._2)
.write.mode(SaveMode.Overwrite).save(s"$workingPath/baseline_dataset_ebi")
val rel: Dataset[DLIRelation] = spark.read.load(s"$workingPath/relation").as[DLIRelation]
val relupdate : Dataset[DLIRelation] = spark.read.load(s"$workingPath/ebi_garr/baseline_links_updates_relation").as[DLIRelation]
rel.union(relupdate)
.map(d => (s"${d.getSource}::${d.getRelType}::${d.getTarget}", d))(Encoders.tuple(Encoders.STRING, relEncoder))
.groupByKey(_._1)(Encoders.STRING)
.agg(EBIAggregator.getDLIRelationAggregator().toColumn)
.map(p => p._2)
.write.mode(SaveMode.Overwrite)
.save(s"$workingPath/baseline_relation_ebi")
}
}

View File

@ -2,6 +2,7 @@ package eu.dnetlib.dhp.sx.ebi
import eu.dnetlib.dhp.application.ArgumentApplicationParser
import eu.dnetlib.dhp.schema.oaf.{Oaf, Publication, Relation, Dataset => OafDataset}
import eu.dnetlib.dhp.schema.scholexplorer.{DLIDataset, DLIPublication, DLIRelation}
import eu.dnetlib.dhp.sx.graph.parser.{DatasetScholexplorerParser, PublicationScholexplorerParser}
import eu.dnetlib.scholexplorer.relation.RelationMapper
import org.apache.commons.io.IOUtils
@ -10,6 +11,7 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Dataset, Encoder, Encoders, SaveMode, SparkSession}
import org.codehaus.jackson.map.{ObjectMapper, SerializationConfig}
import org.slf4j.{Logger, LoggerFactory}
import scala.collection.JavaConverters._
object SparkCreateEBIDataFrame {
@ -34,54 +36,51 @@ object SparkCreateEBIDataFrame {
val relationMapper = RelationMapper.load
implicit val oafEncoder: Encoder[Oaf] = Encoders.kryo(classOf[Oaf])
implicit val datasetEncoder: Encoder[OafDataset] = Encoders.kryo(classOf[OafDataset])
implicit val pubEncoder: Encoder[Publication] = Encoders.kryo(classOf[Publication])
implicit val relEncoder: Encoder[Relation] = Encoders.kryo(classOf[Relation])
implicit val datasetEncoder: Encoder[DLIDataset] = Encoders.kryo(classOf[DLIDataset])
implicit val pubEncoder: Encoder[DLIPublication] = Encoders.kryo(classOf[DLIPublication])
implicit val relEncoder: Encoder[DLIRelation] = Encoders.kryo(classOf[DLIRelation])
logger.info("Extract Publication and relation from publication_xml")
val oafPubsRDD:RDD[Oaf] = sc.textFile(s"$workingPath/publication_xml").map(s =>
{
new ObjectMapper().readValue(s, classOf[String])
}).flatMap(s => {
val d = new PublicationScholexplorerParser
d.parseObject(s, relationMapper).asScala.iterator})
// logger.info("Extract Publication and relation from publication_xml")
// val oafPubsRDD:RDD[Oaf] = sc.textFile(s"$workingPath/publication_xml").map(s =>
// {
// new ObjectMapper().readValue(s, classOf[String])
// }).flatMap(s => {
// val d = new PublicationScholexplorerParser
// d.parseObject(s, relationMapper).asScala.iterator})
//
// val mapper = new ObjectMapper()
// mapper.getSerializationConfig.enable(SerializationConfig.Feature.INDENT_OUTPUT)
// spark.createDataset(oafPubsRDD).write.mode(SaveMode.Overwrite).save(s"$workingPath/oaf")
//
// logger.info("Extract Publication and relation from dataset_xml")
// val oafDatsRDD:RDD[Oaf] = sc.textFile(s"$workingPath/dataset_xml").map(s =>
// {
// new ObjectMapper().readValue(s, classOf[String])
// }).flatMap(s => {
// val d = new DatasetScholexplorerParser
// d.parseObject(s, relationMapper).asScala.iterator})
val mapper = new ObjectMapper()
mapper.getSerializationConfig.enable(SerializationConfig.Feature.INDENT_OUTPUT)
spark.createDataset(oafPubsRDD).write.mode(SaveMode.Overwrite).save(s"$workingPath/oaf")
logger.info("Extract Publication and relation from dataset_xml")
val oafDatsRDD:RDD[Oaf] = sc.textFile(s"$workingPath/dataset_xml").map(s =>
{
new ObjectMapper().readValue(s, classOf[String])
}).flatMap(s => {
val d = new DatasetScholexplorerParser
d.parseObject(s, relationMapper).asScala.iterator})
spark.createDataset(oafDatsRDD).write.mode(SaveMode.Append).save(s"$workingPath/oaf")
val dataset: Dataset[OafDataset] = spark.read.load(s"$workingPath/oaf").as[Oaf].filter(o => o.isInstanceOf[OafDataset]).map(d => d.asInstanceOf[OafDataset])
val publication: Dataset[Publication] = spark.read.load(s"$workingPath/oaf").as[Oaf].filter(o => o.isInstanceOf[Publication]).map(d => d.asInstanceOf[Publication])
val relations: Dataset[Relation] = spark.read.load(s"$workingPath/oaf").as[Oaf].filter(o => o.isInstanceOf[Relation]).map(d => d.asInstanceOf[Relation])
// spark.createDataset(oafDatsRDD).write.mode(SaveMode.Append).save(s"$workingPath/oaf")
val dataset: Dataset[DLIDataset] = spark.read.load(s"$workingPath/oaf").as[Oaf].filter(o => o.isInstanceOf[DLIDataset]).map(d => d.asInstanceOf[DLIDataset])
val publication: Dataset[DLIPublication] = spark.read.load(s"$workingPath/oaf").as[Oaf].filter(o => o.isInstanceOf[DLIPublication]).map(d => d.asInstanceOf[DLIPublication])
val relations: Dataset[DLIRelation] = spark.read.load(s"$workingPath/oaf").as[Oaf].filter(o => o.isInstanceOf[DLIRelation]).map(d => d.asInstanceOf[DLIRelation])
publication.map(d => (d.getId, d))(Encoders.tuple(Encoders.STRING, pubEncoder))
.groupByKey(_._1)(Encoders.STRING)
.agg(EBIAggregator.getPublicationAggregator().toColumn)
.agg(EBIAggregator.getDLIPublicationAggregator().toColumn)
.map(p => p._2)
.write.mode(SaveMode.Overwrite).save(s"$workingPath/publication")
dataset.map(d => (d.getId, d))(Encoders.tuple(Encoders.STRING, datasetEncoder))
.groupByKey(_._1)(Encoders.STRING)
.agg(EBIAggregator.getDatasetAggregator().toColumn)
.agg(EBIAggregator.getDLIDatasetAggregator().toColumn)
.map(p => p._2)
.write.mode(SaveMode.Overwrite).save(s"$workingPath/dataset")
relations.map(d => (s"${d.getSource}::${d.getRelType}::${d.getTarget}", d))(Encoders.tuple(Encoders.STRING, relEncoder))
.groupByKey(_._1)(Encoders.STRING)
.agg(EBIAggregator.getRelationAggregator().toColumn)
.agg(EBIAggregator.getDLIRelationAggregator().toColumn)
.map(p => p._2)
.write.mode(SaveMode.Overwrite).save(s"$workingPath/relation")
relations.map(r => (r.getSource, r.getTarget))(Encoders.tuple(Encoders.STRING,Encoders.STRING))
}
}

View File

@ -25,7 +25,8 @@ public class PMAuthor implements Serializable {
}
public String getFullName() {
return String.format("%s, %s", this.foreName, this.lastName);
return String
.format("%s, %s", this.foreName != null ? this.foreName : "", this.lastName != null ? this.lastName : "");
}
}

View File

@ -1,5 +1,96 @@
package eu.dnetlib.dhp.sx.graph
import eu.dnetlib.dhp.application.ArgumentApplicationParser
import eu.dnetlib.dhp.schema.oaf.Oaf
import eu.dnetlib.dhp.schema.scholexplorer.{DLIDataset, DLIPublication, DLIRelation, DLIUnknown}
import eu.dnetlib.dhp.sx.ebi.EBIAggregator
import eu.dnetlib.dhp.sx.ebi.model.{PMArticle, PMAuthor, PMJournal}
import org.apache.commons.io.IOUtils
import org.apache.spark.sql.{Dataset, Encoder, Encoders, SaveMode, SparkSession}
import org.slf4j.LoggerFactory
object SparkSplitOafTODLIEntities {
def getKeyRelation(rel:DLIRelation):String = {
s"${rel.getSource}::${rel.getRelType}::${rel.getTarget}"
}
def main(args: Array[String]): Unit = {
val parser = new ArgumentApplicationParser(IOUtils.toString(SparkSplitOafTODLIEntities.getClass.getResourceAsStream("/eu/dnetlib/dhp/sx/graph/argumentparser/input_extract_entities_parameters.json")))
val logger = LoggerFactory.getLogger(SparkSplitOafTODLIEntities.getClass)
parser.parseArgument(args)
val workingPath: String = parser.get("workingPath")
logger.info(s"Working dir path = $workingPath")
implicit val oafEncoder: Encoder[Oaf] = Encoders.kryo[Oaf]
implicit val pubEncoder: Encoder[DLIPublication] = Encoders.kryo[DLIPublication]
implicit val datEncoder: Encoder[DLIDataset] = Encoders.kryo[DLIDataset]
implicit val unkEncoder: Encoder[DLIUnknown] = Encoders.kryo[DLIUnknown]
implicit val relEncoder: Encoder[DLIRelation] = Encoders.kryo[DLIRelation]
val spark:SparkSession = SparkSession
.builder()
.appName(SparkSplitOafTODLIEntities.getClass.getSimpleName)
.master(parser.get("master"))
.getOrCreate()
val OAFDataset:Dataset[Oaf] = spark.read.load(s"$workingPath/OAFDataset").as[Oaf]
OAFDataset
.filter(s => s != null && s.isInstanceOf[DLIPublication])
.map(s =>s.asInstanceOf[DLIPublication])
.map(d => (d.getId, d))(Encoders.tuple(Encoders.STRING, pubEncoder))
.groupByKey(_._1)(Encoders.STRING)
.agg(EBIAggregator.getDLIPublicationAggregator().toColumn)
.map(p => p._2)
.repartition(1000)
.write.mode(SaveMode.Overwrite).save(s"$workingPath/publication")
OAFDataset
.filter(s => s != null && s.isInstanceOf[DLIDataset])
.map(s =>s.asInstanceOf[DLIDataset])
.map(d => (d.getId, d))(Encoders.tuple(Encoders.STRING, datEncoder))
.groupByKey(_._1)(Encoders.STRING)
.agg(EBIAggregator.getDLIDatasetAggregator().toColumn)
.map(p => p._2)
.repartition(1000)
.write.mode(SaveMode.Overwrite).save(s"$workingPath/dataset")
OAFDataset
.filter(s => s != null && s.isInstanceOf[DLIUnknown])
.map(s =>s.asInstanceOf[DLIUnknown])
.map(d => (d.getId, d))(Encoders.tuple(Encoders.STRING, unkEncoder))
.groupByKey(_._1)(Encoders.STRING)
.agg(EBIAggregator.getDLIUnknownAggregator().toColumn)
.map(p => p._2)
.repartition(1000)
.write.mode(SaveMode.Overwrite).save(s"$workingPath/unknown")
OAFDataset
.filter(s => s != null && s.isInstanceOf[DLIRelation])
.map(s =>s.asInstanceOf[DLIRelation])
.map(d => (getKeyRelation(d), d))(Encoders.tuple(Encoders.STRING, relEncoder))
.groupByKey(_._1)(Encoders.STRING)
.agg(EBIAggregator.getDLIRelationAggregator().toColumn)
.map(p => p._2)
.repartition(1000)
.write.mode(SaveMode.Overwrite).save(s"$workingPath/relation")
}
}

View File

@ -1,5 +1,73 @@
package eu.dnetlib.dhp.sx.graph
import eu.dnetlib.dhp.application.ArgumentApplicationParser
import eu.dnetlib.dhp.schema.oaf.Oaf
import eu.dnetlib.dhp.schema.scholexplorer.{DLIDataset, DLIPublication, DLIRelation}
import eu.dnetlib.dhp.sx.graph.parser.{DatasetScholexplorerParser, PublicationScholexplorerParser}
import eu.dnetlib.scholexplorer.relation.RelationMapper
import org.apache.commons.io.IOUtils
import org.apache.hadoop.io.{IntWritable, Text}
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Encoder, Encoders, SaveMode, SparkSession}
import org.slf4j.LoggerFactory
import scala.collection.JavaConverters._
/**
* This new version of the Job read a sequential File containing XML stored in the aggregator and generates a Dataset OAF of heterogeneous
* entities like Dataset, Relation, Publication and Unknown
*/
object SparkXMLToOAFDataset {
def main(args: Array[String]): Unit = {
val logger = LoggerFactory.getLogger(SparkXMLToOAFDataset.getClass)
val conf = new SparkConf()
val parser = new ArgumentApplicationParser(IOUtils.toString(SparkXMLToOAFDataset.getClass.getResourceAsStream("/eu/dnetlib/dhp/sx/graph/argumentparser/input_graph_scholix_parameters.json")))
parser.parseArgument(args)
val spark =
SparkSession
.builder()
.config(conf)
.appName(SparkXMLToOAFDataset.getClass.getSimpleName)
.master(parser.get("master")).getOrCreate()
val sc = spark.sparkContext
implicit val oafEncoder:Encoder[Oaf] = Encoders.kryo[Oaf]
implicit val datasetEncoder:Encoder[DLIDataset] = Encoders.kryo[DLIDataset]
implicit val publicationEncoder:Encoder[DLIPublication] = Encoders.kryo[DLIPublication]
implicit val relationEncoder:Encoder[DLIRelation] = Encoders.kryo[DLIRelation]
val relationMapper = RelationMapper.load
val inputPath: String = parser.get("sourcePath")
val entity: String = parser.get("entity")
val targetPath = parser.get("targetPath")
logger.info(s"Input path is $inputPath")
logger.info(s"Entity path is $entity")
logger.info(s"Target Path is $targetPath")
val scholixRdd:RDD[Oaf] = sc.sequenceFile(inputPath, classOf[IntWritable], classOf[Text])
.map(s => s._2.toString)
.flatMap(s => {
entity match {
case "publication" =>
val p = new PublicationScholexplorerParser
val l =p.parseObject(s, relationMapper)
if (l != null) l.asScala else List()
case "dataset" =>
val d = new DatasetScholexplorerParser
val l =d.parseObject(s, relationMapper)
if (l != null) l.asScala else List()
}
}).filter(s => s!= null)
spark.createDataset(scholixRdd).write.mode(SaveMode.Append).save(targetPath)
}
}

View File

@ -317,6 +317,15 @@ public class DatasetScholexplorerParser extends AbstractScholexplorerParser {
.collect(Collectors.toList()));
}
// TERRIBLE HACK TO AVOID EMPTY COLLECTED FROM
if (parsedObject.getDlicollectedfrom() == null) {
final KeyValue cf = new KeyValue();
cf.setKey("dli_________::europe_pmc__");
cf.setValue("Europe PMC");
parsedObject.setCollectedfrom(Collections.singletonList(cf));
}
if (StringUtils.isNotBlank(resolvedURL)) {
Instance i = new Instance();
i.setCollectedfrom(parsedObject.getCollectedfrom().get(0));

View File

@ -1,7 +1,7 @@
<configuration>
<!-- OCEAN -->
<!--
<property>
<name>jobTracker</name>
<value>yarnRM</value>
@ -18,26 +18,26 @@
<name>spark2YarnHistoryServerAddress</name>
<value>http://iis-cdh5-test-gw.ocean.icm.edu.pl:18089</value>
</property>
-->
<!-- GARR -->
<property>
<name>jobTracker</name>
<value>yarn</value>
</property>
<property>
<name>nameNode</name>
<value>hdfs://hadoop-rm1.garr-pa1.d4science.org:8020</value>
</property>
<property>
<name>hive_metastore_uris</name>
<value>thrift://hadoop-edge3.garr-pa1.d4science.org:9083</value>
</property>
<property>
<name>spark2YarnHistoryServerAddress</name>
<value>http://hadoop-rm2.garr-pa1.d4science.org:19888</value>
</property>
<!-- <property>-->
<!-- <name>jobTracker</name>-->
<!-- <value>yarn</value>-->
<!-- </property>-->
<!-- <property>-->
<!-- <name>nameNode</name>-->
<!-- <value>hdfs://hadoop-rm1.garr-pa1.d4science.org:8020</value>-->
<!-- </property>-->
<!-- <property>-->
<!-- <name>hive_metastore_uris</name>-->
<!-- <value>thrift://hadoop-edge3.garr-pa1.d4science.org:9083</value>-->
<!-- </property>-->
<!-- <property>-->
<!-- <name>spark2YarnHistoryServerAddress</name>-->
<!-- <value>http://hadoop-rm2.garr-pa1.d4science.org:19888</value>-->
<!-- </property>-->
<property>

View File

@ -18,7 +18,7 @@
</property>
</parameters>
<start to="GenerateUpdates"/>
<start to="CreateEBIDataSet"/>
<kill name="Kill">
@ -48,6 +48,28 @@
<error to="Kill"/>
</action>
<action name="CreateEBIDataSet">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn-cluster</master>
<mode>cluster</mode>
<name>Create EBI DataSet</name>
<class>eu.dnetlib.dhp.sx.ebi.SparkCreateEBIDataFrame</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.sql.shuffle.partitions=1000
${sparkExtraOPT}
</spark-opts>
<arg>--workingPath</arg><arg>${workingPath}</arg>
<arg>--master</arg><arg>yarn</arg>
</spark>
<ok to="GenerateUpdates"/>
<error to="Kill"/>
</action>
<action name="GenerateUpdates">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn-cluster</master>
@ -71,27 +93,7 @@
</action>
<action name="CreateEBIDataSet">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn-cluster</master>
<mode>cluster</mode>
<name>Create EBI DataSet</name>
<class>eu.dnetlib.dhp.sx.ebi.SparkCreateEBIDataFrame</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.sql.shuffle.partitions=1000
${sparkExtraOPT}
</spark-opts>
<arg>--workingPath</arg><arg>${workingPath}</arg>
<arg>--master</arg><arg>yarn</arg>
</spark>
<ok to="End"/>
<error to="Kill"/>
</action>
<end name="End"/>
</workflow-app>

View File

@ -1,7 +1,4 @@
[
{"paramName":"mt", "paramLongName":"master", "paramDescription": "should be local or yarn", "paramRequired": true},
{"paramName":"s", "paramLongName":"sourcePath", "paramDescription": "the path of the sequencial file to read", "paramRequired": true},
{"paramName":"t", "paramLongName":"targetPath", "paramDescription": "the path of the result data", "paramRequired": true},
{"paramName":"td", "paramLongName":"targetDir", "paramDescription": "the name of the result data", "paramRequired": true},
{"paramName":"e", "paramLongName":"entities", "paramDescription": "the entity type to be filtered", "paramRequired": true}
{"paramName":"mt", "paramLongName":"master", "paramDescription": "should be local or yarn", "paramRequired": true},
{"paramName":"w", "paramLongName":"workingPath", "paramDescription": "the work dir path", "paramRequired": true}
]

View File

@ -101,12 +101,17 @@
<master>yarn-cluster</master>
<mode>cluster</mode>
<name>Import ${entity} and related entities</name>
<class>eu.dnetlib.dhp.sx.graph.SparkScholexplorerGraphImporter</class>
<class>eu.dnetlib.dhp.sx.graph.SparkXMLToOAFDataset</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>--executor-memory ${sparkExecutorMemory} --driver-memory=${sparkDriverMemory} ${sparkExtraOPT}</spark-opts>
<arg>-mt</arg> <arg>yarn-cluster</arg>
<spark-opts>
--executor-memory ${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
${sparkExtraOPT}
</spark-opts>
<arg>-mt</arg> <arg>yarn</arg>
<arg>--sourcePath</arg><arg>${targetXMLPath}</arg>
<arg>--targetPath</arg><arg>${targetEntityPath}</arg>
<arg>--targetPath</arg><arg>${workingPath}/input/OAFDataset</arg>
<arg>--entity</arg><arg>${entity}</arg>
</spark>
<ok to="End"/>

View File

@ -1,16 +1,8 @@
<workflow-app name="Create Raw Graph Step 2: Map XML to OAF Entities" xmlns="uri:oozie:workflow:0.5">
<parameters>
<property>
<name>sourcePath</name>
<description>the source path</description>
</property>
<property>
<name>targetPath</name>
<description>the source path</description>
</property>
<property>
<name>targetDir</name>
<description>the name of the path</description>
<name>workingPath</name>
<description>the working path</description>
</property>
<property>
<name>sparkDriverMemory</name>
@ -20,32 +12,13 @@
<name>sparkExecutorMemory</name>
<description>memory for individual executor</description>
</property>
<property>
<name>entities</name>
<description>the entities to be extracted</description>
</property>
</parameters>
<start to="DeleteTargetPath"/>
<start to="ExtractDLIEntities"/>
<kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<action name="DeleteTargetPath">
<fs>
<mkdir path="${targetPath}"/>
<mkdir path="${targetPath}/dataset"/>
<mkdir path="${targetPath}/publication"/>
<mkdir path="${targetPath}/unknown"/>
<mkdir path="${targetPath}/relation"/>
<delete path='${targetPath}/dataset/${targetDir}'/>
<delete path='${targetPath}/publication/${targetDir}'/>
<delete path='${targetPath}/unknown/${targetDir}'/>
<delete path='${targetPath}/relation/${targetDir}'/>
</fs>
<ok to="ExtractDLIEntities"/>
<error to="Kill"/>
</action>
<action name="ExtractDLIEntities">
<spark xmlns="uri:oozie:spark-action:0.2">
@ -53,19 +26,18 @@
<name-node>${nameNode}</name-node>
<master>yarn-cluster</master>
<mode>cluster</mode>
<name>Extract ${entities}</name>
<class>eu.dnetlib.dhp.sx.graph.SparkExtractEntitiesJob</class>
<name>Extract DLI Entities</name>
<class>eu.dnetlib.dhp.sx.graph.SparkSplitOafTODLIEntities</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory ${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.sql.shuffle.partitions=3840
${sparkExtraOPT}
</spark-opts>
<arg>-mt</arg> <arg>yarn-cluster</arg>
<arg>--sourcePath</arg><arg>${sourcePath}</arg>
<arg>--targetPath</arg><arg>${targetPath}</arg>
<arg>--targetDir</arg><arg>${targetDir}</arg>
<arg>--entities</arg><arg>${entities}</arg>
<arg>--workingPath</arg><arg>${workingPath}</arg>
</spark>
<ok to="End"/>
<error to="Kill"/>