From 1d80c1da57a87328ccfb6dc9692e92db3f4dcb22 Mon Sep 17 00:00:00 2001 From: Giambattista Bloisi Date: Fri, 25 Oct 2024 15:09:52 +0200 Subject: [PATCH] Implement new jobs for collecting data from latest graph on hive and deltas from oaf mdstores (datacite and crossref) Optimized CopyHdfsOafSparkApplication --- .../SparkCountryPropagationJob.java | 16 +- .../raw/CopyHdfsOafSparkApplication.scala | 70 ++++---- .../raw/CopyHdfsOafSparkApplicationTest.java | 73 ++------ dhp-workflows/dhp-incremental-graph/pom.xml | 72 ++++++++ .../dhp/incremental/CollectNewOafResults.java | 161 ++++++++++++++++++ .../SparkResolveRelationById.scala | 89 ++++++++++ .../collectnewresults_input_parameters.json | 44 +++++ .../collect/oozie_app/config-default.xml | 23 +++ .../collect/oozie_app/workflow.xml | 65 +++++++ .../migrate/oozie_app/config-default.xml | 26 +++ .../migrate/oozie_app/migration.sql | 11 ++ .../migrate/oozie_app/workflow.xml | 66 +++++++ .../resolution/oozie_app/config-default.xml | 23 +++ .../resolution/oozie_app/workflow.xml | 138 +++++++++++++++ .../resolve_relationsbyid_params.json | 32 ++++ dhp-workflows/pom.xml | 1 + 16 files changed, 813 insertions(+), 97 deletions(-) create mode 100644 dhp-workflows/dhp-incremental-graph/pom.xml create mode 100644 dhp-workflows/dhp-incremental-graph/src/main/java/eu/dnetlib/dhp/incremental/CollectNewOafResults.java create mode 100644 dhp-workflows/dhp-incremental-graph/src/main/java/eu/dnetlib/dhp/incremental/SparkResolveRelationById.scala create mode 100644 dhp-workflows/dhp-incremental-graph/src/main/resources/eu/dnetlib/dhp/oa/graph/incremental/collect/collectnewresults_input_parameters.json create mode 100644 dhp-workflows/dhp-incremental-graph/src/main/resources/eu/dnetlib/dhp/oa/graph/incremental/collect/oozie_app/config-default.xml create mode 100644 dhp-workflows/dhp-incremental-graph/src/main/resources/eu/dnetlib/dhp/oa/graph/incremental/collect/oozie_app/workflow.xml create mode 100644 dhp-workflows/dhp-incremental-graph/src/main/resources/eu/dnetlib/dhp/oa/graph/incremental/migrate/oozie_app/config-default.xml create mode 100644 dhp-workflows/dhp-incremental-graph/src/main/resources/eu/dnetlib/dhp/oa/graph/incremental/migrate/oozie_app/migration.sql create mode 100644 dhp-workflows/dhp-incremental-graph/src/main/resources/eu/dnetlib/dhp/oa/graph/incremental/migrate/oozie_app/workflow.xml create mode 100644 dhp-workflows/dhp-incremental-graph/src/main/resources/eu/dnetlib/dhp/oa/graph/incremental/resolution/oozie_app/config-default.xml create mode 100644 dhp-workflows/dhp-incremental-graph/src/main/resources/eu/dnetlib/dhp/oa/graph/incremental/resolution/oozie_app/workflow.xml create mode 100644 dhp-workflows/dhp-incremental-graph/src/main/resources/eu/dnetlib/dhp/oa/graph/incremental/resolution/resolve_relationsbyid_params.json diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/countrypropagation/SparkCountryPropagationJob.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/countrypropagation/SparkCountryPropagationJob.java index 4e174fddc..936bdba1d 100644 --- a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/countrypropagation/SparkCountryPropagationJob.java +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/countrypropagation/SparkCountryPropagationJob.java @@ -12,11 +12,7 @@ import java.util.stream.Collectors; import org.apache.commons.io.IOUtils; import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.MapFunction; -import org.apache.spark.sql.Dataset; -import org.apache.spark.sql.Encoders; -import org.apache.spark.sql.Row; -import org.apache.spark.sql.SaveMode; -import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -85,14 +81,16 @@ public class SparkCountryPropagationJob { Dataset res = readPath(spark, sourcePath, resultClazz); log.info("Reading prepared info: {}", preparedInfoPath); - final Dataset preparedInfoRaw = spark + Encoder rcsEncoder = Encoders.bean(ResultCountrySet.class); + final Dataset preparedInfoRaw = spark .read() - .json(preparedInfoPath); + .schema(rcsEncoder.schema()) + .json(preparedInfoPath) + .as(rcsEncoder); if (!preparedInfoRaw.isEmpty()) { - final Dataset prepared = preparedInfoRaw.as(Encoders.bean(ResultCountrySet.class)); res - .joinWith(prepared, res.col("id").equalTo(prepared.col("resultId")), "left_outer") + .joinWith(preparedInfoRaw, res.col("id").equalTo(prepared.col("resultId")), "left_outer") .map(getCountryMergeFn(), Encoders.bean(resultClazz)) .write() .option("compression", "gzip") diff --git a/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/oa/graph/raw/CopyHdfsOafSparkApplication.scala b/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/oa/graph/raw/CopyHdfsOafSparkApplication.scala index 9d7cca7dd..1177b34f4 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/oa/graph/raw/CopyHdfsOafSparkApplication.scala +++ b/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/oa/graph/raw/CopyHdfsOafSparkApplication.scala @@ -1,12 +1,10 @@ package eu.dnetlib.dhp.oa.graph.raw -import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper} import eu.dnetlib.dhp.application.ArgumentApplicationParser import eu.dnetlib.dhp.common.HdfsSupport import eu.dnetlib.dhp.schema.common.ModelSupport -import eu.dnetlib.dhp.schema.oaf.Oaf import eu.dnetlib.dhp.utils.DHPUtils -import org.apache.spark.sql.{Encoder, Encoders, SaveMode, SparkSession} +import org.apache.spark.sql.{Encoders, SaveMode, SparkSession} import org.apache.spark.{SparkConf, SparkContext} import org.json4s.DefaultFormats import org.json4s.jackson.JsonMethods.parse @@ -54,48 +52,60 @@ object CopyHdfsOafSparkApplication { val hdfsPath = parser.get("hdfsPath") log.info("hdfsPath: {}", hdfsPath) - implicit val oafEncoder: Encoder[Oaf] = Encoders.kryo[Oaf] - val paths = DHPUtils.mdstorePaths(mdstoreManagerUrl, mdFormat, mdLayout, mdInterpretation, true).asScala val validPaths: List[String] = paths.filter(p => HdfsSupport.exists(p, sc.hadoopConfiguration)).toList - val types = ModelSupport.oafTypes.entrySet.asScala - .map(e => Tuple2(e.getKey, e.getValue)) - if (validPaths.nonEmpty) { - val oaf = spark.read.textFile(validPaths: _*) - val mapper = - new ObjectMapper().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false) + val oaf = spark.read + .textFile(validPaths: _*) + .map(v => (getOafType(v), v))(Encoders.tuple(Encoders.STRING, Encoders.STRING)) + .cache() - types.foreach(t => - oaf - .filter(o => isOafType(o, t._1)) - .map(j => mapper.readValue(j, t._2).asInstanceOf[Oaf]) - .map(s => mapper.writeValueAsString(s))(Encoders.STRING) - .write - .option("compression", "gzip") - .mode(SaveMode.Append) - .text(s"$hdfsPath/${t._1}") - ) + try { + ModelSupport.oafTypes + .keySet() + .asScala + .foreach(entity => + oaf + .filter(s"_1 = '${entity}'") + .selectExpr("_2") + .write + .option("compression", "gzip") + .mode(SaveMode.Append) + .text(s"$hdfsPath/${entity}") + ) + } finally { + oaf.unpersist() + } } } - def isOafType(input: String, oafType: String): Boolean = { + def getOafType(input: String): String = { implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats lazy val json: org.json4s.JValue = parse(input) - if (oafType == "relation") { - val hasSource = (json \ "source").extractOrElse[String](null) - val hasTarget = (json \ "target").extractOrElse[String](null) - hasSource != null && hasTarget != null + val hasId = (json \ "id").extractOrElse[String](null) + val hasSource = (json \ "source").extractOrElse[String](null) + val hasTarget = (json \ "target").extractOrElse[String](null) + + if (hasId == null && hasSource != null && hasTarget != null) { + "relation" + } else if (hasId != null) { + val oafType: String = ModelSupport.idPrefixEntity.get(hasId.substring(0, 2)) + + oafType match { + case "result" => + (json \ "resulttype" \ "classid").extractOrElse[String](null) match { + case "other" => "otherresearchproduct" + case any => any + } + case _ => oafType + } } else { - val hasId = (json \ "id").extractOrElse[String](null) - val resultType = (json \ "resulttype" \ "classid").extractOrElse[String]("") - hasId != null && oafType.startsWith(resultType) + null } - } } diff --git a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/raw/CopyHdfsOafSparkApplicationTest.java b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/raw/CopyHdfsOafSparkApplicationTest.java index 3c6fc9c7f..16407c65f 100644 --- a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/raw/CopyHdfsOafSparkApplicationTest.java +++ b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/raw/CopyHdfsOafSparkApplicationTest.java @@ -1,8 +1,8 @@ package eu.dnetlib.dhp.oa.graph.raw; -import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertTrue; +import static eu.dnetlib.dhp.oa.graph.raw.CopyHdfsOafSparkApplication.getOafType; +import static org.junit.jupiter.api.Assertions.assertEquals; import java.io.IOException; @@ -11,67 +11,24 @@ import org.junit.jupiter.api.Test; public class CopyHdfsOafSparkApplicationTest { + String getResourceAsStream(String path) throws IOException { + return IOUtils.toString(getClass().getResourceAsStream(path)); + } + @Test void testIsOafType() throws IOException { - assertTrue( - CopyHdfsOafSparkApplication - .isOafType( - IOUtils - .toString( - getClass().getResourceAsStream("/eu/dnetlib/dhp/oa/graph/raw/publication_1.json")), - "publication")); - assertTrue( - CopyHdfsOafSparkApplication - .isOafType( - IOUtils - .toString( - getClass().getResourceAsStream("/eu/dnetlib/dhp/oa/graph/raw/dataset_1.json")), - "dataset")); - assertTrue( - CopyHdfsOafSparkApplication - .isOafType( - IOUtils - .toString( - getClass().getResourceAsStream("/eu/dnetlib/dhp/oa/graph/raw/relation_1.json")), - "relation")); - - assertFalse( - CopyHdfsOafSparkApplication - .isOafType( - IOUtils - .toString( - getClass().getResourceAsStream("/eu/dnetlib/dhp/oa/graph/raw/publication_1.json")), - "dataset")); - assertFalse( - CopyHdfsOafSparkApplication - .isOafType( - IOUtils - .toString( - getClass().getResourceAsStream("/eu/dnetlib/dhp/oa/graph/raw/dataset_1.json")), - "publication")); - - assertTrue( - CopyHdfsOafSparkApplication - .isOafType( - IOUtils - .toString( - getClass() - .getResourceAsStream( - "/eu/dnetlib/dhp/oa/graph/raw/publication_2_unknownProperty.json")), - "publication")); + assertEquals("publication", getOafType(getResourceAsStream("/eu/dnetlib/dhp/oa/graph/raw/publication_1.json"))); + assertEquals("dataset", getOafType(getResourceAsStream("/eu/dnetlib/dhp/oa/graph/raw/dataset_1.json"))); + assertEquals("relation", getOafType(getResourceAsStream("/eu/dnetlib/dhp/oa/graph/raw/relation_1.json"))); + assertEquals("publication", getOafType(getResourceAsStream("/eu/dnetlib/dhp/oa/graph/raw/publication_1.json"))); + assertEquals( + "publication", + getOafType(getResourceAsStream("/eu/dnetlib/dhp/oa/graph/raw/publication_2_unknownProperty.json"))); } @Test void isOafType_Datacite_ORP() throws IOException { - assertTrue( - CopyHdfsOafSparkApplication - .isOafType( - IOUtils - .toString( - getClass() - .getResourceAsStream( - "/eu/dnetlib/dhp/oa/graph/raw/datacite_orp.json")), - "otherresearchproduct")); + assertEquals( + "otherresearchproduct", getOafType(getResourceAsStream("/eu/dnetlib/dhp/oa/graph/raw/datacite_orp.json"))); } - } diff --git a/dhp-workflows/dhp-incremental-graph/pom.xml b/dhp-workflows/dhp-incremental-graph/pom.xml new file mode 100644 index 000000000..3979da15b --- /dev/null +++ b/dhp-workflows/dhp-incremental-graph/pom.xml @@ -0,0 +1,72 @@ + + + 4.0.0 + + eu.dnetlib.dhp + dhp-workflows + 1.2.5-SNAPSHOT + + dhp-incremental-graph + + + + net.alchim31.maven + scala-maven-plugin + ${net.alchim31.maven.version} + + + scala-compile-first + initialize + + add-source + compile + + + + scala-test-compile + process-test-resources + + testCompile + + + + scala-doc + process-resources + + doc + + + + + true + ${scala.binary.version} + ${scala.version} + + + + + + + + + eu.dnetlib.dhp + dhp-aggregation + ${project.version} + + + eu.dnetlib.dhp + dhp-graph-mapper + ${project.version} + + + + org.apache.spark + spark-core_${scala.binary.version} + + + org.apache.spark + spark-sql_${scala.binary.version} + + + + \ No newline at end of file diff --git a/dhp-workflows/dhp-incremental-graph/src/main/java/eu/dnetlib/dhp/incremental/CollectNewOafResults.java b/dhp-workflows/dhp-incremental-graph/src/main/java/eu/dnetlib/dhp/incremental/CollectNewOafResults.java new file mode 100644 index 000000000..ea9131cf1 --- /dev/null +++ b/dhp-workflows/dhp-incremental-graph/src/main/java/eu/dnetlib/dhp/incremental/CollectNewOafResults.java @@ -0,0 +1,161 @@ + +package eu.dnetlib.dhp.incremental; + +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession; +import static org.apache.spark.sql.functions.udf; + +import java.util.Collections; +import java.util.Optional; + +import org.apache.commons.io.IOUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.spark.SparkConf; +import org.apache.spark.sql.Column; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.expressions.UserDefinedFunction; +import org.apache.spark.sql.types.DataTypes; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import eu.dnetlib.dhp.aggregation.mdstore.MDStoreActionNode; +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.common.rest.DNetRestClient; +import eu.dnetlib.dhp.oa.graph.raw.CopyHdfsOafSparkApplication; +import eu.dnetlib.dhp.oozie.RunSQLSparkJob; +import eu.dnetlib.dhp.schema.common.ModelSupport; +import eu.dnetlib.dhp.schema.mdstore.MDStoreVersion; +import scala.collection.JavaConversions; + +public class CollectNewOafResults { + private static final Logger log = LoggerFactory.getLogger(RunSQLSparkJob.class); + + private final ArgumentApplicationParser parser; + + public CollectNewOafResults(ArgumentApplicationParser parser) { + this.parser = parser; + } + + public static void main(String[] args) throws Exception { + String jsonConfiguration = IOUtils + .toString( + CollectNewOafResults.class + .getResourceAsStream( + "/eu/dnetlib/dhp/oa/graph/incremental/collect/collectnewresults_input_parameters.json")); + final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); + parser.parseArgument(args); + + Boolean isSparkSessionManaged = Optional + .ofNullable(parser.get("isSparkSessionManaged")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); + log.info("isSparkSessionManaged: {}", isSparkSessionManaged); + + final String wrkdirPath = parser.get("workingDir"); + log.info("workingDir is {}", wrkdirPath); + + final String outputPath = parser.get("outputPath"); + log.info("outputPath is {}", outputPath); + + final String mdStoreManagerURI = parser.get("mdStoreManagerURI"); + log.info("mdStoreManagerURI is {}", mdStoreManagerURI); + + final String mdStoreID = parser.get("mdStoreID"); + if (StringUtils.isBlank(mdStoreID)) { + throw new IllegalArgumentException("missing or empty argument mdStoreID"); + } + + final String hiveDbName = parser.get("hiveDbName"); + log.info("hiveDbName is {}", hiveDbName); + + final MDStoreVersion currentVersion = DNetRestClient + .doGET(String.format(MDStoreActionNode.READ_LOCK_URL, mdStoreManagerURI, mdStoreID), MDStoreVersion.class); + + log.info("mdstore data is {}", currentVersion.toString()); + + try { + + SparkConf conf = new SparkConf(); + conf.set("hive.metastore.uris", parser.get("hiveMetastoreUris")); + + runWithSparkHiveSession( + conf, + isSparkSessionManaged, + spark -> { + // ids in the current graph + Dataset currentIds = spark + .table(hiveDbName + ".result") + .select("id") + .union( + spark + .table(hiveDbName + ".relation") + .where("relClass = 'merges'") + .selectExpr("target as id")) + .distinct(); + + UserDefinedFunction getOafType = udf( + (String json) -> CopyHdfsOafSparkApplication.getOafType(json), DataTypes.StringType); + + // new collected ids + spark + .read() + .text(currentVersion.getHdfsPath() + "/store") + .selectExpr( + "value", + "get_json_object(value, '$.id') AS id") + .where("id IS NOT NULL") + .join(currentIds, JavaConversions.asScalaBuffer(Collections.singletonList("id")), "left_anti") + .withColumn("oaftype", getOafType.apply(new Column("value"))) + .write() + .partitionBy("oaftype") + .mode(SaveMode.Overwrite) + .option("compression", "gzip") + .parquet(wrkdirPath + "/entities"); + + ModelSupport.oafTypes + .keySet() + .forEach( + entity -> spark + .read() + .parquet(wrkdirPath + "/entities") + .filter("oaftype = '" + entity + "'") + .select("value") + .write() + .option("compression", "gzip") + .mode(SaveMode.Append) + .text(outputPath + "/" + entity)); + + Dataset newIds = spark.read().parquet(wrkdirPath + "/entities").select("id"); + + Dataset rels = spark + .read() + .text(currentVersion.getHdfsPath() + "/store") + .selectExpr( + "value", + "get_json_object(value, '$.source') AS source", + "get_json_object(value, '$.target') AS target") + .where("source IS NOT NULL AND target IS NOT NULL"); + + rels + .join( + newIds.selectExpr("id as source"), + JavaConversions.asScalaBuffer(Collections.singletonList("source")), "left_semi") + .union( + rels + .join( + newIds.selectExpr("id as target"), + JavaConversions.asScalaBuffer(Collections.singletonList("target")), "left_semi")) + .distinct() + .select("value") + .write() + .option("compression", "gzip") + .mode(SaveMode.Append) + .text(outputPath + "/relation"); + }); + } finally { + DNetRestClient + .doGET(String.format(MDStoreActionNode.READ_UNLOCK_URL, mdStoreManagerURI, currentVersion.getId())); + } + } +} diff --git a/dhp-workflows/dhp-incremental-graph/src/main/java/eu/dnetlib/dhp/incremental/SparkResolveRelationById.scala b/dhp-workflows/dhp-incremental-graph/src/main/java/eu/dnetlib/dhp/incremental/SparkResolveRelationById.scala new file mode 100644 index 000000000..44f09bbce --- /dev/null +++ b/dhp-workflows/dhp-incremental-graph/src/main/java/eu/dnetlib/dhp/incremental/SparkResolveRelationById.scala @@ -0,0 +1,89 @@ +package eu.dnetlib.dhp.incremental + +import eu.dnetlib.dhp.application.ArgumentApplicationParser +import eu.dnetlib.dhp.schema.oaf.Relation +import eu.dnetlib.dhp.schema.oaf.utils.IdentifierFactory +import org.apache.commons.io.IOUtils +import org.apache.spark.SparkConf +import org.apache.spark.sql._ +import org.apache.spark.sql.functions.{col, expr} +import org.slf4j.{Logger, LoggerFactory} + +object SparkResolveRelationById { + + def main(args: Array[String]): Unit = { + val log: Logger = LoggerFactory.getLogger(getClass) + val conf: SparkConf = new SparkConf() + + val parser = new ArgumentApplicationParser( + IOUtils.toString( + getClass.getResourceAsStream( + "/eu/dnetlib/dhp/oa/graph/incremental/resolution/resolve_relationsbyid_params.json" + ) + ) + ) + parser.parseArgument(args) + conf.set("hive.metastore.uris", parser.get("hiveMetastoreUris")) + + val graphBasePath = parser.get("graphBasePath") + log.info(s"graphBasePath -> $graphBasePath") + val relationPath = parser.get("relationPath") + log.info(s"relationPath -> $relationPath") + val targetPath = parser.get("targetGraph") + log.info(s"targetGraph -> $targetPath") + + val hiveDbName = parser.get("hiveDbName") + log.info(s"hiveDbName -> $hiveDbName") + + val spark: SparkSession = + SparkSession + .builder() + .config(conf) + .enableHiveSupport() + .appName(getClass.getSimpleName) + .getOrCreate() + + implicit val relEncoder: Encoder[Relation] = Encoders.bean(classOf[Relation]) + + val mergedrels = + spark.table(s"${hiveDbName}.relation").where("relclass = 'merges'").selectExpr("source as dedupId", "target as mergedId") + + spark.read + .schema(Encoders.bean(classOf[Relation]).schema) + .json(s"$graphBasePath/relation") + .as[Relation] + .map(r => resolveRelations(r)) + .join(mergedrels, col("source") === mergedrels.col("mergedId"), "left") + .withColumn("source", expr("coalesce(dedupId, source)")) + .drop("mergedId", "dedupID") + .join(mergedrels, col("target") === mergedrels.col("mergedId"), "left") + .withColumn("target", expr("coalesce(dedupId, target)")) + .drop("mergedId", "dedupID") + .write + .option("compression", "gzip") + .mode(SaveMode.Overwrite) + .json(s"$targetPath/relation") + } + + private def resolveRelations(r: Relation): Relation = { + if (r.getSource.startsWith("unresolved::")) + r.setSource(resolvePid(r.getSource.substring(12))) + + if (r.getTarget.startsWith("unresolved::")) + r.setTarget(resolvePid(r.getTarget.substring(12))) + + r + } + + private def resolvePid(str: String): String = { + val parts = str.split("::") + val id = parts(0) + val scheme: String = parts.last match { + case "arxiv" => "arXiv" + case _ => parts.last + } + + IdentifierFactory.idFromPid("50", scheme, id, true) + } + +} diff --git a/dhp-workflows/dhp-incremental-graph/src/main/resources/eu/dnetlib/dhp/oa/graph/incremental/collect/collectnewresults_input_parameters.json b/dhp-workflows/dhp-incremental-graph/src/main/resources/eu/dnetlib/dhp/oa/graph/incremental/collect/collectnewresults_input_parameters.json new file mode 100644 index 000000000..ef0838fc9 --- /dev/null +++ b/dhp-workflows/dhp-incremental-graph/src/main/resources/eu/dnetlib/dhp/oa/graph/incremental/collect/collectnewresults_input_parameters.json @@ -0,0 +1,44 @@ +[ + { + "paramName": "issm", + "paramLongName": "isSparkSessionManaged", + "paramDescription": "when true will stop SparkSession after job execution", + "paramRequired": false + }, + { + "paramName": "mu", + "paramLongName": "mdStoreManagerURI", + "paramDescription": "the MDStore Manager URI", + "paramRequired": true + }, + { + "paramName": "mi", + "paramLongName": "mdStoreID", + "paramDescription": "the Metadata Store ID", + "paramRequired": false + }, + { + "paramName": "wd", + "paramLongName": "workingDir", + "paramDescription": "the path to store the output graph", + "paramRequired": true + }, + { + "paramName": "o", + "paramLongName": "outputPath", + "paramDescription": "", + "paramRequired": true + }, + { + "paramName": "hmu", + "paramLongName": "hiveMetastoreUris", + "paramDescription": "the hive metastore uris", + "paramRequired": true + }, + { + "paramName": "db", + "paramLongName": "hiveDbName", + "paramDescription": "the graph hive database name", + "paramRequired": true + } +] \ No newline at end of file diff --git a/dhp-workflows/dhp-incremental-graph/src/main/resources/eu/dnetlib/dhp/oa/graph/incremental/collect/oozie_app/config-default.xml b/dhp-workflows/dhp-incremental-graph/src/main/resources/eu/dnetlib/dhp/oa/graph/incremental/collect/oozie_app/config-default.xml new file mode 100644 index 000000000..c8cf80b72 --- /dev/null +++ b/dhp-workflows/dhp-incremental-graph/src/main/resources/eu/dnetlib/dhp/oa/graph/incremental/collect/oozie_app/config-default.xml @@ -0,0 +1,23 @@ + + + jobTracker + yarnRM + + + nameNode + hdfs://nameservice1 + + + oozie.use.system.libpath + true + + + oozie.action.sharelib.for.spark + spark2 + + + hiveMetastoreUris + thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083 + + + \ No newline at end of file diff --git a/dhp-workflows/dhp-incremental-graph/src/main/resources/eu/dnetlib/dhp/oa/graph/incremental/collect/oozie_app/workflow.xml b/dhp-workflows/dhp-incremental-graph/src/main/resources/eu/dnetlib/dhp/oa/graph/incremental/collect/oozie_app/workflow.xml new file mode 100644 index 000000000..7bc610df2 --- /dev/null +++ b/dhp-workflows/dhp-incremental-graph/src/main/resources/eu/dnetlib/dhp/oa/graph/incremental/collect/oozie_app/workflow.xml @@ -0,0 +1,65 @@ + + + + mdStoreManagerURI + the path of the cleaned mdstore + + + mdStoreID + the identifier of the native MDStore + + + outputPath + outputDirectory + + + workingDir + outputDirectory + + + hiveMetastoreUris + hive server metastore URIs + + + hiveDbName + hive database containing last generated graph + + + + + + + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] + + + + + yarn + cluster + Collect New Oaf Results + eu.dnetlib.dhp.incremental.CollectNewOafResults + dhp-incremental-graph-${projectVersion}.jar + + --executor-memory=${sparkExecutorMemory} + --executor-cores=${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.shuffle.partitions=1024 + + --outputPath${outputPath} + --workingDir${workingDir}/collect_new_results + --mdStoreID${mdStoreID} + --mdStoreManagerURI${mdStoreManagerURI} + --hiveMetastoreUris${hiveMetastoreUris} + --hiveDbName${hiveDbName} + + + + + + + + \ No newline at end of file diff --git a/dhp-workflows/dhp-incremental-graph/src/main/resources/eu/dnetlib/dhp/oa/graph/incremental/migrate/oozie_app/config-default.xml b/dhp-workflows/dhp-incremental-graph/src/main/resources/eu/dnetlib/dhp/oa/graph/incremental/migrate/oozie_app/config-default.xml new file mode 100644 index 000000000..17bb70647 --- /dev/null +++ b/dhp-workflows/dhp-incremental-graph/src/main/resources/eu/dnetlib/dhp/oa/graph/incremental/migrate/oozie_app/config-default.xml @@ -0,0 +1,26 @@ + + + jobTracker + yarnRM + + + nameNode + hdfs://nameservice1 + + + oozie.use.system.libpath + true + + + oozie.action.sharelib.for.spark + spark2 + + + hiveMetastoreUris + thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083 + + + sparkSqlWarehouseDir + /user/hive/warehouse + + \ No newline at end of file diff --git a/dhp-workflows/dhp-incremental-graph/src/main/resources/eu/dnetlib/dhp/oa/graph/incremental/migrate/oozie_app/migration.sql b/dhp-workflows/dhp-incremental-graph/src/main/resources/eu/dnetlib/dhp/oa/graph/incremental/migrate/oozie_app/migration.sql new file mode 100644 index 000000000..c0e1fd87a --- /dev/null +++ b/dhp-workflows/dhp-incremental-graph/src/main/resources/eu/dnetlib/dhp/oa/graph/incremental/migrate/oozie_app/migration.sql @@ -0,0 +1,11 @@ +INSERT OVERWRITE DIRECTORY '${outputPath}/datasource' + USING json OPTIONS ('compression' 'gzip') + SELECT * FROM `${hiveDbName}`.`datasource`; /* EOS */ + +INSERT OVERWRITE DIRECTORY '${outputPath}/organization' + USING json OPTIONS ('compression' 'gzip') + SELECT * FROM `${hiveDbName}`.`organization`; /* EOS */ + +INSERT OVERWRITE DIRECTORY '${outputPath}/project' + USING json OPTIONS ('compression' 'gzip') + SELECT * FROM `${hiveDbName}`.`project`; /* EOS */ \ No newline at end of file diff --git a/dhp-workflows/dhp-incremental-graph/src/main/resources/eu/dnetlib/dhp/oa/graph/incremental/migrate/oozie_app/workflow.xml b/dhp-workflows/dhp-incremental-graph/src/main/resources/eu/dnetlib/dhp/oa/graph/incremental/migrate/oozie_app/workflow.xml new file mode 100644 index 000000000..85e398d9b --- /dev/null +++ b/dhp-workflows/dhp-incremental-graph/src/main/resources/eu/dnetlib/dhp/oa/graph/incremental/migrate/oozie_app/workflow.xml @@ -0,0 +1,66 @@ + + + + outputPath + outputDirectory + + + hiveMetastoreUris + hive server metastore URIs + + + sparkSqlWarehouseDir + + + hiveDbName + hive database containing last generated graph + + + + sparkClusterOpts + --conf spark.network.timeout=600 --conf spark.extraListeners= --conf spark.sql.queryExecutionListeners= --conf spark.yarn.historyServer.address=http://iis-cdh5-test-m3.ocean.icm.edu.pl:18088 --conf spark.eventLog.dir=hdfs://nameservice1/user/spark/applicationHistory + spark cluster-wide options + + + sparkResourceOpts + --executor-memory=3G --conf spark.executor.memoryOverhead=3G --executor-cores=6 --driver-memory=8G --driver-cores=4 + spark resource options + + + sparkApplicationOpts + --conf spark.sql.shuffle.partitions=1024 + spark resource options + + + + + + + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] + + + + + yarn + cluster + Copy data from last graph + eu.dnetlib.dhp.oozie.RunSQLSparkJob + dhp-incremental-graph-${projectVersion}.jar + + --conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir} + ${sparkClusterOpts} + ${sparkResourceOpts} + ${sparkApplicationOpts} + + --hiveMetastoreUris${hiveMetastoreUris} + --sqleu/dnetlib/dhp/oa/graph/incremental/migrate/oozie_app/migration.sql + --hiveDbName${hiveDbName} + --outputPath${outputPath} + + + + + + + + \ No newline at end of file diff --git a/dhp-workflows/dhp-incremental-graph/src/main/resources/eu/dnetlib/dhp/oa/graph/incremental/resolution/oozie_app/config-default.xml b/dhp-workflows/dhp-incremental-graph/src/main/resources/eu/dnetlib/dhp/oa/graph/incremental/resolution/oozie_app/config-default.xml new file mode 100644 index 000000000..c8cf80b72 --- /dev/null +++ b/dhp-workflows/dhp-incremental-graph/src/main/resources/eu/dnetlib/dhp/oa/graph/incremental/resolution/oozie_app/config-default.xml @@ -0,0 +1,23 @@ + + + jobTracker + yarnRM + + + nameNode + hdfs://nameservice1 + + + oozie.use.system.libpath + true + + + oozie.action.sharelib.for.spark + spark2 + + + hiveMetastoreUris + thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083 + + + \ No newline at end of file diff --git a/dhp-workflows/dhp-incremental-graph/src/main/resources/eu/dnetlib/dhp/oa/graph/incremental/resolution/oozie_app/workflow.xml b/dhp-workflows/dhp-incremental-graph/src/main/resources/eu/dnetlib/dhp/oa/graph/incremental/resolution/oozie_app/workflow.xml new file mode 100644 index 000000000..15ffc2342 --- /dev/null +++ b/dhp-workflows/dhp-incremental-graph/src/main/resources/eu/dnetlib/dhp/oa/graph/incremental/resolution/oozie_app/workflow.xml @@ -0,0 +1,138 @@ + + + + graphBasePath + input graph to resolve + + + targetGraph + outputDirectory + + + workingDir + outputDirectory + + + hiveMetastoreUris + hive server metastore URIs + + + hiveDbName + hive database containing last generated graph + + + + + + + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] + + + + + + yarn + cluster + Resolve Relations + eu.dnetlib.dhp.incremental.SparkResolveRelationById + dhp-incremental-graph-${projectVersion}.jar + + --executor-memory=${sparkExecutorMemory} + --executor-cores=${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + + --graphBasePath${graphBasePath} + --targetGraph${targetGraph} + --workingDir${workingDir}/resolve_relation + --hiveMetastoreUris${hiveMetastoreUris} + --hiveDbName${hiveDbName} + + + + + + + + + + + + + + + + + + + + + + ${nameNode}/${graphBasePath}/dataset + ${nameNode}/${targetGraph}/dataset + + + + + + + ${nameNode}/${graphBasePath}/datasource + ${nameNode}/${targetGraph}/datasource + + + + + + + ${nameNode}/${graphBasePath}/organization + ${nameNode}/${targetGraph}/organization + + + + + + + ${nameNode}/${graphBasePath}/otherresearchproduct + ${nameNode}/${targetGraph}/otherresearchproduct + + + + + + + ${nameNode}/${graphBasePath}/person + ${nameNode}/${targetGraph}/person + + + + + + + ${nameNode}/${graphBasePath}/project + ${nameNode}/${targetGraph}/project + + + + + + + ${nameNode}/${graphBasePath}/publication + ${nameNode}/${targetGraph}/publication + + + + + + + ${nameNode}/${graphBasePath}/software + ${nameNode}/${targetGraph}/software + + + + + + + \ No newline at end of file diff --git a/dhp-workflows/dhp-incremental-graph/src/main/resources/eu/dnetlib/dhp/oa/graph/incremental/resolution/resolve_relationsbyid_params.json b/dhp-workflows/dhp-incremental-graph/src/main/resources/eu/dnetlib/dhp/oa/graph/incremental/resolution/resolve_relationsbyid_params.json new file mode 100644 index 000000000..481bd874d --- /dev/null +++ b/dhp-workflows/dhp-incremental-graph/src/main/resources/eu/dnetlib/dhp/oa/graph/incremental/resolution/resolve_relationsbyid_params.json @@ -0,0 +1,32 @@ +[ + { + "paramName": "g", + "paramLongName": "graphBasePath", + "paramDescription": "the path of the raw graph", + "paramRequired": true + }, + { + "paramName": "t", + "paramLongName": "targetGraph", + "paramDescription": "the target path", + "paramRequired": true + }, + { + "paramName": "wd", + "paramLongName": "workingDir", + "paramDescription": "the path to store the output graph", + "paramRequired": true + }, + { + "paramName": "hmu", + "paramLongName": "hiveMetastoreUris", + "paramDescription": "the hive metastore uris", + "paramRequired": true + }, + { + "paramName": "db", + "paramLongName": "hiveDbName", + "paramDescription": "the graph hive database name", + "paramRequired": true + } +] \ No newline at end of file diff --git a/dhp-workflows/pom.xml b/dhp-workflows/pom.xml index 1c331d126..4c3f7509e 100644 --- a/dhp-workflows/pom.xml +++ b/dhp-workflows/pom.xml @@ -43,6 +43,7 @@ dhp-doiboost dhp-impact-indicators dhp-swh + dhp-incremental-graph