This commit is contained in:
Claudio Atzori 2020-10-09 13:53:21 +02:00
commit e751c1402f
12 changed files with 228 additions and 99 deletions

View File

@ -14,6 +14,8 @@ import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.util.matching.Regex
case class CrossrefDT(doi: String, json:String) {}
case class mappingAffiliation(name: String) {}
case class mappingAuthor(given: Option[String], family: String, ORCID: Option[String], affiliation: Option[mappingAffiliation]) {}

View File

@ -0,0 +1,93 @@
package eu.dnetlib.doiboost.crossref
import eu.dnetlib.dhp.application.ArgumentApplicationParser
import org.apache.commons.io.IOUtils
import org.apache.spark.SparkConf
import org.apache.spark.sql.expressions.Aggregator
import org.apache.spark.sql.{Dataset, Encoder, Encoders, SaveMode, SparkSession}
import org.json4s
import org.json4s.DefaultFormats
import org.json4s.jackson.JsonMethods.parse
import org.slf4j.{Logger, LoggerFactory}
object CrossrefDataset {
def extractTimestamp(input:String): Long = {
implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats
lazy val json: json4s.JValue = parse(input)
(json\"indexed"\"timestamp").extractOrElse[Long](0)
}
def main(args: Array[String]): Unit = {
val logger: Logger = LoggerFactory.getLogger(SparkMapDumpIntoOAF.getClass)
val conf: SparkConf = new SparkConf()
val parser = new ArgumentApplicationParser(IOUtils.toString(CrossrefDataset.getClass.getResourceAsStream("/eu/dnetlib/dhp/doiboost/crossref_to_dataset_params.json")))
parser.parseArgument(args)
val spark: SparkSession =
SparkSession
.builder()
.config(conf)
.appName(SparkMapDumpIntoOAF.getClass.getSimpleName)
.master(parser.get("master")).getOrCreate()
import spark.implicits._
val crossrefAggregator = new Aggregator[CrossrefDT, CrossrefDT, CrossrefDT] with Serializable {
override def zero: CrossrefDT = null
override def reduce(b: CrossrefDT, a: CrossrefDT): CrossrefDT = {
if (b == null)
return a
if (a == null)
return b
val tb = extractTimestamp(b.json)
val ta = extractTimestamp(a.json)
if(ta >tb) {
return a
}
b
}
override def merge(a: CrossrefDT, b: CrossrefDT): CrossrefDT = {
if (b == null)
return a
if (a == null)
return b
val tb = extractTimestamp(b.json)
val ta = extractTimestamp(a.json)
if(ta >tb) {
return a
}
b
}
override def bufferEncoder: Encoder[CrossrefDT] = implicitly[Encoder[CrossrefDT]]
override def outputEncoder: Encoder[CrossrefDT] = implicitly[Encoder[CrossrefDT]]
override def finish(reduction: CrossrefDT): CrossrefDT = reduction
}
val sourcePath:String = parser.get("sourcePath")
val targetPath:String = parser.get("targetPath")
val ds:Dataset[CrossrefDT] = spark.read.load(sourcePath).as[CrossrefDT]
ds.groupByKey(_.doi)
.agg(crossrefAggregator.toColumn)
.map(s=>s._2)
.write.mode(SaveMode.Overwrite).save(targetPath)
}
}

View File

@ -46,11 +46,11 @@
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<main-class>eu.dnetlib.doiboost.crossref.CrossrefImporter</main-class>
<arg>-t</arg><arg>${workingPath}/input/crossref/index_dump</arg>
<arg>-t</arg><arg>${workingPath}/input/crossref/index_dump_1</arg>
<arg>-n</arg><arg>${nameNode}</arg>
<arg>-ts</arg><arg>${timestamp}</arg>
</java>
<ok to="ExtractCrossrefToOAF"/>
<ok to="End"/>
<error to="Kill"/>
</action>
@ -68,7 +68,7 @@
--driver-memory=${sparkDriverMemory}
${sparkExtraOPT}
</spark-opts>
<arg>--sourcePath</arg><arg>${workingPath}/input/crossref/index_dump,${workingPath}/crossref/index_dump</arg>
<arg>--sourcePath</arg><arg>${workingPath}/input/crossref/index_dump,${workingPath}/input/crossref/index_dump_1,${workingPath}/crossref/index_dump</arg>
<arg>--targetPath</arg><arg>${workingPath}/input/crossref</arg>
<arg>--master</arg><arg>yarn-cluster</arg>
</spark>
@ -76,5 +76,28 @@
<error to="Kill"/>
</action>
<action name="GenerateDataset">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn-cluster</master>
<mode>cluster</mode>
<name>ExtractCrossrefToOAF</name>
<class>eu.dnetlib.doiboost.crossref.CrossrefDataset</class>
<jar>dhp-doiboost-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
${sparkExtraOPT}
</spark-opts>
<arg>--sourcePath</arg><arg>/data/doiboost/crossref/cr_dataset</arg>
<arg>--targetPath</arg><arg>/data/doiboost/crossref/crossrefDataset</arg>
<arg>--master</arg><arg>yarn-cluster</arg>
</spark>
<ok to="End"/>
<error to="Kill"/>
</action>
<end name="End"/>
</workflow-app>

View File

@ -0,0 +1,6 @@
[
{"paramName":"s", "paramLongName":"sourcePath", "paramDescription": "the path of the sequencial file to read", "paramRequired": true},
{"paramName":"t", "paramLongName":"targetPath", "paramDescription": "the working dir path", "paramRequired": true},
{"paramName":"m", "paramLongName":"master", "paramDescription": "the master name", "paramRequired": true}
]

View File

@ -89,7 +89,7 @@
</spark-opts>
<arg>--dbPublicationPath</arg><arg>${workingDirPath}/doiBoostPublicationFiltered</arg>
<arg>--dbDatasetPath</arg><arg>${workingDirPath}/crossrefDataset</arg>
<arg>--crossRefRelation</arg><arg>/data/doiboost/input/crossref/relations</arg>
<arg>--crossRefRelation</arg><arg>${workingDirPath}/crossrefRelation</arg>
<arg>--dbaffiliationRelationPath</arg><arg>${workingDirPath}/doiBoostPublicationAffiliation</arg>
<arg>-do</arg><arg>${workingDirPath}/doiBoostOrganization</arg>
<arg>--targetPath</arg><arg>${workingDirPath}/actionDataSet</arg>

View File

@ -1,54 +1,45 @@
package eu.dnetlib.dhp.doiboost
import eu.dnetlib.dhp.schema.oaf.{Publication, Relation, StructuredProperty, Dataset => OafDataset}
import eu.dnetlib.dhp.schema.oaf.Project
import org.apache.spark.SparkContext
import org.apache.spark.sql.functions.{col, sum}
import org.apache.hadoop.io.Text
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Dataset, Encoder, Encoders, SparkSession}
import org.codehaus.jackson.map.ObjectMapper
import org.json4s.DefaultFormats
import org.json4s
import org.json4s.DefaultFormats
import org.json4s.JsonAST._
import org.json4s.jackson.JsonMethods._
import scala.::
import scala.collection.JavaConverters._
class QueryTest {
def extract_payload(input:String) :String = {
def extractLicense(p:Publication):Tuple2[String,String] = {
val tmp = p.getInstance().asScala.map(i => i.getLicense.getValue).distinct.mkString(",")
(p.getId,tmp)
}
implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats
lazy val json: json4s.JValue = parse(input)
def hasDOI(publication: Publication, doi:String):Boolean = {
compact(render((json \ "payload")))
val s = publication.getOriginalId.asScala.filter(i => i.equalsIgnoreCase(doi))
s.nonEmpty
}
def hasNullHostedBy(publication: Publication):Boolean = {
publication.getInstance().asScala.exists(i => i.getHostedby == null || i.getHostedby.getValue == null)
}
def myQuery(spark:SparkSession, sc:SparkContext): Unit = {
implicit val mapEncoderPub: Encoder[Project] = Encoders.kryo[Project]
// val ds:Dataset[Project] = spark.createDataset(sc.sequenceFile("", classOf[Text], classOf[Text])
// .map(_._2.toString)
// .map(s => new ObjectMapper().readValue(s, classOf[Project])))
//
// ds.write.saveAsTable()
def myQuery(spark:SparkSession): Unit = {
implicit val mapEncoderPub: Encoder[Publication] = Encoders.kryo[Publication]
implicit val mapEncoderDat: Encoder[OafDataset] = Encoders.kryo[OafDataset]
implicit val mapEncoderRel: Encoder[Relation] = Encoders.kryo[Relation]
val doiboostPubs:Dataset[Publication] = spark.read.load("/data/doiboost/process/doiBoostPublicationFiltered").as[Publication]
val relFunder: Dataset[Relation] = spark.read.format("org.apache.spark.sql.parquet").load("/data/doiboost/process/crossrefRelation").as[Relation]
doiboostPubs.filter(p => p.getDateofacceptance != null && p.getDateofacceptance.getValue!= null && p.getDateofacceptance.getValue.length > 0 )
doiboostPubs.filter(p=>hasDOI(p, "10.1016/j.is.2020.101522")).collect()(0).getDescription.get(0).getValue
doiboostPubs.filter(p=> hasNullHostedBy(p)).count()
doiboostPubs.map(p=> (p.getId, p.getBestaccessright.getClassname))(Encoders.tuple(Encoders.STRING,Encoders.STRING))
}
}

View File

@ -19,8 +19,6 @@ class CrossrefMappingTest {
@Test
def testFunderRelationshipsMapping(): Unit = {
val template = Source.fromInputStream(getClass.getResourceAsStream("article_funder_template.json")).mkString

View File

@ -84,6 +84,12 @@
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>eu.dnetlib.dhp</groupId>
<artifactId>dhp-dedup-openaire</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.jayway.jsonpath</groupId>
<artifactId>json-path</artifactId>

View File

@ -1,4 +1,5 @@
package eu.dnetlib.dhp.sx.ebi
import eu.dnetlib.dhp.oa.dedup.AuthorMerger
import eu.dnetlib.dhp.schema.oaf.{Publication, Relation, Dataset => OafDataset}
import eu.dnetlib.dhp.schema.scholexplorer.{DLIDataset, DLIPublication, DLIUnknown}
import org.apache.spark.sql.{Encoder, Encoders}
@ -14,6 +15,7 @@ object EBIAggregator {
override def reduce(b: OafDataset, a: (String, OafDataset)): OafDataset = {
b.mergeFrom(a._2)
b.setAuthor(AuthorMerger.mergeAuthor(a._2.getAuthor, b.getAuthor))
if (b.getId == null)
b.setId(a._2.getId)
b
@ -22,6 +24,7 @@ object EBIAggregator {
override def merge(wx: OafDataset, wy: OafDataset): OafDataset = {
wx.mergeFrom(wy)
wx.setAuthor(AuthorMerger.mergeAuthor(wy.getAuthor, wx.getAuthor))
if(wx.getId == null && wy.getId.nonEmpty)
wx.setId(wy.getId)
wx
@ -35,8 +38,6 @@ object EBIAggregator {
Encoders.kryo(classOf[OafDataset])
}
def getDLIUnknownAggregator(): Aggregator[(String, DLIUnknown), DLIUnknown, DLIUnknown] = new Aggregator[(String, DLIUnknown), DLIUnknown, DLIUnknown]{
override def zero: DLIUnknown = new DLIUnknown()
@ -69,6 +70,7 @@ object EBIAggregator {
override def reduce(b: DLIDataset, a: (String, DLIDataset)): DLIDataset = {
b.mergeFrom(a._2)
b.setAuthor(AuthorMerger.mergeAuthor(a._2.getAuthor, b.getAuthor))
if (b.getId == null)
b.setId(a._2.getId)
b
@ -76,6 +78,7 @@ object EBIAggregator {
override def merge(wx: DLIDataset, wy: DLIDataset): DLIDataset = {
wx.mergeFrom(wy)
wx.setAuthor(AuthorMerger.mergeAuthor(wy.getAuthor, wx.getAuthor))
if(wx.getId == null && wy.getId.nonEmpty)
wx.setId(wy.getId)
wx
@ -96,6 +99,8 @@ object EBIAggregator {
override def reduce(b: DLIPublication, a: (String, DLIPublication)): DLIPublication = {
b.mergeFrom(a._2)
b.setAuthor(AuthorMerger.mergeAuthor(a._2.getAuthor, b.getAuthor))
if (b.getId == null)
b.setId(a._2.getId)
b
@ -104,6 +109,7 @@ object EBIAggregator {
override def merge(wx: DLIPublication, wy: DLIPublication): DLIPublication = {
wx.mergeFrom(wy)
wx.setAuthor(AuthorMerger.mergeAuthor(wy.getAuthor, wx.getAuthor))
if(wx.getId == null && wy.getId.nonEmpty)
wx.setId(wy.getId)
wx
@ -124,6 +130,7 @@ object EBIAggregator {
override def reduce(b: Publication, a: (String, Publication)): Publication = {
b.mergeFrom(a._2)
b.setAuthor(AuthorMerger.mergeAuthor(a._2.getAuthor, b.getAuthor))
if (b.getId == null)
b.setId(a._2.getId)
b
@ -132,6 +139,7 @@ object EBIAggregator {
override def merge(wx: Publication, wy: Publication): Publication = {
wx.mergeFrom(wy)
wx.setAuthor(AuthorMerger.mergeAuthor(wy.getAuthor, wx.getAuthor))
if(wx.getId == null && wy.getId.nonEmpty)
wx.setId(wy.getId)
wx
@ -145,7 +153,6 @@ object EBIAggregator {
Encoders.kryo(classOf[Publication])
}
def getRelationAggregator(): Aggregator[(String, Relation), Relation, Relation] = new Aggregator[(String, Relation), Relation, Relation]{
override def zero: Relation = new Relation()
@ -166,10 +173,4 @@ object EBIAggregator {
override def outputEncoder: Encoder[Relation] =
Encoders.kryo(classOf[Relation])
}
}

View File

@ -2,4 +2,5 @@
package eu.dnetlib.dhp.sx.graph;
public class SparkScholexplorerGraphImporterTest {
}

View File

@ -0,0 +1,10 @@
{"collectedfrom":[{"key":"dli_________::datacite","value":"Datasets in Datacite","dataInfo":null}],"dataInfo":{"invisible":false,"inferred":null,"deletedbyinference":false,"trust":"0.9","inferenceprovenance":null,"provenanceaction":null},"lastupdatetimestamp":null,"id":"50|1307198540d2264d839dfd8c9a19f4a7","originalId":["10.3390/w11050916"],"pid":[{"value":"10.3390/w11050916","qualifier":{"classid":"doi","classname":"doi","schemeid":"dnet:pid_types","schemename":"dnet:pid_types"},"dataInfo":null}],"dateofcollection":"2018-10-28T00:39:04.337Z","dateoftransformation":null,"extraInfo":null,"oaiprovenance":null,"measures":null,"author":[{"fullname":"Cao, Qing","name":null,"surname":null,"rank":null,"pid":null,"affiliation":null},{"fullname":"Hao, Zhenchun","name":null,"surname":null,"rank":null,"pid":null,"affiliation":null},{"fullname":"Yuan, Feifei","name":null,"surname":null,"rank":null,"pid":null,"affiliation":null},{"fullname":"Berndtsson, Ronny","name":null,"surname":null,"rank":null,"pid":null,"affiliation":null},{"fullname":"Xu, Shijie","name":null,"surname":null,"rank":null,"pid":null,"affiliation":null},{"fullname":"Gao, Huibin","name":null,"surname":null,"rank":null,"pid":null,"affiliation":null},{"fullname":"Hao, Jie","name":null,"surname":null,"rank":null,"pid":null,"affiliation":null}],"resulttype":{"classid":"publication","classname":"publication","schemeid":"publication","schemename":"publication"},"language":null,"country":null,"subject":[],"title":[{"value":"On the Predictability of Daily Rainfall during Rainy Season over the Huaihe River Basin","qualifier":{"classid":"main title","classname":null,"schemeid":"dnet:dataCite_title","schemename":"dnet:dataCite_title"},"dataInfo":null}],"relevantdate":[{"value":"2019-05-01","qualifier":{"classid":"date","classname":"date","schemeid":"dnet::date","schemename":"dnet::date"},"dataInfo":null}],"description":[{"value":"In terms of climate change and precipitation, there is large interest in how large-scale climatic features affect regional rainfall amount and rainfall occurrence. Large-scale climate elements need to be downscaled to the regional level for hydrologic applications. Here, a new Nonhomogeneous Hidden Markov Model (NHMM) called the Bayesian-NHMM is presented for downscaling and predicting of multisite daily rainfall during rainy season over the Huaihe River Basin (HRB). The Bayesian-NHMM provides a Bayesian method for parameters estimation. The model avoids the risk to have no solutions for parameter estimation, which often occurs in the traditional NHMM that uses point estimates of parameters. The Bayesian-NHMM accurately captures seasonality and interannual variability of rainfall amount and wet days during the rainy season. The model establishes a link between large-scale meteorological characteristics and local precipitation patterns. It also provides a more stable and efficient method to estimate parameters...","dataInfo":null}],"dateofacceptance":null,"publisher":{"value":"MDPI AG","dataInfo":null},"embargoenddate":null,"source":null,"fulltext":null,"format":null,"contributor":null,"resourcetype":null,"coverage":null,"bestaccessright":null,"context":null,"externalReference":null,"instance":[{"license":null,"accessright":null,"instancetype":null,"hostedby":{"key":"openaire____::1256f046-bf1f-4afc-8b47-d0b147148b18","value":"Unknown Repository","dataInfo":null},"url":["10.3390/w11050916"],"distributionlocation":null,"collectedfrom":null,"dateofacceptance":null,"processingchargeamount":null,"processingchargecurrency":null,"refereed":null}],"journal":null,"originalObjIdentifier":"datacite____::100bb045f34ea2da81433d0b9ae3afa1","dlicollectedfrom":[{"id":"dli_________::datacite","name":"Datasets in Datacite","completionStatus":"complete","collectionMode":null}],"completionStatus":"complete"}
{"collectedfrom":[{"key":"dli_________::datacite","value":"Datasets in Datacite","dataInfo":null}],"dataInfo":{"invisible":false,"inferred":null,"deletedbyinference":false,"trust":"0.9","inferenceprovenance":null,"provenanceaction":null},"lastupdatetimestamp":null,"id":"50|1307198540d2264d839dfd8c9a19f4a7","originalId":["10.3390/w11050916"],"pid":[{"value":"10.3390/w11050916","qualifier":{"classid":"doi","classname":"doi","schemeid":"dnet:pid_types","schemename":"dnet:pid_types"},"dataInfo":null}],"dateofcollection":"2018-10-28T00:39:04.337Z","dateoftransformation":null,"extraInfo":null,"oaiprovenance":null,"measures":null,"author":[{"fullname":"Cao, Qing","name":null,"surname":null,"rank":null,"pid":null,"affiliation":null},{"fullname":"Hao, Zhenchun","name":null,"surname":null,"rank":null,"pid":null,"affiliation":null},{"fullname":"Yuan, Feifei","name":null,"surname":null,"rank":null,"pid":null,"affiliation":null},{"fullname":"Berndtsson, Ronny","name":null,"surname":null,"rank":null,"pid":null,"affiliation":null},{"fullname":"Xu, Shijie","name":null,"surname":null,"rank":null,"pid":null,"affiliation":null},{"fullname":"Gao, Huibin","name":null,"surname":null,"rank":null,"pid":null,"affiliation":null},{"fullname":"Hao, Jie","name":null,"surname":null,"rank":null,"pid":null,"affiliation":null}],"resulttype":{"classid":"publication","classname":"publication","schemeid":"publication","schemename":"publication"},"language":null,"country":null,"subject":[],"title":[{"value":"On the Predictability of Daily Rainfall during Rainy Season over the Huaihe River Basin","qualifier":{"classid":"main title","classname":null,"schemeid":"dnet:dataCite_title","schemename":"dnet:dataCite_title"},"dataInfo":null}],"relevantdate":[{"value":"2019-05-01","qualifier":{"classid":"date","classname":"date","schemeid":"dnet::date","schemename":"dnet::date"},"dataInfo":null}],"description":[{"value":"In terms of climate change and precipitation, there is large interest in how large-scale climatic features affect regional rainfall amount and rainfall occurrence. Large-scale climate elements need to be downscaled to the regional level for hydrologic applications. Here, a new Nonhomogeneous Hidden Markov Model (NHMM) called the Bayesian-NHMM is presented for downscaling and predicting of multisite daily rainfall during rainy season over the Huaihe River Basin (HRB). The Bayesian-NHMM provides a Bayesian method for parameters estimation. The model avoids the risk to have no solutions for parameter estimation, which often occurs in the traditional NHMM that uses point estimates of parameters. The Bayesian-NHMM accurately captures seasonality and interannual variability of rainfall amount and wet days during the rainy season. The model establishes a link between large-scale meteorological characteristics and local precipitation patterns. It also provides a more stable and efficient method to estimate parameters in the model. These results suggest that prediction of daily precipitation could be improved by the suggested new Bayesian-NHMM method, which can be helpful for water resources management and research on climate change.","dataInfo":null}],"dateofacceptance":null,"publisher":{"value":"MDPI AG","dataInfo":null},"embargoenddate":null,"source":null,"fulltext":null,"format":null,"contributor":null,"resourcetype":null,"coverage":null,"bestaccessright":null,"context":null,"externalReference":null,"instance":[{"license":null,"accessright":null,"instancetype":null,"hostedby":{"key":"openaire____::1256f046-bf1f-4afc-8b47-d0b147148b18","value":"Unknown Repository","dataInfo":null},"url":["10.3390/w11050916"],"distributionlocation":null,"collectedfrom":null,"dateofacceptance":null,"processingchargeamount":null,"processingchargecurrency":null,"refereed":null}],"journal":null,"originalObjIdentifier":"datacite____::100bb045f34ea2da81433d0b9ae3afa1","dlicollectedfrom":[{"id":"dli_________::datacite","name":"Datasets in Datacite","completionStatus":"complete","collectionMode":null}],"completionStatus":"complete"}
{"collectedfrom":[{"key":"dli_________::datacite","value":"Datasets in Datacite","dataInfo":null}],"dataInfo":{"invisible":false,"inferred":null,"deletedbyinference":false,"trust":"0.9","inferenceprovenance":null,"provenanceaction":null},"lastupdatetimestamp":null,"id":"50|1307198540d2264d839dfd8c9a19f4a7","originalId":["10.3390/w11050916"],"pid":[{"value":"10.3390/w11050916","qualifier":{"classid":"doi","classname":"doi","schemeid":"dnet:pid_types","schemename":"dnet:pid_types"},"dataInfo":null}],"dateofcollection":"2018-10-28T00:39:04.337Z","dateoftransformation":null,"extraInfo":null,"oaiprovenance":null,"measures":null,"author":[{"fullname":"Cao, Qing","name":null,"surname":null,"rank":null,"pid":null,"affiliation":null},{"fullname":"Hao, Zhenchun","name":null,"surname":null,"rank":null,"pid":null,"affiliation":null},{"fullname":"Yuan, Feifei","name":null,"surname":null,"rank":null,"pid":null,"affiliation":null},{"fullname":"Berndtsson, Ronny","name":null,"surname":null,"rank":null,"pid":null,"affiliation":null},{"fullname":"Xu, Shijie","name":null,"surname":null,"rank":null,"pid":null,"affiliation":null},{"fullname":"Gao, Huibin","name":null,"surname":null,"rank":null,"pid":null,"affiliation":null},{"fullname":"Hao, Jie","name":null,"surname":null,"rank":null,"pid":null,"affiliation":null}],"resulttype":{"classid":"publication","classname":"publication","schemeid":"publication","schemename":"publication"},"language":null,"country":null,"subject":[],"title":[{"value":"On the Predictability of Daily Rainfall during Rainy Season over the Huaihe River Basin","qualifier":{"classid":"main title","classname":null,"schemeid":"dnet:dataCite_title","schemename":"dnet:dataCite_title"},"dataInfo":null}],"relevantdate":[{"value":"2019-05-01","qualifier":{"classid":"date","classname":"date","schemeid":"dnet::date","schemename":"dnet::date"},"dataInfo":null}],"description":[{"value":"In terms of climate change and precipitation, there is large interest in how large-scale climatic features affect regional rainfall amount and rainfall occurrence. Large-scale climate elements need to be downscaled to the regional level for hydrologic applications. Here, a new Nonhomogeneous Hidden Markov Model (NHMM) called the Bayesian-NHMM is presented for downscaling and predicting of multisite daily rainfall during rainy season over the Huaihe River Basin (HRB). The Bayesian-NHMM provides a Bayesian method for parameters estimation. The model avoids the risk to have no solutions for parameter estimation, which often occurs in the traditional NHMM that uses point estimates of parameters. The Bayesian-NHMM accurately captures seasonality and interannual variability of rainfall amount and wet days during the rainy season. The model establishes a link between large-scale meteorological characteristics and local precipitation patterns. It also provides a more stable and efficient method to estimate parameters in the model. These results suggest that prediction of daily precipitation could be improved by the suggested new Bayesian-NHMM method, which can be helpful for water resources management and research on climate change.","dataInfo":null}],"dateofacceptance":null,"publisher":{"value":"MDPI AG","dataInfo":null},"embargoenddate":null,"source":null,"fulltext":null,"format":null,"contributor":null,"resourcetype":null,"coverage":null,"bestaccessright":null,"context":null,"externalReference":null,"instance":[{"license":null,"accessright":null,"instancetype":null,"hostedby":{"key":"openaire____::1256f046-bf1f-4afc-8b47-d0b147148b18","value":"Unknown Repository","dataInfo":null},"url":["10.3390/w11050916"],"distributionlocation":null,"collectedfrom":null,"dateofacceptance":null,"processingchargeamount":null,"processingchargecurrency":null,"refereed":null}],"journal":null,"originalObjIdentifier":"datacite____::100bb045f34ea2da81433d0b9ae3afa1","dlicollectedfrom":[{"id":"dli_________::datacite","name":"Datasets in Datacite","completionStatus":"complete","collectionMode":null}],"completionStatus":"complete"}
{"collectedfrom":[{"key":"dli_________::datacite","value":"Datasets in Datacite","dataInfo":null}],"dataInfo":{"invisible":false,"inferred":null,"deletedbyinference":false,"trust":"0.9","inferenceprovenance":null,"provenanceaction":null},"lastupdatetimestamp":null,"id":"50|1307198540d2264d839dfd8c9a19f4a7","originalId":["10.3390/w11050916"],"pid":[{"value":"10.3390/w11050916","qualifier":{"classid":"doi","classname":"doi","schemeid":"dnet:pid_types","schemename":"dnet:pid_types"},"dataInfo":null}],"dateofcollection":"2018-10-28T00:39:04.337Z","dateoftransformation":null,"extraInfo":null,"oaiprovenance":null,"measures":null,"author":[{"fullname":"Cao, Qing","name":null,"surname":null,"rank":null,"pid":null,"affiliation":null},{"fullname":"Hao, Zhenchun","name":null,"surname":null,"rank":null,"pid":null,"affiliation":null},{"fullname":"Yuan, Feifei","name":null,"surname":null,"rank":null,"pid":null,"affiliation":null},{"fullname":"Berndtsson, Ronny","name":null,"surname":null,"rank":null,"pid":null,"affiliation":null},{"fullname":"Xu, Shijie","name":null,"surname":null,"rank":null,"pid":null,"affiliation":null},{"fullname":"Gao, Huibin","name":null,"surname":null,"rank":null,"pid":null,"affiliation":null},{"fullname":"Hao, Jie","name":null,"surname":null,"rank":null,"pid":null,"affiliation":null}],"resulttype":{"classid":"publication","classname":"publication","schemeid":"publication","schemename":"publication"},"language":null,"country":null,"subject":[],"title":[{"value":"On the Predictability of Daily Rainfall during Rainy Season over the Huaihe River Basin","qualifier":{"classid":"main title","classname":null,"schemeid":"dnet:dataCite_title","schemename":"dnet:dataCite_title"},"dataInfo":null}],"relevantdate":[{"value":"2019-05-01","qualifier":{"classid":"date","classname":"date","schemeid":"dnet::date","schemename":"dnet::date"},"dataInfo":null}],"description":[{"value":"In terms of climate change and precipitation, there is large interest in how large-scale climatic features affect regional rainfall amount and rainfall occurrence. Large-scale climate elements need to be downscaled to the regional level for hydrologic applications. Here, a new Nonhomogeneous Hidden Markov Model (NHMM) called the Bayesian-NHMM is presented for downscaling and predicting of multisite daily rainfall during rainy season over the Huaihe River Basin (HRB). The Bayesian-NHMM provides a Bayesian method for parameters estimation. The model avoids the risk to have no solutions for parameter estimation, which often occurs in the traditional NHMM that uses point estimates of parameters. The Bayesian-NHMM accurately captures seasonality and interannual variability of rainfall amount and wet days during the rainy season. The model establishes a link between large-scale meteorological characteristics and local precipitation patterns. It also provides a more stable and efficient method to estimate parameters in the model. These results suggest that prediction of daily precipitation could be improved by the suggested new Bayesian-NHMM method, which can be helpful for water resources management and research on climate change.","dataInfo":null}],"dateofacceptance":null,"publisher":{"value":"MDPI AG","dataInfo":null},"embargoenddate":null,"source":null,"fulltext":null,"format":null,"contributor":null,"resourcetype":null,"coverage":null,"bestaccessright":null,"context":null,"externalReference":null,"instance":[{"license":null,"accessright":null,"instancetype":null,"hostedby":{"key":"openaire____::1256f046-bf1f-4afc-8b47-d0b147148b18","value":"Unknown Repository","dataInfo":null},"url":["10.3390/w11050916"],"distributionlocation":null,"collectedfrom":null,"dateofacceptance":null,"processingchargeamount":null,"processingchargecurrency":null,"refereed":null}],"journal":null,"originalObjIdentifier":"datacite____::100bb045f34ea2da81433d0b9ae3afa1","dlicollectedfrom":[{"id":"dli_________::datacite","name":"Datasets in Datacite","completionStatus":"complete","collectionMode":null}],"completionStatus":"complete"}
{"collectedfrom":[{"key":"dli_________::datacite","value":"Datasets in Datacite","dataInfo":null}],"dataInfo":{"invisible":false,"inferred":null,"deletedbyinference":false,"trust":"0.9","inferenceprovenance":null,"provenanceaction":null},"lastupdatetimestamp":null,"id":"50|1307198540d2264d839dfd8c9a19f4a7","originalId":["10.3390/w11050916"],"pid":[{"value":"10.3390/w11050916","qualifier":{"classid":"doi","classname":"doi","schemeid":"dnet:pid_types","schemename":"dnet:pid_types"},"dataInfo":null}],"dateofcollection":"2018-10-28T00:39:04.337Z","dateoftransformation":null,"extraInfo":null,"oaiprovenance":null,"measures":null,"author":[{"fullname":"Cao, Qing","name":null,"surname":null,"rank":null,"pid":null,"affiliation":null},{"fullname":"Hao, Zhenchun","name":null,"surname":null,"rank":null,"pid":null,"affiliation":null},{"fullname":"Yuan, Feifei","name":null,"surname":null,"rank":null,"pid":null,"affiliation":null},{"fullname":"Berndtsson, Ronny","name":null,"surname":null,"rank":null,"pid":null,"affiliation":null},{"fullname":"Xu, Shijie","name":null,"surname":null,"rank":null,"pid":null,"affiliation":null},{"fullname":"Gao, Huibin","name":null,"surname":null,"rank":null,"pid":null,"affiliation":null},{"fullname":"Hao, Jie","name":null,"surname":null,"rank":null,"pid":null,"affiliation":null}],"resulttype":{"classid":"publication","classname":"publication","schemeid":"publication","schemename":"publication"},"language":null,"country":null,"subject":[],"title":[{"value":"On the Predictability of Daily Rainfall during Rainy Season over the Huaihe River Basin","qualifier":{"classid":"main title","classname":null,"schemeid":"dnet:dataCite_title","schemename":"dnet:dataCite_title"},"dataInfo":null}],"relevantdate":[{"value":"2019-05-01","qualifier":{"classid":"date","classname":"date","schemeid":"dnet::date","schemename":"dnet::date"},"dataInfo":null}],"description":[{"value":"In terms of climate change and precipitation, there is large interest in how large-scale climatic features affect regional rainfall amount and rainfall occurrence. Large-scale climate elements need to be downscaled to the regional level for hydrologic applications. Here, a new Nonhomogeneous Hidden Markov Model (NHMM) called the Bayesian-NHMM is presented for downscaling and predicting of multisite daily rainfall during rainy season over the Huaihe River Basin (HRB). The Bayesian-NHMM provides a Bayesian method for parameters estimation. The model avoids the risk to have no solutions for parameter estimation, which often occurs in the traditional NHMM that uses point estimates of parameters. The Bayesian-NHMM accurately captures seasonality and interannual variability of rainfall amount and wet days during the rainy season. The model establishes a link between large-scale meteorological characteristics and local precipitation patterns. It also provides a more stable and efficient method to estimate parameters in the model. These results suggest that prediction of daily precipitation could be improved by the suggested new Bayesian-NHMM method, which can be helpful for water resources management and research on climate change.","dataInfo":null}],"dateofacceptance":null,"publisher":{"value":"MDPI AG","dataInfo":null},"embargoenddate":null,"source":null,"fulltext":null,"format":null,"contributor":null,"resourcetype":null,"coverage":null,"bestaccessright":null,"context":null,"externalReference":null,"instance":[{"license":null,"accessright":null,"instancetype":null,"hostedby":{"key":"openaire____::1256f046-bf1f-4afc-8b47-d0b147148b18","value":"Unknown Repository","dataInfo":null},"url":["10.3390/w11050916"],"distributionlocation":null,"collectedfrom":null,"dateofacceptance":null,"processingchargeamount":null,"processingchargecurrency":null,"refereed":null}],"journal":null,"originalObjIdentifier":"datacite____::100bb045f34ea2da81433d0b9ae3afa1","dlicollectedfrom":[{"id":"dli_________::datacite","name":"Datasets in Datacite","completionStatus":"complete","collectionMode":null}],"completionStatus":"complete"}
{"collectedfrom":[{"key":"dli_________::crossref","value":"Crossref","dataInfo":null}],"dataInfo":{"invisible":false,"inferred":null,"deletedbyinference":false,"trust":"0.9","inferenceprovenance":null,"provenanceaction":null},"lastupdatetimestamp":null,"id":"50|1307198540d2264d839dfd8c9a19f4a7","originalId":["1307198540d2264d839dfd8c9a19f4a7"],"pid":[{"value":"10.3390/w11050916","qualifier":{"classid":"doi","classname":"doi","schemeid":"dnet:pid_types","schemename":"dnet:pid_types"},"dataInfo":null}],"dateofcollection":"2020-10-04T14:16:06.105Z","dateoftransformation":null,"extraInfo":null,"oaiprovenance":null,"measures":null,"author":[{"fullname":"Cao Qing","name":null,"surname":null,"rank":null,"pid":null,"affiliation":null},{"fullname":"Hao Zhenchun","name":null,"surname":null,"rank":null,"pid":null,"affiliation":null},{"fullname":"Yuan Feifei","name":null,"surname":null,"rank":null,"pid":null,"affiliation":null},{"fullname":"Berndtsson Ronny","name":null,"surname":null,"rank":null,"pid":null,"affiliation":null},{"fullname":"Xu Shijie","name":null,"surname":null,"rank":null,"pid":null,"affiliation":null},{"fullname":"Gao Huibin","name":null,"surname":null,"rank":null,"pid":null,"affiliation":null},{"fullname":"Hao Jie","name":null,"surname":null,"rank":null,"pid":null,"affiliation":null}],"resulttype":{"classid":"publication","classname":"publication","schemeid":"publication","schemename":"publication"},"language":null,"country":null,"subject":[],"title":[{"value":"On the Predictability of Daily Rainfall during Rainy Season over the Huaihe River Basin","qualifier":{"classid":"main title","classname":null,"schemeid":"dnet:dataCite_title","schemename":"dnet:dataCite_title"},"dataInfo":null}],"relevantdate":[{"value":"2019-05-02T07:15:22Z","qualifier":{"classid":"date","classname":"date","schemeid":"dnet::date","schemename":"dnet::date"},"dataInfo":null}],"description":[{"value":null,"dataInfo":null}],"dateofacceptance":null,"publisher":{"value":"MDPI AG","dataInfo":null},"embargoenddate":null,"source":null,"fulltext":null,"format":null,"contributor":null,"resourcetype":null,"coverage":null,"bestaccessright":null,"context":null,"externalReference":null,"instance":[],"journal":null,"originalObjIdentifier":"dli_resolver::1307198540d2264d839dfd8c9a19f4a7","dlicollectedfrom":[{"id":"dli_________::crossref","name":"Crossref","completionStatus":"complete","collectionMode":"resolved"}],"completionStatus":"complete"}
{"collectedfrom":[{"key":"dli_________::crossref","value":"Crossref","dataInfo":null}],"dataInfo":{"invisible":false,"inferred":null,"deletedbyinference":false,"trust":"0.9","inferenceprovenance":null,"provenanceaction":null},"lastupdatetimestamp":null,"id":"50|1307198540d2264d839dfd8c9a19f4a7","originalId":["1307198540d2264d839dfd8c9a19f4a7"],"pid":[{"value":"10.3390/w11050916","qualifier":{"classid":"doi","classname":"doi","schemeid":"dnet:pid_types","schemename":"dnet:pid_types"},"dataInfo":null}],"dateofcollection":"2020-09-27T11:39:38.835Z","dateoftransformation":null,"extraInfo":null,"oaiprovenance":null,"measures":null,"author":[{"fullname":"Cao Qing","name":null,"surname":null,"rank":null,"pid":null,"affiliation":null},{"fullname":"Hao Zhenchun","name":null,"surname":null,"rank":null,"pid":null,"affiliation":null},{"fullname":"Yuan Feifei","name":null,"surname":null,"rank":null,"pid":null,"affiliation":null},{"fullname":"Berndtsson Ronny","name":null,"surname":null,"rank":null,"pid":null,"affiliation":null},{"fullname":"Xu Shijie","name":null,"surname":null,"rank":null,"pid":null,"affiliation":null},{"fullname":"Gao Huibin","name":null,"surname":null,"rank":null,"pid":null,"affiliation":null},{"fullname":"Hao Jie","name":null,"surname":null,"rank":null,"pid":null,"affiliation":null}],"resulttype":{"classid":"publication","classname":"publication","schemeid":"publication","schemename":"publication"},"language":null,"country":null,"subject":[],"title":[{"value":"On the Predictability of Daily Rainfall during Rainy Season over the Huaihe River Basin","qualifier":{"classid":"main title","classname":null,"schemeid":"dnet:dataCite_title","schemename":"dnet:dataCite_title"},"dataInfo":null}],"relevantdate":[{"value":"2019-05-02T07:15:22Z","qualifier":{"classid":"date","classname":"date","schemeid":"dnet::date","schemename":"dnet::date"},"dataInfo":null}],"description":[{"value":null,"dataInfo":null}],"dateofacceptance":null,"publisher":{"value":"MDPI AG","dataInfo":null},"embargoenddate":null,"source":null,"fulltext":null,"format":null,"contributor":null,"resourcetype":null,"coverage":null,"bestaccessright":null,"context":null,"externalReference":null,"instance":[],"journal":null,"originalObjIdentifier":"dli_resolver::1307198540d2264d839dfd8c9a19f4a7","dlicollectedfrom":[{"id":"dli_________::crossref","name":"Crossref","completionStatus":"complete","collectionMode":"resolved"}],"completionStatus":"complete"}
{"collectedfrom":[{"key":"dli_________::crossref","value":"Crossref","dataInfo":null}],"dataInfo":{"invisible":false,"inferred":null,"deletedbyinference":false,"trust":"0.9","inferenceprovenance":null,"provenanceaction":null},"lastupdatetimestamp":null,"id":"50|1307198540d2264d839dfd8c9a19f4a7","originalId":["1307198540d2264d839dfd8c9a19f4a7"],"pid":[{"value":"10.3390/w11050916","qualifier":{"classid":"doi","classname":"doi","schemeid":"dnet:pid_types","schemename":"dnet:pid_types"},"dataInfo":null}],"dateofcollection":"2020-08-30T11:48:49.809Z","dateoftransformation":null,"extraInfo":null,"oaiprovenance":null,"measures":null,"author":[{"fullname":"Cao Qing","name":null,"surname":null,"rank":null,"pid":null,"affiliation":null},{"fullname":"Hao Zhenchun","name":null,"surname":null,"rank":null,"pid":null,"affiliation":null},{"fullname":"Yuan Feifei","name":null,"surname":null,"rank":null,"pid":null,"affiliation":null},{"fullname":"Berndtsson Ronny","name":null,"surname":null,"rank":null,"pid":null,"affiliation":null},{"fullname":"Xu Shijie","name":null,"surname":null,"rank":null,"pid":null,"affiliation":null},{"fullname":"Gao Huibin","name":null,"surname":null,"rank":null,"pid":null,"affiliation":null},{"fullname":"Hao Jie","name":null,"surname":null,"rank":null,"pid":null,"affiliation":null}],"resulttype":{"classid":"publication","classname":"publication","schemeid":"publication","schemename":"publication"},"language":null,"country":null,"subject":[],"title":[{"value":"On the Predictability of Daily Rainfall during Rainy Season over the Huaihe River Basin","qualifier":{"classid":"main title","classname":null,"schemeid":"dnet:dataCite_title","schemename":"dnet:dataCite_title"},"dataInfo":null}],"relevantdate":[{"value":"2019-05-02T07:15:22Z","qualifier":{"classid":"date","classname":"date","schemeid":"dnet::date","schemename":"dnet::date"},"dataInfo":null}],"description":[{"value":null,"dataInfo":null}],"dateofacceptance":null,"publisher":{"value":"MDPI AG","dataInfo":null},"embargoenddate":null,"source":null,"fulltext":null,"format":null,"contributor":null,"resourcetype":null,"coverage":null,"bestaccessright":null,"context":null,"externalReference":null,"instance":[],"journal":null,"originalObjIdentifier":"dli_resolver::1307198540d2264d839dfd8c9a19f4a7","dlicollectedfrom":[{"id":"dli_________::crossref","name":"Crossref","completionStatus":"complete","collectionMode":"resolved"}],"completionStatus":"complete"}
{"collectedfrom":[{"key":"dli_________::crossref","value":"Crossref","dataInfo":null}],"dataInfo":{"invisible":false,"inferred":null,"deletedbyinference":false,"trust":"0.9","inferenceprovenance":null,"provenanceaction":null},"lastupdatetimestamp":null,"id":"50|1307198540d2264d839dfd8c9a19f4a7","originalId":["1307198540d2264d839dfd8c9a19f4a7"],"pid":[{"value":"10.3390/w11050916","qualifier":{"classid":"doi","classname":"doi","schemeid":"dnet:pid_types","schemename":"dnet:pid_types"},"dataInfo":null}],"dateofcollection":"2020-08-14T14:25:55.176Z","dateoftransformation":null,"extraInfo":null,"oaiprovenance":null,"measures":null,"author":[{"fullname":"Cao Qing","name":null,"surname":null,"rank":null,"pid":null,"affiliation":null},{"fullname":"Hao Zhenchun","name":null,"surname":null,"rank":null,"pid":null,"affiliation":null},{"fullname":"Yuan Feifei","name":null,"surname":null,"rank":null,"pid":null,"affiliation":null},{"fullname":"Berndtsson Ronny","name":null,"surname":null,"rank":null,"pid":null,"affiliation":null},{"fullname":"Xu Shijie","name":null,"surname":null,"rank":null,"pid":null,"affiliation":null},{"fullname":"Gao Huibin","name":null,"surname":null,"rank":null,"pid":null,"affiliation":null},{"fullname":"Hao Jie","name":null,"surname":null,"rank":null,"pid":null,"affiliation":null}],"resulttype":{"classid":"publication","classname":"publication","schemeid":"publication","schemename":"publication"},"language":null,"country":null,"subject":[],"title":[{"value":"On the Predictability of Daily Rainfall during Rainy Season over the Huaihe River Basin","qualifier":{"classid":"main title","classname":null,"schemeid":"dnet:dataCite_title","schemename":"dnet:dataCite_title"},"dataInfo":null}],"relevantdate":[{"value":"2019-05-02T07:15:22Z","qualifier":{"classid":"date","classname":"date","schemeid":"dnet::date","schemename":"dnet::date"},"dataInfo":null}],"description":[{"value":null,"dataInfo":null}],"dateofacceptance":null,"publisher":{"value":"MDPI AG","dataInfo":null},"embargoenddate":null,"source":null,"fulltext":null,"format":null,"contributor":null,"resourcetype":null,"coverage":null,"bestaccessright":null,"context":null,"externalReference":null,"instance":[],"journal":null,"originalObjIdentifier":"dli_resolver::1307198540d2264d839dfd8c9a19f4a7","dlicollectedfrom":[{"id":"dli_________::crossref","name":"Crossref","completionStatus":"complete","collectionMode":"resolved"}],"completionStatus":"complete"}
{"collectedfrom":[{"key":"dli_________::crossref","value":"Crossref","dataInfo":null}],"dataInfo":{"invisible":false,"inferred":null,"deletedbyinference":false,"trust":"0.9","inferenceprovenance":null,"provenanceaction":null},"lastupdatetimestamp":null,"id":"50|1307198540d2264d839dfd8c9a19f4a7","originalId":["1307198540d2264d839dfd8c9a19f4a7"],"pid":[{"value":"10.3390/w11050916","qualifier":{"classid":"doi","classname":"doi","schemeid":"dnet:pid_types","schemename":"dnet:pid_types"},"dataInfo":null}],"dateofcollection":"2020-08-09T11:35:23.526Z","dateoftransformation":null,"extraInfo":null,"oaiprovenance":null,"measures":null,"author":[{"fullname":"Cao Qing","name":null,"surname":null,"rank":null,"pid":null,"affiliation":null},{"fullname":"Hao Zhenchun","name":null,"surname":null,"rank":null,"pid":null,"affiliation":null},{"fullname":"Yuan Feifei","name":null,"surname":null,"rank":null,"pid":null,"affiliation":null},{"fullname":"Berndtsson Ronny","name":null,"surname":null,"rank":null,"pid":null,"affiliation":null},{"fullname":"Xu Shijie","name":null,"surname":null,"rank":null,"pid":null,"affiliation":null},{"fullname":"Gao Huibin","name":null,"surname":null,"rank":null,"pid":null,"affiliation":null},{"fullname":"Hao Jie","name":null,"surname":null,"rank":null,"pid":null,"affiliation":null}],"resulttype":{"classid":"publication","classname":"publication","schemeid":"publication","schemename":"publication"},"language":null,"country":null,"subject":[],"title":[{"value":"On the Predictability of Daily Rainfall during Rainy Season over the Huaihe River Basin","qualifier":{"classid":"main title","classname":null,"schemeid":"dnet:dataCite_title","schemename":"dnet:dataCite_title"},"dataInfo":null}],"relevantdate":[{"value":"2019-05-02T07:15:22Z","qualifier":{"classid":"date","classname":"date","schemeid":"dnet::date","schemename":"dnet::date"},"dataInfo":null}],"description":[{"value":null,"dataInfo":null}],"dateofacceptance":null,"publisher":{"value":"MDPI AG","dataInfo":null},"embargoenddate":null,"source":null,"fulltext":null,"format":null,"contributor":null,"resourcetype":null,"coverage":null,"bestaccessright":null,"context":null,"externalReference":null,"instance":[],"journal":null,"originalObjIdentifier":"dli_resolver::1307198540d2264d839dfd8c9a19f4a7","dlicollectedfrom":[{"id":"dli_________::crossref","name":"Crossref","completionStatus":"complete","collectionMode":"resolved"}],"completionStatus":"complete"}

View File

@ -32,10 +32,10 @@ object SparkExportContentForOpenAire {
.master(parser.get("master")).getOrCreate()
val sc:SparkContext = spark.sparkContext
val workingPath = parser.get("workingDirPath")
implicit val dliPubEncoder: Encoder[DLIPublication] = Encoders.kryo(classOf[DLIPublication])
implicit val dliDatEncoder: Encoder[DLIDataset] = Encoders.kryo(classOf[DLIDataset])
implicit val pubEncoder: Encoder[Publication] = Encoders.bean(classOf[Publication])
implicit val datEncoder: Encoder[OafDataset] = Encoders.bean(classOf[OafDataset])
implicit val relEncoder: Encoder[Relation] = Encoders.bean(classOf[Relation])
@ -43,40 +43,41 @@ object SparkExportContentForOpenAire {
import spark.implicits._
val relRDD:RDD[Relation] = sc.textFile(s"$workingPath/relation_j")
.map(s => new ObjectMapper().readValue(s, classOf[Relation]))
.filter(p => p.getDataInfo.getDeletedbyinference == false)
spark.createDataset(relRDD).write.mode(SaveMode.Overwrite).save(s"$workingPath/relationDS")
val dsRel = spark.read.load(s"$workingPath/relation_b").as[Relation]
dsRel.filter(r => r.getDataInfo==null || r.getDataInfo.getDeletedbyinference ==false).write.mode(SaveMode.Overwrite).save(s"$workingPath/export/relationDS")
val datRDD:RDD[OafDataset] = sc.textFile(s"$workingPath/dataset")
.map(s => new ObjectMapper().readValue(s, classOf[DLIDataset]))
val dsPubs = spark.read.load(s"$workingPath/publication").as[DLIPublication]
dsPubs
.filter(p=>p.getDataInfo.getDeletedbyinference == false)
.map(DLIToOAF.convertDLIPublicationToOAF)
.filter(p=>p!= null)
.write.mode(SaveMode.Overwrite).save(s"$workingPath/export/publicationDS")
val dsDataset = spark.read.load(s"$workingPath/dataset").as[DLIDataset]
dsDataset
.filter(p => p.getDataInfo.getDeletedbyinference == false)
.map(DLIToOAF.convertDLIDatasetTOOAF).filter(p=>p!= null)
spark.createDataset(datRDD).write.mode(SaveMode.Overwrite).save(s"$workingPath/datasetDS")
val pubRDD:RDD[Publication] = sc.textFile(s"$workingPath/publication")
.map(s => new ObjectMapper().readValue(s, classOf[DLIPublication]))
.filter(p => p.getDataInfo.getDeletedbyinference == false)
.map(DLIToOAF.convertDLIPublicationToOAF).filter(p=>p!= null)
spark.createDataset(pubRDD).write.mode(SaveMode.Overwrite).save(s"$workingPath/publicationDS")
.write.mode(SaveMode.Overwrite).save(s"$workingPath/export/datasetDS")
val pubs:Dataset[Publication] = spark.read.load(s"$workingPath/publicationDS").as[Publication]
val dats :Dataset[OafDataset] = spark.read.load(s"$workingPath/datasetDS").as[OafDataset]
val relDS1 :Dataset[Relation] = spark.read.load(s"$workingPath/relationDS").as[Relation]
val pubs:Dataset[Publication] = spark.read.load(s"$workingPath/export/publicationDS").as[Publication]
val dats :Dataset[OafDataset] = spark.read.load(s"$workingPath/export/datasetDS").as[OafDataset]
val relDS1 :Dataset[Relation] = spark.read.load(s"$workingPath/export/relationDS").as[Relation]
val pub_id = pubs.select("id").distinct()
val dat_id = dats.select("id").distinct()
pub_id.joinWith(relDS1, pub_id("id").equalTo(relDS1("source"))).map(k => k._2).write.mode(SaveMode.Overwrite).save(s"$workingPath/relationDS_f1")
pub_id.joinWith(relDS1, pub_id("id").equalTo(relDS1("source"))).map(k => k._2).write.mode(SaveMode.Overwrite).save(s"$workingPath/export/relationDS_f1")
val relDS2= spark.read.load(s"$workingPath/relationDS_f1").as[Relation]
val relDS2= spark.read.load(s"$workingPath/export/relationDS_f1").as[Relation]
relDS2.joinWith(dat_id, relDS2("target").equalTo(dats("id"))).map(k => k._1).write.mode(SaveMode.Overwrite).save(s"$workingPath/relationDS_filtered")
relDS2.joinWith(dat_id, relDS2("target").equalTo(dats("id"))).map(k => k._1).write.mode(SaveMode.Overwrite).save(s"$workingPath/export/relationDS_filtered")
val r_source = relDS2.select(relDS2("source")).distinct()
@ -87,22 +88,20 @@ object SparkExportContentForOpenAire {
pubs.joinWith(r_source, pubs("id").equalTo(r_source("source")), "inner").map(k => k._1)
.withColumn("row",row_number.over(w2)).where($"row" === 1).drop("row")
.write.mode(SaveMode.Overwrite).save(s"$workingPath/publicationDS_filtered")
.write.mode(SaveMode.Overwrite).save(s"$workingPath/export/publicationDS_filtered")
dats.joinWith(r_target, dats("id").equalTo(r_target("target")), "inner").map(k => k._1)
.withColumn("row",row_number.over(w2)).where($"row" === 1).drop("row")
.write.mode(SaveMode.Overwrite).save(s"$workingPath/datasetAS")
.write.mode(SaveMode.Overwrite).save(s"$workingPath/export/datasetAS")
spark.createDataset(sc.textFile(s"$workingPath/dataset")
.map(s => new ObjectMapper().readValue(s, classOf[DLIDataset]))
.map(DLIToOAF.convertDLIDatasetToExternalReference)
.filter(p => p != null)).as[DLIExternalReference].write.mode(SaveMode.Overwrite).save(s"$workingPath/externalReference")
val pf = spark.read.load(s"$workingPath/publicationDS_filtered").select("id")
val relDS3 = spark.read.load(s"$workingPath/relationDS").as[Relation]
dsDataset.map(DLIToOAF.convertDLIDatasetToExternalReference).filter(p => p != null).write.mode(SaveMode.Overwrite).save(s"$workingPath/export/externalReference")
val pf = spark.read.load(s"$workingPath/export/publicationDS_filtered").select("id")
val relDS3 = spark.read.load(s"$workingPath/export/relationDS").as[Relation]
val relationTo = pf.joinWith(relDS3, pf("id").equalTo(relDS3("source")),"inner").map(t =>t._2)
val extRef = spark.read.load(s"$workingPath/externalReference").as[DLIExternalReference]
val extRef = spark.read.load(s"$workingPath/export/externalReference").as[DLIExternalReference]
spark.createDataset(relationTo.joinWith(extRef, relationTo("target").equalTo(extRef("id")), "inner").map(d => {
val r = d._1
@ -112,11 +111,11 @@ object SparkExportContentForOpenAire {
var dli_ext = ArrayBuffer[DLIExternalReference]()
f._2.foreach(d => if (dli_ext.size < 100) dli_ext += d )
(f._1, dli_ext)
})).write.mode(SaveMode.Overwrite).save(s"$workingPath/externalReference_grouped")
})).write.mode(SaveMode.Overwrite).save(s"$workingPath/export/externalReference_grouped")
val pubf :Dataset[Publication] = spark.read.load(s"$workingPath/publicationDS_filtered").as[Publication]
val pubf :Dataset[Publication] = spark.read.load(s"$workingPath/export/publicationDS_filtered").as[Publication]
val groupedERf:Dataset[(String, List[DLIExternalReference])]= spark.read.load(s"$workingPath/externalReference_grouped").as[(String, List[DLIExternalReference])]
val groupedERf:Dataset[(String, List[DLIExternalReference])]= spark.read.load(s"$workingPath/export/externalReference_grouped").as[(String, List[DLIExternalReference])]
groupedERf.joinWith(pubf,pubf("id").equalTo(groupedERf("_1"))).map(t =>
{
@ -128,29 +127,28 @@ object SparkExportContentForOpenAire {
} else
publication
}
).write.mode(SaveMode.Overwrite).save(s"$workingPath/publicationAS")
).write.mode(SaveMode.Overwrite).save(s"$workingPath/export/publicationAS")
spark.createDataset(sc.textFile(s"$workingPath/dataset")
.map(s => new ObjectMapper().readValue(s, classOf[DLIDataset]))
dsDataset
.map(DLIToOAF.convertClinicalTrial)
.filter(p => p != null))
.write.mode(SaveMode.Overwrite).save(s"$workingPath/clinicalTrials")
.filter(p => p != null)
.write.mode(SaveMode.Overwrite).save(s"$workingPath/export/clinicalTrials")
val ct:Dataset[(String,String)] = spark.read.load(s"$workingPath/clinicalTrials").as[(String,String)]
val ct:Dataset[(String,String)] = spark.read.load(s"$workingPath/export/clinicalTrials").as[(String,String)]
val relDS= spark.read.load(s"$workingPath/relationDS_f1").as[Relation]
val relDS= spark.read.load(s"$workingPath/export/relationDS_f1").as[Relation]
relDS.joinWith(ct, relDS("target").equalTo(ct("_1")), "inner")
.map(k =>{
val currentRel = k._1
currentRel.setTarget(k._2._2)
currentRel
}).write.mode(SaveMode.Overwrite).save(s"$workingPath/clinicalTrialsRels")
}).write.mode(SaveMode.Overwrite).save(s"$workingPath/export/clinicalTrialsRels")
val clRels:Dataset[Relation] = spark.read.load(s"$workingPath/clinicalTrialsRels").as[Relation]
val rels:Dataset[Relation] = spark.read.load(s"$workingPath/relationDS_filtered").as[Relation]
val clRels:Dataset[Relation] = spark.read.load(s"$workingPath/export/clinicalTrialsRels").as[Relation]
val rels:Dataset[Relation] = spark.read.load(s"$workingPath/export/relationDS_filtered").as[Relation]
rels.union(clRels).flatMap(r => {
val inverseRel = new Relation
@ -162,18 +160,18 @@ object SparkExportContentForOpenAire {
inverseRel.setSubRelType(r.getSubRelType)
inverseRel.setRelClass(DLIToOAF.rel_inverse(r.getRelClass))
List(r, inverseRel)
}).write.mode(SaveMode.Overwrite).save(s"$workingPath/relationAS")
}).write.mode(SaveMode.Overwrite).save(s"$workingPath/export/relationAS")
spark.read.load(s"$workingPath/publicationAS").as[Publication].map(DLIToOAF.fixInstance).write.mode(SaveMode.Overwrite).save(s"$workingPath/publicationAS_fixed")
spark.read.load(s"$workingPath/datasetAS").as[OafDataset].map(DLIToOAF.fixInstanceDataset).write.mode(SaveMode.Overwrite).save(s"$workingPath/datasetAS_fixed")
spark.read.load(s"$workingPath/export/publicationAS").as[Publication].map(DLIToOAF.fixInstance).write.mode(SaveMode.Overwrite).save(s"$workingPath/export/publicationAS_fixed")
spark.read.load(s"$workingPath/export/datasetAS").as[OafDataset].map(DLIToOAF.fixInstanceDataset).write.mode(SaveMode.Overwrite).save(s"$workingPath/export/datasetAS_fixed")
val fRels:Dataset[(String,String)] = spark.read.load(s"$workingPath/relationAS").as[Relation].map(DLIToOAF.toActionSet)
val fpubs:Dataset[(String,String)] = spark.read.load(s"$workingPath/publicationAS_fixed").as[Publication].map(DLIToOAF.toActionSet)
val fdats:Dataset[(String,String)] = spark.read.load(s"$workingPath/datasetAS_fixed").as[OafDataset].map(DLIToOAF.toActionSet)
val fRels:Dataset[(String,String)] = spark.read.load(s"$workingPath/export/relationAS").as[Relation].map(DLIToOAF.toActionSet)
val fpubs:Dataset[(String,String)] = spark.read.load(s"$workingPath/export/publicationAS_fixed").as[Publication].map(DLIToOAF.toActionSet)
val fdats:Dataset[(String,String)] = spark.read.load(s"$workingPath/export/datasetAS_fixed").as[OafDataset].map(DLIToOAF.toActionSet)
fRels.union(fpubs).union(fdats).rdd.map(s => (new Text(s._1), new Text(s._2))).saveAsHadoopFile(s"$workingPath/rawset", classOf[Text], classOf[Text], classOf[SequenceFileOutputFormat[Text,Text]], classOf[GzipCodec])
fRels.union(fpubs).union(fdats).rdd.map(s => (new Text(s._1), new Text(s._2))).saveAsHadoopFile(s"$workingPath/export/rawset", classOf[Text], classOf[Text], classOf[SequenceFileOutputFormat[Text,Text]], classOf[GzipCodec])
}