Implement new jobs for collecting data from latest graph on hive and deltas from oaf mdstores (datacite and crossref)

Optimized CopyHdfsOafSparkApplication
This commit is contained in:
Giambattista Bloisi 2024-10-25 15:09:52 +02:00
parent dccbcfd36c
commit 6f9dc2d3ca
22 changed files with 1157 additions and 93 deletions

View File

@ -12,10 +12,7 @@ import java.util.stream.Collectors;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -84,10 +81,12 @@ public class SparkCountryPropagationJob {
Dataset<R> res = readPath(spark, sourcePath, resultClazz);
log.info("Reading prepared info: {}", preparedInfoPath);
Encoder<ResultCountrySet> rcsEncoder = Encoders.bean(ResultCountrySet.class);
Dataset<ResultCountrySet> prepared = spark
.read()
.schema(rcsEncoder.schema())
.json(preparedInfoPath)
.as(Encoders.bean(ResultCountrySet.class));
.as(rcsEncoder);
res
.joinWith(prepared, res.col("id").equalTo(prepared.col("resultId")), "left_outer")

View File

@ -0,0 +1,80 @@
package eu.dnetlib.dhp.oa.graph.hive;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession;
import java.util.Optional;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.schema.oaf.Oaf;
public class GraphHiveTableExporterJob {
private static final Logger log = LoggerFactory.getLogger(GraphHiveTableExporterJob.class);
public static void main(String[] args) throws Exception {
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
IOUtils
.toString(
GraphHiveTableExporterJob.class
.getResourceAsStream(
"/eu/dnetlib/dhp/oa/graph/hive_db_exporter_parameters.json")));
parser.parseArgument(args);
Boolean isSparkSessionManaged = Optional
.ofNullable(parser.get("isSparkSessionManaged"))
.map(Boolean::valueOf)
.orElse(Boolean.TRUE);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
int numPartitions = Optional
.ofNullable(parser.get("numPartitions"))
.map(Integer::valueOf)
.orElse(-1);
log.info("numPartitions: {}", numPartitions);
String outputPath = parser.get("outputPath");
log.info("outputPath: {}", outputPath);
String hiveTableName = parser.get("hiveTableName");
log.info("hiveTableName: {}", hiveTableName);
String hiveMetastoreUris = parser.get("hiveMetastoreUris");
log.info("hiveMetastoreUris: {}", hiveMetastoreUris);
String mode = parser.get("mode");
log.info("mode: {}", mode);
SparkConf conf = new SparkConf();
conf.set("hive.metastore.uris", hiveMetastoreUris);
runWithSparkHiveSession(
conf, isSparkSessionManaged,
spark -> saveGraphTable(spark, outputPath, hiveTableName, mode, numPartitions));
}
// protected for testing
private static <T extends Oaf> void saveGraphTable(SparkSession spark, String outputPath, String hiveTableName,
String mode, int numPartitions) {
Dataset<Row> dataset = spark.table(hiveTableName);
if (numPartitions > 0) {
log.info("repartitioning to {} partitions", numPartitions);
dataset = dataset.repartition(numPartitions);
}
dataset
.write()
.mode(mode)
.option("compression", "gzip")
.json(outputPath);
}
}

View File

@ -0,0 +1,32 @@
[
{
"paramName": "issm",
"paramLongName": "isSparkSessionManaged",
"paramDescription": "when true will stop SparkSession after job execution",
"paramRequired": false
},
{
"paramName": "out",
"paramLongName": "outputPath",
"paramDescription": "the path to the graph data dump to read",
"paramRequired": true
},
{
"paramName": "mode",
"paramLongName": "mode",
"paramDescription": "mode (append|overwrite)",
"paramRequired": true
},
{
"paramName": "hmu",
"paramLongName": "hiveMetastoreUris",
"paramDescription": "the hive metastore uris",
"paramRequired": true
},
{
"paramName": "db",
"paramLongName": "hiveTableName",
"paramDescription": "the input hive table identifier",
"paramRequired": true
}
]

View File

@ -1,12 +1,10 @@
package eu.dnetlib.dhp.oa.graph.raw
import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper}
import eu.dnetlib.dhp.application.ArgumentApplicationParser
import eu.dnetlib.dhp.common.HdfsSupport
import eu.dnetlib.dhp.schema.common.ModelSupport
import eu.dnetlib.dhp.schema.oaf.Oaf
import eu.dnetlib.dhp.utils.DHPUtils
import org.apache.spark.sql.{Encoder, Encoders, SaveMode, SparkSession}
import org.apache.spark.sql.{Encoders, SaveMode, SparkSession}
import org.apache.spark.{SparkConf, SparkContext}
import org.json4s.DefaultFormats
import org.json4s.jackson.JsonMethods.parse
@ -54,48 +52,60 @@ object CopyHdfsOafSparkApplication {
val hdfsPath = parser.get("hdfsPath")
log.info("hdfsPath: {}", hdfsPath)
implicit val oafEncoder: Encoder[Oaf] = Encoders.kryo[Oaf]
val paths =
DHPUtils.mdstorePaths(mdstoreManagerUrl, mdFormat, mdLayout, mdInterpretation, true).asScala
val validPaths: List[String] =
paths.filter(p => HdfsSupport.exists(p, sc.hadoopConfiguration)).toList
val types = ModelSupport.oafTypes.entrySet.asScala
.map(e => Tuple2(e.getKey, e.getValue))
if (validPaths.nonEmpty) {
val oaf = spark.read.textFile(validPaths: _*)
val mapper =
new ObjectMapper().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
val oaf = spark.read
.textFile(validPaths: _*)
.map(v => (getOafType(v), v))(Encoders.tuple(Encoders.STRING, Encoders.STRING))
.cache()
types.foreach(t =>
oaf
.filter(o => isOafType(o, t._1))
.map(j => mapper.readValue(j, t._2).asInstanceOf[Oaf])
.map(s => mapper.writeValueAsString(s))(Encoders.STRING)
.write
.option("compression", "gzip")
.mode(SaveMode.Append)
.text(s"$hdfsPath/${t._1}")
)
try {
ModelSupport.oafTypes
.keySet()
.asScala
.foreach(entity =>
oaf
.filter(s"_1 = '${entity}'")
.selectExpr("_2")
.write
.option("compression", "gzip")
.mode(SaveMode.Append)
.text(s"$hdfsPath/${entity}")
)
} finally {
oaf.unpersist()
}
}
}
def isOafType(input: String, oafType: String): Boolean = {
def getOafType(input: String): String = {
implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats
lazy val json: org.json4s.JValue = parse(input)
if (oafType == "relation") {
val hasSource = (json \ "source").extractOrElse[String](null)
val hasTarget = (json \ "target").extractOrElse[String](null)
hasSource != null && hasTarget != null
val hasId = (json \ "id").extractOrElse[String](null)
val hasSource = (json \ "source").extractOrElse[String](null)
val hasTarget = (json \ "target").extractOrElse[String](null)
if (hasId == null && hasSource != null && hasTarget != null) {
"relation"
} else if (hasId != null) {
val oafType: String = ModelSupport.idPrefixEntity.get(hasId.substring(0, 2))
oafType match {
case "result" =>
(json \ "resulttype" \ "classid").extractOrElse[String](null) match {
case "other" => "otherresearchproduct"
case any => any
}
case _ => oafType
}
} else {
val hasId = (json \ "id").extractOrElse[String](null)
val resultType = (json \ "resulttype" \ "classid").extractOrElse[String]("")
hasId != null && oafType.startsWith(resultType)
null
}
}
}

View File

@ -1,8 +1,8 @@
package eu.dnetlib.dhp.oa.graph.raw;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static eu.dnetlib.dhp.oa.graph.raw.CopyHdfsOafSparkApplication.getOafType;
import static org.junit.jupiter.api.Assertions.assertEquals;
import java.io.IOException;
@ -11,67 +11,24 @@ import org.junit.jupiter.api.Test;
public class CopyHdfsOafSparkApplicationTest {
String getResourceAsStream(String path) throws IOException {
return IOUtils.toString(getClass().getResourceAsStream(path));
}
@Test
void testIsOafType() throws IOException {
assertTrue(
CopyHdfsOafSparkApplication
.isOafType(
IOUtils
.toString(
getClass().getResourceAsStream("/eu/dnetlib/dhp/oa/graph/raw/publication_1.json")),
"publication"));
assertTrue(
CopyHdfsOafSparkApplication
.isOafType(
IOUtils
.toString(
getClass().getResourceAsStream("/eu/dnetlib/dhp/oa/graph/raw/dataset_1.json")),
"dataset"));
assertTrue(
CopyHdfsOafSparkApplication
.isOafType(
IOUtils
.toString(
getClass().getResourceAsStream("/eu/dnetlib/dhp/oa/graph/raw/relation_1.json")),
"relation"));
assertFalse(
CopyHdfsOafSparkApplication
.isOafType(
IOUtils
.toString(
getClass().getResourceAsStream("/eu/dnetlib/dhp/oa/graph/raw/publication_1.json")),
"dataset"));
assertFalse(
CopyHdfsOafSparkApplication
.isOafType(
IOUtils
.toString(
getClass().getResourceAsStream("/eu/dnetlib/dhp/oa/graph/raw/dataset_1.json")),
"publication"));
assertTrue(
CopyHdfsOafSparkApplication
.isOafType(
IOUtils
.toString(
getClass()
.getResourceAsStream(
"/eu/dnetlib/dhp/oa/graph/raw/publication_2_unknownProperty.json")),
"publication"));
assertEquals("publication", getOafType(getResourceAsStream("/eu/dnetlib/dhp/oa/graph/raw/publication_1.json")));
assertEquals("dataset", getOafType(getResourceAsStream("/eu/dnetlib/dhp/oa/graph/raw/dataset_1.json")));
assertEquals("relation", getOafType(getResourceAsStream("/eu/dnetlib/dhp/oa/graph/raw/relation_1.json")));
assertEquals("publication", getOafType(getResourceAsStream("/eu/dnetlib/dhp/oa/graph/raw/publication_1.json")));
assertEquals(
"publication",
getOafType(getResourceAsStream("/eu/dnetlib/dhp/oa/graph/raw/publication_2_unknownProperty.json")));
}
@Test
void isOafType_Datacite_ORP() throws IOException {
assertTrue(
CopyHdfsOafSparkApplication
.isOafType(
IOUtils
.toString(
getClass()
.getResourceAsStream(
"/eu/dnetlib/dhp/oa/graph/raw/datacite_orp.json")),
"otherresearchproduct"));
assertEquals(
"otherresearchproduct", getOafType(getResourceAsStream("/eu/dnetlib/dhp/oa/graph/raw/datacite_orp.json")));
}
}

View File

@ -0,0 +1,77 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>eu.dnetlib.dhp</groupId>
<artifactId>dhp-workflows</artifactId>
<version>1.2.5-SNAPSHOT</version>
</parent>
<artifactId>dhp-incremental-graph</artifactId>
<build>
<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>${net.alchim31.maven.version}</version>
<executions>
<execution>
<id>scala-compile-first</id>
<phase>initialize</phase>
<goals>
<goal>add-source</goal>
<goal>compile</goal>
</goals>
</execution>
<execution>
<id>scala-test-compile</id>
<phase>process-test-resources</phase>
<goals>
<goal>testCompile</goal>
</goals>
</execution>
<execution>
<id>scala-doc</id>
<phase>process-resources</phase> <!-- or wherever -->
<goals>
<goal>doc</goal>
</goals>
</execution>
</executions>
<configuration>
<failOnMultipleScalaVersions>true</failOnMultipleScalaVersions>
<scalaCompatVersion>${scala.binary.version}</scalaCompatVersion>
<scalaVersion>${scala.version}</scalaVersion>
</configuration>
</plugin>
</plugins>
</build>
<dependencies>
<dependency>
<groupId>eu.dnetlib.dhp</groupId>
<artifactId>dhp-aggregation</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>eu.dnetlib.dhp</groupId>
<artifactId>dhp-enrichment</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>eu.dnetlib.dhp</groupId>
<artifactId>dhp-graph-mapper</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.binary.version}</artifactId>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.binary.version}</artifactId>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,161 @@
package eu.dnetlib.dhp.incremental;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession;
import static org.apache.spark.sql.functions.udf;
import java.util.Collections;
import java.util.Optional;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.expressions.UserDefinedFunction;
import org.apache.spark.sql.types.DataTypes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import eu.dnetlib.dhp.aggregation.mdstore.MDStoreActionNode;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.rest.DNetRestClient;
import eu.dnetlib.dhp.oa.graph.raw.CopyHdfsOafSparkApplication;
import eu.dnetlib.dhp.oozie.RunSQLSparkJob;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.mdstore.MDStoreVersion;
import scala.collection.JavaConversions;
public class CollectNewOafResults {
private static final Logger log = LoggerFactory.getLogger(RunSQLSparkJob.class);
private final ArgumentApplicationParser parser;
public CollectNewOafResults(ArgumentApplicationParser parser) {
this.parser = parser;
}
public static void main(String[] args) throws Exception {
String jsonConfiguration = IOUtils
.toString(
CollectNewOafResults.class
.getResourceAsStream(
"/eu/dnetlib/dhp/oa/graph/incremental/collect/collectnewresults_input_parameters.json"));
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
parser.parseArgument(args);
Boolean isSparkSessionManaged = Optional
.ofNullable(parser.get("isSparkSessionManaged"))
.map(Boolean::valueOf)
.orElse(Boolean.TRUE);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
final String wrkdirPath = parser.get("workingDir");
log.info("workingDir is {}", wrkdirPath);
final String outputPath = parser.get("outputPath");
log.info("outputPath is {}", outputPath);
final String mdStoreManagerURI = parser.get("mdStoreManagerURI");
log.info("mdStoreManagerURI is {}", mdStoreManagerURI);
final String mdStoreID = parser.get("mdStoreID");
if (StringUtils.isBlank(mdStoreID)) {
throw new IllegalArgumentException("missing or empty argument mdStoreID");
}
final String hiveDbName = parser.get("hiveDbName");
log.info("hiveDbName is {}", hiveDbName);
final MDStoreVersion currentVersion = DNetRestClient
.doGET(String.format(MDStoreActionNode.READ_LOCK_URL, mdStoreManagerURI, mdStoreID), MDStoreVersion.class);
log.info("mdstore data is {}", currentVersion.toString());
try {
SparkConf conf = new SparkConf();
conf.set("hive.metastore.uris", parser.get("hiveMetastoreUris"));
runWithSparkHiveSession(
conf,
isSparkSessionManaged,
spark -> {
// ids in the current graph
Dataset<Row> currentIds = spark
.table(hiveDbName + ".result")
.select("id")
.union(
spark
.table(hiveDbName + ".relation")
.where("relClass = 'merges'")
.selectExpr("target as id"))
.distinct();
UserDefinedFunction getOafType = udf(
(String json) -> CopyHdfsOafSparkApplication.getOafType(json), DataTypes.StringType);
// new collected ids
spark
.read()
.text(currentVersion.getHdfsPath() + "/store")
.selectExpr(
"value",
"get_json_object(value, '$.id') AS id")
.where("id IS NOT NULL")
.join(currentIds, JavaConversions.asScalaBuffer(Collections.singletonList("id")), "left_anti")
.withColumn("oaftype", getOafType.apply(new Column("value")))
.write()
.partitionBy("oaftype")
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.parquet(wrkdirPath + "/entities");
ModelSupport.oafTypes
.keySet()
.forEach(
entity -> spark
.read()
.parquet(wrkdirPath + "/entities")
.filter("oaftype = '" + entity + "'")
.select("value")
.write()
.option("compression", "gzip")
.mode(SaveMode.Append)
.text(outputPath + "/" + entity));
Dataset<Row> newIds = spark.read().parquet(wrkdirPath + "/entities").select("id");
Dataset<Row> rels = spark
.read()
.text(currentVersion.getHdfsPath() + "/store")
.selectExpr(
"value",
"get_json_object(value, '$.source') AS source",
"get_json_object(value, '$.target') AS target")
.where("source IS NOT NULL AND target IS NOT NULL");
rels
.join(
newIds.selectExpr("id as source"),
JavaConversions.asScalaBuffer(Collections.singletonList("source")), "left_semi")
.union(
rels
.join(
newIds.selectExpr("id as target"),
JavaConversions.asScalaBuffer(Collections.singletonList("target")), "left_semi"))
.distinct()
.select("value")
.write()
.option("compression", "gzip")
.mode(SaveMode.Append)
.text(outputPath + "/relation");
});
} finally {
DNetRestClient
.doGET(String.format(MDStoreActionNode.READ_UNLOCK_URL, mdStoreManagerURI, currentVersion.getId()));
}
}
}

View File

@ -0,0 +1,93 @@
package eu.dnetlib.dhp.incremental
import eu.dnetlib.dhp.PropagationConstant
import eu.dnetlib.dhp.application.ArgumentApplicationParser
import eu.dnetlib.dhp.bulktag.community.TaggingConstants
import eu.dnetlib.dhp.schema.common.ModelSupport
import eu.dnetlib.dhp.schema.oaf.{Oaf, OafEntity}
import org.apache.commons.io.IOUtils
import org.apache.spark.SparkConf
import org.apache.spark.sql._
import org.slf4j.{Logger, LoggerFactory}
import scala.collection.JavaConverters.{collectionAsScalaIterableConverter, mapAsScalaMapConverter, seqAsJavaListConverter}
object SparkAppendContextCleanedGraph {
def main(args: Array[String]): Unit = {
val log: Logger = LoggerFactory.getLogger(getClass)
val conf: SparkConf = new SparkConf()
val parser = new ArgumentApplicationParser(
IOUtils.toString(
getClass.getResourceAsStream(
"/eu/dnetlib/dhp/oa/graph/incremental/export_hive/append_context_cleaned_graph.json"
)
)
)
parser.parseArgument(args)
conf.set("hive.metastore.uris", parser.get("hiveMetastoreUris"))
val outputPath = parser.get("outputPath")
log.info(s"outputPath -> $outputPath")
val hiveDbName = parser.get("hiveDbName")
log.info(s"hiveDbName -> $hiveDbName")
val spark: SparkSession =
SparkSession
.builder()
.config(conf)
.enableHiveSupport()
.appName(getClass.getSimpleName)
.getOrCreate()
for ((entity, clazz) <- ModelSupport.oafTypes.asScala.filter(t => !Seq("datasource", "organization", "project").contains(t._1))) {
if (classOf[OafEntity].isAssignableFrom(clazz)) {
val classEnc: Encoder[Oaf] = Encoders.bean(clazz).asInstanceOf[Encoder[Oaf]]
spark
.table(s"${hiveDbName}.${entity}")
.as(classEnc)
.map(e => {
val oaf = e.asInstanceOf[OafEntity]
if (oaf.getContext != null) {
val newContext = oaf.getContext.asScala
.map(c => {
if (c.getDataInfo != null) {
c.setDataInfo(
c.getDataInfo.asScala
.filter(
di =>
di == null || di.getInferenceprovenance == null ||
(!di.getInferenceprovenance.equals(PropagationConstant.PROPAGATION_DATA_INFO_TYPE)
&& !di.getInferenceprovenance.equals(TaggingConstants.BULKTAG_DATA_INFO_TYPE))
)
.toList
.asJava
)
}
c
})
.filter(!_.getDataInfo.isEmpty)
.toList
.asJava
oaf.setContext(newContext)
}
e
})(classEnc)
.write
.option("compression", "gzip")
.mode(SaveMode.Append)
.json(s"$outputPath/${entity}")
} else {
spark
.table(s"${hiveDbName}.${entity}")
.write
.option("compression", "gzip")
.mode(SaveMode.Append)
.json(s"$outputPath/${entity}")
}
}
}
}

View File

@ -0,0 +1,89 @@
package eu.dnetlib.dhp.incremental
import eu.dnetlib.dhp.application.ArgumentApplicationParser
import eu.dnetlib.dhp.schema.oaf.Relation
import eu.dnetlib.dhp.schema.oaf.utils.IdentifierFactory
import org.apache.commons.io.IOUtils
import org.apache.spark.SparkConf
import org.apache.spark.sql._
import org.apache.spark.sql.functions.{col, expr}
import org.slf4j.{Logger, LoggerFactory}
object SparkResolveRelationById {
def main(args: Array[String]): Unit = {
val log: Logger = LoggerFactory.getLogger(getClass)
val conf: SparkConf = new SparkConf()
val parser = new ArgumentApplicationParser(
IOUtils.toString(
getClass.getResourceAsStream(
"/eu/dnetlib/dhp/oa/graph/incremental/resolution/resolve_relationsbyid_params.json"
)
)
)
parser.parseArgument(args)
conf.set("hive.metastore.uris", parser.get("hiveMetastoreUris"))
val graphBasePath = parser.get("graphBasePath")
log.info(s"graphBasePath -> $graphBasePath")
val relationPath = parser.get("relationPath")
log.info(s"relationPath -> $relationPath")
val targetPath = parser.get("targetGraph")
log.info(s"targetGraph -> $targetPath")
val hiveDbName = parser.get("hiveDbName")
log.info(s"hiveDbName -> $hiveDbName")
val spark: SparkSession =
SparkSession
.builder()
.config(conf)
.enableHiveSupport()
.appName(getClass.getSimpleName)
.getOrCreate()
implicit val relEncoder: Encoder[Relation] = Encoders.bean(classOf[Relation])
val mergedrels =
spark.table(s"${hiveDbName}.relation").where("relclass = 'merges'").selectExpr("source as dedupId", "target as mergedId")
spark.read
.schema(Encoders.bean(classOf[Relation]).schema)
.json(s"$graphBasePath/relation")
.as[Relation]
.map(r => resolveRelations(r))
.join(mergedrels, col("source") === mergedrels.col("mergedId"), "left")
.withColumn("source", expr("coalesce(dedupId, source)"))
.drop("mergedId", "dedupID")
.join(mergedrels, col("target") === mergedrels.col("mergedId"), "left")
.withColumn("target", expr("coalesce(dedupId, target)"))
.drop("mergedId", "dedupID")
.write
.option("compression", "gzip")
.mode(SaveMode.Overwrite)
.json(s"$targetPath/relation")
}
private def resolveRelations(r: Relation): Relation = {
if (r.getSource.startsWith("unresolved::"))
r.setSource(resolvePid(r.getSource.substring(12)))
if (r.getTarget.startsWith("unresolved::"))
r.setTarget(resolvePid(r.getTarget.substring(12)))
r
}
private def resolvePid(str: String): String = {
val parts = str.split("::")
val id = parts(0)
val scheme: String = parts.last match {
case "arxiv" => "arXiv"
case _ => parts.last
}
IdentifierFactory.idFromPid("50", scheme, id, true)
}
}

View File

@ -0,0 +1,44 @@
[
{
"paramName": "issm",
"paramLongName": "isSparkSessionManaged",
"paramDescription": "when true will stop SparkSession after job execution",
"paramRequired": false
},
{
"paramName": "mu",
"paramLongName": "mdStoreManagerURI",
"paramDescription": "the MDStore Manager URI",
"paramRequired": true
},
{
"paramName": "mi",
"paramLongName": "mdStoreID",
"paramDescription": "the Metadata Store ID",
"paramRequired": false
},
{
"paramName": "wd",
"paramLongName": "workingDir",
"paramDescription": "the path to store the output graph",
"paramRequired": true
},
{
"paramName": "o",
"paramLongName": "outputPath",
"paramDescription": "",
"paramRequired": true
},
{
"paramName": "hmu",
"paramLongName": "hiveMetastoreUris",
"paramDescription": "the hive metastore uris",
"paramRequired": true
},
{
"paramName": "db",
"paramLongName": "hiveDbName",
"paramDescription": "the graph hive database name",
"paramRequired": true
}
]

View File

@ -0,0 +1,23 @@
<configuration>
<property>
<name>jobTracker</name>
<value>yarnRM</value>
</property>
<property>
<name>nameNode</name>
<value>hdfs://nameservice1</value>
</property>
<property>
<name>oozie.use.system.libpath</name>
<value>true</value>
</property>
<property>
<name>oozie.action.sharelib.for.spark</name>
<value>spark2</value>
</property>
<property>
<name>hiveMetastoreUris</name>
<value>thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083</value>
</property>
</configuration>

View File

@ -0,0 +1,77 @@
<workflow-app name="NewResultsCollect_Workflow" xmlns="uri:oozie:workflow:0.5">
<parameters>
<property>
<name>mdStoreManagerURI</name>
<description>the path of the cleaned mdstore</description>
</property>
<property>
<name>mdStoreID</name>
<description>the identifier of the native MDStore</description>
</property>
<property>
<name>outputPath</name>
<description>outputDirectory</description>
</property>
<property>
<name>workingDir</name>
<description>outputDirectory</description>
</property>
<property>
<name>hiveMetastoreUris</name>
<description>hive server metastore URIs</description>
</property>
<property>
<name>hiveDbName</name>
<description>hive database containing last generated graph</description>
</property>
<!-- General oozie workflow properties -->
<property>
<name>sparkClusterOpts</name>
<value>--conf spark.extraListeners= --conf spark.sql.queryExecutionListeners= --conf spark.yarn.historyServer.address=http://iis-cdh5-test-m3.ocean.icm.edu.pl:18088 --conf spark.eventLog.dir=hdfs://nameservice1/user/spark/applicationHistory</value>
<description>spark cluster-wide options</description>
</property>
<property>
<name>sparkResourceOpts</name>
<value>--executor-memory=8G --conf spark.executor.memoryOverhead=6G --executor-cores=6 --driver-memory=9G --driver-cores=4</value>
<description>spark resource options</description>
</property>
<property>
<name>sparkApplicationOpts</name>
<value>--conf spark.sql.shuffle.partitions=1024</value>
<description>spark resource options</description>
</property>
</parameters>
<start to="CollectJob"/>
<kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<action name="CollectJob">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Collect New Oaf Results</name>
<class>eu.dnetlib.dhp.incremental.CollectNewOafResults</class>
<jar>dhp-incremental-graph-${projectVersion}.jar</jar>
<spark-opts>
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
${sparkClusterOpts}
${sparkResourceOpts}
${sparkApplicationOpts}
</spark-opts>
<arg>--outputPath</arg><arg>${outputPath}</arg>
<arg>--workingDir</arg><arg>${workingDir}/collect_new_results</arg>
<arg>--mdStoreID</arg><arg>${mdStoreID}</arg>
<arg>--mdStoreManagerURI</arg><arg>${mdStoreManagerURI}</arg>
<arg>--hiveMetastoreUris</arg><arg>${hiveMetastoreUris}</arg>
<arg>--hiveDbName</arg><arg>${hiveDbName}</arg>
</spark>
<ok to="End"/>
<error to="Kill"/>
</action>
<end name="End"/>
</workflow-app>

View File

@ -0,0 +1,20 @@
[
{
"paramName": "out",
"paramLongName": "outputPath",
"paramDescription": "the path to the graph data dump to read",
"paramRequired": true
},
{
"paramName": "hmu",
"paramLongName": "hiveMetastoreUris",
"paramDescription": "the hive metastore uris",
"paramRequired": true
},
{
"paramName": "db",
"paramLongName": "hiveDbName",
"paramDescription": "the input hive database identifier",
"paramRequired": true
}
]

View File

@ -0,0 +1,26 @@
<configuration>
<property>
<name>jobTracker</name>
<value>yarnRM</value>
</property>
<property>
<name>nameNode</name>
<value>hdfs://nameservice1</value>
</property>
<property>
<name>oozie.use.system.libpath</name>
<value>true</value>
</property>
<property>
<name>hiveMetastoreUris</name>
<value>thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083</value>
</property>
<property>
<name>hiveJdbcUrl</name>
<value>jdbc:hive2://iis-cdh5-test-m3.ocean.icm.edu.pl:10000</value>
</property>
<property>
<name>hiveDbName</name>
<value>openaire</value>
</property>
</configuration>

View File

@ -0,0 +1,63 @@
<workflow-app name="import_graph_as_hive_DB" xmlns="uri:oozie:workflow:0.5">
<parameters>
<property>
<name>outputPath</name>
<description>the source path</description>
</property>
<property>
<name>hiveDbName</name>
<description>the target hive database name</description>
</property>
<property>
<name>hiveMetastoreUris</name>
<description>hive server metastore URIs</description>
</property>
<!-- General oozie workflow properties -->
<property>
<name>sparkClusterOpts</name>
<value>--conf spark.extraListeners= --conf spark.sql.queryExecutionListeners= --conf spark.yarn.historyServer.address=http://iis-cdh5-test-m3.ocean.icm.edu.pl:18088 --conf spark.eventLog.dir=hdfs://nameservice1/user/spark/applicationHistory</value>
<description>spark cluster-wide options</description>
</property>
<property>
<name>sparkResourceOpts</name>
<value>--executor-memory=3G --conf spark.executor.memoryOverhead=3G --executor-cores=6 --driver-memory=8G --driver-cores=4</value>
<description>spark resource options</description>
</property>
<property>
<name>sparkApplicationOpts</name>
<value>--conf spark.sql.shuffle.partitions=1024</value>
<description>spark resource options</description>
</property>
</parameters>
<start to="merge_db_entities"/>
<kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<action name="merge_db_entities">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Merge Oaf Entities from hive db</name>
<class>eu.dnetlib.dhp.incremental.SparkAppendContextCleanedGraph</class>
<jar>dhp-incremental-graph-${projectVersion}.jar</jar>
<spark-opts>
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
${sparkClusterOpts}
${sparkResourceOpts}
${sparkApplicationOpts}
</spark-opts>
<arg>--outputPath</arg><arg>${outputPath}</arg>
<arg>--hiveDbName</arg><arg>${hiveDbName}</arg>
<arg>--hiveMetastoreUris</arg><arg>${hiveMetastoreUris}</arg>
</spark>
<ok to="End"/>
<error to="Kill"/>
</action>
<end name="End"/>
</workflow-app>

View File

@ -0,0 +1,26 @@
<configuration>
<property>
<name>jobTracker</name>
<value>yarnRM</value>
</property>
<property>
<name>nameNode</name>
<value>hdfs://nameservice1</value>
</property>
<property>
<name>oozie.use.system.libpath</name>
<value>true</value>
</property>
<property>
<name>oozie.action.sharelib.for.spark</name>
<value>spark2</value>
</property>
<property>
<name>hiveMetastoreUris</name>
<value>thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083</value>
</property>
<property>
<name>sparkSqlWarehouseDir</name>
<value>/user/hive/warehouse</value>
</property>
</configuration>

View File

@ -0,0 +1,11 @@
INSERT OVERWRITE DIRECTORY '${outputPath}/datasource'
USING json OPTIONS ('compression' 'gzip')
SELECT * FROM `${hiveDbName}`.`datasource`; /* EOS */
INSERT OVERWRITE DIRECTORY '${outputPath}/organization'
USING json OPTIONS ('compression' 'gzip')
SELECT * FROM `${hiveDbName}`.`organization`; /* EOS */
INSERT OVERWRITE DIRECTORY '${outputPath}/project'
USING json OPTIONS ('compression' 'gzip')
SELECT * FROM `${hiveDbName}`.`project`; /* EOS */

View File

@ -0,0 +1,69 @@
<workflow-app name="NewResultsCollect_Workflow" xmlns="uri:oozie:workflow:0.5">
<parameters>
<property>
<name>outputPath</name>
<description>outputDirectory</description>
</property>
<property>
<name>hiveMetastoreUris</name>
<description>hive server metastore URIs</description>
</property>
<property>
<name>sparkSqlWarehouseDir</name>
</property>
<property>
<name>hiveDbName</name>
<description>hive database containing last generated graph</description>
</property>
<property>
<name>action</name>
</property>
<!-- General oozie workflow properties -->
<property>
<name>sparkClusterOpts</name>
<value>--conf spark.extraListeners= --conf spark.sql.queryExecutionListeners= --conf spark.yarn.historyServer.address=http://iis-cdh5-test-m3.ocean.icm.edu.pl:18088 --conf spark.eventLog.dir=hdfs://nameservice1/user/spark/applicationHistory</value>
<description>spark cluster-wide options</description>
</property>
<property>
<name>sparkResourceOpts</name>
<value>--executor-memory=3G --conf spark.executor.memoryOverhead=3G --executor-cores=6 --driver-memory=8G --driver-cores=4</value>
<description>spark resource options</description>
</property>
<property>
<name>sparkApplicationOpts</name>
<value>--conf spark.sql.shuffle.partitions=1024</value>
<description>spark resource options</description>
</property>
</parameters>
<start to="MigrationJob"/>
<kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<action name="MigrationJob">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Copy data from last graph</name>
<class>eu.dnetlib.dhp.oozie.RunSQLSparkJob</class>
<jar>dhp-incremental-graph-${projectVersion}.jar</jar>
<spark-opts>
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
${sparkClusterOpts}
${sparkResourceOpts}
${sparkApplicationOpts}
</spark-opts>
<arg>--hiveMetastoreUris</arg><arg>${hiveMetastoreUris}</arg>
<arg>--sql</arg><arg>eu/dnetlib/dhp/oa/graph/incremental/migrate/oozie_app/migration.sql</arg>
<arg>--hiveDbName</arg><arg>${hiveDbName}</arg>
<arg>--outputPath</arg><arg>${outputPath}</arg>
</spark>
<ok to="End"/>
<error to="Kill"/>
</action>
<end name="End"/>
</workflow-app>

View File

@ -0,0 +1,23 @@
<configuration>
<property>
<name>jobTracker</name>
<value>yarnRM</value>
</property>
<property>
<name>nameNode</name>
<value>hdfs://nameservice1</value>
</property>
<property>
<name>oozie.use.system.libpath</name>
<value>true</value>
</property>
<property>
<name>oozie.action.sharelib.for.spark</name>
<value>spark2</value>
</property>
<property>
<name>hiveMetastoreUris</name>
<value>thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083</value>
</property>
</configuration>

View File

@ -0,0 +1,151 @@
<workflow-app name="Transformation_Workflow" xmlns="uri:oozie:workflow:0.5">
<parameters>
<property>
<name>graphBasePath</name>
<description>input graph to resolve</description>
</property>
<property>
<name>targetGraph</name>
<description>outputDirectory</description>
</property>
<property>
<name>workingDir</name>
<description>outputDirectory</description>
</property>
<property>
<name>hiveMetastoreUris</name>
<description>hive server metastore URIs</description>
</property>
<property>
<name>hiveDbName</name>
<description>hive database containing last generated graph</description>
</property>
<!-- General oozie workflow properties -->
<property>
<name>sparkClusterOpts</name>
<value>--conf spark.extraListeners= --conf spark.sql.queryExecutionListeners= --conf spark.yarn.historyServer.address=http://iis-cdh5-test-m3.ocean.icm.edu.pl:18088 --conf spark.eventLog.dir=hdfs://nameservice1/user/spark/applicationHistory</value>
<description>spark cluster-wide options</description>
</property>
<property>
<name>sparkResourceOpts</name>
<value>--executor-memory=3G --conf spark.executor.memoryOverhead=3G --executor-cores=6 --driver-memory=8G --driver-cores=4</value>
<description>spark resource options</description>
</property>
<property>
<name>sparkApplicationOpts</name>
<value>--conf spark.sql.shuffle.partitions=1024</value>
<description>spark resource options</description>
</property>
</parameters>
<start to="ResolveJob"/>
<kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<action name="ResolveJob">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Resolve Relations</name>
<class>eu.dnetlib.dhp.incremental.SparkResolveRelationById</class>
<jar>dhp-incremental-graph-${projectVersion}.jar</jar>
<spark-opts>
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
${sparkClusterOpts}
${sparkResourceOpts}
${sparkApplicationOpts}
</spark-opts>
<arg>--graphBasePath</arg><arg>${graphBasePath}</arg>
<arg>--targetGraph</arg><arg>${targetGraph}</arg>
<arg>--workingDir</arg><arg>${workingDir}/resolve_relation</arg>
<arg>--hiveMetastoreUris</arg><arg>${hiveMetastoreUris}</arg>
<arg>--hiveDbName</arg><arg>${hiveDbName}</arg>
</spark>
<ok to="reset_outputpath"/>
<error to="Kill"/>
</action>
<action name="reset_outputpath">
<fs>
<delete path="${targetGraph}/dataset"/>
<delete path="${targetGraph}/datasource"/>
<delete path="${targetGraph}/organization"/>
<delete path="${targetGraph}/otherresearchproduct"/>
<delete path="${targetGraph}/person"/>
<delete path="${targetGraph}/project"/>
<delete path="${targetGraph}/publication"/>
<delete path="${targetGraph}/software"/>
</fs>
<ok to="copy_dataset"/>
<error to="Kill"/>
</action>
<action name="copy_dataset">
<distcp xmlns="uri:oozie:distcp-action:0.2">
<arg>${nameNode}/${graphBasePath}/dataset</arg>
<arg>${nameNode}/${targetGraph}/dataset</arg>
</distcp>
<ok to="copy_datasource"/>
<error to="Kill"/>
</action>
<action name="copy_datasource">
<distcp xmlns="uri:oozie:distcp-action:0.2">
<arg>${nameNode}/${graphBasePath}/datasource</arg>
<arg>${nameNode}/${targetGraph}/datasource</arg>
</distcp>
<ok to="copy_organization"/>
<error to="Kill"/>
</action>
<action name="copy_organization">
<distcp xmlns="uri:oozie:distcp-action:0.2">
<arg>${nameNode}/${graphBasePath}/organization</arg>
<arg>${nameNode}/${targetGraph}/organization</arg>
</distcp>
<ok to="copy_otherresearchproduct"/>
<error to="Kill"/>
</action>
<action name="copy_otherresearchproduct">
<distcp xmlns="uri:oozie:distcp-action:0.2">
<arg>${nameNode}/${graphBasePath}/otherresearchproduct</arg>
<arg>${nameNode}/${targetGraph}/otherresearchproduct</arg>
</distcp>
<ok to="copy_person"/>
<error to="Kill"/>
</action>
<action name="copy_person">
<distcp xmlns="uri:oozie:distcp-action:0.2">
<arg>${nameNode}/${graphBasePath}/person</arg>
<arg>${nameNode}/${targetGraph}/person</arg>
</distcp>
<ok to="copy_project"/>
<error to="Kill"/>
</action>
<action name="copy_project">
<distcp xmlns="uri:oozie:distcp-action:0.2">
<arg>${nameNode}/${graphBasePath}/project</arg>
<arg>${nameNode}/${targetGraph}/project</arg>
</distcp>
<ok to="copy_publication"/>
<error to="Kill"/>
</action>
<action name="copy_publication">
<distcp xmlns="uri:oozie:distcp-action:0.2">
<arg>${nameNode}/${graphBasePath}/publication</arg>
<arg>${nameNode}/${targetGraph}/publication</arg>
</distcp>
<ok to="copy_software"/>
<error to="Kill"/>
</action>
<action name="copy_software">
<distcp xmlns="uri:oozie:distcp-action:0.2">
<arg>${nameNode}/${graphBasePath}/software</arg>
<arg>${nameNode}/${targetGraph}/software</arg>
</distcp>
<ok to="End"/>
<error to="Kill"/>
</action>
<end name="End"/>
</workflow-app>

View File

@ -0,0 +1,32 @@
[
{
"paramName": "g",
"paramLongName": "graphBasePath",
"paramDescription": "the path of the raw graph",
"paramRequired": true
},
{
"paramName": "t",
"paramLongName": "targetGraph",
"paramDescription": "the target path",
"paramRequired": true
},
{
"paramName": "wd",
"paramLongName": "workingDir",
"paramDescription": "the path to store the output graph",
"paramRequired": true
},
{
"paramName": "hmu",
"paramLongName": "hiveMetastoreUris",
"paramDescription": "the hive metastore uris",
"paramRequired": true
},
{
"paramName": "db",
"paramLongName": "hiveDbName",
"paramDescription": "the graph hive database name",
"paramRequired": true
}
]

View File

@ -43,6 +43,7 @@
<module>dhp-doiboost</module>
<module>dhp-impact-indicators</module>
<module>dhp-swh</module>
<module>dhp-incremental-graph</module>
</modules>
<pluginRepositories>