From 46631a44217917f8a23b4c019d7f73641e255e67 Mon Sep 17 00:00:00 2001 From: Sandro La Bruzzo Date: Wed, 24 Jun 2020 14:06:38 +0200 Subject: [PATCH] updated mapping scholexplorer to OAF --- .../java/eu/dnetlib/dhp/export/DLIToOAF.scala | 16 +++++++++++++++- .../export/SparkExportContentForOpenAire.scala | 11 +++++++---- 2 files changed, 22 insertions(+), 5 deletions(-) diff --git a/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/export/DLIToOAF.scala b/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/export/DLIToOAF.scala index 637362acf0..967834d694 100644 --- a/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/export/DLIToOAF.scala +++ b/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/export/DLIToOAF.scala @@ -5,7 +5,7 @@ import java.time.format.DateTimeFormatter import eu.dnetlib.dhp.common.PacePerson import eu.dnetlib.dhp.schema.action.AtomicAction -import eu.dnetlib.dhp.schema.oaf.{Author, DataInfo, Dataset, ExternalReference, Field, Instance, KeyValue, Oaf, Publication, Qualifier, Relation, StructuredProperty} +import eu.dnetlib.dhp.schema.oaf.{Author, DataInfo, Dataset, ExternalReference, Field, Instance, KeyValue, Oaf, Publication, Qualifier, Relation, Result, StructuredProperty} import eu.dnetlib.dhp.schema.scholexplorer.{DLIDataset, DLIPublication, DLIRelation} import eu.dnetlib.dhp.utils.DHPUtils import org.apache.commons.lang3.StringUtils @@ -99,6 +99,20 @@ object DLIToOAF { ) + def fixInstance(r:Publication) :Publication = { + val collectedFrom = r.getCollectedfrom.asScala.head + r.getInstance().asScala.foreach(i => i.setCollectedfrom(collectedFrom)) + r + } + + + def fixInstanceDataset(r:Dataset) :Dataset = { + val collectedFrom = r.getCollectedfrom.asScala.head + r.getInstance().asScala.foreach(i => i.setCollectedfrom(collectedFrom)) + r + } + + def toActionSet(item: Oaf): (String, String) = { val mapper = new ObjectMapper() diff --git a/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/export/SparkExportContentForOpenAire.scala b/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/export/SparkExportContentForOpenAire.scala index edf951df4d..fd8f2d136d 100644 --- a/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/export/SparkExportContentForOpenAire.scala +++ b/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/export/SparkExportContentForOpenAire.scala @@ -1,7 +1,7 @@ package eu.dnetlib.dhp.`export` import eu.dnetlib.dhp.application.ArgumentApplicationParser -import eu.dnetlib.dhp.schema.oaf.{Publication, Relation, Dataset => OafDataset} +import eu.dnetlib.dhp.schema.oaf.{Instance, Publication, Relation, Dataset => OafDataset} import eu.dnetlib.dhp.schema.scholexplorer.{DLIDataset, DLIPublication, DLIRelation} import org.apache.commons.io.IOUtils import org.apache.hadoop.io.Text @@ -166,10 +166,13 @@ object SparkExportContentForOpenAire { }).write.mode(SaveMode.Overwrite).save(s"$workingPath/relationAS") - val fRels:Dataset[(String,String)] = spark.read.load(s"$workingPath/relationAS").as[Relation].map(DLIToOAF.toActionSet) - val fpubs:Dataset[(String,String)] = spark.read.load(s"$workingPath/publicationAS").as[Publication].map(DLIToOAF.toActionSet) - val fdats:Dataset[(String,String)] = spark.read.load(s"$workingPath/datasetAS").as[OafDataset].map(DLIToOAF.toActionSet) + spark.read.load(s"$workingPath/publicationAS").as[Publication].map(DLIToOAF.fixInstance).write.mode(SaveMode.Overwrite).save(s"$workingPath/publicationAS_fixed") + spark.read.load(s"$workingPath/datasetAS").as[OafDataset].map(DLIToOAF.fixInstanceDataset).write.mode(SaveMode.Overwrite).save(s"$workingPath/datasetAS_fixed") + + val fRels:Dataset[(String,String)] = spark.read.load(s"$workingPath/relationAS").as[Relation].map(DLIToOAF.toActionSet) + val fpubs:Dataset[(String,String)] = spark.read.load(s"$workingPath/publicationAS_fixed").as[Publication].map(DLIToOAF.toActionSet) + val fdats:Dataset[(String,String)] = spark.read.load(s"$workingPath/datasetAS_fixed").as[OafDataset].map(DLIToOAF.toActionSet) fRels.union(fpubs).union(fdats).rdd.map(s => (new Text(s._1), new Text(s._2))).saveAsHadoopFile(s"$workingPath/rawset", classOf[Text], classOf[Text], classOf[SequenceFileOutputFormat[Text,Text]], classOf[GzipCodec]) }