diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/Crossref2Oaf.scala b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/Crossref2Oaf.scala index b38e103bc..096217a55 100644 --- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/Crossref2Oaf.scala +++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/Crossref2Oaf.scala @@ -14,6 +14,8 @@ import scala.collection.JavaConverters._ import scala.collection.mutable import scala.util.matching.Regex +case class CrossrefDT(doi: String, json:String) {} + case class mappingAffiliation(name: String) {} case class mappingAuthor(given: Option[String], family: String, ORCID: Option[String], affiliation: Option[mappingAffiliation]) {} diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/CrossrefDataset.scala b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/CrossrefDataset.scala new file mode 100644 index 000000000..996ba5585 --- /dev/null +++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/CrossrefDataset.scala @@ -0,0 +1,93 @@ +package eu.dnetlib.doiboost.crossref + +import eu.dnetlib.dhp.application.ArgumentApplicationParser +import org.apache.commons.io.IOUtils +import org.apache.spark.SparkConf +import org.apache.spark.sql.expressions.Aggregator +import org.apache.spark.sql.{Dataset, Encoder, Encoders, SaveMode, SparkSession} +import org.json4s +import org.json4s.DefaultFormats +import org.json4s.jackson.JsonMethods.parse +import org.slf4j.{Logger, LoggerFactory} + +object CrossrefDataset { + + + def extractTimestamp(input:String): Long = { + + implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats + lazy val json: json4s.JValue = parse(input) + + (json\"indexed"\"timestamp").extractOrElse[Long](0) + + } + + + def main(args: Array[String]): Unit = { + + + val logger: Logger = LoggerFactory.getLogger(SparkMapDumpIntoOAF.getClass) + val conf: SparkConf = new SparkConf() + val parser = new ArgumentApplicationParser(IOUtils.toString(CrossrefDataset.getClass.getResourceAsStream("/eu/dnetlib/dhp/doiboost/crossref_to_dataset_params.json"))) + parser.parseArgument(args) + val spark: SparkSession = + SparkSession + .builder() + .config(conf) + .appName(SparkMapDumpIntoOAF.getClass.getSimpleName) + .master(parser.get("master")).getOrCreate() + import spark.implicits._ + + + val crossrefAggregator = new Aggregator[CrossrefDT, CrossrefDT, CrossrefDT] with Serializable { + + override def zero: CrossrefDT = null + + override def reduce(b: CrossrefDT, a: CrossrefDT): CrossrefDT = { + if (b == null) + return a + if (a == null) + return b + + val tb = extractTimestamp(b.json) + val ta = extractTimestamp(a.json) + if(ta >tb) { + return a + } + b + } + + override def merge(a: CrossrefDT, b: CrossrefDT): CrossrefDT = { + if (b == null) + return a + if (a == null) + return b + + val tb = extractTimestamp(b.json) + val ta = extractTimestamp(a.json) + if(ta >tb) { + return a + } + b + } + + override def bufferEncoder: Encoder[CrossrefDT] = implicitly[Encoder[CrossrefDT]] + + override def outputEncoder: Encoder[CrossrefDT] = implicitly[Encoder[CrossrefDT]] + + override def finish(reduction: CrossrefDT): CrossrefDT = reduction + } + + val sourcePath:String = parser.get("sourcePath") + val targetPath:String = parser.get("targetPath") + + val ds:Dataset[CrossrefDT] = spark.read.load(sourcePath).as[CrossrefDT] + + ds.groupByKey(_.doi) + .agg(crossrefAggregator.toColumn) + .map(s=>s._2) + .write.mode(SaveMode.Overwrite).save(targetPath) + + } + +} diff --git a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/crossref/oozie_app/workflow.xml b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/crossref/oozie_app/workflow.xml index db4ac96f9..be4a45afe 100644 --- a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/crossref/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/crossref/oozie_app/workflow.xml @@ -46,11 +46,11 @@ ${jobTracker} ${nameNode} eu.dnetlib.doiboost.crossref.CrossrefImporter - -t${workingPath}/input/crossref/index_dump + -t${workingPath}/input/crossref/index_dump_1 -n${nameNode} -ts${timestamp} - + @@ -68,7 +68,7 @@ --driver-memory=${sparkDriverMemory} ${sparkExtraOPT} - --sourcePath${workingPath}/input/crossref/index_dump,${workingPath}/crossref/index_dump + --sourcePath${workingPath}/input/crossref/index_dump,${workingPath}/input/crossref/index_dump_1,${workingPath}/crossref/index_dump --targetPath${workingPath}/input/crossref --masteryarn-cluster @@ -76,5 +76,28 @@ + + + + + yarn-cluster + cluster + ExtractCrossrefToOAF + eu.dnetlib.doiboost.crossref.CrossrefDataset + dhp-doiboost-${projectVersion}.jar + + --executor-memory=${sparkExecutorMemory} + --executor-cores=${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + ${sparkExtraOPT} + + --sourcePath/data/doiboost/crossref/cr_dataset + --targetPath/data/doiboost/crossref/crossrefDataset + --masteryarn-cluster + + + + + \ No newline at end of file diff --git a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/crossref_to_dataset_params.json b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/crossref_to_dataset_params.json new file mode 100644 index 000000000..312bd0751 --- /dev/null +++ b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/crossref_to_dataset_params.json @@ -0,0 +1,6 @@ +[ + {"paramName":"s", "paramLongName":"sourcePath", "paramDescription": "the path of the sequencial file to read", "paramRequired": true}, + {"paramName":"t", "paramLongName":"targetPath", "paramDescription": "the working dir path", "paramRequired": true}, + {"paramName":"m", "paramLongName":"master", "paramDescription": "the master name", "paramRequired": true} + +] \ No newline at end of file diff --git a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/intersection/oozie_app/workflow.xml b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/intersection/oozie_app/workflow.xml index bf91958cf..e35f88abd 100644 --- a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/intersection/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/intersection/oozie_app/workflow.xml @@ -89,7 +89,7 @@ --dbPublicationPath${workingDirPath}/doiBoostPublicationFiltered --dbDatasetPath${workingDirPath}/crossrefDataset - --crossRefRelation/data/doiboost/input/crossref/relations + --crossRefRelation${workingDirPath}/crossrefRelation --dbaffiliationRelationPath${workingDirPath}/doiBoostPublicationAffiliation -do${workingDirPath}/doiBoostOrganization --targetPath${workingDirPath}/actionDataSet diff --git a/dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/dhp/doiboost/QueryTest.scala b/dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/dhp/doiboost/QueryTest.scala index c393f0ae9..f23996420 100644 --- a/dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/dhp/doiboost/QueryTest.scala +++ b/dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/dhp/doiboost/QueryTest.scala @@ -1,54 +1,45 @@ package eu.dnetlib.dhp.doiboost -import eu.dnetlib.dhp.schema.oaf.{Publication, Relation, StructuredProperty, Dataset => OafDataset} +import eu.dnetlib.dhp.schema.oaf.Project +import org.apache.spark.SparkContext import org.apache.spark.sql.functions.{col, sum} +import org.apache.hadoop.io.Text +import org.apache.spark.rdd.RDD import org.apache.spark.sql.{Dataset, Encoder, Encoders, SparkSession} - +import org.codehaus.jackson.map.ObjectMapper +import org.json4s.DefaultFormats +import org.json4s +import org.json4s.DefaultFormats +import org.json4s.JsonAST._ +import org.json4s.jackson.JsonMethods._ import scala.:: import scala.collection.JavaConverters._ class QueryTest { + def extract_payload(input:String) :String = { - def extractLicense(p:Publication):Tuple2[String,String] = { - - val tmp = p.getInstance().asScala.map(i => i.getLicense.getValue).distinct.mkString(",") - (p.getId,tmp) - } + implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats + lazy val json: json4s.JValue = parse(input) - - def hasDOI(publication: Publication, doi:String):Boolean = { + compact(render((json \ "payload"))) - val s = publication.getOriginalId.asScala.filter(i => i.equalsIgnoreCase(doi)) - - s.nonEmpty } - def hasNullHostedBy(publication: Publication):Boolean = { - publication.getInstance().asScala.exists(i => i.getHostedby == null || i.getHostedby.getValue == null) - } + + def myQuery(spark:SparkSession, sc:SparkContext): Unit = { + implicit val mapEncoderPub: Encoder[Project] = Encoders.kryo[Project] + + +// val ds:Dataset[Project] = spark.createDataset(sc.sequenceFile("", classOf[Text], classOf[Text]) +// .map(_._2.toString) +// .map(s => new ObjectMapper().readValue(s, classOf[Project]))) +// +// ds.write.saveAsTable() - def myQuery(spark:SparkSession): Unit = { - implicit val mapEncoderPub: Encoder[Publication] = Encoders.kryo[Publication] - implicit val mapEncoderDat: Encoder[OafDataset] = Encoders.kryo[OafDataset] - implicit val mapEncoderRel: Encoder[Relation] = Encoders.kryo[Relation] - - val doiboostPubs:Dataset[Publication] = spark.read.load("/data/doiboost/process/doiBoostPublicationFiltered").as[Publication] - - val relFunder: Dataset[Relation] = spark.read.format("org.apache.spark.sql.parquet").load("/data/doiboost/process/crossrefRelation").as[Relation] - - doiboostPubs.filter(p => p.getDateofacceptance != null && p.getDateofacceptance.getValue!= null && p.getDateofacceptance.getValue.length > 0 ) - - doiboostPubs.filter(p=>hasDOI(p, "10.1016/j.is.2020.101522")).collect()(0).getDescription.get(0).getValue - - - - doiboostPubs.filter(p=> hasNullHostedBy(p)).count() - - doiboostPubs.map(p=> (p.getId, p.getBestaccessright.getClassname))(Encoders.tuple(Encoders.STRING,Encoders.STRING)) } } diff --git a/dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/doiboost/crossref/CrossrefMappingTest.scala b/dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/doiboost/crossref/CrossrefMappingTest.scala index f62ac2b67..a3bb2a4f4 100644 --- a/dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/doiboost/crossref/CrossrefMappingTest.scala +++ b/dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/doiboost/crossref/CrossrefMappingTest.scala @@ -19,8 +19,6 @@ class CrossrefMappingTest { - - @Test def testFunderRelationshipsMapping(): Unit = { val template = Source.fromInputStream(getClass.getResourceAsStream("article_funder_template.json")).mkString diff --git a/dhp-workflows/dhp-graph-mapper/pom.xml b/dhp-workflows/dhp-graph-mapper/pom.xml index a0a334e3c..38c5c8af7 100644 --- a/dhp-workflows/dhp-graph-mapper/pom.xml +++ b/dhp-workflows/dhp-graph-mapper/pom.xml @@ -84,6 +84,12 @@ ${project.version} + + eu.dnetlib.dhp + dhp-dedup-openaire + ${project.version} + + com.jayway.jsonpath json-path diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/ebi/EBIAggregator.scala b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/ebi/EBIAggregator.scala index d1bf39475..90d665e0c 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/ebi/EBIAggregator.scala +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/ebi/EBIAggregator.scala @@ -1,4 +1,5 @@ package eu.dnetlib.dhp.sx.ebi +import eu.dnetlib.dhp.oa.dedup.AuthorMerger import eu.dnetlib.dhp.schema.oaf.{Publication, Relation, Dataset => OafDataset} import eu.dnetlib.dhp.schema.scholexplorer.{DLIDataset, DLIPublication, DLIUnknown} import org.apache.spark.sql.{Encoder, Encoders} @@ -14,6 +15,7 @@ object EBIAggregator { override def reduce(b: OafDataset, a: (String, OafDataset)): OafDataset = { b.mergeFrom(a._2) + b.setAuthor(AuthorMerger.mergeAuthor(a._2.getAuthor, b.getAuthor)) if (b.getId == null) b.setId(a._2.getId) b @@ -22,6 +24,7 @@ object EBIAggregator { override def merge(wx: OafDataset, wy: OafDataset): OafDataset = { wx.mergeFrom(wy) + wx.setAuthor(AuthorMerger.mergeAuthor(wy.getAuthor, wx.getAuthor)) if(wx.getId == null && wy.getId.nonEmpty) wx.setId(wy.getId) wx @@ -35,8 +38,6 @@ object EBIAggregator { Encoders.kryo(classOf[OafDataset]) } - - def getDLIUnknownAggregator(): Aggregator[(String, DLIUnknown), DLIUnknown, DLIUnknown] = new Aggregator[(String, DLIUnknown), DLIUnknown, DLIUnknown]{ override def zero: DLIUnknown = new DLIUnknown() @@ -69,6 +70,7 @@ object EBIAggregator { override def reduce(b: DLIDataset, a: (String, DLIDataset)): DLIDataset = { b.mergeFrom(a._2) + b.setAuthor(AuthorMerger.mergeAuthor(a._2.getAuthor, b.getAuthor)) if (b.getId == null) b.setId(a._2.getId) b @@ -76,6 +78,7 @@ object EBIAggregator { override def merge(wx: DLIDataset, wy: DLIDataset): DLIDataset = { wx.mergeFrom(wy) + wx.setAuthor(AuthorMerger.mergeAuthor(wy.getAuthor, wx.getAuthor)) if(wx.getId == null && wy.getId.nonEmpty) wx.setId(wy.getId) wx @@ -96,6 +99,8 @@ object EBIAggregator { override def reduce(b: DLIPublication, a: (String, DLIPublication)): DLIPublication = { b.mergeFrom(a._2) + b.setAuthor(AuthorMerger.mergeAuthor(a._2.getAuthor, b.getAuthor)) + if (b.getId == null) b.setId(a._2.getId) b @@ -104,6 +109,7 @@ object EBIAggregator { override def merge(wx: DLIPublication, wy: DLIPublication): DLIPublication = { wx.mergeFrom(wy) + wx.setAuthor(AuthorMerger.mergeAuthor(wy.getAuthor, wx.getAuthor)) if(wx.getId == null && wy.getId.nonEmpty) wx.setId(wy.getId) wx @@ -124,6 +130,7 @@ object EBIAggregator { override def reduce(b: Publication, a: (String, Publication)): Publication = { b.mergeFrom(a._2) + b.setAuthor(AuthorMerger.mergeAuthor(a._2.getAuthor, b.getAuthor)) if (b.getId == null) b.setId(a._2.getId) b @@ -132,6 +139,7 @@ object EBIAggregator { override def merge(wx: Publication, wy: Publication): Publication = { wx.mergeFrom(wy) + wx.setAuthor(AuthorMerger.mergeAuthor(wy.getAuthor, wx.getAuthor)) if(wx.getId == null && wy.getId.nonEmpty) wx.setId(wy.getId) wx @@ -145,7 +153,6 @@ object EBIAggregator { Encoders.kryo(classOf[Publication]) } - def getRelationAggregator(): Aggregator[(String, Relation), Relation, Relation] = new Aggregator[(String, Relation), Relation, Relation]{ override def zero: Relation = new Relation() @@ -166,10 +173,4 @@ object EBIAggregator { override def outputEncoder: Encoder[Relation] = Encoders.kryo(classOf[Relation]) } - - - - - - } diff --git a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/sx/graph/SparkScholexplorerGraphImporterTest.java b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/sx/graph/SparkScholexplorerGraphImporterTest.java index ed3b6efdc..ce00466df 100644 --- a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/sx/graph/SparkScholexplorerGraphImporterTest.java +++ b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/sx/graph/SparkScholexplorerGraphImporterTest.java @@ -2,4 +2,5 @@ package eu.dnetlib.dhp.sx.graph; public class SparkScholexplorerGraphImporterTest { + } diff --git a/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/sx/graph/publication.json b/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/sx/graph/publication.json new file mode 100644 index 000000000..539dd5e62 --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/sx/graph/publication.json @@ -0,0 +1,10 @@ +{"collectedfrom":[{"key":"dli_________::datacite","value":"Datasets in Datacite","dataInfo":null}],"dataInfo":{"invisible":false,"inferred":null,"deletedbyinference":false,"trust":"0.9","inferenceprovenance":null,"provenanceaction":null},"lastupdatetimestamp":null,"id":"50|1307198540d2264d839dfd8c9a19f4a7","originalId":["10.3390/w11050916"],"pid":[{"value":"10.3390/w11050916","qualifier":{"classid":"doi","classname":"doi","schemeid":"dnet:pid_types","schemename":"dnet:pid_types"},"dataInfo":null}],"dateofcollection":"2018-10-28T00:39:04.337Z","dateoftransformation":null,"extraInfo":null,"oaiprovenance":null,"measures":null,"author":[{"fullname":"Cao, Qing","name":null,"surname":null,"rank":null,"pid":null,"affiliation":null},{"fullname":"Hao, Zhenchun","name":null,"surname":null,"rank":null,"pid":null,"affiliation":null},{"fullname":"Yuan, Feifei","name":null,"surname":null,"rank":null,"pid":null,"affiliation":null},{"fullname":"Berndtsson, Ronny","name":null,"surname":null,"rank":null,"pid":null,"affiliation":null},{"fullname":"Xu, Shijie","name":null,"surname":null,"rank":null,"pid":null,"affiliation":null},{"fullname":"Gao, Huibin","name":null,"surname":null,"rank":null,"pid":null,"affiliation":null},{"fullname":"Hao, Jie","name":null,"surname":null,"rank":null,"pid":null,"affiliation":null}],"resulttype":{"classid":"publication","classname":"publication","schemeid":"publication","schemename":"publication"},"language":null,"country":null,"subject":[],"title":[{"value":"On the Predictability of Daily Rainfall during Rainy Season over the Huaihe River Basin","qualifier":{"classid":"main title","classname":null,"schemeid":"dnet:dataCite_title","schemename":"dnet:dataCite_title"},"dataInfo":null}],"relevantdate":[{"value":"2019-05-01","qualifier":{"classid":"date","classname":"date","schemeid":"dnet::date","schemename":"dnet::date"},"dataInfo":null}],"description":[{"value":"In terms of climate change and precipitation, there is large interest in how large-scale climatic features affect regional rainfall amount and rainfall occurrence. Large-scale climate elements need to be downscaled to the regional level for hydrologic applications. Here, a new Nonhomogeneous Hidden Markov Model (NHMM) called the Bayesian-NHMM is presented for downscaling and predicting of multisite daily rainfall during rainy season over the Huaihe River Basin (HRB). The Bayesian-NHMM provides a Bayesian method for parameters estimation. The model avoids the risk to have no solutions for parameter estimation, which often occurs in the traditional NHMM that uses point estimates of parameters. The Bayesian-NHMM accurately captures seasonality and interannual variability of rainfall amount and wet days during the rainy season. The model establishes a link between large-scale meteorological characteristics and local precipitation patterns. It also provides a more stable and efficient method to estimate parameters...","dataInfo":null}],"dateofacceptance":null,"publisher":{"value":"MDPI AG","dataInfo":null},"embargoenddate":null,"source":null,"fulltext":null,"format":null,"contributor":null,"resourcetype":null,"coverage":null,"bestaccessright":null,"context":null,"externalReference":null,"instance":[{"license":null,"accessright":null,"instancetype":null,"hostedby":{"key":"openaire____::1256f046-bf1f-4afc-8b47-d0b147148b18","value":"Unknown Repository","dataInfo":null},"url":["10.3390/w11050916"],"distributionlocation":null,"collectedfrom":null,"dateofacceptance":null,"processingchargeamount":null,"processingchargecurrency":null,"refereed":null}],"journal":null,"originalObjIdentifier":"datacite____::100bb045f34ea2da81433d0b9ae3afa1","dlicollectedfrom":[{"id":"dli_________::datacite","name":"Datasets in Datacite","completionStatus":"complete","collectionMode":null}],"completionStatus":"complete"} +{"collectedfrom":[{"key":"dli_________::datacite","value":"Datasets in Datacite","dataInfo":null}],"dataInfo":{"invisible":false,"inferred":null,"deletedbyinference":false,"trust":"0.9","inferenceprovenance":null,"provenanceaction":null},"lastupdatetimestamp":null,"id":"50|1307198540d2264d839dfd8c9a19f4a7","originalId":["10.3390/w11050916"],"pid":[{"value":"10.3390/w11050916","qualifier":{"classid":"doi","classname":"doi","schemeid":"dnet:pid_types","schemename":"dnet:pid_types"},"dataInfo":null}],"dateofcollection":"2018-10-28T00:39:04.337Z","dateoftransformation":null,"extraInfo":null,"oaiprovenance":null,"measures":null,"author":[{"fullname":"Cao, Qing","name":null,"surname":null,"rank":null,"pid":null,"affiliation":null},{"fullname":"Hao, Zhenchun","name":null,"surname":null,"rank":null,"pid":null,"affiliation":null},{"fullname":"Yuan, Feifei","name":null,"surname":null,"rank":null,"pid":null,"affiliation":null},{"fullname":"Berndtsson, Ronny","name":null,"surname":null,"rank":null,"pid":null,"affiliation":null},{"fullname":"Xu, Shijie","name":null,"surname":null,"rank":null,"pid":null,"affiliation":null},{"fullname":"Gao, Huibin","name":null,"surname":null,"rank":null,"pid":null,"affiliation":null},{"fullname":"Hao, Jie","name":null,"surname":null,"rank":null,"pid":null,"affiliation":null}],"resulttype":{"classid":"publication","classname":"publication","schemeid":"publication","schemename":"publication"},"language":null,"country":null,"subject":[],"title":[{"value":"On the Predictability of Daily Rainfall during Rainy Season over the Huaihe River Basin","qualifier":{"classid":"main title","classname":null,"schemeid":"dnet:dataCite_title","schemename":"dnet:dataCite_title"},"dataInfo":null}],"relevantdate":[{"value":"2019-05-01","qualifier":{"classid":"date","classname":"date","schemeid":"dnet::date","schemename":"dnet::date"},"dataInfo":null}],"description":[{"value":"In terms of climate change and precipitation, there is large interest in how large-scale climatic features affect regional rainfall amount and rainfall occurrence. Large-scale climate elements need to be downscaled to the regional level for hydrologic applications. Here, a new Nonhomogeneous Hidden Markov Model (NHMM) called the Bayesian-NHMM is presented for downscaling and predicting of multisite daily rainfall during rainy season over the Huaihe River Basin (HRB). The Bayesian-NHMM provides a Bayesian method for parameters estimation. The model avoids the risk to have no solutions for parameter estimation, which often occurs in the traditional NHMM that uses point estimates of parameters. The Bayesian-NHMM accurately captures seasonality and interannual variability of rainfall amount and wet days during the rainy season. The model establishes a link between large-scale meteorological characteristics and local precipitation patterns. It also provides a more stable and efficient method to estimate parameters in the model. These results suggest that prediction of daily precipitation could be improved by the suggested new Bayesian-NHMM method, which can be helpful for water resources management and research on climate change.","dataInfo":null}],"dateofacceptance":null,"publisher":{"value":"MDPI AG","dataInfo":null},"embargoenddate":null,"source":null,"fulltext":null,"format":null,"contributor":null,"resourcetype":null,"coverage":null,"bestaccessright":null,"context":null,"externalReference":null,"instance":[{"license":null,"accessright":null,"instancetype":null,"hostedby":{"key":"openaire____::1256f046-bf1f-4afc-8b47-d0b147148b18","value":"Unknown Repository","dataInfo":null},"url":["10.3390/w11050916"],"distributionlocation":null,"collectedfrom":null,"dateofacceptance":null,"processingchargeamount":null,"processingchargecurrency":null,"refereed":null}],"journal":null,"originalObjIdentifier":"datacite____::100bb045f34ea2da81433d0b9ae3afa1","dlicollectedfrom":[{"id":"dli_________::datacite","name":"Datasets in Datacite","completionStatus":"complete","collectionMode":null}],"completionStatus":"complete"} +{"collectedfrom":[{"key":"dli_________::datacite","value":"Datasets in Datacite","dataInfo":null}],"dataInfo":{"invisible":false,"inferred":null,"deletedbyinference":false,"trust":"0.9","inferenceprovenance":null,"provenanceaction":null},"lastupdatetimestamp":null,"id":"50|1307198540d2264d839dfd8c9a19f4a7","originalId":["10.3390/w11050916"],"pid":[{"value":"10.3390/w11050916","qualifier":{"classid":"doi","classname":"doi","schemeid":"dnet:pid_types","schemename":"dnet:pid_types"},"dataInfo":null}],"dateofcollection":"2018-10-28T00:39:04.337Z","dateoftransformation":null,"extraInfo":null,"oaiprovenance":null,"measures":null,"author":[{"fullname":"Cao, Qing","name":null,"surname":null,"rank":null,"pid":null,"affiliation":null},{"fullname":"Hao, Zhenchun","name":null,"surname":null,"rank":null,"pid":null,"affiliation":null},{"fullname":"Yuan, Feifei","name":null,"surname":null,"rank":null,"pid":null,"affiliation":null},{"fullname":"Berndtsson, Ronny","name":null,"surname":null,"rank":null,"pid":null,"affiliation":null},{"fullname":"Xu, Shijie","name":null,"surname":null,"rank":null,"pid":null,"affiliation":null},{"fullname":"Gao, Huibin","name":null,"surname":null,"rank":null,"pid":null,"affiliation":null},{"fullname":"Hao, Jie","name":null,"surname":null,"rank":null,"pid":null,"affiliation":null}],"resulttype":{"classid":"publication","classname":"publication","schemeid":"publication","schemename":"publication"},"language":null,"country":null,"subject":[],"title":[{"value":"On the Predictability of Daily Rainfall during Rainy Season over the Huaihe River Basin","qualifier":{"classid":"main title","classname":null,"schemeid":"dnet:dataCite_title","schemename":"dnet:dataCite_title"},"dataInfo":null}],"relevantdate":[{"value":"2019-05-01","qualifier":{"classid":"date","classname":"date","schemeid":"dnet::date","schemename":"dnet::date"},"dataInfo":null}],"description":[{"value":"In terms of climate change and precipitation, there is large interest in how large-scale climatic features affect regional rainfall amount and rainfall occurrence. Large-scale climate elements need to be downscaled to the regional level for hydrologic applications. Here, a new Nonhomogeneous Hidden Markov Model (NHMM) called the Bayesian-NHMM is presented for downscaling and predicting of multisite daily rainfall during rainy season over the Huaihe River Basin (HRB). The Bayesian-NHMM provides a Bayesian method for parameters estimation. The model avoids the risk to have no solutions for parameter estimation, which often occurs in the traditional NHMM that uses point estimates of parameters. The Bayesian-NHMM accurately captures seasonality and interannual variability of rainfall amount and wet days during the rainy season. The model establishes a link between large-scale meteorological characteristics and local precipitation patterns. It also provides a more stable and efficient method to estimate parameters in the model. These results suggest that prediction of daily precipitation could be improved by the suggested new Bayesian-NHMM method, which can be helpful for water resources management and research on climate change.","dataInfo":null}],"dateofacceptance":null,"publisher":{"value":"MDPI AG","dataInfo":null},"embargoenddate":null,"source":null,"fulltext":null,"format":null,"contributor":null,"resourcetype":null,"coverage":null,"bestaccessright":null,"context":null,"externalReference":null,"instance":[{"license":null,"accessright":null,"instancetype":null,"hostedby":{"key":"openaire____::1256f046-bf1f-4afc-8b47-d0b147148b18","value":"Unknown Repository","dataInfo":null},"url":["10.3390/w11050916"],"distributionlocation":null,"collectedfrom":null,"dateofacceptance":null,"processingchargeamount":null,"processingchargecurrency":null,"refereed":null}],"journal":null,"originalObjIdentifier":"datacite____::100bb045f34ea2da81433d0b9ae3afa1","dlicollectedfrom":[{"id":"dli_________::datacite","name":"Datasets in Datacite","completionStatus":"complete","collectionMode":null}],"completionStatus":"complete"} +{"collectedfrom":[{"key":"dli_________::datacite","value":"Datasets in Datacite","dataInfo":null}],"dataInfo":{"invisible":false,"inferred":null,"deletedbyinference":false,"trust":"0.9","inferenceprovenance":null,"provenanceaction":null},"lastupdatetimestamp":null,"id":"50|1307198540d2264d839dfd8c9a19f4a7","originalId":["10.3390/w11050916"],"pid":[{"value":"10.3390/w11050916","qualifier":{"classid":"doi","classname":"doi","schemeid":"dnet:pid_types","schemename":"dnet:pid_types"},"dataInfo":null}],"dateofcollection":"2018-10-28T00:39:04.337Z","dateoftransformation":null,"extraInfo":null,"oaiprovenance":null,"measures":null,"author":[{"fullname":"Cao, Qing","name":null,"surname":null,"rank":null,"pid":null,"affiliation":null},{"fullname":"Hao, Zhenchun","name":null,"surname":null,"rank":null,"pid":null,"affiliation":null},{"fullname":"Yuan, Feifei","name":null,"surname":null,"rank":null,"pid":null,"affiliation":null},{"fullname":"Berndtsson, Ronny","name":null,"surname":null,"rank":null,"pid":null,"affiliation":null},{"fullname":"Xu, Shijie","name":null,"surname":null,"rank":null,"pid":null,"affiliation":null},{"fullname":"Gao, Huibin","name":null,"surname":null,"rank":null,"pid":null,"affiliation":null},{"fullname":"Hao, Jie","name":null,"surname":null,"rank":null,"pid":null,"affiliation":null}],"resulttype":{"classid":"publication","classname":"publication","schemeid":"publication","schemename":"publication"},"language":null,"country":null,"subject":[],"title":[{"value":"On the Predictability of Daily Rainfall during Rainy Season over the Huaihe River Basin","qualifier":{"classid":"main title","classname":null,"schemeid":"dnet:dataCite_title","schemename":"dnet:dataCite_title"},"dataInfo":null}],"relevantdate":[{"value":"2019-05-01","qualifier":{"classid":"date","classname":"date","schemeid":"dnet::date","schemename":"dnet::date"},"dataInfo":null}],"description":[{"value":"In terms of climate change and precipitation, there is large interest in how large-scale climatic features affect regional rainfall amount and rainfall occurrence. Large-scale climate elements need to be downscaled to the regional level for hydrologic applications. Here, a new Nonhomogeneous Hidden Markov Model (NHMM) called the Bayesian-NHMM is presented for downscaling and predicting of multisite daily rainfall during rainy season over the Huaihe River Basin (HRB). The Bayesian-NHMM provides a Bayesian method for parameters estimation. The model avoids the risk to have no solutions for parameter estimation, which often occurs in the traditional NHMM that uses point estimates of parameters. The Bayesian-NHMM accurately captures seasonality and interannual variability of rainfall amount and wet days during the rainy season. The model establishes a link between large-scale meteorological characteristics and local precipitation patterns. It also provides a more stable and efficient method to estimate parameters in the model. These results suggest that prediction of daily precipitation could be improved by the suggested new Bayesian-NHMM method, which can be helpful for water resources management and research on climate change.","dataInfo":null}],"dateofacceptance":null,"publisher":{"value":"MDPI AG","dataInfo":null},"embargoenddate":null,"source":null,"fulltext":null,"format":null,"contributor":null,"resourcetype":null,"coverage":null,"bestaccessright":null,"context":null,"externalReference":null,"instance":[{"license":null,"accessright":null,"instancetype":null,"hostedby":{"key":"openaire____::1256f046-bf1f-4afc-8b47-d0b147148b18","value":"Unknown Repository","dataInfo":null},"url":["10.3390/w11050916"],"distributionlocation":null,"collectedfrom":null,"dateofacceptance":null,"processingchargeamount":null,"processingchargecurrency":null,"refereed":null}],"journal":null,"originalObjIdentifier":"datacite____::100bb045f34ea2da81433d0b9ae3afa1","dlicollectedfrom":[{"id":"dli_________::datacite","name":"Datasets in Datacite","completionStatus":"complete","collectionMode":null}],"completionStatus":"complete"} +{"collectedfrom":[{"key":"dli_________::datacite","value":"Datasets in Datacite","dataInfo":null}],"dataInfo":{"invisible":false,"inferred":null,"deletedbyinference":false,"trust":"0.9","inferenceprovenance":null,"provenanceaction":null},"lastupdatetimestamp":null,"id":"50|1307198540d2264d839dfd8c9a19f4a7","originalId":["10.3390/w11050916"],"pid":[{"value":"10.3390/w11050916","qualifier":{"classid":"doi","classname":"doi","schemeid":"dnet:pid_types","schemename":"dnet:pid_types"},"dataInfo":null}],"dateofcollection":"2018-10-28T00:39:04.337Z","dateoftransformation":null,"extraInfo":null,"oaiprovenance":null,"measures":null,"author":[{"fullname":"Cao, Qing","name":null,"surname":null,"rank":null,"pid":null,"affiliation":null},{"fullname":"Hao, Zhenchun","name":null,"surname":null,"rank":null,"pid":null,"affiliation":null},{"fullname":"Yuan, Feifei","name":null,"surname":null,"rank":null,"pid":null,"affiliation":null},{"fullname":"Berndtsson, Ronny","name":null,"surname":null,"rank":null,"pid":null,"affiliation":null},{"fullname":"Xu, Shijie","name":null,"surname":null,"rank":null,"pid":null,"affiliation":null},{"fullname":"Gao, Huibin","name":null,"surname":null,"rank":null,"pid":null,"affiliation":null},{"fullname":"Hao, Jie","name":null,"surname":null,"rank":null,"pid":null,"affiliation":null}],"resulttype":{"classid":"publication","classname":"publication","schemeid":"publication","schemename":"publication"},"language":null,"country":null,"subject":[],"title":[{"value":"On the Predictability of Daily Rainfall during Rainy Season over the Huaihe River Basin","qualifier":{"classid":"main title","classname":null,"schemeid":"dnet:dataCite_title","schemename":"dnet:dataCite_title"},"dataInfo":null}],"relevantdate":[{"value":"2019-05-01","qualifier":{"classid":"date","classname":"date","schemeid":"dnet::date","schemename":"dnet::date"},"dataInfo":null}],"description":[{"value":"In terms of climate change and precipitation, there is large interest in how large-scale climatic features affect regional rainfall amount and rainfall occurrence. Large-scale climate elements need to be downscaled to the regional level for hydrologic applications. Here, a new Nonhomogeneous Hidden Markov Model (NHMM) called the Bayesian-NHMM is presented for downscaling and predicting of multisite daily rainfall during rainy season over the Huaihe River Basin (HRB). The Bayesian-NHMM provides a Bayesian method for parameters estimation. The model avoids the risk to have no solutions for parameter estimation, which often occurs in the traditional NHMM that uses point estimates of parameters. The Bayesian-NHMM accurately captures seasonality and interannual variability of rainfall amount and wet days during the rainy season. The model establishes a link between large-scale meteorological characteristics and local precipitation patterns. It also provides a more stable and efficient method to estimate parameters in the model. These results suggest that prediction of daily precipitation could be improved by the suggested new Bayesian-NHMM method, which can be helpful for water resources management and research on climate change.","dataInfo":null}],"dateofacceptance":null,"publisher":{"value":"MDPI AG","dataInfo":null},"embargoenddate":null,"source":null,"fulltext":null,"format":null,"contributor":null,"resourcetype":null,"coverage":null,"bestaccessright":null,"context":null,"externalReference":null,"instance":[{"license":null,"accessright":null,"instancetype":null,"hostedby":{"key":"openaire____::1256f046-bf1f-4afc-8b47-d0b147148b18","value":"Unknown Repository","dataInfo":null},"url":["10.3390/w11050916"],"distributionlocation":null,"collectedfrom":null,"dateofacceptance":null,"processingchargeamount":null,"processingchargecurrency":null,"refereed":null}],"journal":null,"originalObjIdentifier":"datacite____::100bb045f34ea2da81433d0b9ae3afa1","dlicollectedfrom":[{"id":"dli_________::datacite","name":"Datasets in Datacite","completionStatus":"complete","collectionMode":null}],"completionStatus":"complete"} +{"collectedfrom":[{"key":"dli_________::crossref","value":"Crossref","dataInfo":null}],"dataInfo":{"invisible":false,"inferred":null,"deletedbyinference":false,"trust":"0.9","inferenceprovenance":null,"provenanceaction":null},"lastupdatetimestamp":null,"id":"50|1307198540d2264d839dfd8c9a19f4a7","originalId":["1307198540d2264d839dfd8c9a19f4a7"],"pid":[{"value":"10.3390/w11050916","qualifier":{"classid":"doi","classname":"doi","schemeid":"dnet:pid_types","schemename":"dnet:pid_types"},"dataInfo":null}],"dateofcollection":"2020-10-04T14:16:06.105Z","dateoftransformation":null,"extraInfo":null,"oaiprovenance":null,"measures":null,"author":[{"fullname":"Cao Qing","name":null,"surname":null,"rank":null,"pid":null,"affiliation":null},{"fullname":"Hao Zhenchun","name":null,"surname":null,"rank":null,"pid":null,"affiliation":null},{"fullname":"Yuan Feifei","name":null,"surname":null,"rank":null,"pid":null,"affiliation":null},{"fullname":"Berndtsson Ronny","name":null,"surname":null,"rank":null,"pid":null,"affiliation":null},{"fullname":"Xu Shijie","name":null,"surname":null,"rank":null,"pid":null,"affiliation":null},{"fullname":"Gao Huibin","name":null,"surname":null,"rank":null,"pid":null,"affiliation":null},{"fullname":"Hao Jie","name":null,"surname":null,"rank":null,"pid":null,"affiliation":null}],"resulttype":{"classid":"publication","classname":"publication","schemeid":"publication","schemename":"publication"},"language":null,"country":null,"subject":[],"title":[{"value":"On the Predictability of Daily Rainfall during Rainy Season over the Huaihe River Basin","qualifier":{"classid":"main title","classname":null,"schemeid":"dnet:dataCite_title","schemename":"dnet:dataCite_title"},"dataInfo":null}],"relevantdate":[{"value":"2019-05-02T07:15:22Z","qualifier":{"classid":"date","classname":"date","schemeid":"dnet::date","schemename":"dnet::date"},"dataInfo":null}],"description":[{"value":null,"dataInfo":null}],"dateofacceptance":null,"publisher":{"value":"MDPI AG","dataInfo":null},"embargoenddate":null,"source":null,"fulltext":null,"format":null,"contributor":null,"resourcetype":null,"coverage":null,"bestaccessright":null,"context":null,"externalReference":null,"instance":[],"journal":null,"originalObjIdentifier":"dli_resolver::1307198540d2264d839dfd8c9a19f4a7","dlicollectedfrom":[{"id":"dli_________::crossref","name":"Crossref","completionStatus":"complete","collectionMode":"resolved"}],"completionStatus":"complete"} +{"collectedfrom":[{"key":"dli_________::crossref","value":"Crossref","dataInfo":null}],"dataInfo":{"invisible":false,"inferred":null,"deletedbyinference":false,"trust":"0.9","inferenceprovenance":null,"provenanceaction":null},"lastupdatetimestamp":null,"id":"50|1307198540d2264d839dfd8c9a19f4a7","originalId":["1307198540d2264d839dfd8c9a19f4a7"],"pid":[{"value":"10.3390/w11050916","qualifier":{"classid":"doi","classname":"doi","schemeid":"dnet:pid_types","schemename":"dnet:pid_types"},"dataInfo":null}],"dateofcollection":"2020-09-27T11:39:38.835Z","dateoftransformation":null,"extraInfo":null,"oaiprovenance":null,"measures":null,"author":[{"fullname":"Cao Qing","name":null,"surname":null,"rank":null,"pid":null,"affiliation":null},{"fullname":"Hao Zhenchun","name":null,"surname":null,"rank":null,"pid":null,"affiliation":null},{"fullname":"Yuan Feifei","name":null,"surname":null,"rank":null,"pid":null,"affiliation":null},{"fullname":"Berndtsson Ronny","name":null,"surname":null,"rank":null,"pid":null,"affiliation":null},{"fullname":"Xu Shijie","name":null,"surname":null,"rank":null,"pid":null,"affiliation":null},{"fullname":"Gao Huibin","name":null,"surname":null,"rank":null,"pid":null,"affiliation":null},{"fullname":"Hao Jie","name":null,"surname":null,"rank":null,"pid":null,"affiliation":null}],"resulttype":{"classid":"publication","classname":"publication","schemeid":"publication","schemename":"publication"},"language":null,"country":null,"subject":[],"title":[{"value":"On the Predictability of Daily Rainfall during Rainy Season over the Huaihe River Basin","qualifier":{"classid":"main title","classname":null,"schemeid":"dnet:dataCite_title","schemename":"dnet:dataCite_title"},"dataInfo":null}],"relevantdate":[{"value":"2019-05-02T07:15:22Z","qualifier":{"classid":"date","classname":"date","schemeid":"dnet::date","schemename":"dnet::date"},"dataInfo":null}],"description":[{"value":null,"dataInfo":null}],"dateofacceptance":null,"publisher":{"value":"MDPI AG","dataInfo":null},"embargoenddate":null,"source":null,"fulltext":null,"format":null,"contributor":null,"resourcetype":null,"coverage":null,"bestaccessright":null,"context":null,"externalReference":null,"instance":[],"journal":null,"originalObjIdentifier":"dli_resolver::1307198540d2264d839dfd8c9a19f4a7","dlicollectedfrom":[{"id":"dli_________::crossref","name":"Crossref","completionStatus":"complete","collectionMode":"resolved"}],"completionStatus":"complete"} +{"collectedfrom":[{"key":"dli_________::crossref","value":"Crossref","dataInfo":null}],"dataInfo":{"invisible":false,"inferred":null,"deletedbyinference":false,"trust":"0.9","inferenceprovenance":null,"provenanceaction":null},"lastupdatetimestamp":null,"id":"50|1307198540d2264d839dfd8c9a19f4a7","originalId":["1307198540d2264d839dfd8c9a19f4a7"],"pid":[{"value":"10.3390/w11050916","qualifier":{"classid":"doi","classname":"doi","schemeid":"dnet:pid_types","schemename":"dnet:pid_types"},"dataInfo":null}],"dateofcollection":"2020-08-30T11:48:49.809Z","dateoftransformation":null,"extraInfo":null,"oaiprovenance":null,"measures":null,"author":[{"fullname":"Cao Qing","name":null,"surname":null,"rank":null,"pid":null,"affiliation":null},{"fullname":"Hao Zhenchun","name":null,"surname":null,"rank":null,"pid":null,"affiliation":null},{"fullname":"Yuan Feifei","name":null,"surname":null,"rank":null,"pid":null,"affiliation":null},{"fullname":"Berndtsson Ronny","name":null,"surname":null,"rank":null,"pid":null,"affiliation":null},{"fullname":"Xu Shijie","name":null,"surname":null,"rank":null,"pid":null,"affiliation":null},{"fullname":"Gao Huibin","name":null,"surname":null,"rank":null,"pid":null,"affiliation":null},{"fullname":"Hao Jie","name":null,"surname":null,"rank":null,"pid":null,"affiliation":null}],"resulttype":{"classid":"publication","classname":"publication","schemeid":"publication","schemename":"publication"},"language":null,"country":null,"subject":[],"title":[{"value":"On the Predictability of Daily Rainfall during Rainy Season over the Huaihe River Basin","qualifier":{"classid":"main title","classname":null,"schemeid":"dnet:dataCite_title","schemename":"dnet:dataCite_title"},"dataInfo":null}],"relevantdate":[{"value":"2019-05-02T07:15:22Z","qualifier":{"classid":"date","classname":"date","schemeid":"dnet::date","schemename":"dnet::date"},"dataInfo":null}],"description":[{"value":null,"dataInfo":null}],"dateofacceptance":null,"publisher":{"value":"MDPI AG","dataInfo":null},"embargoenddate":null,"source":null,"fulltext":null,"format":null,"contributor":null,"resourcetype":null,"coverage":null,"bestaccessright":null,"context":null,"externalReference":null,"instance":[],"journal":null,"originalObjIdentifier":"dli_resolver::1307198540d2264d839dfd8c9a19f4a7","dlicollectedfrom":[{"id":"dli_________::crossref","name":"Crossref","completionStatus":"complete","collectionMode":"resolved"}],"completionStatus":"complete"} +{"collectedfrom":[{"key":"dli_________::crossref","value":"Crossref","dataInfo":null}],"dataInfo":{"invisible":false,"inferred":null,"deletedbyinference":false,"trust":"0.9","inferenceprovenance":null,"provenanceaction":null},"lastupdatetimestamp":null,"id":"50|1307198540d2264d839dfd8c9a19f4a7","originalId":["1307198540d2264d839dfd8c9a19f4a7"],"pid":[{"value":"10.3390/w11050916","qualifier":{"classid":"doi","classname":"doi","schemeid":"dnet:pid_types","schemename":"dnet:pid_types"},"dataInfo":null}],"dateofcollection":"2020-08-14T14:25:55.176Z","dateoftransformation":null,"extraInfo":null,"oaiprovenance":null,"measures":null,"author":[{"fullname":"Cao Qing","name":null,"surname":null,"rank":null,"pid":null,"affiliation":null},{"fullname":"Hao Zhenchun","name":null,"surname":null,"rank":null,"pid":null,"affiliation":null},{"fullname":"Yuan Feifei","name":null,"surname":null,"rank":null,"pid":null,"affiliation":null},{"fullname":"Berndtsson Ronny","name":null,"surname":null,"rank":null,"pid":null,"affiliation":null},{"fullname":"Xu Shijie","name":null,"surname":null,"rank":null,"pid":null,"affiliation":null},{"fullname":"Gao Huibin","name":null,"surname":null,"rank":null,"pid":null,"affiliation":null},{"fullname":"Hao Jie","name":null,"surname":null,"rank":null,"pid":null,"affiliation":null}],"resulttype":{"classid":"publication","classname":"publication","schemeid":"publication","schemename":"publication"},"language":null,"country":null,"subject":[],"title":[{"value":"On the Predictability of Daily Rainfall during Rainy Season over the Huaihe River Basin","qualifier":{"classid":"main title","classname":null,"schemeid":"dnet:dataCite_title","schemename":"dnet:dataCite_title"},"dataInfo":null}],"relevantdate":[{"value":"2019-05-02T07:15:22Z","qualifier":{"classid":"date","classname":"date","schemeid":"dnet::date","schemename":"dnet::date"},"dataInfo":null}],"description":[{"value":null,"dataInfo":null}],"dateofacceptance":null,"publisher":{"value":"MDPI AG","dataInfo":null},"embargoenddate":null,"source":null,"fulltext":null,"format":null,"contributor":null,"resourcetype":null,"coverage":null,"bestaccessright":null,"context":null,"externalReference":null,"instance":[],"journal":null,"originalObjIdentifier":"dli_resolver::1307198540d2264d839dfd8c9a19f4a7","dlicollectedfrom":[{"id":"dli_________::crossref","name":"Crossref","completionStatus":"complete","collectionMode":"resolved"}],"completionStatus":"complete"} +{"collectedfrom":[{"key":"dli_________::crossref","value":"Crossref","dataInfo":null}],"dataInfo":{"invisible":false,"inferred":null,"deletedbyinference":false,"trust":"0.9","inferenceprovenance":null,"provenanceaction":null},"lastupdatetimestamp":null,"id":"50|1307198540d2264d839dfd8c9a19f4a7","originalId":["1307198540d2264d839dfd8c9a19f4a7"],"pid":[{"value":"10.3390/w11050916","qualifier":{"classid":"doi","classname":"doi","schemeid":"dnet:pid_types","schemename":"dnet:pid_types"},"dataInfo":null}],"dateofcollection":"2020-08-09T11:35:23.526Z","dateoftransformation":null,"extraInfo":null,"oaiprovenance":null,"measures":null,"author":[{"fullname":"Cao Qing","name":null,"surname":null,"rank":null,"pid":null,"affiliation":null},{"fullname":"Hao Zhenchun","name":null,"surname":null,"rank":null,"pid":null,"affiliation":null},{"fullname":"Yuan Feifei","name":null,"surname":null,"rank":null,"pid":null,"affiliation":null},{"fullname":"Berndtsson Ronny","name":null,"surname":null,"rank":null,"pid":null,"affiliation":null},{"fullname":"Xu Shijie","name":null,"surname":null,"rank":null,"pid":null,"affiliation":null},{"fullname":"Gao Huibin","name":null,"surname":null,"rank":null,"pid":null,"affiliation":null},{"fullname":"Hao Jie","name":null,"surname":null,"rank":null,"pid":null,"affiliation":null}],"resulttype":{"classid":"publication","classname":"publication","schemeid":"publication","schemename":"publication"},"language":null,"country":null,"subject":[],"title":[{"value":"On the Predictability of Daily Rainfall during Rainy Season over the Huaihe River Basin","qualifier":{"classid":"main title","classname":null,"schemeid":"dnet:dataCite_title","schemename":"dnet:dataCite_title"},"dataInfo":null}],"relevantdate":[{"value":"2019-05-02T07:15:22Z","qualifier":{"classid":"date","classname":"date","schemeid":"dnet::date","schemename":"dnet::date"},"dataInfo":null}],"description":[{"value":null,"dataInfo":null}],"dateofacceptance":null,"publisher":{"value":"MDPI AG","dataInfo":null},"embargoenddate":null,"source":null,"fulltext":null,"format":null,"contributor":null,"resourcetype":null,"coverage":null,"bestaccessright":null,"context":null,"externalReference":null,"instance":[],"journal":null,"originalObjIdentifier":"dli_resolver::1307198540d2264d839dfd8c9a19f4a7","dlicollectedfrom":[{"id":"dli_________::crossref","name":"Crossref","completionStatus":"complete","collectionMode":"resolved"}],"completionStatus":"complete"} \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/export/SparkExportContentForOpenAire.scala b/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/export/SparkExportContentForOpenAire.scala index 165c3340b..6c6e2c835 100644 --- a/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/export/SparkExportContentForOpenAire.scala +++ b/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/export/SparkExportContentForOpenAire.scala @@ -32,10 +32,10 @@ object SparkExportContentForOpenAire { .master(parser.get("master")).getOrCreate() - val sc:SparkContext = spark.sparkContext - val workingPath = parser.get("workingDirPath") + implicit val dliPubEncoder: Encoder[DLIPublication] = Encoders.kryo(classOf[DLIPublication]) + implicit val dliDatEncoder: Encoder[DLIDataset] = Encoders.kryo(classOf[DLIDataset]) implicit val pubEncoder: Encoder[Publication] = Encoders.bean(classOf[Publication]) implicit val datEncoder: Encoder[OafDataset] = Encoders.bean(classOf[OafDataset]) implicit val relEncoder: Encoder[Relation] = Encoders.bean(classOf[Relation]) @@ -43,40 +43,41 @@ object SparkExportContentForOpenAire { import spark.implicits._ - val relRDD:RDD[Relation] = sc.textFile(s"$workingPath/relation_j") - .map(s => new ObjectMapper().readValue(s, classOf[Relation])) - .filter(p => p.getDataInfo.getDeletedbyinference == false) - spark.createDataset(relRDD).write.mode(SaveMode.Overwrite).save(s"$workingPath/relationDS") + val dsRel = spark.read.load(s"$workingPath/relation_b").as[Relation] + dsRel.filter(r => r.getDataInfo==null || r.getDataInfo.getDeletedbyinference ==false).write.mode(SaveMode.Overwrite).save(s"$workingPath/export/relationDS") - val datRDD:RDD[OafDataset] = sc.textFile(s"$workingPath/dataset") - .map(s => new ObjectMapper().readValue(s, classOf[DLIDataset])) + + val dsPubs = spark.read.load(s"$workingPath/publication").as[DLIPublication] + dsPubs + .filter(p=>p.getDataInfo.getDeletedbyinference == false) + .map(DLIToOAF.convertDLIPublicationToOAF) + .filter(p=>p!= null) + .write.mode(SaveMode.Overwrite).save(s"$workingPath/export/publicationDS") + + + val dsDataset = spark.read.load(s"$workingPath/dataset").as[DLIDataset] + dsDataset .filter(p => p.getDataInfo.getDeletedbyinference == false) .map(DLIToOAF.convertDLIDatasetTOOAF).filter(p=>p!= null) - spark.createDataset(datRDD).write.mode(SaveMode.Overwrite).save(s"$workingPath/datasetDS") - - - val pubRDD:RDD[Publication] = sc.textFile(s"$workingPath/publication") - .map(s => new ObjectMapper().readValue(s, classOf[DLIPublication])) - .filter(p => p.getDataInfo.getDeletedbyinference == false) - .map(DLIToOAF.convertDLIPublicationToOAF).filter(p=>p!= null) - spark.createDataset(pubRDD).write.mode(SaveMode.Overwrite).save(s"$workingPath/publicationDS") + .write.mode(SaveMode.Overwrite).save(s"$workingPath/export/datasetDS") - val pubs:Dataset[Publication] = spark.read.load(s"$workingPath/publicationDS").as[Publication] - val dats :Dataset[OafDataset] = spark.read.load(s"$workingPath/datasetDS").as[OafDataset] - val relDS1 :Dataset[Relation] = spark.read.load(s"$workingPath/relationDS").as[Relation] + + val pubs:Dataset[Publication] = spark.read.load(s"$workingPath/export/publicationDS").as[Publication] + val dats :Dataset[OafDataset] = spark.read.load(s"$workingPath/export/datasetDS").as[OafDataset] + val relDS1 :Dataset[Relation] = spark.read.load(s"$workingPath/export/relationDS").as[Relation] val pub_id = pubs.select("id").distinct() val dat_id = dats.select("id").distinct() - pub_id.joinWith(relDS1, pub_id("id").equalTo(relDS1("source"))).map(k => k._2).write.mode(SaveMode.Overwrite).save(s"$workingPath/relationDS_f1") + pub_id.joinWith(relDS1, pub_id("id").equalTo(relDS1("source"))).map(k => k._2).write.mode(SaveMode.Overwrite).save(s"$workingPath/export/relationDS_f1") - val relDS2= spark.read.load(s"$workingPath/relationDS_f1").as[Relation] + val relDS2= spark.read.load(s"$workingPath/export/relationDS_f1").as[Relation] - relDS2.joinWith(dat_id, relDS2("target").equalTo(dats("id"))).map(k => k._1).write.mode(SaveMode.Overwrite).save(s"$workingPath/relationDS_filtered") + relDS2.joinWith(dat_id, relDS2("target").equalTo(dats("id"))).map(k => k._1).write.mode(SaveMode.Overwrite).save(s"$workingPath/export/relationDS_filtered") val r_source = relDS2.select(relDS2("source")).distinct() @@ -87,22 +88,20 @@ object SparkExportContentForOpenAire { pubs.joinWith(r_source, pubs("id").equalTo(r_source("source")), "inner").map(k => k._1) .withColumn("row",row_number.over(w2)).where($"row" === 1).drop("row") - .write.mode(SaveMode.Overwrite).save(s"$workingPath/publicationDS_filtered") + .write.mode(SaveMode.Overwrite).save(s"$workingPath/export/publicationDS_filtered") dats.joinWith(r_target, dats("id").equalTo(r_target("target")), "inner").map(k => k._1) .withColumn("row",row_number.over(w2)).where($"row" === 1).drop("row") - .write.mode(SaveMode.Overwrite).save(s"$workingPath/datasetAS") + .write.mode(SaveMode.Overwrite).save(s"$workingPath/export/datasetAS") - spark.createDataset(sc.textFile(s"$workingPath/dataset") - .map(s => new ObjectMapper().readValue(s, classOf[DLIDataset])) - .map(DLIToOAF.convertDLIDatasetToExternalReference) - .filter(p => p != null)).as[DLIExternalReference].write.mode(SaveMode.Overwrite).save(s"$workingPath/externalReference") - val pf = spark.read.load(s"$workingPath/publicationDS_filtered").select("id") - val relDS3 = spark.read.load(s"$workingPath/relationDS").as[Relation] + dsDataset.map(DLIToOAF.convertDLIDatasetToExternalReference).filter(p => p != null).write.mode(SaveMode.Overwrite).save(s"$workingPath/export/externalReference") + + val pf = spark.read.load(s"$workingPath/export/publicationDS_filtered").select("id") + val relDS3 = spark.read.load(s"$workingPath/export/relationDS").as[Relation] val relationTo = pf.joinWith(relDS3, pf("id").equalTo(relDS3("source")),"inner").map(t =>t._2) - val extRef = spark.read.load(s"$workingPath/externalReference").as[DLIExternalReference] + val extRef = spark.read.load(s"$workingPath/export/externalReference").as[DLIExternalReference] spark.createDataset(relationTo.joinWith(extRef, relationTo("target").equalTo(extRef("id")), "inner").map(d => { val r = d._1 @@ -112,11 +111,11 @@ object SparkExportContentForOpenAire { var dli_ext = ArrayBuffer[DLIExternalReference]() f._2.foreach(d => if (dli_ext.size < 100) dli_ext += d ) (f._1, dli_ext) - })).write.mode(SaveMode.Overwrite).save(s"$workingPath/externalReference_grouped") + })).write.mode(SaveMode.Overwrite).save(s"$workingPath/export/externalReference_grouped") - val pubf :Dataset[Publication] = spark.read.load(s"$workingPath/publicationDS_filtered").as[Publication] + val pubf :Dataset[Publication] = spark.read.load(s"$workingPath/export/publicationDS_filtered").as[Publication] - val groupedERf:Dataset[(String, List[DLIExternalReference])]= spark.read.load(s"$workingPath/externalReference_grouped").as[(String, List[DLIExternalReference])] + val groupedERf:Dataset[(String, List[DLIExternalReference])]= spark.read.load(s"$workingPath/export/externalReference_grouped").as[(String, List[DLIExternalReference])] groupedERf.joinWith(pubf,pubf("id").equalTo(groupedERf("_1"))).map(t => { @@ -128,29 +127,28 @@ object SparkExportContentForOpenAire { } else publication } - ).write.mode(SaveMode.Overwrite).save(s"$workingPath/publicationAS") + ).write.mode(SaveMode.Overwrite).save(s"$workingPath/export/publicationAS") - spark.createDataset(sc.textFile(s"$workingPath/dataset") - .map(s => new ObjectMapper().readValue(s, classOf[DLIDataset])) + dsDataset .map(DLIToOAF.convertClinicalTrial) - .filter(p => p != null)) - .write.mode(SaveMode.Overwrite).save(s"$workingPath/clinicalTrials") + .filter(p => p != null) + .write.mode(SaveMode.Overwrite).save(s"$workingPath/export/clinicalTrials") - val ct:Dataset[(String,String)] = spark.read.load(s"$workingPath/clinicalTrials").as[(String,String)] + val ct:Dataset[(String,String)] = spark.read.load(s"$workingPath/export/clinicalTrials").as[(String,String)] - val relDS= spark.read.load(s"$workingPath/relationDS_f1").as[Relation] + val relDS= spark.read.load(s"$workingPath/export/relationDS_f1").as[Relation] relDS.joinWith(ct, relDS("target").equalTo(ct("_1")), "inner") .map(k =>{ val currentRel = k._1 currentRel.setTarget(k._2._2) currentRel - }).write.mode(SaveMode.Overwrite).save(s"$workingPath/clinicalTrialsRels") + }).write.mode(SaveMode.Overwrite).save(s"$workingPath/export/clinicalTrialsRels") - val clRels:Dataset[Relation] = spark.read.load(s"$workingPath/clinicalTrialsRels").as[Relation] - val rels:Dataset[Relation] = spark.read.load(s"$workingPath/relationDS_filtered").as[Relation] + val clRels:Dataset[Relation] = spark.read.load(s"$workingPath/export/clinicalTrialsRels").as[Relation] + val rels:Dataset[Relation] = spark.read.load(s"$workingPath/export/relationDS_filtered").as[Relation] rels.union(clRels).flatMap(r => { val inverseRel = new Relation @@ -162,18 +160,18 @@ object SparkExportContentForOpenAire { inverseRel.setSubRelType(r.getSubRelType) inverseRel.setRelClass(DLIToOAF.rel_inverse(r.getRelClass)) List(r, inverseRel) - }).write.mode(SaveMode.Overwrite).save(s"$workingPath/relationAS") + }).write.mode(SaveMode.Overwrite).save(s"$workingPath/export/relationAS") - spark.read.load(s"$workingPath/publicationAS").as[Publication].map(DLIToOAF.fixInstance).write.mode(SaveMode.Overwrite).save(s"$workingPath/publicationAS_fixed") - spark.read.load(s"$workingPath/datasetAS").as[OafDataset].map(DLIToOAF.fixInstanceDataset).write.mode(SaveMode.Overwrite).save(s"$workingPath/datasetAS_fixed") + spark.read.load(s"$workingPath/export/publicationAS").as[Publication].map(DLIToOAF.fixInstance).write.mode(SaveMode.Overwrite).save(s"$workingPath/export/publicationAS_fixed") + spark.read.load(s"$workingPath/export/datasetAS").as[OafDataset].map(DLIToOAF.fixInstanceDataset).write.mode(SaveMode.Overwrite).save(s"$workingPath/export/datasetAS_fixed") - val fRels:Dataset[(String,String)] = spark.read.load(s"$workingPath/relationAS").as[Relation].map(DLIToOAF.toActionSet) - val fpubs:Dataset[(String,String)] = spark.read.load(s"$workingPath/publicationAS_fixed").as[Publication].map(DLIToOAF.toActionSet) - val fdats:Dataset[(String,String)] = spark.read.load(s"$workingPath/datasetAS_fixed").as[OafDataset].map(DLIToOAF.toActionSet) + val fRels:Dataset[(String,String)] = spark.read.load(s"$workingPath/export/relationAS").as[Relation].map(DLIToOAF.toActionSet) + val fpubs:Dataset[(String,String)] = spark.read.load(s"$workingPath/export/publicationAS_fixed").as[Publication].map(DLIToOAF.toActionSet) + val fdats:Dataset[(String,String)] = spark.read.load(s"$workingPath/export/datasetAS_fixed").as[OafDataset].map(DLIToOAF.toActionSet) - fRels.union(fpubs).union(fdats).rdd.map(s => (new Text(s._1), new Text(s._2))).saveAsHadoopFile(s"$workingPath/rawset", classOf[Text], classOf[Text], classOf[SequenceFileOutputFormat[Text,Text]], classOf[GzipCodec]) + fRels.union(fpubs).union(fdats).rdd.map(s => (new Text(s._1), new Text(s._2))).saveAsHadoopFile(s"$workingPath/export/rawset", classOf[Text], classOf[Text], classOf[SequenceFileOutputFormat[Text,Text]], classOf[GzipCodec]) }