forked from D-Net/dnet-hadoop
Merge branch 'master' of https://code-repo.d4science.org/D-Net/dnet-hadoop into orcid-no-doi
This commit is contained in:
commit
fa1855a4b8
|
@ -6,7 +6,7 @@
|
||||||
<groupId>eu.dnetlib.dhp</groupId>
|
<groupId>eu.dnetlib.dhp</groupId>
|
||||||
<artifactId>dhp</artifactId>
|
<artifactId>dhp</artifactId>
|
||||||
<version>1.2.4-SNAPSHOT</version>
|
<version>1.2.4-SNAPSHOT</version>
|
||||||
<relativePath>../</relativePath>
|
<relativePath>../pom.xml</relativePath>
|
||||||
</parent>
|
</parent>
|
||||||
|
|
||||||
<artifactId>dhp-schemas</artifactId>
|
<artifactId>dhp-schemas</artifactId>
|
||||||
|
|
|
@ -0,0 +1,24 @@
|
||||||
|
package eu.dnetlib.dhp.schema.orcid;
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
public class OrcidDOI {
|
||||||
|
private String doi;
|
||||||
|
private List<AuthorData> authors;
|
||||||
|
|
||||||
|
public String getDoi() {
|
||||||
|
return doi;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setDoi(String doi) {
|
||||||
|
this.doi = doi;
|
||||||
|
}
|
||||||
|
|
||||||
|
public List<AuthorData> getAuthors() {
|
||||||
|
return authors;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setAuthors(List<AuthorData> authors) {
|
||||||
|
this.authors = authors;
|
||||||
|
}
|
||||||
|
}
|
|
@ -200,7 +200,7 @@ case object Crossref2Oaf {
|
||||||
a.setSurname(family)
|
a.setSurname(family)
|
||||||
a.setFullname(s"$given $family")
|
a.setFullname(s"$given $family")
|
||||||
if (StringUtils.isNotBlank(orcid))
|
if (StringUtils.isNotBlank(orcid))
|
||||||
a.setPid(List(createSP(orcid, ORCID, PID_TYPES)).asJava)
|
a.setPid(List(createSP(orcid, ORCID, PID_TYPES, generateDataInfo())).asJava)
|
||||||
|
|
||||||
a
|
a
|
||||||
}
|
}
|
||||||
|
@ -248,7 +248,7 @@ case object Crossref2Oaf {
|
||||||
|
|
||||||
|
|
||||||
def snsfRule(award:String): String = {
|
def snsfRule(award:String): String = {
|
||||||
var tmp1 = StringUtils.substringAfter(award,"_")
|
val tmp1 = StringUtils.substringAfter(award,"_")
|
||||||
val tmp2 = StringUtils.substringBefore(tmp1,"/")
|
val tmp2 = StringUtils.substringBefore(tmp1,"/")
|
||||||
logger.debug(s"From $award to $tmp2")
|
logger.debug(s"From $award to $tmp2")
|
||||||
tmp2
|
tmp2
|
||||||
|
@ -294,7 +294,7 @@ case object Crossref2Oaf {
|
||||||
}
|
}
|
||||||
|
|
||||||
def getProjectId (nsPrefix:String, targetId:String):String = {
|
def getProjectId (nsPrefix:String, targetId:String):String = {
|
||||||
"40|$nsPrefix::$targetId"
|
s"40|$nsPrefix::$targetId"
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -2,6 +2,7 @@ package eu.dnetlib.doiboost.crossref
|
||||||
|
|
||||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser
|
import eu.dnetlib.dhp.application.ArgumentApplicationParser
|
||||||
import org.apache.commons.io.IOUtils
|
import org.apache.commons.io.IOUtils
|
||||||
|
import org.apache.hadoop.io.{IntWritable, Text}
|
||||||
import org.apache.spark.SparkConf
|
import org.apache.spark.SparkConf
|
||||||
import org.apache.spark.sql.expressions.Aggregator
|
import org.apache.spark.sql.expressions.Aggregator
|
||||||
import org.apache.spark.sql.{Dataset, Encoder, Encoders, SaveMode, SparkSession}
|
import org.apache.spark.sql.{Dataset, Encoder, Encoders, SaveMode, SparkSession}
|
||||||
|
@ -12,21 +13,23 @@ import org.slf4j.{Logger, LoggerFactory}
|
||||||
|
|
||||||
object CrossrefDataset {
|
object CrossrefDataset {
|
||||||
|
|
||||||
|
val logger: Logger = LoggerFactory.getLogger(SparkMapDumpIntoOAF.getClass)
|
||||||
|
|
||||||
def extractTimestamp(input:String): Long = {
|
|
||||||
|
def to_item(input:String):CrossrefDT = {
|
||||||
|
|
||||||
implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats
|
implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats
|
||||||
lazy val json: json4s.JValue = parse(input)
|
lazy val json: json4s.JValue = parse(input)
|
||||||
|
val ts:Long = (json \ "indexed" \ "timestamp").extract[Long]
|
||||||
(json\"indexed"\"timestamp").extractOrElse[Long](0)
|
val doi:String = (json \ "DOI").extract[String]
|
||||||
|
CrossrefDT(doi, input, ts)
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
def main(args: Array[String]): Unit = {
|
def main(args: Array[String]): Unit = {
|
||||||
|
|
||||||
|
|
||||||
val logger: Logger = LoggerFactory.getLogger(SparkMapDumpIntoOAF.getClass)
|
|
||||||
val conf: SparkConf = new SparkConf()
|
val conf: SparkConf = new SparkConf()
|
||||||
val parser = new ArgumentApplicationParser(IOUtils.toString(CrossrefDataset.getClass.getResourceAsStream("/eu/dnetlib/dhp/doiboost/crossref_to_dataset_params.json")))
|
val parser = new ArgumentApplicationParser(IOUtils.toString(CrossrefDataset.getClass.getResourceAsStream("/eu/dnetlib/dhp/doiboost/crossref_to_dataset_params.json")))
|
||||||
parser.parseArgument(args)
|
parser.parseArgument(args)
|
||||||
|
@ -49,9 +52,8 @@ object CrossrefDataset {
|
||||||
if (a == null)
|
if (a == null)
|
||||||
return b
|
return b
|
||||||
|
|
||||||
val tb = extractTimestamp(b.json)
|
|
||||||
val ta = extractTimestamp(a.json)
|
if(a.timestamp >b.timestamp) {
|
||||||
if(ta >tb) {
|
|
||||||
return a
|
return a
|
||||||
}
|
}
|
||||||
b
|
b
|
||||||
|
@ -63,9 +65,7 @@ object CrossrefDataset {
|
||||||
if (a == null)
|
if (a == null)
|
||||||
return b
|
return b
|
||||||
|
|
||||||
val tb = extractTimestamp(b.json)
|
if(a.timestamp >b.timestamp) {
|
||||||
val ta = extractTimestamp(a.json)
|
|
||||||
if(ta >tb) {
|
|
||||||
return a
|
return a
|
||||||
}
|
}
|
||||||
b
|
b
|
||||||
|
@ -78,15 +78,21 @@ object CrossrefDataset {
|
||||||
override def finish(reduction: CrossrefDT): CrossrefDT = reduction
|
override def finish(reduction: CrossrefDT): CrossrefDT = reduction
|
||||||
}
|
}
|
||||||
|
|
||||||
val sourcePath:String = parser.get("sourcePath")
|
val workingPath:String = parser.get("workingPath")
|
||||||
val targetPath:String = parser.get("targetPath")
|
|
||||||
|
|
||||||
val ds:Dataset[CrossrefDT] = spark.read.load(sourcePath).as[CrossrefDT]
|
|
||||||
|
|
||||||
ds.groupByKey(_.doi)
|
val main_ds:Dataset[CrossrefDT] = spark.read.load(s"$workingPath/crossref_ds").as[CrossrefDT]
|
||||||
|
|
||||||
|
|
||||||
|
val update =
|
||||||
|
spark.createDataset(spark.sparkContext.sequenceFile(s"$workingPath/index_update", classOf[IntWritable], classOf[Text])
|
||||||
|
.map(i =>CrossrefImporter.decompressBlob(i._2.toString))
|
||||||
|
.map(i =>to_item(i)))
|
||||||
|
|
||||||
|
main_ds.union(update).groupByKey(_.doi)
|
||||||
.agg(crossrefAggregator.toColumn)
|
.agg(crossrefAggregator.toColumn)
|
||||||
.map(s=>s._2)
|
.map(s=>s._2)
|
||||||
.write.mode(SaveMode.Overwrite).save(targetPath)
|
.write.mode(SaveMode.Overwrite).save(s"$workingPath/crossref_ds_updated")
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -34,85 +34,21 @@ object SparkMapDumpIntoOAF {
|
||||||
implicit val mapEncoderRelatons: Encoder[Relation] = Encoders.kryo[Relation]
|
implicit val mapEncoderRelatons: Encoder[Relation] = Encoders.kryo[Relation]
|
||||||
implicit val mapEncoderDatasets: Encoder[oaf.Dataset] = Encoders.kryo[OafDataset]
|
implicit val mapEncoderDatasets: Encoder[oaf.Dataset] = Encoders.kryo[OafDataset]
|
||||||
|
|
||||||
val sc = spark.sparkContext
|
|
||||||
val targetPath = parser.get("targetPath")
|
val targetPath = parser.get("targetPath")
|
||||||
import spark.implicits._
|
import spark.implicits._
|
||||||
|
|
||||||
|
|
||||||
spark.read.load(parser.get("sourcePath")).as[CrossrefDT]
|
spark.read.load(parser.get("sourcePath")).as[CrossrefDT]
|
||||||
.flatMap(k => Crossref2Oaf.convert(k.json))
|
.flatMap(k => Crossref2Oaf.convert(k.json))
|
||||||
.filter(o => o != null)
|
.filter(o => o != null)
|
||||||
.write.mode(SaveMode.Overwrite).save(s"$targetPath/mixObject")
|
.write.mode(SaveMode.Overwrite).save(s"$targetPath/mixObject")
|
||||||
|
|
||||||
|
|
||||||
val ds:Dataset[Oaf] = spark.read.load(s"$targetPath/mixObject").as[Oaf]
|
val ds:Dataset[Oaf] = spark.read.load(s"$targetPath/mixObject").as[Oaf]
|
||||||
|
|
||||||
ds.filter(o => o.isInstanceOf[Publication]).map(o => o.asInstanceOf[Publication]).write.save(s"$targetPath/publication")
|
ds.filter(o => o.isInstanceOf[Publication]).map(o => o.asInstanceOf[Publication]).write.mode(SaveMode.Overwrite).save(s"$targetPath/crossrefPublication")
|
||||||
|
|
||||||
ds.filter(o => o.isInstanceOf[Relation]).map(o => o.asInstanceOf[Relation]).write.save(s"$targetPath/relation")
|
ds.filter(o => o.isInstanceOf[Relation]).map(o => o.asInstanceOf[Relation]).write.mode(SaveMode.Overwrite).save(s"$targetPath/crossrefRelation")
|
||||||
|
|
||||||
ds.filter(o => o.isInstanceOf[OafDataset]).map(o => o.asInstanceOf[OafDataset]).write.save(s"$targetPath/dataset")
|
ds.filter(o => o.isInstanceOf[OafDataset]).map(o => o.asInstanceOf[OafDataset]).write.mode(SaveMode.Overwrite).save(s"$targetPath/crossrefDataset")
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
//
|
|
||||||
//
|
|
||||||
//
|
|
||||||
// sc.sequenceFile(parser.get("sourcePath"), classOf[IntWritable], classOf[Text])
|
|
||||||
// .map(k => k._2.toString).map(CrossrefImporter.decompressBlob)
|
|
||||||
// .flatMap(k => Crossref2Oaf.convert(k)).saveAsObjectFile(s"${targetPath}/mixObject")
|
|
||||||
//
|
|
||||||
// val inputRDD = sc.objectFile[Oaf](s"${targetPath}/mixObject").filter(p=> p!= null)
|
|
||||||
//
|
|
||||||
// val distinctPubs:RDD[Publication] = inputRDD.filter(k => k != null && k.isInstanceOf[Publication])
|
|
||||||
// .map(k => k.asInstanceOf[Publication]).map { p: Publication => Tuple2(p.getId, p) }.reduceByKey { case (p1: Publication, p2: Publication) =>
|
|
||||||
// var r = if (p1 == null) p2 else p1
|
|
||||||
// if (p1 != null && p2 != null) {
|
|
||||||
// if (p1.getLastupdatetimestamp != null && p2.getLastupdatetimestamp != null) {
|
|
||||||
// if (p1.getLastupdatetimestamp < p2.getLastupdatetimestamp)
|
|
||||||
// r = p2
|
|
||||||
// else
|
|
||||||
// r = p1
|
|
||||||
// } else {
|
|
||||||
// r = if (p1.getLastupdatetimestamp == null) p2 else p1
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
// r
|
|
||||||
// }.map(_._2)
|
|
||||||
//
|
|
||||||
// val pubs:Dataset[Publication] = spark.createDataset(distinctPubs)
|
|
||||||
// pubs.write.mode(SaveMode.Overwrite).save(s"${targetPath}/publication")
|
|
||||||
//
|
|
||||||
//
|
|
||||||
// val distincDatasets:RDD[OafDataset] = inputRDD.filter(k => k != null && k.isInstanceOf[OafDataset])
|
|
||||||
// .map(k => k.asInstanceOf[OafDataset]).map(p => Tuple2(p.getId, p)).reduceByKey { case (p1: OafDataset, p2: OafDataset) =>
|
|
||||||
// var r = if (p1 == null) p2 else p1
|
|
||||||
// if (p1 != null && p2 != null) {
|
|
||||||
// if (p1.getLastupdatetimestamp != null && p2.getLastupdatetimestamp != null) {
|
|
||||||
// if (p1.getLastupdatetimestamp < p2.getLastupdatetimestamp)
|
|
||||||
// r = p2
|
|
||||||
// else
|
|
||||||
// r = p1
|
|
||||||
// } else {
|
|
||||||
// r = if (p1.getLastupdatetimestamp == null) p2 else p1
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
// r
|
|
||||||
// }.map(_._2)
|
|
||||||
//
|
|
||||||
// spark.createDataset(distincDatasets).write.mode(SaveMode.Overwrite).save(s"${targetPath}/dataset")
|
|
||||||
//
|
|
||||||
//
|
|
||||||
//
|
|
||||||
// val distinctRels =inputRDD.filter(k => k != null && k.isInstanceOf[Relation])
|
|
||||||
// .map(k => k.asInstanceOf[Relation]).map(r=> (s"${r.getSource}::${r.getTarget}",r))
|
|
||||||
// .reduceByKey { case (p1: Relation, p2: Relation) =>
|
|
||||||
// if (p1 == null) p2 else p1
|
|
||||||
// }.map(_._2)
|
|
||||||
//
|
|
||||||
// val rels: Dataset[Relation] = spark.createDataset(distinctRels)
|
|
||||||
//
|
|
||||||
// rels.write.mode(SaveMode.Overwrite).save(s"${targetPath}/relations")
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -16,88 +16,86 @@
|
||||||
<name>sparkExecutorCores</name>
|
<name>sparkExecutorCores</name>
|
||||||
<description>number of cores used by single executor</description>
|
<description>number of cores used by single executor</description>
|
||||||
</property>
|
</property>
|
||||||
<!-- <property>-->
|
<property>
|
||||||
<!-- <name>timestamp</name>-->
|
<name>timestamp</name>
|
||||||
<!-- <description>Timestamp for incremental Harvesting</description>-->
|
<description>Timestamp for incremental Harvesting</description>
|
||||||
<!-- </property>-->
|
</property>
|
||||||
|
|
||||||
</parameters>
|
</parameters>
|
||||||
|
|
||||||
<start to="ExtractCrossrefToOAF"/>
|
<start to="ImportCrossRef"/>
|
||||||
|
|
||||||
|
|
||||||
<kill name="Kill">
|
<kill name="Kill">
|
||||||
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
|
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
|
||||||
</kill>
|
</kill>
|
||||||
|
|
||||||
<!-- <action name="ResetWorkingPath">-->
|
<action name="ImportCrossRef">
|
||||||
<!-- <fs>-->
|
<java>
|
||||||
<!-- <delete path='${workingPath}/input/crossref/index_dump'/>-->
|
<job-tracker>${jobTracker}</job-tracker>
|
||||||
<!--<!– <mkdir path='${workingPath}/input/crossref'/>–>-->
|
<name-node>${nameNode}</name-node>
|
||||||
<!-- </fs>-->
|
<main-class>eu.dnetlib.doiboost.crossref.CrossrefImporter</main-class>
|
||||||
<!-- <ok to="ImportCrossRef"/>-->
|
<arg>-t</arg><arg>${workingPath}/input/crossref/index_update</arg>
|
||||||
<!-- <error to="Kill"/>-->
|
<arg>-n</arg><arg>${nameNode}</arg>
|
||||||
<!-- </action>-->
|
<arg>-ts</arg><arg>${timestamp}</arg>
|
||||||
|
</java>
|
||||||
|
<ok to="GenerateDataset"/>
|
||||||
|
<error to="Kill"/>
|
||||||
|
</action>
|
||||||
|
|
||||||
|
<action name="GenerateDataset">
|
||||||
|
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||||
|
<master>yarn-cluster</master>
|
||||||
|
<mode>cluster</mode>
|
||||||
|
<name>ExtractCrossrefToOAF</name>
|
||||||
|
<class>eu.dnetlib.doiboost.crossref.CrossrefDataset</class>
|
||||||
|
<jar>dhp-doiboost-${projectVersion}.jar</jar>
|
||||||
|
<spark-opts>
|
||||||
|
--executor-memory=${sparkExecutorMemory}
|
||||||
|
--executor-cores=${sparkExecutorCores}
|
||||||
|
--driver-memory=${sparkDriverMemory}
|
||||||
|
--conf spark.sql.shuffle.partitions=3840
|
||||||
|
${sparkExtraOPT}
|
||||||
|
</spark-opts>
|
||||||
|
<arg>--workingPath</arg><arg>/data/doiboost/input/crossref</arg>
|
||||||
|
<arg>--master</arg><arg>yarn-cluster</arg>
|
||||||
|
</spark>
|
||||||
|
<ok to="RenameDataset"/>
|
||||||
|
<error to="Kill"/>
|
||||||
|
</action>
|
||||||
|
|
||||||
|
<action name="RenameDataset">
|
||||||
|
<fs>
|
||||||
|
<delete path='${workingPath}/input/crossref/crossref_ds'/>
|
||||||
|
<move source="${workingPath}/input/crossref/crossref_ds_updated"
|
||||||
|
target="${workingPath}/input/crossref/crossref_ds"/>
|
||||||
|
</fs>
|
||||||
|
<ok to="ConvertCrossrefToOAF"/>
|
||||||
|
<error to="Kill"/>
|
||||||
|
</action>
|
||||||
|
|
||||||
|
|
||||||
|
<action name="ConvertCrossrefToOAF">
|
||||||
<!-- <action name="ImportCrossRef">-->
|
|
||||||
<!-- <java>-->
|
|
||||||
<!-- <job-tracker>${jobTracker}</job-tracker>-->
|
|
||||||
<!-- <name-node>${nameNode}</name-node>-->
|
|
||||||
<!-- <main-class>eu.dnetlib.doiboost.crossref.CrossrefImporter</main-class>-->
|
|
||||||
<!-- <arg>-t</arg><arg>${workingPath}/input/crossref/index_dump_1</arg>-->
|
|
||||||
<!-- <arg>-n</arg><arg>${nameNode}</arg>-->
|
|
||||||
<!-- <arg>-ts</arg><arg>${timestamp}</arg>-->
|
|
||||||
<!-- </java>-->
|
|
||||||
<!-- <ok to="End"/>-->
|
|
||||||
<!-- <error to="Kill"/>-->
|
|
||||||
<!-- </action>-->
|
|
||||||
|
|
||||||
|
|
||||||
<action name="ExtractCrossrefToOAF">
|
|
||||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||||
<master>yarn-cluster</master>
|
<master>yarn-cluster</master>
|
||||||
<mode>cluster</mode>
|
<mode>cluster</mode>
|
||||||
<name>ExtractCrossrefToOAF</name>
|
<name>ConvertCrossrefToOAF</name>
|
||||||
<class>eu.dnetlib.doiboost.crossref.SparkMapDumpIntoOAF</class>
|
<class>eu.dnetlib.doiboost.crossref.SparkMapDumpIntoOAF</class>
|
||||||
<jar>dhp-doiboost-${projectVersion}.jar</jar>
|
<jar>dhp-doiboost-${projectVersion}.jar</jar>
|
||||||
<spark-opts>
|
<spark-opts>
|
||||||
--executor-memory=${sparkExecutorMemory}
|
--executor-memory=${sparkExecutorMemory}
|
||||||
--executor-cores=${sparkExecutorCores}
|
--executor-cores=${sparkExecutorCores}
|
||||||
--driver-memory=${sparkDriverMemory}
|
--driver-memory=${sparkDriverMemory}
|
||||||
|
--conf spark.sql.shuffle.partitions=3840
|
||||||
${sparkExtraOPT}
|
${sparkExtraOPT}
|
||||||
</spark-opts>
|
</spark-opts>
|
||||||
<arg>--sourcePath</arg><arg>${workingPath}/input/crossref/crossref_ds</arg>
|
<arg>--sourcePath</arg><arg>${workingPath}/input/crossref/crossref_ds</arg>
|
||||||
<arg>--targetPath</arg><arg>${workingPath}/input/crossref</arg>
|
<arg>--targetPath</arg><arg>${workingPath}/process/</arg>
|
||||||
<arg>--master</arg><arg>yarn-cluster</arg>
|
<arg>--master</arg><arg>yarn-cluster</arg>
|
||||||
</spark>
|
</spark>
|
||||||
<ok to="End"/>
|
<ok to="End"/>
|
||||||
<error to="Kill"/>
|
<error to="Kill"/>
|
||||||
</action>
|
</action>
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
<!-- <action name="GenerateDataset">-->
|
|
||||||
<!-- <spark xmlns="uri:oozie:spark-action:0.2">-->
|
|
||||||
<!-- <master>yarn-cluster</master>-->
|
|
||||||
<!-- <mode>cluster</mode>-->
|
|
||||||
<!-- <name>ExtractCrossrefToOAF</name>-->
|
|
||||||
<!-- <class>eu.dnetlib.doiboost.crossref.CrossrefDataset</class>-->
|
|
||||||
<!-- <jar>dhp-doiboost-${projectVersion}.jar</jar>-->
|
|
||||||
<!-- <spark-opts>-->
|
|
||||||
<!-- --executor-memory=${sparkExecutorMemory}-->
|
|
||||||
<!-- --executor-cores=${sparkExecutorCores}-->
|
|
||||||
<!-- --driver-memory=${sparkDriverMemory}-->
|
|
||||||
<!-- ${sparkExtraOPT}-->
|
|
||||||
<!-- </spark-opts>-->
|
|
||||||
<!-- <arg>--sourcePath</arg><arg>/data/doiboost/crossref/cr_dataset</arg>-->
|
|
||||||
<!-- <arg>--targetPath</arg><arg>/data/doiboost/crossref/crossrefDataset</arg>-->
|
|
||||||
<!-- <arg>--master</arg><arg>yarn-cluster</arg>-->
|
|
||||||
<!-- </spark>-->
|
|
||||||
<!-- <ok to="End"/>-->
|
|
||||||
<!-- <error to="Kill"/>-->
|
|
||||||
<!-- </action>-->
|
|
||||||
|
|
||||||
<end name="End"/>
|
<end name="End"/>
|
||||||
</workflow-app>
|
</workflow-app>
|
|
@ -1,6 +1,5 @@
|
||||||
[
|
[
|
||||||
{"paramName":"s", "paramLongName":"sourcePath", "paramDescription": "the path of the sequencial file to read", "paramRequired": true},
|
{"paramName":"w", "paramLongName":"workingPath", "paramDescription": "the working dir path", "paramRequired": true},
|
||||||
{"paramName":"t", "paramLongName":"targetPath", "paramDescription": "the working dir path", "paramRequired": true},
|
|
||||||
{"paramName":"m", "paramLongName":"master", "paramDescription": "the master name", "paramRequired": true}
|
{"paramName":"m", "paramLongName":"master", "paramDescription": "the master name", "paramRequired": true}
|
||||||
|
|
||||||
]
|
]
|
|
@ -190,15 +190,6 @@ public class CleaningFunctions {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
final Set<String> collectedFrom = Optional
|
|
||||||
.ofNullable(r.getCollectedfrom())
|
|
||||||
.map(
|
|
||||||
c -> c
|
|
||||||
.stream()
|
|
||||||
.map(KeyValue::getKey)
|
|
||||||
.collect(Collectors.toCollection(HashSet::new)))
|
|
||||||
.orElse(new HashSet<>());
|
|
||||||
|
|
||||||
for (Author a : r.getAuthor()) {
|
for (Author a : r.getAuthor()) {
|
||||||
if (Objects.isNull(a.getPid())) {
|
if (Objects.isNull(a.getPid())) {
|
||||||
a.setPid(Lists.newArrayList());
|
a.setPid(Lists.newArrayList());
|
||||||
|
|
|
@ -0,0 +1,32 @@
|
||||||
|
<?xml version="1.0" encoding="UTF-8"?>
|
||||||
|
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||||
|
<parent>
|
||||||
|
<artifactId>dhp-workflows</artifactId>
|
||||||
|
<groupId>eu.dnetlib.dhp</groupId>
|
||||||
|
<version>1.2.4-SNAPSHOT</version>
|
||||||
|
</parent>
|
||||||
|
<modelVersion>4.0.0</modelVersion>
|
||||||
|
<artifactId>dhp-stats-promote</artifactId>
|
||||||
|
<dependencies>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.apache.spark</groupId>
|
||||||
|
<artifactId>spark-core_2.11</artifactId>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.apache.spark</groupId>
|
||||||
|
<artifactId>spark-sql_2.11</artifactId>
|
||||||
|
</dependency>
|
||||||
|
</dependencies>
|
||||||
|
<build>
|
||||||
|
<plugins>
|
||||||
|
<plugin>
|
||||||
|
<groupId>pl.project13.maven</groupId>
|
||||||
|
<artifactId>git-commit-id-plugin</artifactId>
|
||||||
|
<version>2.1.11</version>
|
||||||
|
<configuration>
|
||||||
|
<failOnNoGitDirectory>false</failOnNoGitDirectory>
|
||||||
|
</configuration>
|
||||||
|
</plugin>
|
||||||
|
</plugins>
|
||||||
|
</build>
|
||||||
|
</project>
|
|
@ -0,0 +1,34 @@
|
||||||
|
<configuration>
|
||||||
|
<property>
|
||||||
|
<name>jobTracker</name>
|
||||||
|
<value>${jobTracker}</value>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>nameNode</name>
|
||||||
|
<value>${nameNode}</value>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>oozie.use.system.libpath</name>
|
||||||
|
<value>true</value>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>oozie.action.sharelib.for.spark</name>
|
||||||
|
<value>spark2</value>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>hive_metastore_uris</name>
|
||||||
|
<value>thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083</value>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>hive_jdbc_url</name>
|
||||||
|
<value>jdbc:hive2://iis-cdh5-test-m3.ocean.icm.edu.pl:10000</value>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>oozie.wf.workflow.notification.url</name>
|
||||||
|
<value>{serviceUrl}/v1/oozieNotification/jobUpdate?jobId=$jobId%26status=$status</value>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>stats_tool_api_url</name>
|
||||||
|
<value>${stats_tool_api_url}</value>
|
||||||
|
</property>
|
||||||
|
</configuration>
|
|
@ -0,0 +1,18 @@
|
||||||
|
export PYTHON_EGG_CACHE=/home/$(whoami)/.python-eggs
|
||||||
|
export link_folder=/tmp/impala-shell-python-egg-cache-$(whoami)
|
||||||
|
if ! [ -L $link_folder ]
|
||||||
|
then
|
||||||
|
rm -Rf "$link_folder"
|
||||||
|
ln -sfn ${PYTHON_EGG_CACHE}${link_folder} ${link_folder}
|
||||||
|
fi
|
||||||
|
|
||||||
|
echo "Getting file from " $3
|
||||||
|
hdfs dfs -copyToLocal $3
|
||||||
|
|
||||||
|
echo "Running impala shell make the new database visible"
|
||||||
|
impala-shell -q "INVALIDATE METADATA;"
|
||||||
|
|
||||||
|
echo "Running impala shell to compute new table stats"
|
||||||
|
impala-shell -d $1 -f $2
|
||||||
|
echo "Impala shell finished"
|
||||||
|
rm $2
|
|
@ -0,0 +1,4 @@
|
||||||
|
#!/usr/bin/env bash
|
||||||
|
|
||||||
|
curl --request GET $1/cache/promoteCache
|
||||||
|
|
|
@ -0,0 +1,8 @@
|
||||||
|
------------------------------------------------------
|
||||||
|
------------------------------------------------------
|
||||||
|
-- Impala table statistics - Needed to make the tables
|
||||||
|
-- visible for impala
|
||||||
|
------------------------------------------------------
|
||||||
|
------------------------------------------------------
|
||||||
|
|
||||||
|
INVALIDATE METADATA ${stats_db_name};
|
|
@ -0,0 +1,207 @@
|
||||||
|
------------------------------------------------------
|
||||||
|
------------------------------------------------------
|
||||||
|
-- Shadow schema table exchange
|
||||||
|
------------------------------------------------------
|
||||||
|
------------------------------------------------------
|
||||||
|
|
||||||
|
-- Dropping old views
|
||||||
|
DROP VIEW IF EXISTS ${stats_db_production_name}.category;
|
||||||
|
DROP VIEW IF EXISTS ${stats_db_production_name}.concept;
|
||||||
|
DROP VIEW IF EXISTS ${stats_db_production_name}.context;
|
||||||
|
DROP VIEW IF EXISTS ${stats_db_production_name}.country;
|
||||||
|
DROP VIEW IF EXISTS ${stats_db_production_name}.countrygdp;
|
||||||
|
DROP VIEW IF EXISTS ${stats_db_production_name}.creation_date;
|
||||||
|
DROP VIEW IF EXISTS ${stats_db_production_name}.dataset;
|
||||||
|
DROP VIEW IF EXISTS ${stats_db_production_name}.dataset_citations;
|
||||||
|
DROP VIEW IF EXISTS ${stats_db_production_name}.dataset_classifications;
|
||||||
|
DROP VIEW IF EXISTS ${stats_db_production_name}.dataset_concepts;
|
||||||
|
DROP VIEW IF EXISTS ${stats_db_production_name}.dataset_datasources;
|
||||||
|
DROP VIEW IF EXISTS ${stats_db_production_name}.dataset_languages;
|
||||||
|
DROP VIEW IF EXISTS ${stats_db_production_name}.dataset_licenses;
|
||||||
|
DROP VIEW IF EXISTS ${stats_db_production_name}.dataset_oids;
|
||||||
|
DROP VIEW IF EXISTS ${stats_db_production_name}.dataset_pids;
|
||||||
|
DROP VIEW IF EXISTS ${stats_db_production_name}.dataset_refereed;
|
||||||
|
DROP VIEW IF EXISTS ${stats_db_production_name}.dataset_sources;
|
||||||
|
DROP VIEW IF EXISTS ${stats_db_production_name}.dataset_topics;
|
||||||
|
DROP VIEW IF EXISTS ${stats_db_production_name}.datasource;
|
||||||
|
DROP VIEW IF EXISTS ${stats_db_production_name}.datasource_languages;
|
||||||
|
DROP VIEW IF EXISTS ${stats_db_production_name}.datasource_oids;
|
||||||
|
DROP VIEW IF EXISTS ${stats_db_production_name}.datasource_organizations;
|
||||||
|
DROP VIEW IF EXISTS ${stats_db_production_name}.datasource_results;
|
||||||
|
DROP VIEW IF EXISTS ${stats_db_production_name}.datasource_sources;
|
||||||
|
DROP VIEW IF EXISTS ${stats_db_production_name}.funder;
|
||||||
|
DROP VIEW IF EXISTS ${stats_db_production_name}.fundref;
|
||||||
|
DROP VIEW IF EXISTS ${stats_db_production_name}.numbers_country;
|
||||||
|
DROP VIEW IF EXISTS ${stats_db_production_name}.organization;
|
||||||
|
DROP VIEW IF EXISTS ${stats_db_production_name}.organization_datasources;
|
||||||
|
DROP VIEW IF EXISTS ${stats_db_production_name}.organization_pids;
|
||||||
|
DROP VIEW IF EXISTS ${stats_db_production_name}.organization_projects;
|
||||||
|
DROP VIEW IF EXISTS ${stats_db_production_name}.organization_sources;
|
||||||
|
DROP VIEW IF EXISTS ${stats_db_production_name}.otherresearchproduct;
|
||||||
|
DROP VIEW IF EXISTS ${stats_db_production_name}.otherresearchproduct_citations;
|
||||||
|
DROP VIEW IF EXISTS ${stats_db_production_name}.otherresearchproduct_classifications;
|
||||||
|
DROP VIEW IF EXISTS ${stats_db_production_name}.otherresearchproduct_concepts;
|
||||||
|
DROP VIEW IF EXISTS ${stats_db_production_name}.otherresearchproduct_datasources;
|
||||||
|
DROP VIEW IF EXISTS ${stats_db_production_name}.otherresearchproduct_languages;
|
||||||
|
DROP VIEW IF EXISTS ${stats_db_production_name}.otherresearchproduct_licenses;
|
||||||
|
DROP VIEW IF EXISTS ${stats_db_production_name}.otherresearchproduct_oids;
|
||||||
|
DROP VIEW IF EXISTS ${stats_db_production_name}.otherresearchproduct_pids;
|
||||||
|
DROP VIEW IF EXISTS ${stats_db_production_name}.otherresearchproduct_refereed;
|
||||||
|
DROP VIEW IF EXISTS ${stats_db_production_name}.otherresearchproduct_sources;
|
||||||
|
DROP VIEW IF EXISTS ${stats_db_production_name}.otherresearchproduct_topics;
|
||||||
|
DROP VIEW IF EXISTS ${stats_db_production_name}.project;
|
||||||
|
DROP VIEW IF EXISTS ${stats_db_production_name}.project_oids;
|
||||||
|
DROP VIEW IF EXISTS ${stats_db_production_name}.project_organizations;
|
||||||
|
DROP VIEW IF EXISTS ${stats_db_production_name}.project_results;
|
||||||
|
DROP VIEW IF EXISTS ${stats_db_production_name}.project_resultcount;
|
||||||
|
DROP VIEW IF EXISTS ${stats_db_production_name}.project_results_publication;
|
||||||
|
DROP VIEW IF EXISTS ${stats_db_production_name}.publication;
|
||||||
|
DROP VIEW IF EXISTS ${stats_db_production_name}.publication_citations;
|
||||||
|
DROP VIEW IF EXISTS ${stats_db_production_name}.publication_classifications;
|
||||||
|
DROP VIEW IF EXISTS ${stats_db_production_name}.publication_concepts;
|
||||||
|
DROP VIEW IF EXISTS ${stats_db_production_name}.publication_datasources;
|
||||||
|
DROP VIEW IF EXISTS ${stats_db_production_name}.publication_languages;
|
||||||
|
DROP VIEW IF EXISTS ${stats_db_production_name}.publication_licenses;
|
||||||
|
DROP VIEW IF EXISTS ${stats_db_production_name}.publication_oids;
|
||||||
|
DROP VIEW IF EXISTS ${stats_db_production_name}.publication_pids;
|
||||||
|
DROP VIEW IF EXISTS ${stats_db_production_name}.publication_refereed;
|
||||||
|
DROP VIEW IF EXISTS ${stats_db_production_name}.publication_sources;
|
||||||
|
DROP VIEW IF EXISTS ${stats_db_production_name}.publication_topics;
|
||||||
|
DROP VIEW IF EXISTS ${stats_db_production_name}.result;
|
||||||
|
DROP VIEW IF EXISTS ${stats_db_production_name}.result_affiliated_country;
|
||||||
|
DROP VIEW IF EXISTS ${stats_db_production_name}.result_citations;
|
||||||
|
DROP VIEW IF EXISTS ${stats_db_production_name}.result_classifications;
|
||||||
|
DROP VIEW IF EXISTS ${stats_db_production_name}.result_concepts;
|
||||||
|
DROP VIEW IF EXISTS ${stats_db_production_name}.result_datasources;
|
||||||
|
DROP VIEW IF EXISTS ${stats_db_production_name}.result_deposited_country;
|
||||||
|
DROP VIEW IF EXISTS ${stats_db_production_name}.result_fundercount;
|
||||||
|
DROP VIEW IF EXISTS ${stats_db_production_name}.result_gold;
|
||||||
|
DROP VIEW IF EXISTS ${stats_db_production_name}.result_greenoa;
|
||||||
|
DROP VIEW IF EXISTS ${stats_db_production_name}.result_languages;
|
||||||
|
DROP VIEW IF EXISTS ${stats_db_production_name}.result_licenses;
|
||||||
|
DROP VIEW IF EXISTS ${stats_db_production_name}.result_oids;
|
||||||
|
DROP VIEW IF EXISTS ${stats_db_production_name}.result_organization;
|
||||||
|
DROP VIEW IF EXISTS ${stats_db_production_name}.result_peerreviewed;
|
||||||
|
DROP VIEW IF EXISTS ${stats_db_production_name}.result_pids;
|
||||||
|
DROP VIEW IF EXISTS ${stats_db_production_name}.result_projectcount;
|
||||||
|
DROP VIEW IF EXISTS ${stats_db_production_name}.result_projects;
|
||||||
|
DROP VIEW IF EXISTS ${stats_db_production_name}.result_refereed;
|
||||||
|
DROP VIEW IF EXISTS ${stats_db_production_name}.result_sources;
|
||||||
|
DROP VIEW IF EXISTS ${stats_db_production_name}.result_topics;
|
||||||
|
DROP VIEW IF EXISTS ${stats_db_production_name}.rndexpediture;
|
||||||
|
DROP VIEW IF EXISTS ${stats_db_production_name}.roarmap;
|
||||||
|
DROP VIEW IF EXISTS ${stats_db_production_name}.software;
|
||||||
|
DROP VIEW IF EXISTS ${stats_db_production_name}.software_citations;
|
||||||
|
DROP VIEW IF EXISTS ${stats_db_production_name}.software_classifications;
|
||||||
|
DROP VIEW IF EXISTS ${stats_db_production_name}.software_concepts;
|
||||||
|
DROP VIEW IF EXISTS ${stats_db_production_name}.software_datasources;
|
||||||
|
DROP VIEW IF EXISTS ${stats_db_production_name}.software_languages;
|
||||||
|
DROP VIEW IF EXISTS ${stats_db_production_name}.software_licenses;
|
||||||
|
DROP VIEW IF EXISTS ${stats_db_production_name}.software_oids;
|
||||||
|
DROP VIEW IF EXISTS ${stats_db_production_name}.software_pids;
|
||||||
|
DROP VIEW IF EXISTS ${stats_db_production_name}.software_refereed;
|
||||||
|
DROP VIEW IF EXISTS ${stats_db_production_name}.software_sources;
|
||||||
|
DROP VIEW IF EXISTS ${stats_db_production_name}.software_topics;
|
||||||
|
|
||||||
|
|
||||||
|
-- Creating the shadow database, in case it doesn't exist
|
||||||
|
CREATE database IF NOT EXISTS ${stats_db_production_name};
|
||||||
|
|
||||||
|
-- Creating new views
|
||||||
|
CREATE VIEW IF NOT EXISTS ${stats_db_production_name}.category AS SELECT * FROM ${stats_db_name}.category;
|
||||||
|
CREATE VIEW IF NOT EXISTS ${stats_db_production_name}.concept AS SELECT * FROM ${stats_db_name}.concept;
|
||||||
|
CREATE VIEW IF NOT EXISTS ${stats_db_production_name}.context AS SELECT * FROM ${stats_db_name}.context;
|
||||||
|
CREATE VIEW IF NOT EXISTS ${stats_db_production_name}.country AS SELECT * FROM ${stats_db_name}.country;
|
||||||
|
CREATE VIEW IF NOT EXISTS ${stats_db_production_name}.countrygdp AS SELECT * FROM ${stats_db_name}.countrygdp;
|
||||||
|
CREATE VIEW IF NOT EXISTS ${stats_db_production_name}.creation_date AS SELECT * FROM ${stats_db_name}.creation_date;
|
||||||
|
CREATE VIEW IF NOT EXISTS ${stats_db_production_name}.dataset AS SELECT * FROM ${stats_db_name}.dataset;
|
||||||
|
CREATE VIEW IF NOT EXISTS ${stats_db_production_name}.dataset_citations AS SELECT * FROM ${stats_db_name}.dataset_citations;
|
||||||
|
CREATE VIEW IF NOT EXISTS ${stats_db_production_name}.dataset_classifications AS SELECT * FROM ${stats_db_name}.dataset_classifications;
|
||||||
|
CREATE VIEW IF NOT EXISTS ${stats_db_production_name}.dataset_concepts AS SELECT * FROM ${stats_db_name}.dataset_concepts;
|
||||||
|
CREATE VIEW IF NOT EXISTS ${stats_db_production_name}.dataset_datasources AS SELECT * FROM ${stats_db_name}.dataset_datasources;
|
||||||
|
CREATE VIEW IF NOT EXISTS ${stats_db_production_name}.dataset_languages AS SELECT * FROM ${stats_db_name}.dataset_languages;
|
||||||
|
CREATE VIEW IF NOT EXISTS ${stats_db_production_name}.dataset_licenses AS SELECT * FROM ${stats_db_name}.dataset_licenses;
|
||||||
|
CREATE VIEW IF NOT EXISTS ${stats_db_production_name}.dataset_oids AS SELECT * FROM ${stats_db_name}.dataset_oids;
|
||||||
|
CREATE VIEW IF NOT EXISTS ${stats_db_production_name}.dataset_pids AS SELECT * FROM ${stats_db_name}.dataset_pids;
|
||||||
|
CREATE VIEW IF NOT EXISTS ${stats_db_production_name}.dataset_refereed AS SELECT * FROM ${stats_db_name}.dataset_refereed;
|
||||||
|
CREATE VIEW IF NOT EXISTS ${stats_db_production_name}.dataset_sources AS SELECT * FROM ${stats_db_name}.dataset_sources;
|
||||||
|
CREATE VIEW IF NOT EXISTS ${stats_db_production_name}.dataset_topics AS SELECT * FROM ${stats_db_name}.dataset_topics;
|
||||||
|
CREATE VIEW IF NOT EXISTS ${stats_db_production_name}.datasource AS SELECT * FROM ${stats_db_name}.datasource;
|
||||||
|
CREATE VIEW IF NOT EXISTS ${stats_db_production_name}.datasource_languages AS SELECT * FROM ${stats_db_name}.datasource_languages;
|
||||||
|
CREATE VIEW IF NOT EXISTS ${stats_db_production_name}.datasource_oids AS SELECT * FROM ${stats_db_name}.datasource_oids;
|
||||||
|
CREATE VIEW IF NOT EXISTS ${stats_db_production_name}.datasource_organizations AS SELECT * FROM ${stats_db_name}.datasource_organizations;
|
||||||
|
CREATE VIEW IF NOT EXISTS ${stats_db_production_name}.datasource_results AS SELECT * FROM ${stats_db_name}.datasource_results;
|
||||||
|
CREATE VIEW IF NOT EXISTS ${stats_db_production_name}.datasource_sources AS SELECT * FROM ${stats_db_name}.datasource_sources;
|
||||||
|
CREATE VIEW IF NOT EXISTS ${stats_db_production_name}.funder AS SELECT * FROM ${stats_db_name}.funder;
|
||||||
|
CREATE VIEW IF NOT EXISTS ${stats_db_production_name}.fundref AS SELECT * FROM ${stats_db_name}.fundref;
|
||||||
|
CREATE VIEW IF NOT EXISTS ${stats_db_production_name}.numbers_country AS SELECT * FROM ${stats_db_name}.numbers_country;
|
||||||
|
CREATE VIEW IF NOT EXISTS ${stats_db_production_name}.organization AS SELECT * FROM ${stats_db_name}.organization;
|
||||||
|
CREATE VIEW IF NOT EXISTS ${stats_db_production_name}.organization_datasources AS SELECT * FROM ${stats_db_name}.organization_datasources;
|
||||||
|
CREATE VIEW IF NOT EXISTS ${stats_db_production_name}.organization_pids AS SELECT * FROM ${stats_db_name}.organization_pids;
|
||||||
|
CREATE VIEW IF NOT EXISTS ${stats_db_production_name}.organization_projects AS SELECT * FROM ${stats_db_name}.organization_projects;
|
||||||
|
CREATE VIEW IF NOT EXISTS ${stats_db_production_name}.organization_sources AS SELECT * FROM ${stats_db_name}.organization_sources;
|
||||||
|
CREATE VIEW IF NOT EXISTS ${stats_db_production_name}.otherresearchproduct AS SELECT * FROM ${stats_db_name}.otherresearchproduct;
|
||||||
|
CREATE VIEW IF NOT EXISTS ${stats_db_production_name}.otherresearchproduct_citations AS SELECT * FROM ${stats_db_name}.otherresearchproduct_citations;
|
||||||
|
CREATE VIEW IF NOT EXISTS ${stats_db_production_name}.otherresearchproduct_classifications AS SELECT * FROM ${stats_db_name}.otherresearchproduct_classifications;
|
||||||
|
CREATE VIEW IF NOT EXISTS ${stats_db_production_name}.otherresearchproduct_concepts AS SELECT * FROM ${stats_db_name}.otherresearchproduct_concepts;
|
||||||
|
CREATE VIEW IF NOT EXISTS ${stats_db_production_name}.otherresearchproduct_datasources AS SELECT * FROM ${stats_db_name}.otherresearchproduct_datasources;
|
||||||
|
CREATE VIEW IF NOT EXISTS ${stats_db_production_name}.otherresearchproduct_languages AS SELECT * FROM ${stats_db_name}.otherresearchproduct_languages;
|
||||||
|
CREATE VIEW IF NOT EXISTS ${stats_db_production_name}.otherresearchproduct_licenses AS SELECT * FROM ${stats_db_name}.otherresearchproduct_licenses;
|
||||||
|
CREATE VIEW IF NOT EXISTS ${stats_db_production_name}.otherresearchproduct_oids AS SELECT * FROM ${stats_db_name}.otherresearchproduct_oids;
|
||||||
|
CREATE VIEW IF NOT EXISTS ${stats_db_production_name}.otherresearchproduct_pids AS SELECT * FROM ${stats_db_name}.otherresearchproduct_pids;
|
||||||
|
CREATE VIEW IF NOT EXISTS ${stats_db_production_name}.otherresearchproduct_refereed AS SELECT * FROM ${stats_db_name}.otherresearchproduct_refereed;
|
||||||
|
CREATE VIEW IF NOT EXISTS ${stats_db_production_name}.otherresearchproduct_sources AS SELECT * FROM ${stats_db_name}.otherresearchproduct_sources;
|
||||||
|
CREATE VIEW IF NOT EXISTS ${stats_db_production_name}.otherresearchproduct_topics AS SELECT * FROM ${stats_db_name}.otherresearchproduct_topics;
|
||||||
|
CREATE VIEW IF NOT EXISTS ${stats_db_production_name}.project AS SELECT * FROM ${stats_db_name}.project;
|
||||||
|
CREATE VIEW IF NOT EXISTS ${stats_db_production_name}.project_oids AS SELECT * FROM ${stats_db_name}.project_oids;
|
||||||
|
CREATE VIEW IF NOT EXISTS ${stats_db_production_name}.project_organizations AS SELECT * FROM ${stats_db_name}.project_organizations;
|
||||||
|
CREATE VIEW IF NOT EXISTS ${stats_db_production_name}.project_results AS SELECT * FROM ${stats_db_name}.project_results;
|
||||||
|
CREATE VIEW IF NOT EXISTS ${stats_db_production_name}.project_resultcount AS SELECT * FROM ${stats_db_name}.project_resultcount;
|
||||||
|
CREATE VIEW IF NOT EXISTS ${stats_db_production_name}.project_results_publication AS SELECT * FROM ${stats_db_name}.project_results_publication;
|
||||||
|
CREATE VIEW IF NOT EXISTS ${stats_db_production_name}.publication AS SELECT * FROM ${stats_db_name}.publication;
|
||||||
|
CREATE VIEW IF NOT EXISTS ${stats_db_production_name}.publication_citations AS SELECT * FROM ${stats_db_name}.publication_citations;
|
||||||
|
CREATE VIEW IF NOT EXISTS ${stats_db_production_name}.publication_classifications AS SELECT * FROM ${stats_db_name}.publication_classifications;
|
||||||
|
CREATE VIEW IF NOT EXISTS ${stats_db_production_name}.publication_concepts AS SELECT * FROM ${stats_db_name}.publication_concepts;
|
||||||
|
CREATE VIEW IF NOT EXISTS ${stats_db_production_name}.publication_datasources AS SELECT * FROM ${stats_db_name}.publication_datasources;
|
||||||
|
CREATE VIEW IF NOT EXISTS ${stats_db_production_name}.publication_languages AS SELECT * FROM ${stats_db_name}.publication_languages;
|
||||||
|
CREATE VIEW IF NOT EXISTS ${stats_db_production_name}.publication_licenses AS SELECT * FROM ${stats_db_name}.publication_licenses;
|
||||||
|
CREATE VIEW IF NOT EXISTS ${stats_db_production_name}.publication_oids AS SELECT * FROM ${stats_db_name}.publication_oids;
|
||||||
|
CREATE VIEW IF NOT EXISTS ${stats_db_production_name}.publication_pids AS SELECT * FROM ${stats_db_name}.publication_pids;
|
||||||
|
CREATE VIEW IF NOT EXISTS ${stats_db_production_name}.publication_refereed AS SELECT * FROM ${stats_db_name}.publication_refereed;
|
||||||
|
CREATE VIEW IF NOT EXISTS ${stats_db_production_name}.publication_sources AS SELECT * FROM ${stats_db_name}.publication_sources;
|
||||||
|
CREATE VIEW IF NOT EXISTS ${stats_db_production_name}.publication_topics AS SELECT * FROM ${stats_db_name}.publication_topics;
|
||||||
|
CREATE VIEW IF NOT EXISTS ${stats_db_production_name}.result AS SELECT * FROM ${stats_db_name}.result;
|
||||||
|
CREATE VIEW IF NOT EXISTS ${stats_db_production_name}.result_affiliated_country AS SELECT * FROM ${stats_db_name}.result_affiliated_country;
|
||||||
|
CREATE VIEW IF NOT EXISTS ${stats_db_production_name}.result_citations AS SELECT * FROM ${stats_db_name}.result_citations;
|
||||||
|
CREATE VIEW IF NOT EXISTS ${stats_db_production_name}.result_classifications AS SELECT * FROM ${stats_db_name}.result_classifications;
|
||||||
|
CREATE VIEW IF NOT EXISTS ${stats_db_production_name}.result_concepts AS SELECT * FROM ${stats_db_name}.result_concepts;
|
||||||
|
CREATE VIEW IF NOT EXISTS ${stats_db_production_name}.result_datasources AS SELECT * FROM ${stats_db_name}.result_datasources;
|
||||||
|
CREATE VIEW IF NOT EXISTS ${stats_db_production_name}.result_deposited_country AS SELECT * FROM ${stats_db_name}.result_deposited_country;
|
||||||
|
CREATE VIEW IF NOT EXISTS ${stats_db_production_name}.result_fundercount AS SELECT * FROM ${stats_db_name}.result_fundercount;
|
||||||
|
CREATE VIEW IF NOT EXISTS ${stats_db_production_name}.result_gold AS SELECT * FROM ${stats_db_name}.result_gold;
|
||||||
|
CREATE VIEW IF NOT EXISTS ${stats_db_production_name}.result_greenoa AS SELECT * FROM ${stats_db_name}.result_greenoa;
|
||||||
|
CREATE VIEW IF NOT EXISTS ${stats_db_production_name}.result_languages AS SELECT * FROM ${stats_db_name}.result_languages;
|
||||||
|
CREATE VIEW IF NOT EXISTS ${stats_db_production_name}.result_licenses AS SELECT * FROM ${stats_db_name}.result_licenses;
|
||||||
|
CREATE VIEW IF NOT EXISTS ${stats_db_production_name}.result_oids AS SELECT * FROM ${stats_db_name}.result_oids;
|
||||||
|
CREATE VIEW IF NOT EXISTS ${stats_db_production_name}.result_organization AS SELECT * FROM ${stats_db_name}.result_organization;
|
||||||
|
CREATE VIEW IF NOT EXISTS ${stats_db_production_name}.result_peerreviewed AS SELECT * FROM ${stats_db_name}.result_peerreviewed;
|
||||||
|
CREATE VIEW IF NOT EXISTS ${stats_db_production_name}.result_pids AS SELECT * FROM ${stats_db_name}.result_pids;
|
||||||
|
CREATE VIEW IF NOT EXISTS ${stats_db_production_name}.result_projectcount AS SELECT * FROM ${stats_db_name}.result_projectcount;
|
||||||
|
CREATE VIEW IF NOT EXISTS ${stats_db_production_name}.result_projects AS SELECT * FROM ${stats_db_name}.result_projects;
|
||||||
|
CREATE VIEW IF NOT EXISTS ${stats_db_production_name}.result_refereed AS SELECT * FROM ${stats_db_name}.result_refereed;
|
||||||
|
CREATE VIEW IF NOT EXISTS ${stats_db_production_name}.result_sources AS SELECT * FROM ${stats_db_name}.result_sources;
|
||||||
|
CREATE VIEW IF NOT EXISTS ${stats_db_production_name}.result_topics AS SELECT * FROM ${stats_db_name}.result_topics;
|
||||||
|
CREATE VIEW IF NOT EXISTS ${stats_db_production_name}.rndexpediture AS SELECT * FROM ${stats_db_name}.rndexpediture;
|
||||||
|
CREATE VIEW IF NOT EXISTS ${stats_db_production_name}.roarmap AS SELECT * FROM ${stats_db_name}.roarmap;
|
||||||
|
CREATE VIEW IF NOT EXISTS ${stats_db_production_name}.software AS SELECT * FROM ${stats_db_name}.software;
|
||||||
|
CREATE VIEW IF NOT EXISTS ${stats_db_production_name}.software_citations AS SELECT * FROM ${stats_db_name}.software_citations;
|
||||||
|
CREATE VIEW IF NOT EXISTS ${stats_db_production_name}.software_classifications AS SELECT * FROM ${stats_db_name}.software_classifications;
|
||||||
|
CREATE VIEW IF NOT EXISTS ${stats_db_production_name}.software_concepts AS SELECT * FROM ${stats_db_name}.software_concepts;
|
||||||
|
CREATE VIEW IF NOT EXISTS ${stats_db_production_name}.software_datasources AS SELECT * FROM ${stats_db_name}.software_datasources;
|
||||||
|
CREATE VIEW IF NOT EXISTS ${stats_db_production_name}.software_languages AS SELECT * FROM ${stats_db_name}.software_languages;
|
||||||
|
CREATE VIEW IF NOT EXISTS ${stats_db_production_name}.software_licenses AS SELECT * FROM ${stats_db_name}.software_licenses;
|
||||||
|
CREATE VIEW IF NOT EXISTS ${stats_db_production_name}.software_oids AS SELECT * FROM ${stats_db_name}.software_oids;
|
||||||
|
CREATE VIEW IF NOT EXISTS ${stats_db_production_name}.software_pids AS SELECT * FROM ${stats_db_name}.software_pids;
|
||||||
|
CREATE VIEW IF NOT EXISTS ${stats_db_production_name}.software_refereed AS SELECT * FROM ${stats_db_name}.software_refereed;
|
||||||
|
CREATE VIEW IF NOT EXISTS ${stats_db_production_name}.software_sources AS SELECT * FROM ${stats_db_name}.software_sources;
|
||||||
|
CREATE VIEW IF NOT EXISTS ${stats_db_production_name}.software_topics AS SELECT * FROM ${stats_db_name}.software_topics;
|
|
@ -0,0 +1,87 @@
|
||||||
|
<workflow-app name="Graph Stats" xmlns="uri:oozie:workflow:0.5">
|
||||||
|
<parameters>
|
||||||
|
<property>
|
||||||
|
<name>stats_db_name</name>
|
||||||
|
<description>the target stats database name</description>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>stats_db_production_name</name>
|
||||||
|
<description>the name of the production schema</description>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>stats_tool_api_url</name>
|
||||||
|
<description>The url of the API of the stats tool. Is used to trigger the cache promote.</description>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>hive_metastore_uris</name>
|
||||||
|
<description>hive server metastore URIs</description>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>hive_jdbc_url</name>
|
||||||
|
<description>hive server jdbc url</description>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>hive_timeout</name>
|
||||||
|
<description>the time period, in seconds, after which Hive fails a transaction if a Hive client has not sent a hearbeat. The default value is 300 seconds.</description>
|
||||||
|
</property>
|
||||||
|
</parameters>
|
||||||
|
|
||||||
|
<global>
|
||||||
|
<job-tracker>${jobTracker}</job-tracker>
|
||||||
|
<name-node>${nameNode}</name-node>
|
||||||
|
<configuration>
|
||||||
|
<property>
|
||||||
|
<name>hive.metastore.uris</name>
|
||||||
|
<value>${hive_metastore_uris}</value>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>hive.txn.timeout</name>
|
||||||
|
<value>${hive_timeout}</value>
|
||||||
|
</property>
|
||||||
|
</configuration>
|
||||||
|
</global>
|
||||||
|
|
||||||
|
<start to="updateProductionViews"/>
|
||||||
|
|
||||||
|
<kill name="Kill">
|
||||||
|
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
|
||||||
|
</kill>
|
||||||
|
|
||||||
|
<action name="updateProductionViews">
|
||||||
|
<hive2 xmlns="uri:oozie:hive2-action:0.1">
|
||||||
|
<jdbc-url>${hive_jdbc_url}</jdbc-url>
|
||||||
|
<script>scripts/updateProductionViews.sql</script>
|
||||||
|
<param>stats_db_name=${stats_db_name}</param>
|
||||||
|
<param>stats_db_production_name=${stats_db_production_name}</param>
|
||||||
|
</hive2>
|
||||||
|
<ok to="computeProductionStats"/>
|
||||||
|
<error to="Kill"/>
|
||||||
|
</action>
|
||||||
|
|
||||||
|
<action name="computeProductionStats">
|
||||||
|
<shell xmlns="uri:oozie:shell-action:0.1">
|
||||||
|
<job-tracker>${jobTracker}</job-tracker>
|
||||||
|
<name-node>${nameNode}</name-node>
|
||||||
|
<exec>impala-shell.sh</exec>
|
||||||
|
<argument>${stats_db_production_name}</argument>
|
||||||
|
<argument>computeProductionStats.sql</argument>
|
||||||
|
<argument>${wf:appPath()}/scripts/computeProductionStats.sql</argument>
|
||||||
|
<file>impala-shell.sh</file>
|
||||||
|
</shell>
|
||||||
|
<ok to="promoteCache"/>
|
||||||
|
<error to="Kill"/>
|
||||||
|
</action>
|
||||||
|
|
||||||
|
<action name="promoteCache">
|
||||||
|
<shell xmlns="uri:oozie:shell-action:0.1">
|
||||||
|
<job-tracker>${jobTracker}</job-tracker>
|
||||||
|
<name-node>${nameNode}</name-node>
|
||||||
|
<exec>promoteCache.sh</exec>
|
||||||
|
<argument>${stats_tool_api_url}</argument>
|
||||||
|
<file>promoteCache.sh</file>
|
||||||
|
</shell>
|
||||||
|
<ok to="End"/>
|
||||||
|
<error to="Kill"/>
|
||||||
|
</action>
|
||||||
|
<end name="End"/>
|
||||||
|
</workflow-app>
|
|
@ -27,4 +27,8 @@
|
||||||
<name>oozie.wf.workflow.notification.url</name>
|
<name>oozie.wf.workflow.notification.url</name>
|
||||||
<value>{serviceUrl}/v1/oozieNotification/jobUpdate?jobId=$jobId%26status=$status</value>
|
<value>{serviceUrl}/v1/oozieNotification/jobUpdate?jobId=$jobId%26status=$status</value>
|
||||||
</property>
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>stats_tool_api_url</name>
|
||||||
|
<value>${stats_tool_api_url}</value>
|
||||||
|
</property>
|
||||||
</configuration>
|
</configuration>
|
|
@ -11,7 +11,7 @@ CREATE TABLE ${stats_db_name}.project_oids AS SELECT substr(p.id, 4) AS id, oids
|
||||||
|
|
||||||
-- Project_organizations Table
|
-- Project_organizations Table
|
||||||
DROP TABLE IF EXISTS ${stats_db_name}.project_organizations;
|
DROP TABLE IF EXISTS ${stats_db_name}.project_organizations;
|
||||||
CREATE TABLE ${stats_db_name}.project_organizations AS SELECT substr(r.source, 4) AS id, substr(r.target, 4) AS organization from ${openaire_db_name}.relation r WHERE r.reltype='projectOrganization';
|
CREATE TABLE ${stats_db_name}.project_organizations AS SELECT substr(r.source, 4) AS id, substr(r.target, 4) AS organization from ${openaire_db_name}.relation r WHERE r.reltype='projectOrganization' and r.datainfo.deletedbyinference=false;
|
||||||
|
|
||||||
-- Project_results Table
|
-- Project_results Table
|
||||||
DROP TABLE IF EXISTS ${stats_db_name}.project_results;
|
DROP TABLE IF EXISTS ${stats_db_name}.project_results;
|
||||||
|
|
|
@ -25,7 +25,7 @@ CREATE OR REPLACE VIEW ${stats_db_name}.result_pids AS SELECT * FROM ${stats_db_
|
||||||
CREATE OR REPLACE VIEW ${stats_db_name}.result_topics AS SELECT * FROM ${stats_db_name}.publication_topics UNION ALL SELECT * FROM ${stats_db_name}.software_topics UNION ALL SELECT * FROM ${stats_db_name}.dataset_topics UNION ALL SELECT * FROM ${stats_db_name}.otherresearchproduct_topics;
|
CREATE OR REPLACE VIEW ${stats_db_name}.result_topics AS SELECT * FROM ${stats_db_name}.publication_topics UNION ALL SELECT * FROM ${stats_db_name}.software_topics UNION ALL SELECT * FROM ${stats_db_name}.dataset_topics UNION ALL SELECT * FROM ${stats_db_name}.otherresearchproduct_topics;
|
||||||
|
|
||||||
DROP TABLE IF EXISTS ${stats_db_name}.result_organization;
|
DROP TABLE IF EXISTS ${stats_db_name}.result_organization;
|
||||||
CREATE TABLE ${stats_db_name}.result_organization AS SELECT substr(r.target, 4) AS id, substr(r.source, 4) AS organization FROM ${openaire_db_name}.relation r WHERE r.reltype='resultOrganization';
|
CREATE TABLE ${stats_db_name}.result_organization AS SELECT substr(r.target, 4) AS id, substr(r.source, 4) AS organization FROM ${openaire_db_name}.relation r WHERE r.reltype='resultOrganization' and r.datainfo.deletedbyinference=false;
|
||||||
|
|
||||||
DROP TABLE IF EXISTS ${stats_db_name}.result_projects;
|
DROP TABLE IF EXISTS ${stats_db_name}.result_projects;
|
||||||
CREATE TABLE ${stats_db_name}.result_projects AS select pr.result AS id, pr.id AS project, datediff(p.enddate, p.startdate) AS daysfromend FROM ${stats_db_name}.result r JOIN ${stats_db_name}.project_results pr ON r.id=pr.result JOIN ${stats_db_name}.project_tmp p ON p.id=pr.id;
|
CREATE TABLE ${stats_db_name}.result_projects AS select pr.result AS id, pr.id AS project, datediff(p.enddate, p.startdate) AS daysfromend FROM ${stats_db_name}.result r JOIN ${stats_db_name}.project_results pr ON r.id=pr.result JOIN ${stats_db_name}.project_tmp p ON p.id=pr.id;
|
||||||
|
|
|
@ -47,7 +47,7 @@ DROP TABLE IF EXISTS ${stats_db_name}.datasource_oids;
|
||||||
CREATE TABLE ${stats_db_name}.datasource_oids AS SELECT substr(d.id, 4) AS id, oids.ids AS oid FROM ${openaire_db_name}.datasource d LATERAL VIEW explode(d.originalid) oids AS ids;
|
CREATE TABLE ${stats_db_name}.datasource_oids AS SELECT substr(d.id, 4) AS id, oids.ids AS oid FROM ${openaire_db_name}.datasource d LATERAL VIEW explode(d.originalid) oids AS ids;
|
||||||
|
|
||||||
DROP TABLE IF EXISTS ${stats_db_name}.datasource_organizations;
|
DROP TABLE IF EXISTS ${stats_db_name}.datasource_organizations;
|
||||||
CREATE TABLE ${stats_db_name}.datasource_organizations AS SELECT substr(r.target, 4) AS id, substr(r.source, 4) AS organization FROM ${openaire_db_name}.relation r WHERE r.reltype='datasourceOrganization';
|
CREATE TABLE ${stats_db_name}.datasource_organizations AS SELECT substr(r.target, 4) AS id, substr(r.source, 4) AS organization FROM ${openaire_db_name}.relation r WHERE r.reltype='datasourceOrganization' and r.datainfo.deletedbyinference=false;
|
||||||
|
|
||||||
-- datasource sources:
|
-- datasource sources:
|
||||||
-- where the datasource info have been collected from.
|
-- where the datasource info have been collected from.
|
||||||
|
|
|
@ -0,0 +1,4 @@
|
||||||
|
#!/usr/bin/env bash
|
||||||
|
|
||||||
|
curl --request GET $1/cache/updateCache
|
||||||
|
|
|
@ -17,6 +17,10 @@
|
||||||
<name>stats_db_shadow_name</name>
|
<name>stats_db_shadow_name</name>
|
||||||
<description>the name of the shadow schema</description>
|
<description>the name of the shadow schema</description>
|
||||||
</property>
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>stats_tool_api_url</name>
|
||||||
|
<description>The url of the API of the stats tool. Is used to trigger the cache update.</description>
|
||||||
|
</property>
|
||||||
<property>
|
<property>
|
||||||
<name>hive_metastore_uris</name>
|
<name>hive_metastore_uris</name>
|
||||||
<description>hive server metastore URIs</description>
|
<description>hive server metastore URIs</description>
|
||||||
|
@ -255,7 +259,7 @@
|
||||||
<action name="Step17">
|
<action name="Step17">
|
||||||
<hive2 xmlns="uri:oozie:hive2-action:0.1">
|
<hive2 xmlns="uri:oozie:hive2-action:0.1">
|
||||||
<jdbc-url>${hive_jdbc_url}</jdbc-url>
|
<jdbc-url>${hive_jdbc_url}</jdbc-url>
|
||||||
<script>scripts/step17.sql</script>
|
<script>scripts/updateProductionViews.sql</script>
|
||||||
<param>stats_db_name=${stats_db_name}</param>
|
<param>stats_db_name=${stats_db_name}</param>
|
||||||
<param>stats_db_shadow_name=${stats_db_shadow_name}</param>
|
<param>stats_db_shadow_name=${stats_db_shadow_name}</param>
|
||||||
</hive2>
|
</hive2>
|
||||||
|
@ -283,10 +287,22 @@
|
||||||
<name-node>${nameNode}</name-node>
|
<name-node>${nameNode}</name-node>
|
||||||
<exec>impala-shell.sh</exec>
|
<exec>impala-shell.sh</exec>
|
||||||
<argument>${stats_db_shadow_name}</argument>
|
<argument>${stats_db_shadow_name}</argument>
|
||||||
<argument>step19.sql</argument>
|
<argument>computeProductionStats.sql</argument>
|
||||||
<argument>${wf:appPath()}/scripts/step19.sql</argument>
|
<argument>${wf:appPath()}/scripts/computeProductionStats.sql</argument>
|
||||||
<file>impala-shell.sh</file>
|
<file>impala-shell.sh</file>
|
||||||
</shell>
|
</shell>
|
||||||
|
<ok to="Step20"/>
|
||||||
|
<error to="Kill"/>
|
||||||
|
</action>
|
||||||
|
|
||||||
|
<action name="Step20">
|
||||||
|
<shell xmlns="uri:oozie:shell-action:0.1">
|
||||||
|
<job-tracker>${jobTracker}</job-tracker>
|
||||||
|
<name-node>${nameNode}</name-node>
|
||||||
|
<exec>updateCache.sh</exec>
|
||||||
|
<argument>${stats_tool_api_url}</argument>
|
||||||
|
<file>updateCache.sh</file>
|
||||||
|
</shell>
|
||||||
<ok to="End"/>
|
<ok to="End"/>
|
||||||
<error to="Kill"/>
|
<error to="Kill"/>
|
||||||
</action>
|
</action>
|
||||||
|
|
|
@ -7,7 +7,7 @@
|
||||||
<groupId>eu.dnetlib.dhp</groupId>
|
<groupId>eu.dnetlib.dhp</groupId>
|
||||||
<artifactId>dhp</artifactId>
|
<artifactId>dhp</artifactId>
|
||||||
<version>1.2.4-SNAPSHOT</version>
|
<version>1.2.4-SNAPSHOT</version>
|
||||||
<relativePath>../</relativePath>
|
<relativePath>../pom.xml</relativePath>
|
||||||
</parent>
|
</parent>
|
||||||
|
|
||||||
<artifactId>dhp-workflows</artifactId>
|
<artifactId>dhp-workflows</artifactId>
|
||||||
|
|
Loading…
Reference in New Issue