diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/Crossref2Oaf.scala b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/Crossref2Oaf.scala index b38e103bcb..096217a552 100644 --- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/Crossref2Oaf.scala +++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/Crossref2Oaf.scala @@ -14,6 +14,8 @@ import scala.collection.JavaConverters._ import scala.collection.mutable import scala.util.matching.Regex +case class CrossrefDT(doi: String, json:String) {} + case class mappingAffiliation(name: String) {} case class mappingAuthor(given: Option[String], family: String, ORCID: Option[String], affiliation: Option[mappingAffiliation]) {} diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/CrossrefDataset.scala b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/CrossrefDataset.scala new file mode 100644 index 0000000000..996ba55855 --- /dev/null +++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/CrossrefDataset.scala @@ -0,0 +1,93 @@ +package eu.dnetlib.doiboost.crossref + +import eu.dnetlib.dhp.application.ArgumentApplicationParser +import org.apache.commons.io.IOUtils +import org.apache.spark.SparkConf +import org.apache.spark.sql.expressions.Aggregator +import org.apache.spark.sql.{Dataset, Encoder, Encoders, SaveMode, SparkSession} +import org.json4s +import org.json4s.DefaultFormats +import org.json4s.jackson.JsonMethods.parse +import org.slf4j.{Logger, LoggerFactory} + +object CrossrefDataset { + + + def extractTimestamp(input:String): Long = { + + implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats + lazy val json: json4s.JValue = parse(input) + + (json\"indexed"\"timestamp").extractOrElse[Long](0) + + } + + + def main(args: Array[String]): Unit = { + + + val logger: Logger = LoggerFactory.getLogger(SparkMapDumpIntoOAF.getClass) + val conf: SparkConf = new SparkConf() + val parser = new ArgumentApplicationParser(IOUtils.toString(CrossrefDataset.getClass.getResourceAsStream("/eu/dnetlib/dhp/doiboost/crossref_to_dataset_params.json"))) + parser.parseArgument(args) + val spark: SparkSession = + SparkSession + .builder() + .config(conf) + .appName(SparkMapDumpIntoOAF.getClass.getSimpleName) + .master(parser.get("master")).getOrCreate() + import spark.implicits._ + + + val crossrefAggregator = new Aggregator[CrossrefDT, CrossrefDT, CrossrefDT] with Serializable { + + override def zero: CrossrefDT = null + + override def reduce(b: CrossrefDT, a: CrossrefDT): CrossrefDT = { + if (b == null) + return a + if (a == null) + return b + + val tb = extractTimestamp(b.json) + val ta = extractTimestamp(a.json) + if(ta >tb) { + return a + } + b + } + + override def merge(a: CrossrefDT, b: CrossrefDT): CrossrefDT = { + if (b == null) + return a + if (a == null) + return b + + val tb = extractTimestamp(b.json) + val ta = extractTimestamp(a.json) + if(ta >tb) { + return a + } + b + } + + override def bufferEncoder: Encoder[CrossrefDT] = implicitly[Encoder[CrossrefDT]] + + override def outputEncoder: Encoder[CrossrefDT] = implicitly[Encoder[CrossrefDT]] + + override def finish(reduction: CrossrefDT): CrossrefDT = reduction + } + + val sourcePath:String = parser.get("sourcePath") + val targetPath:String = parser.get("targetPath") + + val ds:Dataset[CrossrefDT] = spark.read.load(sourcePath).as[CrossrefDT] + + ds.groupByKey(_.doi) + .agg(crossrefAggregator.toColumn) + .map(s=>s._2) + .write.mode(SaveMode.Overwrite).save(targetPath) + + } + +} diff --git a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/crossref/oozie_app/workflow.xml b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/crossref/oozie_app/workflow.xml index db4ac96f94..be4a45afe5 100644 --- a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/crossref/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/crossref/oozie_app/workflow.xml @@ -46,11 +46,11 @@ ${jobTracker} ${nameNode} eu.dnetlib.doiboost.crossref.CrossrefImporter - -t${workingPath}/input/crossref/index_dump + -t${workingPath}/input/crossref/index_dump_1 -n${nameNode} -ts${timestamp} - + @@ -68,7 +68,7 @@ --driver-memory=${sparkDriverMemory} ${sparkExtraOPT} - --sourcePath${workingPath}/input/crossref/index_dump,${workingPath}/crossref/index_dump + --sourcePath${workingPath}/input/crossref/index_dump,${workingPath}/input/crossref/index_dump_1,${workingPath}/crossref/index_dump --targetPath${workingPath}/input/crossref --masteryarn-cluster @@ -76,5 +76,28 @@ + + + + + yarn-cluster + cluster + ExtractCrossrefToOAF + eu.dnetlib.doiboost.crossref.CrossrefDataset + dhp-doiboost-${projectVersion}.jar + + --executor-memory=${sparkExecutorMemory} + --executor-cores=${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + ${sparkExtraOPT} + + --sourcePath/data/doiboost/crossref/cr_dataset + --targetPath/data/doiboost/crossref/crossrefDataset + --masteryarn-cluster + + + + + \ No newline at end of file diff --git a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/crossref_to_dataset_params.json b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/crossref_to_dataset_params.json new file mode 100644 index 0000000000..312bd0751e --- /dev/null +++ b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/crossref_to_dataset_params.json @@ -0,0 +1,6 @@ +[ + {"paramName":"s", "paramLongName":"sourcePath", "paramDescription": "the path of the sequencial file to read", "paramRequired": true}, + {"paramName":"t", "paramLongName":"targetPath", "paramDescription": "the working dir path", "paramRequired": true}, + {"paramName":"m", "paramLongName":"master", "paramDescription": "the master name", "paramRequired": true} + +] \ No newline at end of file diff --git a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/intersection/oozie_app/workflow.xml b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/intersection/oozie_app/workflow.xml index bf91958cfa..e35f88abd5 100644 --- a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/intersection/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/intersection/oozie_app/workflow.xml @@ -89,7 +89,7 @@ --dbPublicationPath${workingDirPath}/doiBoostPublicationFiltered --dbDatasetPath${workingDirPath}/crossrefDataset - --crossRefRelation/data/doiboost/input/crossref/relations + --crossRefRelation${workingDirPath}/crossrefRelation --dbaffiliationRelationPath${workingDirPath}/doiBoostPublicationAffiliation -do${workingDirPath}/doiBoostOrganization --targetPath${workingDirPath}/actionDataSet diff --git a/dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/dhp/doiboost/QueryTest.scala b/dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/dhp/doiboost/QueryTest.scala index c393f0ae9c..f23996420a 100644 --- a/dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/dhp/doiboost/QueryTest.scala +++ b/dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/dhp/doiboost/QueryTest.scala @@ -1,54 +1,45 @@ package eu.dnetlib.dhp.doiboost -import eu.dnetlib.dhp.schema.oaf.{Publication, Relation, StructuredProperty, Dataset => OafDataset} +import eu.dnetlib.dhp.schema.oaf.Project +import org.apache.spark.SparkContext import org.apache.spark.sql.functions.{col, sum} +import org.apache.hadoop.io.Text +import org.apache.spark.rdd.RDD import org.apache.spark.sql.{Dataset, Encoder, Encoders, SparkSession} - +import org.codehaus.jackson.map.ObjectMapper +import org.json4s.DefaultFormats +import org.json4s +import org.json4s.DefaultFormats +import org.json4s.JsonAST._ +import org.json4s.jackson.JsonMethods._ import scala.:: import scala.collection.JavaConverters._ class QueryTest { + def extract_payload(input:String) :String = { - def extractLicense(p:Publication):Tuple2[String,String] = { - - val tmp = p.getInstance().asScala.map(i => i.getLicense.getValue).distinct.mkString(",") - (p.getId,tmp) - } + implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats + lazy val json: json4s.JValue = parse(input) - - def hasDOI(publication: Publication, doi:String):Boolean = { + compact(render((json \ "payload"))) - val s = publication.getOriginalId.asScala.filter(i => i.equalsIgnoreCase(doi)) - - s.nonEmpty } - def hasNullHostedBy(publication: Publication):Boolean = { - publication.getInstance().asScala.exists(i => i.getHostedby == null || i.getHostedby.getValue == null) - } + + def myQuery(spark:SparkSession, sc:SparkContext): Unit = { + implicit val mapEncoderPub: Encoder[Project] = Encoders.kryo[Project] + + +// val ds:Dataset[Project] = spark.createDataset(sc.sequenceFile("", classOf[Text], classOf[Text]) +// .map(_._2.toString) +// .map(s => new ObjectMapper().readValue(s, classOf[Project]))) +// +// ds.write.saveAsTable() - def myQuery(spark:SparkSession): Unit = { - implicit val mapEncoderPub: Encoder[Publication] = Encoders.kryo[Publication] - implicit val mapEncoderDat: Encoder[OafDataset] = Encoders.kryo[OafDataset] - implicit val mapEncoderRel: Encoder[Relation] = Encoders.kryo[Relation] - - val doiboostPubs:Dataset[Publication] = spark.read.load("/data/doiboost/process/doiBoostPublicationFiltered").as[Publication] - - val relFunder: Dataset[Relation] = spark.read.format("org.apache.spark.sql.parquet").load("/data/doiboost/process/crossrefRelation").as[Relation] - - doiboostPubs.filter(p => p.getDateofacceptance != null && p.getDateofacceptance.getValue!= null && p.getDateofacceptance.getValue.length > 0 ) - - doiboostPubs.filter(p=>hasDOI(p, "10.1016/j.is.2020.101522")).collect()(0).getDescription.get(0).getValue - - - - doiboostPubs.filter(p=> hasNullHostedBy(p)).count() - - doiboostPubs.map(p=> (p.getId, p.getBestaccessright.getClassname))(Encoders.tuple(Encoders.STRING,Encoders.STRING)) } } diff --git a/dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/doiboost/crossref/CrossrefMappingTest.scala b/dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/doiboost/crossref/CrossrefMappingTest.scala index f62ac2b67c..a3bb2a4f48 100644 --- a/dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/doiboost/crossref/CrossrefMappingTest.scala +++ b/dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/doiboost/crossref/CrossrefMappingTest.scala @@ -19,8 +19,6 @@ class CrossrefMappingTest { - - @Test def testFunderRelationshipsMapping(): Unit = { val template = Source.fromInputStream(getClass.getResourceAsStream("article_funder_template.json")).mkString diff --git a/dhp-workflows/dhp-graph-mapper/pom.xml b/dhp-workflows/dhp-graph-mapper/pom.xml index a0a334e3c1..38c5c8af7e 100644 --- a/dhp-workflows/dhp-graph-mapper/pom.xml +++ b/dhp-workflows/dhp-graph-mapper/pom.xml @@ -84,6 +84,12 @@ ${project.version} + + eu.dnetlib.dhp + dhp-dedup-openaire + ${project.version} + + com.jayway.jsonpath json-path diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/ebi/EBIAggregator.scala b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/ebi/EBIAggregator.scala index d1bf39475a..90d665e0c1 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/ebi/EBIAggregator.scala +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/ebi/EBIAggregator.scala @@ -1,4 +1,5 @@ package eu.dnetlib.dhp.sx.ebi +import eu.dnetlib.dhp.oa.dedup.AuthorMerger import eu.dnetlib.dhp.schema.oaf.{Publication, Relation, Dataset => OafDataset} import eu.dnetlib.dhp.schema.scholexplorer.{DLIDataset, DLIPublication, DLIUnknown} import org.apache.spark.sql.{Encoder, Encoders} @@ -14,6 +15,7 @@ object EBIAggregator { override def reduce(b: OafDataset, a: (String, OafDataset)): OafDataset = { b.mergeFrom(a._2) + b.setAuthor(AuthorMerger.mergeAuthor(a._2.getAuthor, b.getAuthor)) if (b.getId == null) b.setId(a._2.getId) b @@ -22,6 +24,7 @@ object EBIAggregator { override def merge(wx: OafDataset, wy: OafDataset): OafDataset = { wx.mergeFrom(wy) + wx.setAuthor(AuthorMerger.mergeAuthor(wy.getAuthor, wx.getAuthor)) if(wx.getId == null && wy.getId.nonEmpty) wx.setId(wy.getId) wx @@ -35,8 +38,6 @@ object EBIAggregator { Encoders.kryo(classOf[OafDataset]) } - - def getDLIUnknownAggregator(): Aggregator[(String, DLIUnknown), DLIUnknown, DLIUnknown] = new Aggregator[(String, DLIUnknown), DLIUnknown, DLIUnknown]{ override def zero: DLIUnknown = new DLIUnknown() @@ -69,6 +70,7 @@ object EBIAggregator { override def reduce(b: DLIDataset, a: (String, DLIDataset)): DLIDataset = { b.mergeFrom(a._2) + b.setAuthor(AuthorMerger.mergeAuthor(a._2.getAuthor, b.getAuthor)) if (b.getId == null) b.setId(a._2.getId) b @@ -76,6 +78,7 @@ object EBIAggregator { override def merge(wx: DLIDataset, wy: DLIDataset): DLIDataset = { wx.mergeFrom(wy) + wx.setAuthor(AuthorMerger.mergeAuthor(wy.getAuthor, wx.getAuthor)) if(wx.getId == null && wy.getId.nonEmpty) wx.setId(wy.getId) wx @@ -96,6 +99,8 @@ object EBIAggregator { override def reduce(b: DLIPublication, a: (String, DLIPublication)): DLIPublication = { b.mergeFrom(a._2) + b.setAuthor(AuthorMerger.mergeAuthor(a._2.getAuthor, b.getAuthor)) + if (b.getId == null) b.setId(a._2.getId) b @@ -104,6 +109,7 @@ object EBIAggregator { override def merge(wx: DLIPublication, wy: DLIPublication): DLIPublication = { wx.mergeFrom(wy) + wx.setAuthor(AuthorMerger.mergeAuthor(wy.getAuthor, wx.getAuthor)) if(wx.getId == null && wy.getId.nonEmpty) wx.setId(wy.getId) wx @@ -124,6 +130,7 @@ object EBIAggregator { override def reduce(b: Publication, a: (String, Publication)): Publication = { b.mergeFrom(a._2) + b.setAuthor(AuthorMerger.mergeAuthor(a._2.getAuthor, b.getAuthor)) if (b.getId == null) b.setId(a._2.getId) b @@ -132,6 +139,7 @@ object EBIAggregator { override def merge(wx: Publication, wy: Publication): Publication = { wx.mergeFrom(wy) + wx.setAuthor(AuthorMerger.mergeAuthor(wy.getAuthor, wx.getAuthor)) if(wx.getId == null && wy.getId.nonEmpty) wx.setId(wy.getId) wx @@ -145,7 +153,6 @@ object EBIAggregator { Encoders.kryo(classOf[Publication]) } - def getRelationAggregator(): Aggregator[(String, Relation), Relation, Relation] = new Aggregator[(String, Relation), Relation, Relation]{ override def zero: Relation = new Relation() @@ -166,10 +173,4 @@ object EBIAggregator { override def outputEncoder: Encoder[Relation] = Encoders.kryo(classOf[Relation]) } - - - - - - } diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/hive/oozie_app/lib/scripts/postprocessing.sql b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/hive/oozie_app/lib/scripts/postprocessing.sql index 778e3afd21..ea483a4a74 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/hive/oozie_app/lib/scripts/postprocessing.sql +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/hive/oozie_app/lib/scripts/postprocessing.sql @@ -1,10 +1,10 @@ DROP VIEW IF EXISTS ${hiveDbName}.result; CREATE VIEW IF NOT EXISTS ${hiveDbName}.result as - select id, dateofcollection, title, publisher, bestaccessright, datainfo, collectedfrom, pid, author, resulttype, language, country, subject, description, dateofacceptance, relevantdate, embargoenddate, resourcetype, context, externalreference, instance from ${hiveDbName}.publication p + select id, originalid, dateofcollection, title, publisher, bestaccessright, datainfo, collectedfrom, pid, author, resulttype, language, country, subject, description, dateofacceptance, relevantdate, embargoenddate, resourcetype, context, externalreference, instance from ${hiveDbName}.publication p union all - select id, dateofcollection, title, publisher, bestaccessright, datainfo, collectedfrom, pid, author, resulttype, language, country, subject, description, dateofacceptance, relevantdate, embargoenddate, resourcetype, context, externalreference, instance from ${hiveDbName}.dataset d + select id, originalid, dateofcollection, title, publisher, bestaccessright, datainfo, collectedfrom, pid, author, resulttype, language, country, subject, description, dateofacceptance, relevantdate, embargoenddate, resourcetype, context, externalreference, instance from ${hiveDbName}.dataset d union all - select id, dateofcollection, title, publisher, bestaccessright, datainfo, collectedfrom, pid, author, resulttype, language, country, subject, description, dateofacceptance, relevantdate, embargoenddate, resourcetype, context, externalreference, instance from ${hiveDbName}.software s + select id, originalid, dateofcollection, title, publisher, bestaccessright, datainfo, collectedfrom, pid, author, resulttype, language, country, subject, description, dateofacceptance, relevantdate, embargoenddate, resourcetype, context, externalreference, instance from ${hiveDbName}.software s union all - select id, dateofcollection, title, publisher, bestaccessright, datainfo, collectedfrom, pid, author, resulttype, language, country, subject, description, dateofacceptance, relevantdate, embargoenddate, resourcetype, context, externalreference, instance from ${hiveDbName}.otherresearchproduct o; + select id, originalid, dateofcollection, title, publisher, bestaccessright, datainfo, collectedfrom, pid, author, resulttype, language, country, subject, description, dateofacceptance, relevantdate, embargoenddate, resourcetype, context, externalreference, instance from ${hiveDbName}.otherresearchproduct o; diff --git a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/sx/graph/SparkScholexplorerGraphImporterTest.java b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/sx/graph/SparkScholexplorerGraphImporterTest.java index ed3b6efdcc..ce00466df0 100644 --- a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/sx/graph/SparkScholexplorerGraphImporterTest.java +++ b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/sx/graph/SparkScholexplorerGraphImporterTest.java @@ -2,4 +2,5 @@ package eu.dnetlib.dhp.sx.graph; public class SparkScholexplorerGraphImporterTest { + } diff --git a/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/sx/graph/publication.json b/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/sx/graph/publication.json new file mode 100644 index 0000000000..539dd5e62b --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/sx/graph/publication.json @@ -0,0 +1,10 @@ +{"collectedfrom":[{"key":"dli_________::datacite","value":"Datasets in Datacite","dataInfo":null}],"dataInfo":{"invisible":false,"inferred":null,"deletedbyinference":false,"trust":"0.9","inferenceprovenance":null,"provenanceaction":null},"lastupdatetimestamp":null,"id":"50|1307198540d2264d839dfd8c9a19f4a7","originalId":["10.3390/w11050916"],"pid":[{"value":"10.3390/w11050916","qualifier":{"classid":"doi","classname":"doi","schemeid":"dnet:pid_types","schemename":"dnet:pid_types"},"dataInfo":null}],"dateofcollection":"2018-10-28T00:39:04.337Z","dateoftransformation":null,"extraInfo":null,"oaiprovenance":null,"measures":null,"author":[{"fullname":"Cao, Qing","name":null,"surname":null,"rank":null,"pid":null,"affiliation":null},{"fullname":"Hao, Zhenchun","name":null,"surname":null,"rank":null,"pid":null,"affiliation":null},{"fullname":"Yuan, Feifei","name":null,"surname":null,"rank":null,"pid":null,"affiliation":null},{"fullname":"Berndtsson, Ronny","name":null,"surname":null,"rank":null,"pid":null,"affiliation":null},{"fullname":"Xu, Shijie","name":null,"surname":null,"rank":null,"pid":null,"affiliation":null},{"fullname":"Gao, Huibin","name":null,"surname":null,"rank":null,"pid":null,"affiliation":null},{"fullname":"Hao, Jie","name":null,"surname":null,"rank":null,"pid":null,"affiliation":null}],"resulttype":{"classid":"publication","classname":"publication","schemeid":"publication","schemename":"publication"},"language":null,"country":null,"subject":[],"title":[{"value":"On the Predictability of Daily Rainfall during Rainy Season over the Huaihe River Basin","qualifier":{"classid":"main title","classname":null,"schemeid":"dnet:dataCite_title","schemename":"dnet:dataCite_title"},"dataInfo":null}],"relevantdate":[{"value":"2019-05-01","qualifier":{"classid":"date","classname":"date","schemeid":"dnet::date","schemename":"dnet::date"},"dataInfo":null}],"description":[{"value":"In terms of climate change and precipitation, there is large interest in how large-scale climatic features affect regional rainfall amount and rainfall occurrence. Large-scale climate elements need to be downscaled to the regional level for hydrologic applications. Here, a new Nonhomogeneous Hidden Markov Model (NHMM) called the Bayesian-NHMM is presented for downscaling and predicting of multisite daily rainfall during rainy season over the Huaihe River Basin (HRB). The Bayesian-NHMM provides a Bayesian method for parameters estimation. The model avoids the risk to have no solutions for parameter estimation, which often occurs in the traditional NHMM that uses point estimates of parameters. The Bayesian-NHMM accurately captures seasonality and interannual variability of rainfall amount and wet days during the rainy season. The model establishes a link between large-scale meteorological characteristics and local precipitation patterns. It also provides a more stable and efficient method to estimate parameters...","dataInfo":null}],"dateofacceptance":null,"publisher":{"value":"MDPI AG","dataInfo":null},"embargoenddate":null,"source":null,"fulltext":null,"format":null,"contributor":null,"resourcetype":null,"coverage":null,"bestaccessright":null,"context":null,"externalReference":null,"instance":[{"license":null,"accessright":null,"instancetype":null,"hostedby":{"key":"openaire____::1256f046-bf1f-4afc-8b47-d0b147148b18","value":"Unknown Repository","dataInfo":null},"url":["10.3390/w11050916"],"distributionlocation":null,"collectedfrom":null,"dateofacceptance":null,"processingchargeamount":null,"processingchargecurrency":null,"refereed":null}],"journal":null,"originalObjIdentifier":"datacite____::100bb045f34ea2da81433d0b9ae3afa1","dlicollectedfrom":[{"id":"dli_________::datacite","name":"Datasets in Datacite","completionStatus":"complete","collectionMode":null}],"completionStatus":"complete"} +{"collectedfrom":[{"key":"dli_________::datacite","value":"Datasets in Datacite","dataInfo":null}],"dataInfo":{"invisible":false,"inferred":null,"deletedbyinference":false,"trust":"0.9","inferenceprovenance":null,"provenanceaction":null},"lastupdatetimestamp":null,"id":"50|1307198540d2264d839dfd8c9a19f4a7","originalId":["10.3390/w11050916"],"pid":[{"value":"10.3390/w11050916","qualifier":{"classid":"doi","classname":"doi","schemeid":"dnet:pid_types","schemename":"dnet:pid_types"},"dataInfo":null}],"dateofcollection":"2018-10-28T00:39:04.337Z","dateoftransformation":null,"extraInfo":null,"oaiprovenance":null,"measures":null,"author":[{"fullname":"Cao, Qing","name":null,"surname":null,"rank":null,"pid":null,"affiliation":null},{"fullname":"Hao, Zhenchun","name":null,"surname":null,"rank":null,"pid":null,"affiliation":null},{"fullname":"Yuan, Feifei","name":null,"surname":null,"rank":null,"pid":null,"affiliation":null},{"fullname":"Berndtsson, Ronny","name":null,"surname":null,"rank":null,"pid":null,"affiliation":null},{"fullname":"Xu, Shijie","name":null,"surname":null,"rank":null,"pid":null,"affiliation":null},{"fullname":"Gao, Huibin","name":null,"surname":null,"rank":null,"pid":null,"affiliation":null},{"fullname":"Hao, Jie","name":null,"surname":null,"rank":null,"pid":null,"affiliation":null}],"resulttype":{"classid":"publication","classname":"publication","schemeid":"publication","schemename":"publication"},"language":null,"country":null,"subject":[],"title":[{"value":"On the Predictability of Daily Rainfall during Rainy Season over the Huaihe River Basin","qualifier":{"classid":"main title","classname":null,"schemeid":"dnet:dataCite_title","schemename":"dnet:dataCite_title"},"dataInfo":null}],"relevantdate":[{"value":"2019-05-01","qualifier":{"classid":"date","classname":"date","schemeid":"dnet::date","schemename":"dnet::date"},"dataInfo":null}],"description":[{"value":"In terms of climate change and precipitation, there is large interest in how large-scale climatic features affect regional rainfall amount and rainfall occurrence. Large-scale climate elements need to be downscaled to the regional level for hydrologic applications. Here, a new Nonhomogeneous Hidden Markov Model (NHMM) called the Bayesian-NHMM is presented for downscaling and predicting of multisite daily rainfall during rainy season over the Huaihe River Basin (HRB). The Bayesian-NHMM provides a Bayesian method for parameters estimation. The model avoids the risk to have no solutions for parameter estimation, which often occurs in the traditional NHMM that uses point estimates of parameters. The Bayesian-NHMM accurately captures seasonality and interannual variability of rainfall amount and wet days during the rainy season. The model establishes a link between large-scale meteorological characteristics and local precipitation patterns. It also provides a more stable and efficient method to estimate parameters in the model. These results suggest that prediction of daily precipitation could be improved by the suggested new Bayesian-NHMM method, which can be helpful for water resources management and research on climate change.","dataInfo":null}],"dateofacceptance":null,"publisher":{"value":"MDPI AG","dataInfo":null},"embargoenddate":null,"source":null,"fulltext":null,"format":null,"contributor":null,"resourcetype":null,"coverage":null,"bestaccessright":null,"context":null,"externalReference":null,"instance":[{"license":null,"accessright":null,"instancetype":null,"hostedby":{"key":"openaire____::1256f046-bf1f-4afc-8b47-d0b147148b18","value":"Unknown Repository","dataInfo":null},"url":["10.3390/w11050916"],"distributionlocation":null,"collectedfrom":null,"dateofacceptance":null,"processingchargeamount":null,"processingchargecurrency":null,"refereed":null}],"journal":null,"originalObjIdentifier":"datacite____::100bb045f34ea2da81433d0b9ae3afa1","dlicollectedfrom":[{"id":"dli_________::datacite","name":"Datasets in Datacite","completionStatus":"complete","collectionMode":null}],"completionStatus":"complete"} +{"collectedfrom":[{"key":"dli_________::datacite","value":"Datasets in Datacite","dataInfo":null}],"dataInfo":{"invisible":false,"inferred":null,"deletedbyinference":false,"trust":"0.9","inferenceprovenance":null,"provenanceaction":null},"lastupdatetimestamp":null,"id":"50|1307198540d2264d839dfd8c9a19f4a7","originalId":["10.3390/w11050916"],"pid":[{"value":"10.3390/w11050916","qualifier":{"classid":"doi","classname":"doi","schemeid":"dnet:pid_types","schemename":"dnet:pid_types"},"dataInfo":null}],"dateofcollection":"2018-10-28T00:39:04.337Z","dateoftransformation":null,"extraInfo":null,"oaiprovenance":null,"measures":null,"author":[{"fullname":"Cao, Qing","name":null,"surname":null,"rank":null,"pid":null,"affiliation":null},{"fullname":"Hao, Zhenchun","name":null,"surname":null,"rank":null,"pid":null,"affiliation":null},{"fullname":"Yuan, Feifei","name":null,"surname":null,"rank":null,"pid":null,"affiliation":null},{"fullname":"Berndtsson, Ronny","name":null,"surname":null,"rank":null,"pid":null,"affiliation":null},{"fullname":"Xu, Shijie","name":null,"surname":null,"rank":null,"pid":null,"affiliation":null},{"fullname":"Gao, Huibin","name":null,"surname":null,"rank":null,"pid":null,"affiliation":null},{"fullname":"Hao, Jie","name":null,"surname":null,"rank":null,"pid":null,"affiliation":null}],"resulttype":{"classid":"publication","classname":"publication","schemeid":"publication","schemename":"publication"},"language":null,"country":null,"subject":[],"title":[{"value":"On the Predictability of Daily Rainfall during Rainy Season over the Huaihe River Basin","qualifier":{"classid":"main title","classname":null,"schemeid":"dnet:dataCite_title","schemename":"dnet:dataCite_title"},"dataInfo":null}],"relevantdate":[{"value":"2019-05-01","qualifier":{"classid":"date","classname":"date","schemeid":"dnet::date","schemename":"dnet::date"},"dataInfo":null}],"description":[{"value":"In terms of climate change and precipitation, there is large interest in how large-scale climatic features affect regional rainfall amount and rainfall occurrence. Large-scale climate elements need to be downscaled to the regional level for hydrologic applications. Here, a new Nonhomogeneous Hidden Markov Model (NHMM) called the Bayesian-NHMM is presented for downscaling and predicting of multisite daily rainfall during rainy season over the Huaihe River Basin (HRB). The Bayesian-NHMM provides a Bayesian method for parameters estimation. The model avoids the risk to have no solutions for parameter estimation, which often occurs in the traditional NHMM that uses point estimates of parameters. The Bayesian-NHMM accurately captures seasonality and interannual variability of rainfall amount and wet days during the rainy season. The model establishes a link between large-scale meteorological characteristics and local precipitation patterns. It also provides a more stable and efficient method to estimate parameters in the model. These results suggest that prediction of daily precipitation could be improved by the suggested new Bayesian-NHMM method, which can be helpful for water resources management and research on climate change.","dataInfo":null}],"dateofacceptance":null,"publisher":{"value":"MDPI AG","dataInfo":null},"embargoenddate":null,"source":null,"fulltext":null,"format":null,"contributor":null,"resourcetype":null,"coverage":null,"bestaccessright":null,"context":null,"externalReference":null,"instance":[{"license":null,"accessright":null,"instancetype":null,"hostedby":{"key":"openaire____::1256f046-bf1f-4afc-8b47-d0b147148b18","value":"Unknown Repository","dataInfo":null},"url":["10.3390/w11050916"],"distributionlocation":null,"collectedfrom":null,"dateofacceptance":null,"processingchargeamount":null,"processingchargecurrency":null,"refereed":null}],"journal":null,"originalObjIdentifier":"datacite____::100bb045f34ea2da81433d0b9ae3afa1","dlicollectedfrom":[{"id":"dli_________::datacite","name":"Datasets in Datacite","completionStatus":"complete","collectionMode":null}],"completionStatus":"complete"} +{"collectedfrom":[{"key":"dli_________::datacite","value":"Datasets in Datacite","dataInfo":null}],"dataInfo":{"invisible":false,"inferred":null,"deletedbyinference":false,"trust":"0.9","inferenceprovenance":null,"provenanceaction":null},"lastupdatetimestamp":null,"id":"50|1307198540d2264d839dfd8c9a19f4a7","originalId":["10.3390/w11050916"],"pid":[{"value":"10.3390/w11050916","qualifier":{"classid":"doi","classname":"doi","schemeid":"dnet:pid_types","schemename":"dnet:pid_types"},"dataInfo":null}],"dateofcollection":"2018-10-28T00:39:04.337Z","dateoftransformation":null,"extraInfo":null,"oaiprovenance":null,"measures":null,"author":[{"fullname":"Cao, Qing","name":null,"surname":null,"rank":null,"pid":null,"affiliation":null},{"fullname":"Hao, Zhenchun","name":null,"surname":null,"rank":null,"pid":null,"affiliation":null},{"fullname":"Yuan, Feifei","name":null,"surname":null,"rank":null,"pid":null,"affiliation":null},{"fullname":"Berndtsson, Ronny","name":null,"surname":null,"rank":null,"pid":null,"affiliation":null},{"fullname":"Xu, Shijie","name":null,"surname":null,"rank":null,"pid":null,"affiliation":null},{"fullname":"Gao, Huibin","name":null,"surname":null,"rank":null,"pid":null,"affiliation":null},{"fullname":"Hao, Jie","name":null,"surname":null,"rank":null,"pid":null,"affiliation":null}],"resulttype":{"classid":"publication","classname":"publication","schemeid":"publication","schemename":"publication"},"language":null,"country":null,"subject":[],"title":[{"value":"On the Predictability of Daily Rainfall during Rainy Season over the Huaihe River Basin","qualifier":{"classid":"main title","classname":null,"schemeid":"dnet:dataCite_title","schemename":"dnet:dataCite_title"},"dataInfo":null}],"relevantdate":[{"value":"2019-05-01","qualifier":{"classid":"date","classname":"date","schemeid":"dnet::date","schemename":"dnet::date"},"dataInfo":null}],"description":[{"value":"In terms of climate change and precipitation, there is large interest in how large-scale climatic features affect regional rainfall amount and rainfall occurrence. Large-scale climate elements need to be downscaled to the regional level for hydrologic applications. Here, a new Nonhomogeneous Hidden Markov Model (NHMM) called the Bayesian-NHMM is presented for downscaling and predicting of multisite daily rainfall during rainy season over the Huaihe River Basin (HRB). The Bayesian-NHMM provides a Bayesian method for parameters estimation. The model avoids the risk to have no solutions for parameter estimation, which often occurs in the traditional NHMM that uses point estimates of parameters. The Bayesian-NHMM accurately captures seasonality and interannual variability of rainfall amount and wet days during the rainy season. The model establishes a link between large-scale meteorological characteristics and local precipitation patterns. It also provides a more stable and efficient method to estimate parameters in the model. These results suggest that prediction of daily precipitation could be improved by the suggested new Bayesian-NHMM method, which can be helpful for water resources management and research on climate change.","dataInfo":null}],"dateofacceptance":null,"publisher":{"value":"MDPI AG","dataInfo":null},"embargoenddate":null,"source":null,"fulltext":null,"format":null,"contributor":null,"resourcetype":null,"coverage":null,"bestaccessright":null,"context":null,"externalReference":null,"instance":[{"license":null,"accessright":null,"instancetype":null,"hostedby":{"key":"openaire____::1256f046-bf1f-4afc-8b47-d0b147148b18","value":"Unknown Repository","dataInfo":null},"url":["10.3390/w11050916"],"distributionlocation":null,"collectedfrom":null,"dateofacceptance":null,"processingchargeamount":null,"processingchargecurrency":null,"refereed":null}],"journal":null,"originalObjIdentifier":"datacite____::100bb045f34ea2da81433d0b9ae3afa1","dlicollectedfrom":[{"id":"dli_________::datacite","name":"Datasets in Datacite","completionStatus":"complete","collectionMode":null}],"completionStatus":"complete"} +{"collectedfrom":[{"key":"dli_________::datacite","value":"Datasets in Datacite","dataInfo":null}],"dataInfo":{"invisible":false,"inferred":null,"deletedbyinference":false,"trust":"0.9","inferenceprovenance":null,"provenanceaction":null},"lastupdatetimestamp":null,"id":"50|1307198540d2264d839dfd8c9a19f4a7","originalId":["10.3390/w11050916"],"pid":[{"value":"10.3390/w11050916","qualifier":{"classid":"doi","classname":"doi","schemeid":"dnet:pid_types","schemename":"dnet:pid_types"},"dataInfo":null}],"dateofcollection":"2018-10-28T00:39:04.337Z","dateoftransformation":null,"extraInfo":null,"oaiprovenance":null,"measures":null,"author":[{"fullname":"Cao, Qing","name":null,"surname":null,"rank":null,"pid":null,"affiliation":null},{"fullname":"Hao, Zhenchun","name":null,"surname":null,"rank":null,"pid":null,"affiliation":null},{"fullname":"Yuan, Feifei","name":null,"surname":null,"rank":null,"pid":null,"affiliation":null},{"fullname":"Berndtsson, Ronny","name":null,"surname":null,"rank":null,"pid":null,"affiliation":null},{"fullname":"Xu, Shijie","name":null,"surname":null,"rank":null,"pid":null,"affiliation":null},{"fullname":"Gao, Huibin","name":null,"surname":null,"rank":null,"pid":null,"affiliation":null},{"fullname":"Hao, Jie","name":null,"surname":null,"rank":null,"pid":null,"affiliation":null}],"resulttype":{"classid":"publication","classname":"publication","schemeid":"publication","schemename":"publication"},"language":null,"country":null,"subject":[],"title":[{"value":"On the Predictability of Daily Rainfall during Rainy Season over the Huaihe River Basin","qualifier":{"classid":"main title","classname":null,"schemeid":"dnet:dataCite_title","schemename":"dnet:dataCite_title"},"dataInfo":null}],"relevantdate":[{"value":"2019-05-01","qualifier":{"classid":"date","classname":"date","schemeid":"dnet::date","schemename":"dnet::date"},"dataInfo":null}],"description":[{"value":"In terms of climate change and precipitation, there is large interest in how large-scale climatic features affect regional rainfall amount and rainfall occurrence. Large-scale climate elements need to be downscaled to the regional level for hydrologic applications. Here, a new Nonhomogeneous Hidden Markov Model (NHMM) called the Bayesian-NHMM is presented for downscaling and predicting of multisite daily rainfall during rainy season over the Huaihe River Basin (HRB). The Bayesian-NHMM provides a Bayesian method for parameters estimation. The model avoids the risk to have no solutions for parameter estimation, which often occurs in the traditional NHMM that uses point estimates of parameters. The Bayesian-NHMM accurately captures seasonality and interannual variability of rainfall amount and wet days during the rainy season. The model establishes a link between large-scale meteorological characteristics and local precipitation patterns. It also provides a more stable and efficient method to estimate parameters in the model. These results suggest that prediction of daily precipitation could be improved by the suggested new Bayesian-NHMM method, which can be helpful for water resources management and research on climate change.","dataInfo":null}],"dateofacceptance":null,"publisher":{"value":"MDPI AG","dataInfo":null},"embargoenddate":null,"source":null,"fulltext":null,"format":null,"contributor":null,"resourcetype":null,"coverage":null,"bestaccessright":null,"context":null,"externalReference":null,"instance":[{"license":null,"accessright":null,"instancetype":null,"hostedby":{"key":"openaire____::1256f046-bf1f-4afc-8b47-d0b147148b18","value":"Unknown Repository","dataInfo":null},"url":["10.3390/w11050916"],"distributionlocation":null,"collectedfrom":null,"dateofacceptance":null,"processingchargeamount":null,"processingchargecurrency":null,"refereed":null}],"journal":null,"originalObjIdentifier":"datacite____::100bb045f34ea2da81433d0b9ae3afa1","dlicollectedfrom":[{"id":"dli_________::datacite","name":"Datasets in Datacite","completionStatus":"complete","collectionMode":null}],"completionStatus":"complete"} +{"collectedfrom":[{"key":"dli_________::crossref","value":"Crossref","dataInfo":null}],"dataInfo":{"invisible":false,"inferred":null,"deletedbyinference":false,"trust":"0.9","inferenceprovenance":null,"provenanceaction":null},"lastupdatetimestamp":null,"id":"50|1307198540d2264d839dfd8c9a19f4a7","originalId":["1307198540d2264d839dfd8c9a19f4a7"],"pid":[{"value":"10.3390/w11050916","qualifier":{"classid":"doi","classname":"doi","schemeid":"dnet:pid_types","schemename":"dnet:pid_types"},"dataInfo":null}],"dateofcollection":"2020-10-04T14:16:06.105Z","dateoftransformation":null,"extraInfo":null,"oaiprovenance":null,"measures":null,"author":[{"fullname":"Cao Qing","name":null,"surname":null,"rank":null,"pid":null,"affiliation":null},{"fullname":"Hao Zhenchun","name":null,"surname":null,"rank":null,"pid":null,"affiliation":null},{"fullname":"Yuan Feifei","name":null,"surname":null,"rank":null,"pid":null,"affiliation":null},{"fullname":"Berndtsson Ronny","name":null,"surname":null,"rank":null,"pid":null,"affiliation":null},{"fullname":"Xu Shijie","name":null,"surname":null,"rank":null,"pid":null,"affiliation":null},{"fullname":"Gao Huibin","name":null,"surname":null,"rank":null,"pid":null,"affiliation":null},{"fullname":"Hao Jie","name":null,"surname":null,"rank":null,"pid":null,"affiliation":null}],"resulttype":{"classid":"publication","classname":"publication","schemeid":"publication","schemename":"publication"},"language":null,"country":null,"subject":[],"title":[{"value":"On the Predictability of Daily Rainfall during Rainy Season over the Huaihe River Basin","qualifier":{"classid":"main title","classname":null,"schemeid":"dnet:dataCite_title","schemename":"dnet:dataCite_title"},"dataInfo":null}],"relevantdate":[{"value":"2019-05-02T07:15:22Z","qualifier":{"classid":"date","classname":"date","schemeid":"dnet::date","schemename":"dnet::date"},"dataInfo":null}],"description":[{"value":null,"dataInfo":null}],"dateofacceptance":null,"publisher":{"value":"MDPI AG","dataInfo":null},"embargoenddate":null,"source":null,"fulltext":null,"format":null,"contributor":null,"resourcetype":null,"coverage":null,"bestaccessright":null,"context":null,"externalReference":null,"instance":[],"journal":null,"originalObjIdentifier":"dli_resolver::1307198540d2264d839dfd8c9a19f4a7","dlicollectedfrom":[{"id":"dli_________::crossref","name":"Crossref","completionStatus":"complete","collectionMode":"resolved"}],"completionStatus":"complete"} +{"collectedfrom":[{"key":"dli_________::crossref","value":"Crossref","dataInfo":null}],"dataInfo":{"invisible":false,"inferred":null,"deletedbyinference":false,"trust":"0.9","inferenceprovenance":null,"provenanceaction":null},"lastupdatetimestamp":null,"id":"50|1307198540d2264d839dfd8c9a19f4a7","originalId":["1307198540d2264d839dfd8c9a19f4a7"],"pid":[{"value":"10.3390/w11050916","qualifier":{"classid":"doi","classname":"doi","schemeid":"dnet:pid_types","schemename":"dnet:pid_types"},"dataInfo":null}],"dateofcollection":"2020-09-27T11:39:38.835Z","dateoftransformation":null,"extraInfo":null,"oaiprovenance":null,"measures":null,"author":[{"fullname":"Cao Qing","name":null,"surname":null,"rank":null,"pid":null,"affiliation":null},{"fullname":"Hao Zhenchun","name":null,"surname":null,"rank":null,"pid":null,"affiliation":null},{"fullname":"Yuan Feifei","name":null,"surname":null,"rank":null,"pid":null,"affiliation":null},{"fullname":"Berndtsson Ronny","name":null,"surname":null,"rank":null,"pid":null,"affiliation":null},{"fullname":"Xu Shijie","name":null,"surname":null,"rank":null,"pid":null,"affiliation":null},{"fullname":"Gao Huibin","name":null,"surname":null,"rank":null,"pid":null,"affiliation":null},{"fullname":"Hao Jie","name":null,"surname":null,"rank":null,"pid":null,"affiliation":null}],"resulttype":{"classid":"publication","classname":"publication","schemeid":"publication","schemename":"publication"},"language":null,"country":null,"subject":[],"title":[{"value":"On the Predictability of Daily Rainfall during Rainy Season over the Huaihe River Basin","qualifier":{"classid":"main title","classname":null,"schemeid":"dnet:dataCite_title","schemename":"dnet:dataCite_title"},"dataInfo":null}],"relevantdate":[{"value":"2019-05-02T07:15:22Z","qualifier":{"classid":"date","classname":"date","schemeid":"dnet::date","schemename":"dnet::date"},"dataInfo":null}],"description":[{"value":null,"dataInfo":null}],"dateofacceptance":null,"publisher":{"value":"MDPI AG","dataInfo":null},"embargoenddate":null,"source":null,"fulltext":null,"format":null,"contributor":null,"resourcetype":null,"coverage":null,"bestaccessright":null,"context":null,"externalReference":null,"instance":[],"journal":null,"originalObjIdentifier":"dli_resolver::1307198540d2264d839dfd8c9a19f4a7","dlicollectedfrom":[{"id":"dli_________::crossref","name":"Crossref","completionStatus":"complete","collectionMode":"resolved"}],"completionStatus":"complete"} +{"collectedfrom":[{"key":"dli_________::crossref","value":"Crossref","dataInfo":null}],"dataInfo":{"invisible":false,"inferred":null,"deletedbyinference":false,"trust":"0.9","inferenceprovenance":null,"provenanceaction":null},"lastupdatetimestamp":null,"id":"50|1307198540d2264d839dfd8c9a19f4a7","originalId":["1307198540d2264d839dfd8c9a19f4a7"],"pid":[{"value":"10.3390/w11050916","qualifier":{"classid":"doi","classname":"doi","schemeid":"dnet:pid_types","schemename":"dnet:pid_types"},"dataInfo":null}],"dateofcollection":"2020-08-30T11:48:49.809Z","dateoftransformation":null,"extraInfo":null,"oaiprovenance":null,"measures":null,"author":[{"fullname":"Cao Qing","name":null,"surname":null,"rank":null,"pid":null,"affiliation":null},{"fullname":"Hao Zhenchun","name":null,"surname":null,"rank":null,"pid":null,"affiliation":null},{"fullname":"Yuan Feifei","name":null,"surname":null,"rank":null,"pid":null,"affiliation":null},{"fullname":"Berndtsson Ronny","name":null,"surname":null,"rank":null,"pid":null,"affiliation":null},{"fullname":"Xu Shijie","name":null,"surname":null,"rank":null,"pid":null,"affiliation":null},{"fullname":"Gao Huibin","name":null,"surname":null,"rank":null,"pid":null,"affiliation":null},{"fullname":"Hao Jie","name":null,"surname":null,"rank":null,"pid":null,"affiliation":null}],"resulttype":{"classid":"publication","classname":"publication","schemeid":"publication","schemename":"publication"},"language":null,"country":null,"subject":[],"title":[{"value":"On the Predictability of Daily Rainfall during Rainy Season over the Huaihe River Basin","qualifier":{"classid":"main title","classname":null,"schemeid":"dnet:dataCite_title","schemename":"dnet:dataCite_title"},"dataInfo":null}],"relevantdate":[{"value":"2019-05-02T07:15:22Z","qualifier":{"classid":"date","classname":"date","schemeid":"dnet::date","schemename":"dnet::date"},"dataInfo":null}],"description":[{"value":null,"dataInfo":null}],"dateofacceptance":null,"publisher":{"value":"MDPI AG","dataInfo":null},"embargoenddate":null,"source":null,"fulltext":null,"format":null,"contributor":null,"resourcetype":null,"coverage":null,"bestaccessright":null,"context":null,"externalReference":null,"instance":[],"journal":null,"originalObjIdentifier":"dli_resolver::1307198540d2264d839dfd8c9a19f4a7","dlicollectedfrom":[{"id":"dli_________::crossref","name":"Crossref","completionStatus":"complete","collectionMode":"resolved"}],"completionStatus":"complete"} +{"collectedfrom":[{"key":"dli_________::crossref","value":"Crossref","dataInfo":null}],"dataInfo":{"invisible":false,"inferred":null,"deletedbyinference":false,"trust":"0.9","inferenceprovenance":null,"provenanceaction":null},"lastupdatetimestamp":null,"id":"50|1307198540d2264d839dfd8c9a19f4a7","originalId":["1307198540d2264d839dfd8c9a19f4a7"],"pid":[{"value":"10.3390/w11050916","qualifier":{"classid":"doi","classname":"doi","schemeid":"dnet:pid_types","schemename":"dnet:pid_types"},"dataInfo":null}],"dateofcollection":"2020-08-14T14:25:55.176Z","dateoftransformation":null,"extraInfo":null,"oaiprovenance":null,"measures":null,"author":[{"fullname":"Cao Qing","name":null,"surname":null,"rank":null,"pid":null,"affiliation":null},{"fullname":"Hao Zhenchun","name":null,"surname":null,"rank":null,"pid":null,"affiliation":null},{"fullname":"Yuan Feifei","name":null,"surname":null,"rank":null,"pid":null,"affiliation":null},{"fullname":"Berndtsson Ronny","name":null,"surname":null,"rank":null,"pid":null,"affiliation":null},{"fullname":"Xu Shijie","name":null,"surname":null,"rank":null,"pid":null,"affiliation":null},{"fullname":"Gao Huibin","name":null,"surname":null,"rank":null,"pid":null,"affiliation":null},{"fullname":"Hao Jie","name":null,"surname":null,"rank":null,"pid":null,"affiliation":null}],"resulttype":{"classid":"publication","classname":"publication","schemeid":"publication","schemename":"publication"},"language":null,"country":null,"subject":[],"title":[{"value":"On the Predictability of Daily Rainfall during Rainy Season over the Huaihe River Basin","qualifier":{"classid":"main title","classname":null,"schemeid":"dnet:dataCite_title","schemename":"dnet:dataCite_title"},"dataInfo":null}],"relevantdate":[{"value":"2019-05-02T07:15:22Z","qualifier":{"classid":"date","classname":"date","schemeid":"dnet::date","schemename":"dnet::date"},"dataInfo":null}],"description":[{"value":null,"dataInfo":null}],"dateofacceptance":null,"publisher":{"value":"MDPI AG","dataInfo":null},"embargoenddate":null,"source":null,"fulltext":null,"format":null,"contributor":null,"resourcetype":null,"coverage":null,"bestaccessright":null,"context":null,"externalReference":null,"instance":[],"journal":null,"originalObjIdentifier":"dli_resolver::1307198540d2264d839dfd8c9a19f4a7","dlicollectedfrom":[{"id":"dli_________::crossref","name":"Crossref","completionStatus":"complete","collectionMode":"resolved"}],"completionStatus":"complete"} +{"collectedfrom":[{"key":"dli_________::crossref","value":"Crossref","dataInfo":null}],"dataInfo":{"invisible":false,"inferred":null,"deletedbyinference":false,"trust":"0.9","inferenceprovenance":null,"provenanceaction":null},"lastupdatetimestamp":null,"id":"50|1307198540d2264d839dfd8c9a19f4a7","originalId":["1307198540d2264d839dfd8c9a19f4a7"],"pid":[{"value":"10.3390/w11050916","qualifier":{"classid":"doi","classname":"doi","schemeid":"dnet:pid_types","schemename":"dnet:pid_types"},"dataInfo":null}],"dateofcollection":"2020-08-09T11:35:23.526Z","dateoftransformation":null,"extraInfo":null,"oaiprovenance":null,"measures":null,"author":[{"fullname":"Cao Qing","name":null,"surname":null,"rank":null,"pid":null,"affiliation":null},{"fullname":"Hao Zhenchun","name":null,"surname":null,"rank":null,"pid":null,"affiliation":null},{"fullname":"Yuan Feifei","name":null,"surname":null,"rank":null,"pid":null,"affiliation":null},{"fullname":"Berndtsson Ronny","name":null,"surname":null,"rank":null,"pid":null,"affiliation":null},{"fullname":"Xu Shijie","name":null,"surname":null,"rank":null,"pid":null,"affiliation":null},{"fullname":"Gao Huibin","name":null,"surname":null,"rank":null,"pid":null,"affiliation":null},{"fullname":"Hao Jie","name":null,"surname":null,"rank":null,"pid":null,"affiliation":null}],"resulttype":{"classid":"publication","classname":"publication","schemeid":"publication","schemename":"publication"},"language":null,"country":null,"subject":[],"title":[{"value":"On the Predictability of Daily Rainfall during Rainy Season over the Huaihe River Basin","qualifier":{"classid":"main title","classname":null,"schemeid":"dnet:dataCite_title","schemename":"dnet:dataCite_title"},"dataInfo":null}],"relevantdate":[{"value":"2019-05-02T07:15:22Z","qualifier":{"classid":"date","classname":"date","schemeid":"dnet::date","schemename":"dnet::date"},"dataInfo":null}],"description":[{"value":null,"dataInfo":null}],"dateofacceptance":null,"publisher":{"value":"MDPI AG","dataInfo":null},"embargoenddate":null,"source":null,"fulltext":null,"format":null,"contributor":null,"resourcetype":null,"coverage":null,"bestaccessright":null,"context":null,"externalReference":null,"instance":[],"journal":null,"originalObjIdentifier":"dli_resolver::1307198540d2264d839dfd8c9a19f4a7","dlicollectedfrom":[{"id":"dli_________::crossref","name":"Crossref","completionStatus":"complete","collectionMode":"resolved"}],"completionStatus":"complete"} \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/export/SparkExportContentForOpenAire.scala b/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/export/SparkExportContentForOpenAire.scala index 165c3340b3..6c6e2c8356 100644 --- a/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/export/SparkExportContentForOpenAire.scala +++ b/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/export/SparkExportContentForOpenAire.scala @@ -32,10 +32,10 @@ object SparkExportContentForOpenAire { .master(parser.get("master")).getOrCreate() - val sc:SparkContext = spark.sparkContext - val workingPath = parser.get("workingDirPath") + implicit val dliPubEncoder: Encoder[DLIPublication] = Encoders.kryo(classOf[DLIPublication]) + implicit val dliDatEncoder: Encoder[DLIDataset] = Encoders.kryo(classOf[DLIDataset]) implicit val pubEncoder: Encoder[Publication] = Encoders.bean(classOf[Publication]) implicit val datEncoder: Encoder[OafDataset] = Encoders.bean(classOf[OafDataset]) implicit val relEncoder: Encoder[Relation] = Encoders.bean(classOf[Relation]) @@ -43,40 +43,41 @@ object SparkExportContentForOpenAire { import spark.implicits._ - val relRDD:RDD[Relation] = sc.textFile(s"$workingPath/relation_j") - .map(s => new ObjectMapper().readValue(s, classOf[Relation])) - .filter(p => p.getDataInfo.getDeletedbyinference == false) - spark.createDataset(relRDD).write.mode(SaveMode.Overwrite).save(s"$workingPath/relationDS") + val dsRel = spark.read.load(s"$workingPath/relation_b").as[Relation] + dsRel.filter(r => r.getDataInfo==null || r.getDataInfo.getDeletedbyinference ==false).write.mode(SaveMode.Overwrite).save(s"$workingPath/export/relationDS") - val datRDD:RDD[OafDataset] = sc.textFile(s"$workingPath/dataset") - .map(s => new ObjectMapper().readValue(s, classOf[DLIDataset])) + + val dsPubs = spark.read.load(s"$workingPath/publication").as[DLIPublication] + dsPubs + .filter(p=>p.getDataInfo.getDeletedbyinference == false) + .map(DLIToOAF.convertDLIPublicationToOAF) + .filter(p=>p!= null) + .write.mode(SaveMode.Overwrite).save(s"$workingPath/export/publicationDS") + + + val dsDataset = spark.read.load(s"$workingPath/dataset").as[DLIDataset] + dsDataset .filter(p => p.getDataInfo.getDeletedbyinference == false) .map(DLIToOAF.convertDLIDatasetTOOAF).filter(p=>p!= null) - spark.createDataset(datRDD).write.mode(SaveMode.Overwrite).save(s"$workingPath/datasetDS") - - - val pubRDD:RDD[Publication] = sc.textFile(s"$workingPath/publication") - .map(s => new ObjectMapper().readValue(s, classOf[DLIPublication])) - .filter(p => p.getDataInfo.getDeletedbyinference == false) - .map(DLIToOAF.convertDLIPublicationToOAF).filter(p=>p!= null) - spark.createDataset(pubRDD).write.mode(SaveMode.Overwrite).save(s"$workingPath/publicationDS") + .write.mode(SaveMode.Overwrite).save(s"$workingPath/export/datasetDS") - val pubs:Dataset[Publication] = spark.read.load(s"$workingPath/publicationDS").as[Publication] - val dats :Dataset[OafDataset] = spark.read.load(s"$workingPath/datasetDS").as[OafDataset] - val relDS1 :Dataset[Relation] = spark.read.load(s"$workingPath/relationDS").as[Relation] + + val pubs:Dataset[Publication] = spark.read.load(s"$workingPath/export/publicationDS").as[Publication] + val dats :Dataset[OafDataset] = spark.read.load(s"$workingPath/export/datasetDS").as[OafDataset] + val relDS1 :Dataset[Relation] = spark.read.load(s"$workingPath/export/relationDS").as[Relation] val pub_id = pubs.select("id").distinct() val dat_id = dats.select("id").distinct() - pub_id.joinWith(relDS1, pub_id("id").equalTo(relDS1("source"))).map(k => k._2).write.mode(SaveMode.Overwrite).save(s"$workingPath/relationDS_f1") + pub_id.joinWith(relDS1, pub_id("id").equalTo(relDS1("source"))).map(k => k._2).write.mode(SaveMode.Overwrite).save(s"$workingPath/export/relationDS_f1") - val relDS2= spark.read.load(s"$workingPath/relationDS_f1").as[Relation] + val relDS2= spark.read.load(s"$workingPath/export/relationDS_f1").as[Relation] - relDS2.joinWith(dat_id, relDS2("target").equalTo(dats("id"))).map(k => k._1).write.mode(SaveMode.Overwrite).save(s"$workingPath/relationDS_filtered") + relDS2.joinWith(dat_id, relDS2("target").equalTo(dats("id"))).map(k => k._1).write.mode(SaveMode.Overwrite).save(s"$workingPath/export/relationDS_filtered") val r_source = relDS2.select(relDS2("source")).distinct() @@ -87,22 +88,20 @@ object SparkExportContentForOpenAire { pubs.joinWith(r_source, pubs("id").equalTo(r_source("source")), "inner").map(k => k._1) .withColumn("row",row_number.over(w2)).where($"row" === 1).drop("row") - .write.mode(SaveMode.Overwrite).save(s"$workingPath/publicationDS_filtered") + .write.mode(SaveMode.Overwrite).save(s"$workingPath/export/publicationDS_filtered") dats.joinWith(r_target, dats("id").equalTo(r_target("target")), "inner").map(k => k._1) .withColumn("row",row_number.over(w2)).where($"row" === 1).drop("row") - .write.mode(SaveMode.Overwrite).save(s"$workingPath/datasetAS") + .write.mode(SaveMode.Overwrite).save(s"$workingPath/export/datasetAS") - spark.createDataset(sc.textFile(s"$workingPath/dataset") - .map(s => new ObjectMapper().readValue(s, classOf[DLIDataset])) - .map(DLIToOAF.convertDLIDatasetToExternalReference) - .filter(p => p != null)).as[DLIExternalReference].write.mode(SaveMode.Overwrite).save(s"$workingPath/externalReference") - val pf = spark.read.load(s"$workingPath/publicationDS_filtered").select("id") - val relDS3 = spark.read.load(s"$workingPath/relationDS").as[Relation] + dsDataset.map(DLIToOAF.convertDLIDatasetToExternalReference).filter(p => p != null).write.mode(SaveMode.Overwrite).save(s"$workingPath/export/externalReference") + + val pf = spark.read.load(s"$workingPath/export/publicationDS_filtered").select("id") + val relDS3 = spark.read.load(s"$workingPath/export/relationDS").as[Relation] val relationTo = pf.joinWith(relDS3, pf("id").equalTo(relDS3("source")),"inner").map(t =>t._2) - val extRef = spark.read.load(s"$workingPath/externalReference").as[DLIExternalReference] + val extRef = spark.read.load(s"$workingPath/export/externalReference").as[DLIExternalReference] spark.createDataset(relationTo.joinWith(extRef, relationTo("target").equalTo(extRef("id")), "inner").map(d => { val r = d._1 @@ -112,11 +111,11 @@ object SparkExportContentForOpenAire { var dli_ext = ArrayBuffer[DLIExternalReference]() f._2.foreach(d => if (dli_ext.size < 100) dli_ext += d ) (f._1, dli_ext) - })).write.mode(SaveMode.Overwrite).save(s"$workingPath/externalReference_grouped") + })).write.mode(SaveMode.Overwrite).save(s"$workingPath/export/externalReference_grouped") - val pubf :Dataset[Publication] = spark.read.load(s"$workingPath/publicationDS_filtered").as[Publication] + val pubf :Dataset[Publication] = spark.read.load(s"$workingPath/export/publicationDS_filtered").as[Publication] - val groupedERf:Dataset[(String, List[DLIExternalReference])]= spark.read.load(s"$workingPath/externalReference_grouped").as[(String, List[DLIExternalReference])] + val groupedERf:Dataset[(String, List[DLIExternalReference])]= spark.read.load(s"$workingPath/export/externalReference_grouped").as[(String, List[DLIExternalReference])] groupedERf.joinWith(pubf,pubf("id").equalTo(groupedERf("_1"))).map(t => { @@ -128,29 +127,28 @@ object SparkExportContentForOpenAire { } else publication } - ).write.mode(SaveMode.Overwrite).save(s"$workingPath/publicationAS") + ).write.mode(SaveMode.Overwrite).save(s"$workingPath/export/publicationAS") - spark.createDataset(sc.textFile(s"$workingPath/dataset") - .map(s => new ObjectMapper().readValue(s, classOf[DLIDataset])) + dsDataset .map(DLIToOAF.convertClinicalTrial) - .filter(p => p != null)) - .write.mode(SaveMode.Overwrite).save(s"$workingPath/clinicalTrials") + .filter(p => p != null) + .write.mode(SaveMode.Overwrite).save(s"$workingPath/export/clinicalTrials") - val ct:Dataset[(String,String)] = spark.read.load(s"$workingPath/clinicalTrials").as[(String,String)] + val ct:Dataset[(String,String)] = spark.read.load(s"$workingPath/export/clinicalTrials").as[(String,String)] - val relDS= spark.read.load(s"$workingPath/relationDS_f1").as[Relation] + val relDS= spark.read.load(s"$workingPath/export/relationDS_f1").as[Relation] relDS.joinWith(ct, relDS("target").equalTo(ct("_1")), "inner") .map(k =>{ val currentRel = k._1 currentRel.setTarget(k._2._2) currentRel - }).write.mode(SaveMode.Overwrite).save(s"$workingPath/clinicalTrialsRels") + }).write.mode(SaveMode.Overwrite).save(s"$workingPath/export/clinicalTrialsRels") - val clRels:Dataset[Relation] = spark.read.load(s"$workingPath/clinicalTrialsRels").as[Relation] - val rels:Dataset[Relation] = spark.read.load(s"$workingPath/relationDS_filtered").as[Relation] + val clRels:Dataset[Relation] = spark.read.load(s"$workingPath/export/clinicalTrialsRels").as[Relation] + val rels:Dataset[Relation] = spark.read.load(s"$workingPath/export/relationDS_filtered").as[Relation] rels.union(clRels).flatMap(r => { val inverseRel = new Relation @@ -162,18 +160,18 @@ object SparkExportContentForOpenAire { inverseRel.setSubRelType(r.getSubRelType) inverseRel.setRelClass(DLIToOAF.rel_inverse(r.getRelClass)) List(r, inverseRel) - }).write.mode(SaveMode.Overwrite).save(s"$workingPath/relationAS") + }).write.mode(SaveMode.Overwrite).save(s"$workingPath/export/relationAS") - spark.read.load(s"$workingPath/publicationAS").as[Publication].map(DLIToOAF.fixInstance).write.mode(SaveMode.Overwrite).save(s"$workingPath/publicationAS_fixed") - spark.read.load(s"$workingPath/datasetAS").as[OafDataset].map(DLIToOAF.fixInstanceDataset).write.mode(SaveMode.Overwrite).save(s"$workingPath/datasetAS_fixed") + spark.read.load(s"$workingPath/export/publicationAS").as[Publication].map(DLIToOAF.fixInstance).write.mode(SaveMode.Overwrite).save(s"$workingPath/export/publicationAS_fixed") + spark.read.load(s"$workingPath/export/datasetAS").as[OafDataset].map(DLIToOAF.fixInstanceDataset).write.mode(SaveMode.Overwrite).save(s"$workingPath/export/datasetAS_fixed") - val fRels:Dataset[(String,String)] = spark.read.load(s"$workingPath/relationAS").as[Relation].map(DLIToOAF.toActionSet) - val fpubs:Dataset[(String,String)] = spark.read.load(s"$workingPath/publicationAS_fixed").as[Publication].map(DLIToOAF.toActionSet) - val fdats:Dataset[(String,String)] = spark.read.load(s"$workingPath/datasetAS_fixed").as[OafDataset].map(DLIToOAF.toActionSet) + val fRels:Dataset[(String,String)] = spark.read.load(s"$workingPath/export/relationAS").as[Relation].map(DLIToOAF.toActionSet) + val fpubs:Dataset[(String,String)] = spark.read.load(s"$workingPath/export/publicationAS_fixed").as[Publication].map(DLIToOAF.toActionSet) + val fdats:Dataset[(String,String)] = spark.read.load(s"$workingPath/export/datasetAS_fixed").as[OafDataset].map(DLIToOAF.toActionSet) - fRels.union(fpubs).union(fdats).rdd.map(s => (new Text(s._1), new Text(s._2))).saveAsHadoopFile(s"$workingPath/rawset", classOf[Text], classOf[Text], classOf[SequenceFileOutputFormat[Text,Text]], classOf[GzipCodec]) + fRels.union(fpubs).union(fdats).rdd.map(s => (new Text(s._1), new Text(s._2))).saveAsHadoopFile(s"$workingPath/export/rawset", classOf[Text], classOf[Text], classOf[SequenceFileOutputFormat[Text,Text]], classOf[GzipCodec]) }