diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/countrypropagation/SparkCountryPropagationJob.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/countrypropagation/SparkCountryPropagationJob.java index a0cc4c84a6..8e9207966a 100644 --- a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/countrypropagation/SparkCountryPropagationJob.java +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/countrypropagation/SparkCountryPropagationJob.java @@ -12,10 +12,7 @@ import java.util.stream.Collectors; import org.apache.commons.io.IOUtils; import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.MapFunction; -import org.apache.spark.sql.Dataset; -import org.apache.spark.sql.Encoders; -import org.apache.spark.sql.SaveMode; -import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -84,10 +81,12 @@ public class SparkCountryPropagationJob { Dataset res = readPath(spark, sourcePath, resultClazz); log.info("Reading prepared info: {}", preparedInfoPath); + Encoder rcsEncoder = Encoders.bean(ResultCountrySet.class); Dataset prepared = spark .read() + .schema(rcsEncoder.schema()) .json(preparedInfoPath) - .as(Encoders.bean(ResultCountrySet.class)); + .as(rcsEncoder); res .joinWith(prepared, res.col("id").equalTo(prepared.col("resultId")), "left_outer") diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/hive/GraphHiveTableExporterJob.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/hive/GraphHiveTableExporterJob.java new file mode 100644 index 0000000000..85db998555 --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/hive/GraphHiveTableExporterJob.java @@ -0,0 +1,80 @@ + +package eu.dnetlib.dhp.oa.graph.hive; + +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession; + +import java.util.Optional; + +import org.apache.commons.io.IOUtils; +import org.apache.spark.SparkConf; +import org.apache.spark.sql.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.schema.oaf.Oaf; + +public class GraphHiveTableExporterJob { + + private static final Logger log = LoggerFactory.getLogger(GraphHiveTableExporterJob.class); + + public static void main(String[] args) throws Exception { + + final ArgumentApplicationParser parser = new ArgumentApplicationParser( + IOUtils + .toString( + GraphHiveTableExporterJob.class + .getResourceAsStream( + "/eu/dnetlib/dhp/oa/graph/hive_db_exporter_parameters.json"))); + parser.parseArgument(args); + + Boolean isSparkSessionManaged = Optional + .ofNullable(parser.get("isSparkSessionManaged")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); + log.info("isSparkSessionManaged: {}", isSparkSessionManaged); + + int numPartitions = Optional + .ofNullable(parser.get("numPartitions")) + .map(Integer::valueOf) + .orElse(-1); + log.info("numPartitions: {}", numPartitions); + + String outputPath = parser.get("outputPath"); + log.info("outputPath: {}", outputPath); + + String hiveTableName = parser.get("hiveTableName"); + log.info("hiveTableName: {}", hiveTableName); + + String hiveMetastoreUris = parser.get("hiveMetastoreUris"); + log.info("hiveMetastoreUris: {}", hiveMetastoreUris); + + String mode = parser.get("mode"); + log.info("mode: {}", mode); + + SparkConf conf = new SparkConf(); + conf.set("hive.metastore.uris", hiveMetastoreUris); + + runWithSparkHiveSession( + conf, isSparkSessionManaged, + spark -> saveGraphTable(spark, outputPath, hiveTableName, mode, numPartitions)); + } + + // protected for testing + private static void saveGraphTable(SparkSession spark, String outputPath, String hiveTableName, + String mode, int numPartitions) { + + Dataset dataset = spark.table(hiveTableName); + + if (numPartitions > 0) { + log.info("repartitioning to {} partitions", numPartitions); + dataset = dataset.repartition(numPartitions); + } + + dataset + .write() + .mode(mode) + .option("compression", "gzip") + .json(outputPath); + } +} diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/hive_db_exporter_parameters.json b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/hive_db_exporter_parameters.json new file mode 100644 index 0000000000..548d75a3d1 --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/hive_db_exporter_parameters.json @@ -0,0 +1,32 @@ +[ + { + "paramName": "issm", + "paramLongName": "isSparkSessionManaged", + "paramDescription": "when true will stop SparkSession after job execution", + "paramRequired": false + }, + { + "paramName": "out", + "paramLongName": "outputPath", + "paramDescription": "the path to the graph data dump to read", + "paramRequired": true + }, + { + "paramName": "mode", + "paramLongName": "mode", + "paramDescription": "mode (append|overwrite)", + "paramRequired": true + }, + { + "paramName": "hmu", + "paramLongName": "hiveMetastoreUris", + "paramDescription": "the hive metastore uris", + "paramRequired": true + }, + { + "paramName": "db", + "paramLongName": "hiveTableName", + "paramDescription": "the input hive table identifier", + "paramRequired": true + } +] \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/oa/graph/raw/CopyHdfsOafSparkApplication.scala b/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/oa/graph/raw/CopyHdfsOafSparkApplication.scala index 9d7cca7dd2..1177b34f44 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/oa/graph/raw/CopyHdfsOafSparkApplication.scala +++ b/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/oa/graph/raw/CopyHdfsOafSparkApplication.scala @@ -1,12 +1,10 @@ package eu.dnetlib.dhp.oa.graph.raw -import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper} import eu.dnetlib.dhp.application.ArgumentApplicationParser import eu.dnetlib.dhp.common.HdfsSupport import eu.dnetlib.dhp.schema.common.ModelSupport -import eu.dnetlib.dhp.schema.oaf.Oaf import eu.dnetlib.dhp.utils.DHPUtils -import org.apache.spark.sql.{Encoder, Encoders, SaveMode, SparkSession} +import org.apache.spark.sql.{Encoders, SaveMode, SparkSession} import org.apache.spark.{SparkConf, SparkContext} import org.json4s.DefaultFormats import org.json4s.jackson.JsonMethods.parse @@ -54,48 +52,60 @@ object CopyHdfsOafSparkApplication { val hdfsPath = parser.get("hdfsPath") log.info("hdfsPath: {}", hdfsPath) - implicit val oafEncoder: Encoder[Oaf] = Encoders.kryo[Oaf] - val paths = DHPUtils.mdstorePaths(mdstoreManagerUrl, mdFormat, mdLayout, mdInterpretation, true).asScala val validPaths: List[String] = paths.filter(p => HdfsSupport.exists(p, sc.hadoopConfiguration)).toList - val types = ModelSupport.oafTypes.entrySet.asScala - .map(e => Tuple2(e.getKey, e.getValue)) - if (validPaths.nonEmpty) { - val oaf = spark.read.textFile(validPaths: _*) - val mapper = - new ObjectMapper().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false) + val oaf = spark.read + .textFile(validPaths: _*) + .map(v => (getOafType(v), v))(Encoders.tuple(Encoders.STRING, Encoders.STRING)) + .cache() - types.foreach(t => - oaf - .filter(o => isOafType(o, t._1)) - .map(j => mapper.readValue(j, t._2).asInstanceOf[Oaf]) - .map(s => mapper.writeValueAsString(s))(Encoders.STRING) - .write - .option("compression", "gzip") - .mode(SaveMode.Append) - .text(s"$hdfsPath/${t._1}") - ) + try { + ModelSupport.oafTypes + .keySet() + .asScala + .foreach(entity => + oaf + .filter(s"_1 = '${entity}'") + .selectExpr("_2") + .write + .option("compression", "gzip") + .mode(SaveMode.Append) + .text(s"$hdfsPath/${entity}") + ) + } finally { + oaf.unpersist() + } } } - def isOafType(input: String, oafType: String): Boolean = { + def getOafType(input: String): String = { implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats lazy val json: org.json4s.JValue = parse(input) - if (oafType == "relation") { - val hasSource = (json \ "source").extractOrElse[String](null) - val hasTarget = (json \ "target").extractOrElse[String](null) - hasSource != null && hasTarget != null + val hasId = (json \ "id").extractOrElse[String](null) + val hasSource = (json \ "source").extractOrElse[String](null) + val hasTarget = (json \ "target").extractOrElse[String](null) + + if (hasId == null && hasSource != null && hasTarget != null) { + "relation" + } else if (hasId != null) { + val oafType: String = ModelSupport.idPrefixEntity.get(hasId.substring(0, 2)) + + oafType match { + case "result" => + (json \ "resulttype" \ "classid").extractOrElse[String](null) match { + case "other" => "otherresearchproduct" + case any => any + } + case _ => oafType + } } else { - val hasId = (json \ "id").extractOrElse[String](null) - val resultType = (json \ "resulttype" \ "classid").extractOrElse[String]("") - hasId != null && oafType.startsWith(resultType) + null } - } } diff --git a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/raw/CopyHdfsOafSparkApplicationTest.java b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/raw/CopyHdfsOafSparkApplicationTest.java index 3c6fc9c7fc..16407c65f3 100644 --- a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/raw/CopyHdfsOafSparkApplicationTest.java +++ b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/raw/CopyHdfsOafSparkApplicationTest.java @@ -1,8 +1,8 @@ package eu.dnetlib.dhp.oa.graph.raw; -import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertTrue; +import static eu.dnetlib.dhp.oa.graph.raw.CopyHdfsOafSparkApplication.getOafType; +import static org.junit.jupiter.api.Assertions.assertEquals; import java.io.IOException; @@ -11,67 +11,24 @@ import org.junit.jupiter.api.Test; public class CopyHdfsOafSparkApplicationTest { + String getResourceAsStream(String path) throws IOException { + return IOUtils.toString(getClass().getResourceAsStream(path)); + } + @Test void testIsOafType() throws IOException { - assertTrue( - CopyHdfsOafSparkApplication - .isOafType( - IOUtils - .toString( - getClass().getResourceAsStream("/eu/dnetlib/dhp/oa/graph/raw/publication_1.json")), - "publication")); - assertTrue( - CopyHdfsOafSparkApplication - .isOafType( - IOUtils - .toString( - getClass().getResourceAsStream("/eu/dnetlib/dhp/oa/graph/raw/dataset_1.json")), - "dataset")); - assertTrue( - CopyHdfsOafSparkApplication - .isOafType( - IOUtils - .toString( - getClass().getResourceAsStream("/eu/dnetlib/dhp/oa/graph/raw/relation_1.json")), - "relation")); - - assertFalse( - CopyHdfsOafSparkApplication - .isOafType( - IOUtils - .toString( - getClass().getResourceAsStream("/eu/dnetlib/dhp/oa/graph/raw/publication_1.json")), - "dataset")); - assertFalse( - CopyHdfsOafSparkApplication - .isOafType( - IOUtils - .toString( - getClass().getResourceAsStream("/eu/dnetlib/dhp/oa/graph/raw/dataset_1.json")), - "publication")); - - assertTrue( - CopyHdfsOafSparkApplication - .isOafType( - IOUtils - .toString( - getClass() - .getResourceAsStream( - "/eu/dnetlib/dhp/oa/graph/raw/publication_2_unknownProperty.json")), - "publication")); + assertEquals("publication", getOafType(getResourceAsStream("/eu/dnetlib/dhp/oa/graph/raw/publication_1.json"))); + assertEquals("dataset", getOafType(getResourceAsStream("/eu/dnetlib/dhp/oa/graph/raw/dataset_1.json"))); + assertEquals("relation", getOafType(getResourceAsStream("/eu/dnetlib/dhp/oa/graph/raw/relation_1.json"))); + assertEquals("publication", getOafType(getResourceAsStream("/eu/dnetlib/dhp/oa/graph/raw/publication_1.json"))); + assertEquals( + "publication", + getOafType(getResourceAsStream("/eu/dnetlib/dhp/oa/graph/raw/publication_2_unknownProperty.json"))); } @Test void isOafType_Datacite_ORP() throws IOException { - assertTrue( - CopyHdfsOafSparkApplication - .isOafType( - IOUtils - .toString( - getClass() - .getResourceAsStream( - "/eu/dnetlib/dhp/oa/graph/raw/datacite_orp.json")), - "otherresearchproduct")); + assertEquals( + "otherresearchproduct", getOafType(getResourceAsStream("/eu/dnetlib/dhp/oa/graph/raw/datacite_orp.json"))); } - } diff --git a/dhp-workflows/dhp-incremental-graph/pom.xml b/dhp-workflows/dhp-incremental-graph/pom.xml new file mode 100644 index 0000000000..ba294072ce --- /dev/null +++ b/dhp-workflows/dhp-incremental-graph/pom.xml @@ -0,0 +1,77 @@ + + + 4.0.0 + + eu.dnetlib.dhp + dhp-workflows + 1.2.5-SNAPSHOT + + dhp-incremental-graph + + + + net.alchim31.maven + scala-maven-plugin + ${net.alchim31.maven.version} + + + scala-compile-first + initialize + + add-source + compile + + + + scala-test-compile + process-test-resources + + testCompile + + + + scala-doc + process-resources + + doc + + + + + true + ${scala.binary.version} + ${scala.version} + + + + + + + + + eu.dnetlib.dhp + dhp-aggregation + ${project.version} + + + eu.dnetlib.dhp + dhp-enrichment + ${project.version} + + + eu.dnetlib.dhp + dhp-graph-mapper + ${project.version} + + + + org.apache.spark + spark-core_${scala.binary.version} + + + org.apache.spark + spark-sql_${scala.binary.version} + + + + \ No newline at end of file diff --git a/dhp-workflows/dhp-incremental-graph/src/main/java/eu/dnetlib/dhp/incremental/CollectNewOafResults.java b/dhp-workflows/dhp-incremental-graph/src/main/java/eu/dnetlib/dhp/incremental/CollectNewOafResults.java new file mode 100644 index 0000000000..ea9131cf17 --- /dev/null +++ b/dhp-workflows/dhp-incremental-graph/src/main/java/eu/dnetlib/dhp/incremental/CollectNewOafResults.java @@ -0,0 +1,161 @@ + +package eu.dnetlib.dhp.incremental; + +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession; +import static org.apache.spark.sql.functions.udf; + +import java.util.Collections; +import java.util.Optional; + +import org.apache.commons.io.IOUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.spark.SparkConf; +import org.apache.spark.sql.Column; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.expressions.UserDefinedFunction; +import org.apache.spark.sql.types.DataTypes; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import eu.dnetlib.dhp.aggregation.mdstore.MDStoreActionNode; +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.common.rest.DNetRestClient; +import eu.dnetlib.dhp.oa.graph.raw.CopyHdfsOafSparkApplication; +import eu.dnetlib.dhp.oozie.RunSQLSparkJob; +import eu.dnetlib.dhp.schema.common.ModelSupport; +import eu.dnetlib.dhp.schema.mdstore.MDStoreVersion; +import scala.collection.JavaConversions; + +public class CollectNewOafResults { + private static final Logger log = LoggerFactory.getLogger(RunSQLSparkJob.class); + + private final ArgumentApplicationParser parser; + + public CollectNewOafResults(ArgumentApplicationParser parser) { + this.parser = parser; + } + + public static void main(String[] args) throws Exception { + String jsonConfiguration = IOUtils + .toString( + CollectNewOafResults.class + .getResourceAsStream( + "/eu/dnetlib/dhp/oa/graph/incremental/collect/collectnewresults_input_parameters.json")); + final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); + parser.parseArgument(args); + + Boolean isSparkSessionManaged = Optional + .ofNullable(parser.get("isSparkSessionManaged")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); + log.info("isSparkSessionManaged: {}", isSparkSessionManaged); + + final String wrkdirPath = parser.get("workingDir"); + log.info("workingDir is {}", wrkdirPath); + + final String outputPath = parser.get("outputPath"); + log.info("outputPath is {}", outputPath); + + final String mdStoreManagerURI = parser.get("mdStoreManagerURI"); + log.info("mdStoreManagerURI is {}", mdStoreManagerURI); + + final String mdStoreID = parser.get("mdStoreID"); + if (StringUtils.isBlank(mdStoreID)) { + throw new IllegalArgumentException("missing or empty argument mdStoreID"); + } + + final String hiveDbName = parser.get("hiveDbName"); + log.info("hiveDbName is {}", hiveDbName); + + final MDStoreVersion currentVersion = DNetRestClient + .doGET(String.format(MDStoreActionNode.READ_LOCK_URL, mdStoreManagerURI, mdStoreID), MDStoreVersion.class); + + log.info("mdstore data is {}", currentVersion.toString()); + + try { + + SparkConf conf = new SparkConf(); + conf.set("hive.metastore.uris", parser.get("hiveMetastoreUris")); + + runWithSparkHiveSession( + conf, + isSparkSessionManaged, + spark -> { + // ids in the current graph + Dataset currentIds = spark + .table(hiveDbName + ".result") + .select("id") + .union( + spark + .table(hiveDbName + ".relation") + .where("relClass = 'merges'") + .selectExpr("target as id")) + .distinct(); + + UserDefinedFunction getOafType = udf( + (String json) -> CopyHdfsOafSparkApplication.getOafType(json), DataTypes.StringType); + + // new collected ids + spark + .read() + .text(currentVersion.getHdfsPath() + "/store") + .selectExpr( + "value", + "get_json_object(value, '$.id') AS id") + .where("id IS NOT NULL") + .join(currentIds, JavaConversions.asScalaBuffer(Collections.singletonList("id")), "left_anti") + .withColumn("oaftype", getOafType.apply(new Column("value"))) + .write() + .partitionBy("oaftype") + .mode(SaveMode.Overwrite) + .option("compression", "gzip") + .parquet(wrkdirPath + "/entities"); + + ModelSupport.oafTypes + .keySet() + .forEach( + entity -> spark + .read() + .parquet(wrkdirPath + "/entities") + .filter("oaftype = '" + entity + "'") + .select("value") + .write() + .option("compression", "gzip") + .mode(SaveMode.Append) + .text(outputPath + "/" + entity)); + + Dataset newIds = spark.read().parquet(wrkdirPath + "/entities").select("id"); + + Dataset rels = spark + .read() + .text(currentVersion.getHdfsPath() + "/store") + .selectExpr( + "value", + "get_json_object(value, '$.source') AS source", + "get_json_object(value, '$.target') AS target") + .where("source IS NOT NULL AND target IS NOT NULL"); + + rels + .join( + newIds.selectExpr("id as source"), + JavaConversions.asScalaBuffer(Collections.singletonList("source")), "left_semi") + .union( + rels + .join( + newIds.selectExpr("id as target"), + JavaConversions.asScalaBuffer(Collections.singletonList("target")), "left_semi")) + .distinct() + .select("value") + .write() + .option("compression", "gzip") + .mode(SaveMode.Append) + .text(outputPath + "/relation"); + }); + } finally { + DNetRestClient + .doGET(String.format(MDStoreActionNode.READ_UNLOCK_URL, mdStoreManagerURI, currentVersion.getId())); + } + } +} diff --git a/dhp-workflows/dhp-incremental-graph/src/main/java/eu/dnetlib/dhp/incremental/SparkAppendContextCleanedGraph.scala b/dhp-workflows/dhp-incremental-graph/src/main/java/eu/dnetlib/dhp/incremental/SparkAppendContextCleanedGraph.scala new file mode 100644 index 0000000000..63c17a74a3 --- /dev/null +++ b/dhp-workflows/dhp-incremental-graph/src/main/java/eu/dnetlib/dhp/incremental/SparkAppendContextCleanedGraph.scala @@ -0,0 +1,96 @@ +package eu.dnetlib.dhp.incremental + +import eu.dnetlib.dhp.PropagationConstant +import eu.dnetlib.dhp.application.ArgumentApplicationParser +import eu.dnetlib.dhp.bulktag.community.TaggingConstants +import eu.dnetlib.dhp.schema.common.ModelSupport +import eu.dnetlib.dhp.schema.oaf.{Oaf, OafEntity} +import org.apache.commons.io.IOUtils +import org.apache.spark.SparkConf +import org.apache.spark.sql._ +import org.slf4j.{Logger, LoggerFactory} + +import scala.collection.JavaConverters.{collectionAsScalaIterableConverter, mapAsScalaMapConverter, seqAsJavaListConverter} + +object SparkAppendContextCleanedGraph { + + def main(args: Array[String]): Unit = { + val log: Logger = LoggerFactory.getLogger(getClass) + val conf: SparkConf = new SparkConf() + + val parser = new ArgumentApplicationParser( + IOUtils.toString( + getClass.getResourceAsStream( + "/eu/dnetlib/dhp/oa/graph/incremental/resolution/resolve_relationsbyid_params.json" + ) + ) + ) + parser.parseArgument(args) + conf.set("hive.metastore.uris", parser.get("hiveMetastoreUris")) + + val graphBasePath = parser.get("graphBasePath") + log.info(s"graphBasePath -> $graphBasePath") + val relationPath = parser.get("relationPath") + log.info(s"relationPath -> $relationPath") + val targetPath = parser.get("targetGraph") + log.info(s"targetGraph -> $targetPath") + + val hiveDbName = parser.get("hiveDbName") + log.info(s"hiveDbName -> $hiveDbName") + + val spark: SparkSession = + SparkSession + .builder() + .config(conf) + .enableHiveSupport() + .appName(getClass.getSimpleName) + .getOrCreate() + + for ((entity, clazz) <- ModelSupport.oafTypes.asScala) { + if (classOf[OafEntity].isAssignableFrom(clazz)) { + val classEnc: Encoder[Oaf] = Encoders.bean(clazz).asInstanceOf[Encoder[Oaf]] + + spark + .table(s"${hiveDbName}.${entity}") + .as(classEnc) + .map(e => { + val oaf = e.asInstanceOf[OafEntity] + if (oaf.getContext != null) { + val newContext = oaf.getContext.asScala + .map(c => { + if (c.getDataInfo != null) { + c.setDataInfo( + c.getDataInfo.asScala + .filter( + di => + !di.getInferenceprovenance.equals(PropagationConstant.PROPAGATION_DATA_INFO_TYPE) + && !di.getInferenceprovenance.equals(TaggingConstants.BULKTAG_DATA_INFO_TYPE) + ) + .toList + .asJava + ) + } + c + }) + .filter(!_.getDataInfo.isEmpty) + .toList + .asJava + oaf.setContext(newContext) + } + e + })(classEnc) + .write + .option("compression", "gzip") + .mode(SaveMode.Append) + .json(s"$targetPath/${entity}") + } else { + spark + .table(s"${hiveDbName}.${entity}") + .write + .option("compression", "gzip") + .mode(SaveMode.Append) + .json(s"$targetPath/${entity}") + } + } + } +} diff --git a/dhp-workflows/dhp-incremental-graph/src/main/java/eu/dnetlib/dhp/incremental/SparkResolveRelationById.scala b/dhp-workflows/dhp-incremental-graph/src/main/java/eu/dnetlib/dhp/incremental/SparkResolveRelationById.scala new file mode 100644 index 0000000000..44f09bbce1 --- /dev/null +++ b/dhp-workflows/dhp-incremental-graph/src/main/java/eu/dnetlib/dhp/incremental/SparkResolveRelationById.scala @@ -0,0 +1,89 @@ +package eu.dnetlib.dhp.incremental + +import eu.dnetlib.dhp.application.ArgumentApplicationParser +import eu.dnetlib.dhp.schema.oaf.Relation +import eu.dnetlib.dhp.schema.oaf.utils.IdentifierFactory +import org.apache.commons.io.IOUtils +import org.apache.spark.SparkConf +import org.apache.spark.sql._ +import org.apache.spark.sql.functions.{col, expr} +import org.slf4j.{Logger, LoggerFactory} + +object SparkResolveRelationById { + + def main(args: Array[String]): Unit = { + val log: Logger = LoggerFactory.getLogger(getClass) + val conf: SparkConf = new SparkConf() + + val parser = new ArgumentApplicationParser( + IOUtils.toString( + getClass.getResourceAsStream( + "/eu/dnetlib/dhp/oa/graph/incremental/resolution/resolve_relationsbyid_params.json" + ) + ) + ) + parser.parseArgument(args) + conf.set("hive.metastore.uris", parser.get("hiveMetastoreUris")) + + val graphBasePath = parser.get("graphBasePath") + log.info(s"graphBasePath -> $graphBasePath") + val relationPath = parser.get("relationPath") + log.info(s"relationPath -> $relationPath") + val targetPath = parser.get("targetGraph") + log.info(s"targetGraph -> $targetPath") + + val hiveDbName = parser.get("hiveDbName") + log.info(s"hiveDbName -> $hiveDbName") + + val spark: SparkSession = + SparkSession + .builder() + .config(conf) + .enableHiveSupport() + .appName(getClass.getSimpleName) + .getOrCreate() + + implicit val relEncoder: Encoder[Relation] = Encoders.bean(classOf[Relation]) + + val mergedrels = + spark.table(s"${hiveDbName}.relation").where("relclass = 'merges'").selectExpr("source as dedupId", "target as mergedId") + + spark.read + .schema(Encoders.bean(classOf[Relation]).schema) + .json(s"$graphBasePath/relation") + .as[Relation] + .map(r => resolveRelations(r)) + .join(mergedrels, col("source") === mergedrels.col("mergedId"), "left") + .withColumn("source", expr("coalesce(dedupId, source)")) + .drop("mergedId", "dedupID") + .join(mergedrels, col("target") === mergedrels.col("mergedId"), "left") + .withColumn("target", expr("coalesce(dedupId, target)")) + .drop("mergedId", "dedupID") + .write + .option("compression", "gzip") + .mode(SaveMode.Overwrite) + .json(s"$targetPath/relation") + } + + private def resolveRelations(r: Relation): Relation = { + if (r.getSource.startsWith("unresolved::")) + r.setSource(resolvePid(r.getSource.substring(12))) + + if (r.getTarget.startsWith("unresolved::")) + r.setTarget(resolvePid(r.getTarget.substring(12))) + + r + } + + private def resolvePid(str: String): String = { + val parts = str.split("::") + val id = parts(0) + val scheme: String = parts.last match { + case "arxiv" => "arXiv" + case _ => parts.last + } + + IdentifierFactory.idFromPid("50", scheme, id, true) + } + +} diff --git a/dhp-workflows/dhp-incremental-graph/src/main/resources/eu/dnetlib/dhp/oa/graph/incremental/collect/collectnewresults_input_parameters.json b/dhp-workflows/dhp-incremental-graph/src/main/resources/eu/dnetlib/dhp/oa/graph/incremental/collect/collectnewresults_input_parameters.json new file mode 100644 index 0000000000..ef0838fc98 --- /dev/null +++ b/dhp-workflows/dhp-incremental-graph/src/main/resources/eu/dnetlib/dhp/oa/graph/incremental/collect/collectnewresults_input_parameters.json @@ -0,0 +1,44 @@ +[ + { + "paramName": "issm", + "paramLongName": "isSparkSessionManaged", + "paramDescription": "when true will stop SparkSession after job execution", + "paramRequired": false + }, + { + "paramName": "mu", + "paramLongName": "mdStoreManagerURI", + "paramDescription": "the MDStore Manager URI", + "paramRequired": true + }, + { + "paramName": "mi", + "paramLongName": "mdStoreID", + "paramDescription": "the Metadata Store ID", + "paramRequired": false + }, + { + "paramName": "wd", + "paramLongName": "workingDir", + "paramDescription": "the path to store the output graph", + "paramRequired": true + }, + { + "paramName": "o", + "paramLongName": "outputPath", + "paramDescription": "", + "paramRequired": true + }, + { + "paramName": "hmu", + "paramLongName": "hiveMetastoreUris", + "paramDescription": "the hive metastore uris", + "paramRequired": true + }, + { + "paramName": "db", + "paramLongName": "hiveDbName", + "paramDescription": "the graph hive database name", + "paramRequired": true + } +] \ No newline at end of file diff --git a/dhp-workflows/dhp-incremental-graph/src/main/resources/eu/dnetlib/dhp/oa/graph/incremental/collect/oozie_app/config-default.xml b/dhp-workflows/dhp-incremental-graph/src/main/resources/eu/dnetlib/dhp/oa/graph/incremental/collect/oozie_app/config-default.xml new file mode 100644 index 0000000000..c8cf80b728 --- /dev/null +++ b/dhp-workflows/dhp-incremental-graph/src/main/resources/eu/dnetlib/dhp/oa/graph/incremental/collect/oozie_app/config-default.xml @@ -0,0 +1,23 @@ + + + jobTracker + yarnRM + + + nameNode + hdfs://nameservice1 + + + oozie.use.system.libpath + true + + + oozie.action.sharelib.for.spark + spark2 + + + hiveMetastoreUris + thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083 + + + \ No newline at end of file diff --git a/dhp-workflows/dhp-incremental-graph/src/main/resources/eu/dnetlib/dhp/oa/graph/incremental/collect/oozie_app/workflow.xml b/dhp-workflows/dhp-incremental-graph/src/main/resources/eu/dnetlib/dhp/oa/graph/incremental/collect/oozie_app/workflow.xml new file mode 100644 index 0000000000..f2242b644f --- /dev/null +++ b/dhp-workflows/dhp-incremental-graph/src/main/resources/eu/dnetlib/dhp/oa/graph/incremental/collect/oozie_app/workflow.xml @@ -0,0 +1,77 @@ + + + + mdStoreManagerURI + the path of the cleaned mdstore + + + mdStoreID + the identifier of the native MDStore + + + outputPath + outputDirectory + + + workingDir + outputDirectory + + + hiveMetastoreUris + hive server metastore URIs + + + hiveDbName + hive database containing last generated graph + + + + sparkClusterOpts + --conf spark.extraListeners= --conf spark.sql.queryExecutionListeners= --conf spark.yarn.historyServer.address=http://iis-cdh5-test-m3.ocean.icm.edu.pl:18088 --conf spark.eventLog.dir=hdfs://nameservice1/user/spark/applicationHistory + spark cluster-wide options + + + sparkResourceOpts + --executor-memory=8G --conf spark.executor.memoryOverhead=6G --executor-cores=6 --driver-memory=9G --driver-cores=4 + spark resource options + + + sparkApplicationOpts + --conf spark.sql.shuffle.partitions=1024 + spark resource options + + + + + + + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] + + + + + yarn + cluster + Collect New Oaf Results + eu.dnetlib.dhp.incremental.CollectNewOafResults + dhp-incremental-graph-${projectVersion}.jar + + --conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir} + ${sparkClusterOpts} + ${sparkResourceOpts} + ${sparkApplicationOpts} + + --outputPath${outputPath} + --workingDir${workingDir}/collect_new_results + --mdStoreID${mdStoreID} + --mdStoreManagerURI${mdStoreManagerURI} + --hiveMetastoreUris${hiveMetastoreUris} + --hiveDbName${hiveDbName} + + + + + + + + \ No newline at end of file diff --git a/dhp-workflows/dhp-incremental-graph/src/main/resources/eu/dnetlib/dhp/oa/graph/incremental/export_hive/oozie_app/config-default.xml b/dhp-workflows/dhp-incremental-graph/src/main/resources/eu/dnetlib/dhp/oa/graph/incremental/export_hive/oozie_app/config-default.xml new file mode 100644 index 0000000000..9608732eda --- /dev/null +++ b/dhp-workflows/dhp-incremental-graph/src/main/resources/eu/dnetlib/dhp/oa/graph/incremental/export_hive/oozie_app/config-default.xml @@ -0,0 +1,26 @@ + + + jobTracker + yarnRM + + + nameNode + hdfs://nameservice1 + + + oozie.use.system.libpath + true + + + hiveMetastoreUris + thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083 + + + hiveJdbcUrl + jdbc:hive2://iis-cdh5-test-m3.ocean.icm.edu.pl:10000 + + + hiveDbName + openaire + + \ No newline at end of file diff --git a/dhp-workflows/dhp-incremental-graph/src/main/resources/eu/dnetlib/dhp/oa/graph/incremental/export_hive/oozie_app/workflow.xml b/dhp-workflows/dhp-incremental-graph/src/main/resources/eu/dnetlib/dhp/oa/graph/incremental/export_hive/oozie_app/workflow.xml new file mode 100644 index 0000000000..428fdf0005 --- /dev/null +++ b/dhp-workflows/dhp-incremental-graph/src/main/resources/eu/dnetlib/dhp/oa/graph/incremental/export_hive/oozie_app/workflow.xml @@ -0,0 +1,158 @@ + + + + + outputPath + the source path + + + hiveDbName + the target hive database name + + + hiveMetastoreUris + hive server metastore URIs + + + + sparkClusterOpts + --conf spark.extraListeners= --conf spark.sql.queryExecutionListeners= --conf spark.yarn.historyServer.address=http://iis-cdh5-test-m3.ocean.icm.edu.pl:18088 --conf spark.eventLog.dir=hdfs://nameservice1/user/spark/applicationHistory + spark cluster-wide options + + + sparkResourceOpts + --executor-memory=3G --conf spark.executor.memoryOverhead=3G --executor-cores=6 --driver-memory=8G --driver-cores=4 + spark resource options + + + sparkApplicationOpts + --conf spark.sql.shuffle.partitions=1024 + spark resource options + + + + + + + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] + + + + + + + + + + + + + yarn + cluster + Merge table publication + eu.dnetlib.dhp.oa.graph.hive.GraphHiveTableExporterJob + dhp-graph-mapper-${projectVersion}.jar + + --conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir} + ${sparkClusterOpts} + ${sparkResourceOpts} + ${sparkApplicationOpts} + + --outputPath${outputPath}/publication + --modeappend + --hiveTableName${hiveDbName}.publication + --hiveMetastoreUris${hiveMetastoreUris} + + + + + + + yarn + cluster + Merge table dataset + eu.dnetlib.dhp.oa.graph.hive.GraphHiveTableExporterJob + dhp-graph-mapper-${projectVersion}.jar + + --conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir} + ${sparkClusterOpts} + ${sparkResourceOpts} + ${sparkApplicationOpts} + + --outputPath${outputPath}/dataset + --modeappend + --hiveTableName${hiveDbName}.dataset + --hiveMetastoreUris${hiveMetastoreUris} + + + + + + + yarn + cluster + Merge table otherresearchproduct + eu.dnetlib.dhp.oa.graph.hive.GraphHiveTableExporterJob + dhp-graph-mapper-${projectVersion}.jar + + --conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir} + ${sparkClusterOpts} + ${sparkResourceOpts} + ${sparkApplicationOpts} + + --outputPath${outputPath}/otherresearchproduct + --modeappend + --hiveTableName${hiveDbName}.otherresearchproduct + --hiveMetastoreUris${hiveMetastoreUris} + + + + + + + yarn + cluster + Merge table software + eu.dnetlib.dhp.oa.graph.hive.GraphHiveTableExporterJob + dhp-graph-mapper-${projectVersion}.jar + + --conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir} + ${sparkClusterOpts} + ${sparkResourceOpts} + ${sparkApplicationOpts} + + --outputPath${outputPath}/software + --modeappend + --hiveTableName${hiveDbName}.software + --hiveMetastoreUris${hiveMetastoreUris} + + + + + + + yarn + cluster + Merge table relation + eu.dnetlib.dhp.oa.graph.hive.GraphHiveTableExporterJob + dhp-graph-mapper-${projectVersion}.jar + + --conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir} + ${sparkClusterOpts} + ${sparkResourceOpts} + ${sparkApplicationOpts} + + --outputPath${outputPath}/relation + --modeappend + --hiveTableName${hiveDbName}.relation + --hiveMetastoreUris${hiveMetastoreUris} + + + + + + + + + + \ No newline at end of file diff --git a/dhp-workflows/dhp-incremental-graph/src/main/resources/eu/dnetlib/dhp/oa/graph/incremental/migrate/oozie_app/config-default.xml b/dhp-workflows/dhp-incremental-graph/src/main/resources/eu/dnetlib/dhp/oa/graph/incremental/migrate/oozie_app/config-default.xml new file mode 100644 index 0000000000..17bb706477 --- /dev/null +++ b/dhp-workflows/dhp-incremental-graph/src/main/resources/eu/dnetlib/dhp/oa/graph/incremental/migrate/oozie_app/config-default.xml @@ -0,0 +1,26 @@ + + + jobTracker + yarnRM + + + nameNode + hdfs://nameservice1 + + + oozie.use.system.libpath + true + + + oozie.action.sharelib.for.spark + spark2 + + + hiveMetastoreUris + thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083 + + + sparkSqlWarehouseDir + /user/hive/warehouse + + \ No newline at end of file diff --git a/dhp-workflows/dhp-incremental-graph/src/main/resources/eu/dnetlib/dhp/oa/graph/incremental/migrate/oozie_app/migration.sql b/dhp-workflows/dhp-incremental-graph/src/main/resources/eu/dnetlib/dhp/oa/graph/incremental/migrate/oozie_app/migration.sql new file mode 100644 index 0000000000..c0e1fd87ab --- /dev/null +++ b/dhp-workflows/dhp-incremental-graph/src/main/resources/eu/dnetlib/dhp/oa/graph/incremental/migrate/oozie_app/migration.sql @@ -0,0 +1,11 @@ +INSERT OVERWRITE DIRECTORY '${outputPath}/datasource' + USING json OPTIONS ('compression' 'gzip') + SELECT * FROM `${hiveDbName}`.`datasource`; /* EOS */ + +INSERT OVERWRITE DIRECTORY '${outputPath}/organization' + USING json OPTIONS ('compression' 'gzip') + SELECT * FROM `${hiveDbName}`.`organization`; /* EOS */ + +INSERT OVERWRITE DIRECTORY '${outputPath}/project' + USING json OPTIONS ('compression' 'gzip') + SELECT * FROM `${hiveDbName}`.`project`; /* EOS */ \ No newline at end of file diff --git a/dhp-workflows/dhp-incremental-graph/src/main/resources/eu/dnetlib/dhp/oa/graph/incremental/migrate/oozie_app/workflow.xml b/dhp-workflows/dhp-incremental-graph/src/main/resources/eu/dnetlib/dhp/oa/graph/incremental/migrate/oozie_app/workflow.xml new file mode 100644 index 0000000000..67532ec668 --- /dev/null +++ b/dhp-workflows/dhp-incremental-graph/src/main/resources/eu/dnetlib/dhp/oa/graph/incremental/migrate/oozie_app/workflow.xml @@ -0,0 +1,69 @@ + + + + outputPath + outputDirectory + + + hiveMetastoreUris + hive server metastore URIs + + + sparkSqlWarehouseDir + + + hiveDbName + hive database containing last generated graph + + + action + + + + sparkClusterOpts + --conf spark.extraListeners= --conf spark.sql.queryExecutionListeners= --conf spark.yarn.historyServer.address=http://iis-cdh5-test-m3.ocean.icm.edu.pl:18088 --conf spark.eventLog.dir=hdfs://nameservice1/user/spark/applicationHistory + spark cluster-wide options + + + sparkResourceOpts + --executor-memory=3G --conf spark.executor.memoryOverhead=3G --executor-cores=6 --driver-memory=8G --driver-cores=4 + spark resource options + + + sparkApplicationOpts + --conf spark.sql.shuffle.partitions=1024 + spark resource options + + + + + + + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] + + + + + yarn + cluster + Copy data from last graph + eu.dnetlib.dhp.oozie.RunSQLSparkJob + dhp-incremental-graph-${projectVersion}.jar + + --conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir} + ${sparkClusterOpts} + ${sparkResourceOpts} + ${sparkApplicationOpts} + + --hiveMetastoreUris${hiveMetastoreUris} + --sqleu/dnetlib/dhp/oa/graph/incremental/migrate/oozie_app/${action}.sql + --hiveDbName${hiveDbName} + --outputPath${outputPath} + + + + + + + + \ No newline at end of file diff --git a/dhp-workflows/dhp-incremental-graph/src/main/resources/eu/dnetlib/dhp/oa/graph/incremental/resolution/oozie_app/config-default.xml b/dhp-workflows/dhp-incremental-graph/src/main/resources/eu/dnetlib/dhp/oa/graph/incremental/resolution/oozie_app/config-default.xml new file mode 100644 index 0000000000..c8cf80b728 --- /dev/null +++ b/dhp-workflows/dhp-incremental-graph/src/main/resources/eu/dnetlib/dhp/oa/graph/incremental/resolution/oozie_app/config-default.xml @@ -0,0 +1,23 @@ + + + jobTracker + yarnRM + + + nameNode + hdfs://nameservice1 + + + oozie.use.system.libpath + true + + + oozie.action.sharelib.for.spark + spark2 + + + hiveMetastoreUris + thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083 + + + \ No newline at end of file diff --git a/dhp-workflows/dhp-incremental-graph/src/main/resources/eu/dnetlib/dhp/oa/graph/incremental/resolution/oozie_app/workflow.xml b/dhp-workflows/dhp-incremental-graph/src/main/resources/eu/dnetlib/dhp/oa/graph/incremental/resolution/oozie_app/workflow.xml new file mode 100644 index 0000000000..4b5b13e857 --- /dev/null +++ b/dhp-workflows/dhp-incremental-graph/src/main/resources/eu/dnetlib/dhp/oa/graph/incremental/resolution/oozie_app/workflow.xml @@ -0,0 +1,151 @@ + + + + graphBasePath + input graph to resolve + + + targetGraph + outputDirectory + + + workingDir + outputDirectory + + + hiveMetastoreUris + hive server metastore URIs + + + hiveDbName + hive database containing last generated graph + + + + sparkClusterOpts + --conf spark.extraListeners= --conf spark.sql.queryExecutionListeners= --conf spark.yarn.historyServer.address=http://iis-cdh5-test-m3.ocean.icm.edu.pl:18088 --conf spark.eventLog.dir=hdfs://nameservice1/user/spark/applicationHistory + spark cluster-wide options + + + sparkResourceOpts + --executor-memory=3G --conf spark.executor.memoryOverhead=3G --executor-cores=6 --driver-memory=8G --driver-cores=4 + spark resource options + + + sparkApplicationOpts + --conf spark.sql.shuffle.partitions=1024 + spark resource options + + + + + + + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] + + + + + + yarn + cluster + Resolve Relations + eu.dnetlib.dhp.incremental.SparkResolveRelationById + dhp-incremental-graph-${projectVersion}.jar + + --conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir} + ${sparkClusterOpts} + ${sparkResourceOpts} + ${sparkApplicationOpts} + + --graphBasePath${graphBasePath} + --targetGraph${targetGraph} + --workingDir${workingDir}/resolve_relation + --hiveMetastoreUris${hiveMetastoreUris} + --hiveDbName${hiveDbName} + + + + + + + + + + + + + + + + + + + + + + ${nameNode}/${graphBasePath}/dataset + ${nameNode}/${targetGraph}/dataset + + + + + + + ${nameNode}/${graphBasePath}/datasource + ${nameNode}/${targetGraph}/datasource + + + + + + + ${nameNode}/${graphBasePath}/organization + ${nameNode}/${targetGraph}/organization + + + + + + + ${nameNode}/${graphBasePath}/otherresearchproduct + ${nameNode}/${targetGraph}/otherresearchproduct + + + + + + + ${nameNode}/${graphBasePath}/person + ${nameNode}/${targetGraph}/person + + + + + + + ${nameNode}/${graphBasePath}/project + ${nameNode}/${targetGraph}/project + + + + + + + ${nameNode}/${graphBasePath}/publication + ${nameNode}/${targetGraph}/publication + + + + + + + ${nameNode}/${graphBasePath}/software + ${nameNode}/${targetGraph}/software + + + + + + + \ No newline at end of file diff --git a/dhp-workflows/dhp-incremental-graph/src/main/resources/eu/dnetlib/dhp/oa/graph/incremental/resolution/resolve_relationsbyid_params.json b/dhp-workflows/dhp-incremental-graph/src/main/resources/eu/dnetlib/dhp/oa/graph/incremental/resolution/resolve_relationsbyid_params.json new file mode 100644 index 0000000000..481bd874d3 --- /dev/null +++ b/dhp-workflows/dhp-incremental-graph/src/main/resources/eu/dnetlib/dhp/oa/graph/incremental/resolution/resolve_relationsbyid_params.json @@ -0,0 +1,32 @@ +[ + { + "paramName": "g", + "paramLongName": "graphBasePath", + "paramDescription": "the path of the raw graph", + "paramRequired": true + }, + { + "paramName": "t", + "paramLongName": "targetGraph", + "paramDescription": "the target path", + "paramRequired": true + }, + { + "paramName": "wd", + "paramLongName": "workingDir", + "paramDescription": "the path to store the output graph", + "paramRequired": true + }, + { + "paramName": "hmu", + "paramLongName": "hiveMetastoreUris", + "paramDescription": "the hive metastore uris", + "paramRequired": true + }, + { + "paramName": "db", + "paramLongName": "hiveDbName", + "paramDescription": "the graph hive database name", + "paramRequired": true + } +] \ No newline at end of file diff --git a/dhp-workflows/pom.xml b/dhp-workflows/pom.xml index 1c331d1269..4c3f7509e6 100644 --- a/dhp-workflows/pom.xml +++ b/dhp-workflows/pom.xml @@ -43,6 +43,7 @@ dhp-doiboost dhp-impact-indicators dhp-swh + dhp-incremental-graph