Implement new jobs for collecting data from latest graph on hive and deltas from oaf mdstores (datacite and crossref)
Optimized CopyHdfsOafSparkApplication
This commit is contained in:
parent
dccbcfd36c
commit
85dced4ffb
|
@ -12,10 +12,7 @@ import java.util.stream.Collectors;
|
|||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.spark.SparkConf;
|
||||
import org.apache.spark.api.java.function.MapFunction;
|
||||
import org.apache.spark.sql.Dataset;
|
||||
import org.apache.spark.sql.Encoders;
|
||||
import org.apache.spark.sql.SaveMode;
|
||||
import org.apache.spark.sql.SparkSession;
|
||||
import org.apache.spark.sql.*;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
|
@ -84,10 +81,12 @@ public class SparkCountryPropagationJob {
|
|||
Dataset<R> res = readPath(spark, sourcePath, resultClazz);
|
||||
|
||||
log.info("Reading prepared info: {}", preparedInfoPath);
|
||||
Encoder<ResultCountrySet> rcsEncoder = Encoders.bean(ResultCountrySet.class);
|
||||
Dataset<ResultCountrySet> prepared = spark
|
||||
.read()
|
||||
.schema(rcsEncoder.schema())
|
||||
.json(preparedInfoPath)
|
||||
.as(Encoders.bean(ResultCountrySet.class));
|
||||
.as(rcsEncoder);
|
||||
|
||||
res
|
||||
.joinWith(prepared, res.col("id").equalTo(prepared.col("resultId")), "left_outer")
|
||||
|
|
|
@ -0,0 +1,80 @@
|
|||
|
||||
package eu.dnetlib.dhp.oa.graph.hive;
|
||||
|
||||
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession;
|
||||
|
||||
import java.util.Optional;
|
||||
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.spark.SparkConf;
|
||||
import org.apache.spark.sql.*;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||
import eu.dnetlib.dhp.schema.oaf.Oaf;
|
||||
|
||||
public class GraphHiveTableExporterJob {
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(GraphHiveTableExporterJob.class);
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
|
||||
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
|
||||
IOUtils
|
||||
.toString(
|
||||
GraphHiveTableExporterJob.class
|
||||
.getResourceAsStream(
|
||||
"/eu/dnetlib/dhp/oa/graph/hive_db_exporter_parameters.json")));
|
||||
parser.parseArgument(args);
|
||||
|
||||
Boolean isSparkSessionManaged = Optional
|
||||
.ofNullable(parser.get("isSparkSessionManaged"))
|
||||
.map(Boolean::valueOf)
|
||||
.orElse(Boolean.TRUE);
|
||||
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
|
||||
|
||||
int numPartitions = Optional
|
||||
.ofNullable(parser.get("numPartitions"))
|
||||
.map(Integer::valueOf)
|
||||
.orElse(-1);
|
||||
log.info("numPartitions: {}", numPartitions);
|
||||
|
||||
String outputPath = parser.get("outputPath");
|
||||
log.info("outputPath: {}", outputPath);
|
||||
|
||||
String hiveTableName = parser.get("hiveTableName");
|
||||
log.info("hiveTableName: {}", hiveTableName);
|
||||
|
||||
String hiveMetastoreUris = parser.get("hiveMetastoreUris");
|
||||
log.info("hiveMetastoreUris: {}", hiveMetastoreUris);
|
||||
|
||||
String mode = parser.get("mode");
|
||||
log.info("mode: {}", mode);
|
||||
|
||||
SparkConf conf = new SparkConf();
|
||||
conf.set("hive.metastore.uris", hiveMetastoreUris);
|
||||
|
||||
runWithSparkHiveSession(
|
||||
conf, isSparkSessionManaged,
|
||||
spark -> saveGraphTable(spark, outputPath, hiveTableName, mode, numPartitions));
|
||||
}
|
||||
|
||||
// protected for testing
|
||||
private static <T extends Oaf> void saveGraphTable(SparkSession spark, String outputPath, String hiveTableName,
|
||||
String mode, int numPartitions) {
|
||||
|
||||
Dataset<Row> dataset = spark.table(hiveTableName);
|
||||
|
||||
if (numPartitions > 0) {
|
||||
log.info("repartitioning to {} partitions", numPartitions);
|
||||
dataset = dataset.repartition(numPartitions);
|
||||
}
|
||||
|
||||
dataset
|
||||
.write()
|
||||
.mode(mode)
|
||||
.option("compression", "gzip")
|
||||
.json(outputPath);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,32 @@
|
|||
[
|
||||
{
|
||||
"paramName": "issm",
|
||||
"paramLongName": "isSparkSessionManaged",
|
||||
"paramDescription": "when true will stop SparkSession after job execution",
|
||||
"paramRequired": false
|
||||
},
|
||||
{
|
||||
"paramName": "out",
|
||||
"paramLongName": "outputPath",
|
||||
"paramDescription": "the path to the graph data dump to read",
|
||||
"paramRequired": true
|
||||
},
|
||||
{
|
||||
"paramName": "mode",
|
||||
"paramLongName": "mode",
|
||||
"paramDescription": "mode (append|overwrite)",
|
||||
"paramRequired": true
|
||||
},
|
||||
{
|
||||
"paramName": "hmu",
|
||||
"paramLongName": "hiveMetastoreUris",
|
||||
"paramDescription": "the hive metastore uris",
|
||||
"paramRequired": true
|
||||
},
|
||||
{
|
||||
"paramName": "db",
|
||||
"paramLongName": "hiveTableName",
|
||||
"paramDescription": "the input hive table identifier",
|
||||
"paramRequired": true
|
||||
}
|
||||
]
|
|
@ -1,12 +1,10 @@
|
|||
package eu.dnetlib.dhp.oa.graph.raw
|
||||
|
||||
import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper}
|
||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser
|
||||
import eu.dnetlib.dhp.common.HdfsSupport
|
||||
import eu.dnetlib.dhp.schema.common.ModelSupport
|
||||
import eu.dnetlib.dhp.schema.oaf.Oaf
|
||||
import eu.dnetlib.dhp.utils.DHPUtils
|
||||
import org.apache.spark.sql.{Encoder, Encoders, SaveMode, SparkSession}
|
||||
import org.apache.spark.sql.{Encoders, SaveMode, SparkSession}
|
||||
import org.apache.spark.{SparkConf, SparkContext}
|
||||
import org.json4s.DefaultFormats
|
||||
import org.json4s.jackson.JsonMethods.parse
|
||||
|
@ -54,48 +52,60 @@ object CopyHdfsOafSparkApplication {
|
|||
val hdfsPath = parser.get("hdfsPath")
|
||||
log.info("hdfsPath: {}", hdfsPath)
|
||||
|
||||
implicit val oafEncoder: Encoder[Oaf] = Encoders.kryo[Oaf]
|
||||
|
||||
val paths =
|
||||
DHPUtils.mdstorePaths(mdstoreManagerUrl, mdFormat, mdLayout, mdInterpretation, true).asScala
|
||||
|
||||
val validPaths: List[String] =
|
||||
paths.filter(p => HdfsSupport.exists(p, sc.hadoopConfiguration)).toList
|
||||
|
||||
val types = ModelSupport.oafTypes.entrySet.asScala
|
||||
.map(e => Tuple2(e.getKey, e.getValue))
|
||||
|
||||
if (validPaths.nonEmpty) {
|
||||
val oaf = spark.read.textFile(validPaths: _*)
|
||||
val mapper =
|
||||
new ObjectMapper().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
|
||||
val oaf = spark.read
|
||||
.textFile(validPaths: _*)
|
||||
.map(v => (getOafType(v), v))(Encoders.tuple(Encoders.STRING, Encoders.STRING))
|
||||
.cache()
|
||||
|
||||
types.foreach(t =>
|
||||
oaf
|
||||
.filter(o => isOafType(o, t._1))
|
||||
.map(j => mapper.readValue(j, t._2).asInstanceOf[Oaf])
|
||||
.map(s => mapper.writeValueAsString(s))(Encoders.STRING)
|
||||
.write
|
||||
.option("compression", "gzip")
|
||||
.mode(SaveMode.Append)
|
||||
.text(s"$hdfsPath/${t._1}")
|
||||
)
|
||||
try {
|
||||
ModelSupport.oafTypes
|
||||
.keySet()
|
||||
.asScala
|
||||
.foreach(entity =>
|
||||
oaf
|
||||
.filter(s"_1 = '${entity}'")
|
||||
.selectExpr("_2")
|
||||
.write
|
||||
.option("compression", "gzip")
|
||||
.mode(SaveMode.Append)
|
||||
.text(s"$hdfsPath/${entity}")
|
||||
)
|
||||
} finally {
|
||||
oaf.unpersist()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
def isOafType(input: String, oafType: String): Boolean = {
|
||||
def getOafType(input: String): String = {
|
||||
implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats
|
||||
lazy val json: org.json4s.JValue = parse(input)
|
||||
if (oafType == "relation") {
|
||||
val hasSource = (json \ "source").extractOrElse[String](null)
|
||||
val hasTarget = (json \ "target").extractOrElse[String](null)
|
||||
|
||||
hasSource != null && hasTarget != null
|
||||
val hasId = (json \ "id").extractOrElse[String](null)
|
||||
val hasSource = (json \ "source").extractOrElse[String](null)
|
||||
val hasTarget = (json \ "target").extractOrElse[String](null)
|
||||
|
||||
if (hasId == null && hasSource != null && hasTarget != null) {
|
||||
"relation"
|
||||
} else if (hasId != null) {
|
||||
val oafType: String = ModelSupport.idPrefixEntity.get(hasId.substring(0, 2))
|
||||
|
||||
oafType match {
|
||||
case "result" =>
|
||||
(json \ "resulttype" \ "classid").extractOrElse[String](null) match {
|
||||
case "other" => "otherresearchproduct"
|
||||
case any => any
|
||||
}
|
||||
case _ => oafType
|
||||
}
|
||||
} else {
|
||||
val hasId = (json \ "id").extractOrElse[String](null)
|
||||
val resultType = (json \ "resulttype" \ "classid").extractOrElse[String]("")
|
||||
hasId != null && oafType.startsWith(resultType)
|
||||
null
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,8 +1,8 @@
|
|||
|
||||
package eu.dnetlib.dhp.oa.graph.raw;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertFalse;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
import static eu.dnetlib.dhp.oa.graph.raw.CopyHdfsOafSparkApplication.getOafType;
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
|
@ -11,67 +11,24 @@ import org.junit.jupiter.api.Test;
|
|||
|
||||
public class CopyHdfsOafSparkApplicationTest {
|
||||
|
||||
String getResourceAsStream(String path) throws IOException {
|
||||
return IOUtils.toString(getClass().getResourceAsStream(path));
|
||||
}
|
||||
|
||||
@Test
|
||||
void testIsOafType() throws IOException {
|
||||
assertTrue(
|
||||
CopyHdfsOafSparkApplication
|
||||
.isOafType(
|
||||
IOUtils
|
||||
.toString(
|
||||
getClass().getResourceAsStream("/eu/dnetlib/dhp/oa/graph/raw/publication_1.json")),
|
||||
"publication"));
|
||||
assertTrue(
|
||||
CopyHdfsOafSparkApplication
|
||||
.isOafType(
|
||||
IOUtils
|
||||
.toString(
|
||||
getClass().getResourceAsStream("/eu/dnetlib/dhp/oa/graph/raw/dataset_1.json")),
|
||||
"dataset"));
|
||||
assertTrue(
|
||||
CopyHdfsOafSparkApplication
|
||||
.isOafType(
|
||||
IOUtils
|
||||
.toString(
|
||||
getClass().getResourceAsStream("/eu/dnetlib/dhp/oa/graph/raw/relation_1.json")),
|
||||
"relation"));
|
||||
|
||||
assertFalse(
|
||||
CopyHdfsOafSparkApplication
|
||||
.isOafType(
|
||||
IOUtils
|
||||
.toString(
|
||||
getClass().getResourceAsStream("/eu/dnetlib/dhp/oa/graph/raw/publication_1.json")),
|
||||
"dataset"));
|
||||
assertFalse(
|
||||
CopyHdfsOafSparkApplication
|
||||
.isOafType(
|
||||
IOUtils
|
||||
.toString(
|
||||
getClass().getResourceAsStream("/eu/dnetlib/dhp/oa/graph/raw/dataset_1.json")),
|
||||
"publication"));
|
||||
|
||||
assertTrue(
|
||||
CopyHdfsOafSparkApplication
|
||||
.isOafType(
|
||||
IOUtils
|
||||
.toString(
|
||||
getClass()
|
||||
.getResourceAsStream(
|
||||
"/eu/dnetlib/dhp/oa/graph/raw/publication_2_unknownProperty.json")),
|
||||
"publication"));
|
||||
assertEquals("publication", getOafType(getResourceAsStream("/eu/dnetlib/dhp/oa/graph/raw/publication_1.json")));
|
||||
assertEquals("dataset", getOafType(getResourceAsStream("/eu/dnetlib/dhp/oa/graph/raw/dataset_1.json")));
|
||||
assertEquals("relation", getOafType(getResourceAsStream("/eu/dnetlib/dhp/oa/graph/raw/relation_1.json")));
|
||||
assertEquals("publication", getOafType(getResourceAsStream("/eu/dnetlib/dhp/oa/graph/raw/publication_1.json")));
|
||||
assertEquals(
|
||||
"publication",
|
||||
getOafType(getResourceAsStream("/eu/dnetlib/dhp/oa/graph/raw/publication_2_unknownProperty.json")));
|
||||
}
|
||||
|
||||
@Test
|
||||
void isOafType_Datacite_ORP() throws IOException {
|
||||
assertTrue(
|
||||
CopyHdfsOafSparkApplication
|
||||
.isOafType(
|
||||
IOUtils
|
||||
.toString(
|
||||
getClass()
|
||||
.getResourceAsStream(
|
||||
"/eu/dnetlib/dhp/oa/graph/raw/datacite_orp.json")),
|
||||
"otherresearchproduct"));
|
||||
assertEquals(
|
||||
"otherresearchproduct", getOafType(getResourceAsStream("/eu/dnetlib/dhp/oa/graph/raw/datacite_orp.json")));
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -0,0 +1,77 @@
|
|||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<parent>
|
||||
<groupId>eu.dnetlib.dhp</groupId>
|
||||
<artifactId>dhp-workflows</artifactId>
|
||||
<version>1.2.5-SNAPSHOT</version>
|
||||
</parent>
|
||||
<artifactId>dhp-incremental-graph</artifactId>
|
||||
<build>
|
||||
<plugins>
|
||||
<plugin>
|
||||
<groupId>net.alchim31.maven</groupId>
|
||||
<artifactId>scala-maven-plugin</artifactId>
|
||||
<version>${net.alchim31.maven.version}</version>
|
||||
<executions>
|
||||
<execution>
|
||||
<id>scala-compile-first</id>
|
||||
<phase>initialize</phase>
|
||||
<goals>
|
||||
<goal>add-source</goal>
|
||||
<goal>compile</goal>
|
||||
</goals>
|
||||
</execution>
|
||||
<execution>
|
||||
<id>scala-test-compile</id>
|
||||
<phase>process-test-resources</phase>
|
||||
<goals>
|
||||
<goal>testCompile</goal>
|
||||
</goals>
|
||||
</execution>
|
||||
<execution>
|
||||
<id>scala-doc</id>
|
||||
<phase>process-resources</phase> <!-- or wherever -->
|
||||
<goals>
|
||||
<goal>doc</goal>
|
||||
</goals>
|
||||
</execution>
|
||||
</executions>
|
||||
<configuration>
|
||||
<failOnMultipleScalaVersions>true</failOnMultipleScalaVersions>
|
||||
<scalaCompatVersion>${scala.binary.version}</scalaCompatVersion>
|
||||
<scalaVersion>${scala.version}</scalaVersion>
|
||||
</configuration>
|
||||
</plugin>
|
||||
</plugins>
|
||||
|
||||
</build>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>eu.dnetlib.dhp</groupId>
|
||||
<artifactId>dhp-aggregation</artifactId>
|
||||
<version>${project.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>eu.dnetlib.dhp</groupId>
|
||||
<artifactId>dhp-enrichment</artifactId>
|
||||
<version>${project.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>eu.dnetlib.dhp</groupId>
|
||||
<artifactId>dhp-graph-mapper</artifactId>
|
||||
<version>${project.version}</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.apache.spark</groupId>
|
||||
<artifactId>spark-core_${scala.binary.version}</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.spark</groupId>
|
||||
<artifactId>spark-sql_${scala.binary.version}</artifactId>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
</project>
|
|
@ -0,0 +1,161 @@
|
|||
|
||||
package eu.dnetlib.dhp.incremental;
|
||||
|
||||
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession;
|
||||
import static org.apache.spark.sql.functions.udf;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.Optional;
|
||||
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.spark.SparkConf;
|
||||
import org.apache.spark.sql.Column;
|
||||
import org.apache.spark.sql.Dataset;
|
||||
import org.apache.spark.sql.Row;
|
||||
import org.apache.spark.sql.SaveMode;
|
||||
import org.apache.spark.sql.expressions.UserDefinedFunction;
|
||||
import org.apache.spark.sql.types.DataTypes;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import eu.dnetlib.dhp.aggregation.mdstore.MDStoreActionNode;
|
||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||
import eu.dnetlib.dhp.common.rest.DNetRestClient;
|
||||
import eu.dnetlib.dhp.oa.graph.raw.CopyHdfsOafSparkApplication;
|
||||
import eu.dnetlib.dhp.oozie.RunSQLSparkJob;
|
||||
import eu.dnetlib.dhp.schema.common.ModelSupport;
|
||||
import eu.dnetlib.dhp.schema.mdstore.MDStoreVersion;
|
||||
import scala.collection.JavaConversions;
|
||||
|
||||
public class CollectNewOafResults {
|
||||
private static final Logger log = LoggerFactory.getLogger(RunSQLSparkJob.class);
|
||||
|
||||
private final ArgumentApplicationParser parser;
|
||||
|
||||
public CollectNewOafResults(ArgumentApplicationParser parser) {
|
||||
this.parser = parser;
|
||||
}
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
String jsonConfiguration = IOUtils
|
||||
.toString(
|
||||
CollectNewOafResults.class
|
||||
.getResourceAsStream(
|
||||
"/eu/dnetlib/dhp/oa/graph/incremental/collect/collectnewresults_input_parameters.json"));
|
||||
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
|
||||
parser.parseArgument(args);
|
||||
|
||||
Boolean isSparkSessionManaged = Optional
|
||||
.ofNullable(parser.get("isSparkSessionManaged"))
|
||||
.map(Boolean::valueOf)
|
||||
.orElse(Boolean.TRUE);
|
||||
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
|
||||
|
||||
final String wrkdirPath = parser.get("workingDir");
|
||||
log.info("workingDir is {}", wrkdirPath);
|
||||
|
||||
final String outputPath = parser.get("outputPath");
|
||||
log.info("outputPath is {}", outputPath);
|
||||
|
||||
final String mdStoreManagerURI = parser.get("mdStoreManagerURI");
|
||||
log.info("mdStoreManagerURI is {}", mdStoreManagerURI);
|
||||
|
||||
final String mdStoreID = parser.get("mdStoreID");
|
||||
if (StringUtils.isBlank(mdStoreID)) {
|
||||
throw new IllegalArgumentException("missing or empty argument mdStoreID");
|
||||
}
|
||||
|
||||
final String hiveDbName = parser.get("hiveDbName");
|
||||
log.info("hiveDbName is {}", hiveDbName);
|
||||
|
||||
final MDStoreVersion currentVersion = DNetRestClient
|
||||
.doGET(String.format(MDStoreActionNode.READ_LOCK_URL, mdStoreManagerURI, mdStoreID), MDStoreVersion.class);
|
||||
|
||||
log.info("mdstore data is {}", currentVersion.toString());
|
||||
|
||||
try {
|
||||
|
||||
SparkConf conf = new SparkConf();
|
||||
conf.set("hive.metastore.uris", parser.get("hiveMetastoreUris"));
|
||||
|
||||
runWithSparkHiveSession(
|
||||
conf,
|
||||
isSparkSessionManaged,
|
||||
spark -> {
|
||||
// ids in the current graph
|
||||
Dataset<Row> currentIds = spark
|
||||
.table(hiveDbName + ".result")
|
||||
.select("id")
|
||||
.union(
|
||||
spark
|
||||
.table(hiveDbName + ".relation")
|
||||
.where("relClass = 'merges'")
|
||||
.selectExpr("target as id"))
|
||||
.distinct();
|
||||
|
||||
UserDefinedFunction getOafType = udf(
|
||||
(String json) -> CopyHdfsOafSparkApplication.getOafType(json), DataTypes.StringType);
|
||||
|
||||
// new collected ids
|
||||
spark
|
||||
.read()
|
||||
.text(currentVersion.getHdfsPath() + "/store")
|
||||
.selectExpr(
|
||||
"value",
|
||||
"get_json_object(value, '$.id') AS id")
|
||||
.where("id IS NOT NULL")
|
||||
.join(currentIds, JavaConversions.asScalaBuffer(Collections.singletonList("id")), "left_anti")
|
||||
.withColumn("oaftype", getOafType.apply(new Column("value")))
|
||||
.write()
|
||||
.partitionBy("oaftype")
|
||||
.mode(SaveMode.Overwrite)
|
||||
.option("compression", "gzip")
|
||||
.parquet(wrkdirPath + "/entities");
|
||||
|
||||
ModelSupport.oafTypes
|
||||
.keySet()
|
||||
.forEach(
|
||||
entity -> spark
|
||||
.read()
|
||||
.parquet(wrkdirPath + "/entities")
|
||||
.filter("oaftype = '" + entity + "'")
|
||||
.select("value")
|
||||
.write()
|
||||
.option("compression", "gzip")
|
||||
.mode(SaveMode.Append)
|
||||
.text(outputPath + "/" + entity));
|
||||
|
||||
Dataset<Row> newIds = spark.read().parquet(wrkdirPath + "/entities").select("id");
|
||||
|
||||
Dataset<Row> rels = spark
|
||||
.read()
|
||||
.text(currentVersion.getHdfsPath() + "/store")
|
||||
.selectExpr(
|
||||
"value",
|
||||
"get_json_object(value, '$.source') AS source",
|
||||
"get_json_object(value, '$.target') AS target")
|
||||
.where("source IS NOT NULL AND target IS NOT NULL");
|
||||
|
||||
rels
|
||||
.join(
|
||||
newIds.selectExpr("id as source"),
|
||||
JavaConversions.asScalaBuffer(Collections.singletonList("source")), "left_semi")
|
||||
.union(
|
||||
rels
|
||||
.join(
|
||||
newIds.selectExpr("id as target"),
|
||||
JavaConversions.asScalaBuffer(Collections.singletonList("target")), "left_semi"))
|
||||
.distinct()
|
||||
.select("value")
|
||||
.write()
|
||||
.option("compression", "gzip")
|
||||
.mode(SaveMode.Append)
|
||||
.text(outputPath + "/relation");
|
||||
});
|
||||
} finally {
|
||||
DNetRestClient
|
||||
.doGET(String.format(MDStoreActionNode.READ_UNLOCK_URL, mdStoreManagerURI, currentVersion.getId()));
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,96 @@
|
|||
package eu.dnetlib.dhp.incremental
|
||||
|
||||
import eu.dnetlib.dhp.PropagationConstant
|
||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser
|
||||
import eu.dnetlib.dhp.bulktag.community.TaggingConstants
|
||||
import eu.dnetlib.dhp.schema.common.ModelSupport
|
||||
import eu.dnetlib.dhp.schema.oaf.{Oaf, OafEntity}
|
||||
import org.apache.commons.io.IOUtils
|
||||
import org.apache.spark.SparkConf
|
||||
import org.apache.spark.sql._
|
||||
import org.slf4j.{Logger, LoggerFactory}
|
||||
|
||||
import scala.collection.JavaConverters.{collectionAsScalaIterableConverter, mapAsScalaMapConverter, seqAsJavaListConverter}
|
||||
|
||||
object SparkAppendContextCleanedGraph {
|
||||
|
||||
def main(args: Array[String]): Unit = {
|
||||
val log: Logger = LoggerFactory.getLogger(getClass)
|
||||
val conf: SparkConf = new SparkConf()
|
||||
|
||||
val parser = new ArgumentApplicationParser(
|
||||
IOUtils.toString(
|
||||
getClass.getResourceAsStream(
|
||||
"/eu/dnetlib/dhp/oa/graph/incremental/resolution/resolve_relationsbyid_params.json"
|
||||
)
|
||||
)
|
||||
)
|
||||
parser.parseArgument(args)
|
||||
conf.set("hive.metastore.uris", parser.get("hiveMetastoreUris"))
|
||||
|
||||
val graphBasePath = parser.get("graphBasePath")
|
||||
log.info(s"graphBasePath -> $graphBasePath")
|
||||
val relationPath = parser.get("relationPath")
|
||||
log.info(s"relationPath -> $relationPath")
|
||||
val targetPath = parser.get("targetGraph")
|
||||
log.info(s"targetGraph -> $targetPath")
|
||||
|
||||
val hiveDbName = parser.get("hiveDbName")
|
||||
log.info(s"hiveDbName -> $hiveDbName")
|
||||
|
||||
val spark: SparkSession =
|
||||
SparkSession
|
||||
.builder()
|
||||
.config(conf)
|
||||
.enableHiveSupport()
|
||||
.appName(getClass.getSimpleName)
|
||||
.getOrCreate()
|
||||
|
||||
for ((entity, clazz) <- ModelSupport.oafTypes.asScala) {
|
||||
if (classOf[OafEntity].isAssignableFrom(clazz)) {
|
||||
val classEnc: Encoder[Oaf] = Encoders.bean(clazz).asInstanceOf[Encoder[Oaf]]
|
||||
|
||||
spark
|
||||
.table(s"${hiveDbName}.${entity}")
|
||||
.as(classEnc)
|
||||
.map(e => {
|
||||
val oaf = e.asInstanceOf[OafEntity]
|
||||
if (oaf.getContext != null) {
|
||||
val newContext = oaf.getContext.asScala
|
||||
.map(c => {
|
||||
if (c.getDataInfo != null) {
|
||||
c.setDataInfo(
|
||||
c.getDataInfo.asScala
|
||||
.filter(
|
||||
di =>
|
||||
!di.getInferenceprovenance.equals(PropagationConstant.PROPAGATION_DATA_INFO_TYPE)
|
||||
&& !di.getInferenceprovenance.equals(TaggingConstants.BULKTAG_DATA_INFO_TYPE)
|
||||
)
|
||||
.toList
|
||||
.asJava
|
||||
)
|
||||
}
|
||||
c
|
||||
})
|
||||
.filter(!_.getDataInfo.isEmpty)
|
||||
.toList
|
||||
.asJava
|
||||
oaf.setContext(newContext)
|
||||
}
|
||||
e
|
||||
})(classEnc)
|
||||
.write
|
||||
.option("compression", "gzip")
|
||||
.mode(SaveMode.Append)
|
||||
.json(s"$targetPath/${entity}")
|
||||
} else {
|
||||
spark
|
||||
.table(s"${hiveDbName}.${entity}")
|
||||
.write
|
||||
.option("compression", "gzip")
|
||||
.mode(SaveMode.Append)
|
||||
.json(s"$targetPath/${entity}")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,89 @@
|
|||
package eu.dnetlib.dhp.incremental
|
||||
|
||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser
|
||||
import eu.dnetlib.dhp.schema.oaf.Relation
|
||||
import eu.dnetlib.dhp.schema.oaf.utils.IdentifierFactory
|
||||
import org.apache.commons.io.IOUtils
|
||||
import org.apache.spark.SparkConf
|
||||
import org.apache.spark.sql._
|
||||
import org.apache.spark.sql.functions.{col, expr}
|
||||
import org.slf4j.{Logger, LoggerFactory}
|
||||
|
||||
object SparkResolveRelationById {
|
||||
|
||||
def main(args: Array[String]): Unit = {
|
||||
val log: Logger = LoggerFactory.getLogger(getClass)
|
||||
val conf: SparkConf = new SparkConf()
|
||||
|
||||
val parser = new ArgumentApplicationParser(
|
||||
IOUtils.toString(
|
||||
getClass.getResourceAsStream(
|
||||
"/eu/dnetlib/dhp/oa/graph/incremental/resolution/resolve_relationsbyid_params.json"
|
||||
)
|
||||
)
|
||||
)
|
||||
parser.parseArgument(args)
|
||||
conf.set("hive.metastore.uris", parser.get("hiveMetastoreUris"))
|
||||
|
||||
val graphBasePath = parser.get("graphBasePath")
|
||||
log.info(s"graphBasePath -> $graphBasePath")
|
||||
val relationPath = parser.get("relationPath")
|
||||
log.info(s"relationPath -> $relationPath")
|
||||
val targetPath = parser.get("targetGraph")
|
||||
log.info(s"targetGraph -> $targetPath")
|
||||
|
||||
val hiveDbName = parser.get("hiveDbName")
|
||||
log.info(s"hiveDbName -> $hiveDbName")
|
||||
|
||||
val spark: SparkSession =
|
||||
SparkSession
|
||||
.builder()
|
||||
.config(conf)
|
||||
.enableHiveSupport()
|
||||
.appName(getClass.getSimpleName)
|
||||
.getOrCreate()
|
||||
|
||||
implicit val relEncoder: Encoder[Relation] = Encoders.bean(classOf[Relation])
|
||||
|
||||
val mergedrels =
|
||||
spark.table(s"${hiveDbName}.relation").where("relclass = 'merges'").selectExpr("source as dedupId", "target as mergedId")
|
||||
|
||||
spark.read
|
||||
.schema(Encoders.bean(classOf[Relation]).schema)
|
||||
.json(s"$graphBasePath/relation")
|
||||
.as[Relation]
|
||||
.map(r => resolveRelations(r))
|
||||
.join(mergedrels, col("source") === mergedrels.col("mergedId"), "left")
|
||||
.withColumn("source", expr("coalesce(dedupId, source)"))
|
||||
.drop("mergedId", "dedupID")
|
||||
.join(mergedrels, col("target") === mergedrels.col("mergedId"), "left")
|
||||
.withColumn("target", expr("coalesce(dedupId, target)"))
|
||||
.drop("mergedId", "dedupID")
|
||||
.write
|
||||
.option("compression", "gzip")
|
||||
.mode(SaveMode.Overwrite)
|
||||
.json(s"$targetPath/relation")
|
||||
}
|
||||
|
||||
private def resolveRelations(r: Relation): Relation = {
|
||||
if (r.getSource.startsWith("unresolved::"))
|
||||
r.setSource(resolvePid(r.getSource.substring(12)))
|
||||
|
||||
if (r.getTarget.startsWith("unresolved::"))
|
||||
r.setTarget(resolvePid(r.getTarget.substring(12)))
|
||||
|
||||
r
|
||||
}
|
||||
|
||||
private def resolvePid(str: String): String = {
|
||||
val parts = str.split("::")
|
||||
val id = parts(0)
|
||||
val scheme: String = parts.last match {
|
||||
case "arxiv" => "arXiv"
|
||||
case _ => parts.last
|
||||
}
|
||||
|
||||
IdentifierFactory.idFromPid("50", scheme, id, true)
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,44 @@
|
|||
[
|
||||
{
|
||||
"paramName": "issm",
|
||||
"paramLongName": "isSparkSessionManaged",
|
||||
"paramDescription": "when true will stop SparkSession after job execution",
|
||||
"paramRequired": false
|
||||
},
|
||||
{
|
||||
"paramName": "mu",
|
||||
"paramLongName": "mdStoreManagerURI",
|
||||
"paramDescription": "the MDStore Manager URI",
|
||||
"paramRequired": true
|
||||
},
|
||||
{
|
||||
"paramName": "mi",
|
||||
"paramLongName": "mdStoreID",
|
||||
"paramDescription": "the Metadata Store ID",
|
||||
"paramRequired": false
|
||||
},
|
||||
{
|
||||
"paramName": "wd",
|
||||
"paramLongName": "workingDir",
|
||||
"paramDescription": "the path to store the output graph",
|
||||
"paramRequired": true
|
||||
},
|
||||
{
|
||||
"paramName": "o",
|
||||
"paramLongName": "outputPath",
|
||||
"paramDescription": "",
|
||||
"paramRequired": true
|
||||
},
|
||||
{
|
||||
"paramName": "hmu",
|
||||
"paramLongName": "hiveMetastoreUris",
|
||||
"paramDescription": "the hive metastore uris",
|
||||
"paramRequired": true
|
||||
},
|
||||
{
|
||||
"paramName": "db",
|
||||
"paramLongName": "hiveDbName",
|
||||
"paramDescription": "the graph hive database name",
|
||||
"paramRequired": true
|
||||
}
|
||||
]
|
|
@ -0,0 +1,23 @@
|
|||
<configuration>
|
||||
<property>
|
||||
<name>jobTracker</name>
|
||||
<value>yarnRM</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>nameNode</name>
|
||||
<value>hdfs://nameservice1</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>oozie.use.system.libpath</name>
|
||||
<value>true</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>oozie.action.sharelib.for.spark</name>
|
||||
<value>spark2</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>hiveMetastoreUris</name>
|
||||
<value>thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083</value>
|
||||
</property>
|
||||
|
||||
</configuration>
|
|
@ -0,0 +1,77 @@
|
|||
<workflow-app name="NewResultsCollect_Workflow" xmlns="uri:oozie:workflow:0.5">
|
||||
<parameters>
|
||||
<property>
|
||||
<name>mdStoreManagerURI</name>
|
||||
<description>the path of the cleaned mdstore</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>mdStoreID</name>
|
||||
<description>the identifier of the native MDStore</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>outputPath</name>
|
||||
<description>outputDirectory</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>workingDir</name>
|
||||
<description>outputDirectory</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>hiveMetastoreUris</name>
|
||||
<description>hive server metastore URIs</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>hiveDbName</name>
|
||||
<description>hive database containing last generated graph</description>
|
||||
</property>
|
||||
<!-- General oozie workflow properties -->
|
||||
<property>
|
||||
<name>sparkClusterOpts</name>
|
||||
<value>--conf spark.extraListeners= --conf spark.sql.queryExecutionListeners= --conf spark.yarn.historyServer.address=http://iis-cdh5-test-m3.ocean.icm.edu.pl:18088 --conf spark.eventLog.dir=hdfs://nameservice1/user/spark/applicationHistory</value>
|
||||
<description>spark cluster-wide options</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>sparkResourceOpts</name>
|
||||
<value>--executor-memory=8G --conf spark.executor.memoryOverhead=6G --executor-cores=6 --driver-memory=9G --driver-cores=4</value>
|
||||
<description>spark resource options</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>sparkApplicationOpts</name>
|
||||
<value>--conf spark.sql.shuffle.partitions=1024</value>
|
||||
<description>spark resource options</description>
|
||||
</property>
|
||||
</parameters>
|
||||
|
||||
<start to="CollectJob"/>
|
||||
|
||||
<kill name="Kill">
|
||||
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
|
||||
</kill>
|
||||
|
||||
<action name="CollectJob">
|
||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||
<master>yarn</master>
|
||||
<mode>cluster</mode>
|
||||
<name>Collect New Oaf Results</name>
|
||||
<class>eu.dnetlib.dhp.incremental.CollectNewOafResults</class>
|
||||
<jar>dhp-incremental-graph-${projectVersion}.jar</jar>
|
||||
<spark-opts>
|
||||
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
|
||||
${sparkClusterOpts}
|
||||
${sparkResourceOpts}
|
||||
${sparkApplicationOpts}
|
||||
</spark-opts>
|
||||
<arg>--outputPath</arg><arg>${outputPath}</arg>
|
||||
<arg>--workingDir</arg><arg>${workingDir}/collect_new_results</arg>
|
||||
<arg>--mdStoreID</arg><arg>${mdStoreID}</arg>
|
||||
<arg>--mdStoreManagerURI</arg><arg>${mdStoreManagerURI}</arg>
|
||||
<arg>--hiveMetastoreUris</arg><arg>${hiveMetastoreUris}</arg>
|
||||
<arg>--hiveDbName</arg><arg>${hiveDbName}</arg>
|
||||
</spark>
|
||||
<ok to="End"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<end name="End"/>
|
||||
|
||||
</workflow-app>
|
|
@ -0,0 +1,26 @@
|
|||
<configuration>
|
||||
<property>
|
||||
<name>jobTracker</name>
|
||||
<value>yarnRM</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>nameNode</name>
|
||||
<value>hdfs://nameservice1</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>oozie.use.system.libpath</name>
|
||||
<value>true</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>hiveMetastoreUris</name>
|
||||
<value>thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>hiveJdbcUrl</name>
|
||||
<value>jdbc:hive2://iis-cdh5-test-m3.ocean.icm.edu.pl:10000</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>hiveDbName</name>
|
||||
<value>openaire</value>
|
||||
</property>
|
||||
</configuration>
|
|
@ -0,0 +1,158 @@
|
|||
<workflow-app name="import_graph_as_hive_DB" xmlns="uri:oozie:workflow:0.5">
|
||||
|
||||
<parameters>
|
||||
<property>
|
||||
<name>outputPath</name>
|
||||
<description>the source path</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>hiveDbName</name>
|
||||
<description>the target hive database name</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>hiveMetastoreUris</name>
|
||||
<description>hive server metastore URIs</description>
|
||||
</property>
|
||||
<!-- General oozie workflow properties -->
|
||||
<property>
|
||||
<name>sparkClusterOpts</name>
|
||||
<value>--conf spark.extraListeners= --conf spark.sql.queryExecutionListeners= --conf spark.yarn.historyServer.address=http://iis-cdh5-test-m3.ocean.icm.edu.pl:18088 --conf spark.eventLog.dir=hdfs://nameservice1/user/spark/applicationHistory</value>
|
||||
<description>spark cluster-wide options</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>sparkResourceOpts</name>
|
||||
<value>--executor-memory=3G --conf spark.executor.memoryOverhead=3G --executor-cores=6 --driver-memory=8G --driver-cores=4</value>
|
||||
<description>spark resource options</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>sparkApplicationOpts</name>
|
||||
<value>--conf spark.sql.shuffle.partitions=1024</value>
|
||||
<description>spark resource options</description>
|
||||
</property>
|
||||
</parameters>
|
||||
|
||||
<start to="fork_export"/>
|
||||
|
||||
<kill name="Kill">
|
||||
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
|
||||
</kill>
|
||||
|
||||
<fork name="fork_export">
|
||||
<path start="export_publication"/>
|
||||
<path start="export_dataset"/>
|
||||
<path start="export_otherresearchproduct"/>
|
||||
<path start="export_software"/>
|
||||
<path start="export_relation"/>
|
||||
</fork>
|
||||
|
||||
<action name="export_publication">
|
||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||
<master>yarn</master>
|
||||
<mode>cluster</mode>
|
||||
<name>Merge table publication</name>
|
||||
<class>eu.dnetlib.dhp.oa.graph.hive.GraphHiveTableExporterJob</class>
|
||||
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
|
||||
<spark-opts>
|
||||
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
|
||||
${sparkClusterOpts}
|
||||
${sparkResourceOpts}
|
||||
${sparkApplicationOpts}
|
||||
</spark-opts>
|
||||
<arg>--outputPath</arg><arg>${outputPath}/publication</arg>
|
||||
<arg>--mode</arg><arg>append</arg>
|
||||
<arg>--hiveTableName</arg><arg>${hiveDbName}.publication</arg>
|
||||
<arg>--hiveMetastoreUris</arg><arg>${hiveMetastoreUris}</arg>
|
||||
</spark>
|
||||
<ok to="join_export"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
<action name="export_dataset">
|
||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||
<master>yarn</master>
|
||||
<mode>cluster</mode>
|
||||
<name>Merge table dataset</name>
|
||||
<class>eu.dnetlib.dhp.oa.graph.hive.GraphHiveTableExporterJob</class>
|
||||
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
|
||||
<spark-opts>
|
||||
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
|
||||
${sparkClusterOpts}
|
||||
${sparkResourceOpts}
|
||||
${sparkApplicationOpts}
|
||||
</spark-opts>
|
||||
<arg>--outputPath</arg><arg>${outputPath}/dataset</arg>
|
||||
<arg>--mode</arg><arg>append</arg>
|
||||
<arg>--hiveTableName</arg><arg>${hiveDbName}.dataset</arg>
|
||||
<arg>--hiveMetastoreUris</arg><arg>${hiveMetastoreUris}</arg>
|
||||
</spark>
|
||||
<ok to="join_export"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
<action name="export_otherresearchproduct">
|
||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||
<master>yarn</master>
|
||||
<mode>cluster</mode>
|
||||
<name>Merge table otherresearchproduct</name>
|
||||
<class>eu.dnetlib.dhp.oa.graph.hive.GraphHiveTableExporterJob</class>
|
||||
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
|
||||
<spark-opts>
|
||||
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
|
||||
${sparkClusterOpts}
|
||||
${sparkResourceOpts}
|
||||
${sparkApplicationOpts}
|
||||
</spark-opts>
|
||||
<arg>--outputPath</arg><arg>${outputPath}/otherresearchproduct</arg>
|
||||
<arg>--mode</arg><arg>append</arg>
|
||||
<arg>--hiveTableName</arg><arg>${hiveDbName}.otherresearchproduct</arg>
|
||||
<arg>--hiveMetastoreUris</arg><arg>${hiveMetastoreUris}</arg>
|
||||
</spark>
|
||||
<ok to="join_export"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
<action name="export_software">
|
||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||
<master>yarn</master>
|
||||
<mode>cluster</mode>
|
||||
<name>Merge table software</name>
|
||||
<class>eu.dnetlib.dhp.oa.graph.hive.GraphHiveTableExporterJob</class>
|
||||
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
|
||||
<spark-opts>
|
||||
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
|
||||
${sparkClusterOpts}
|
||||
${sparkResourceOpts}
|
||||
${sparkApplicationOpts}
|
||||
</spark-opts>
|
||||
<arg>--outputPath</arg><arg>${outputPath}/software</arg>
|
||||
<arg>--mode</arg><arg>append</arg>
|
||||
<arg>--hiveTableName</arg><arg>${hiveDbName}.software</arg>
|
||||
<arg>--hiveMetastoreUris</arg><arg>${hiveMetastoreUris}</arg>
|
||||
</spark>
|
||||
<ok to="join_export"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
<action name="export_relation">
|
||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||
<master>yarn</master>
|
||||
<mode>cluster</mode>
|
||||
<name>Merge table relation</name>
|
||||
<class>eu.dnetlib.dhp.oa.graph.hive.GraphHiveTableExporterJob</class>
|
||||
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
|
||||
<spark-opts>
|
||||
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
|
||||
${sparkClusterOpts}
|
||||
${sparkResourceOpts}
|
||||
${sparkApplicationOpts}
|
||||
</spark-opts>
|
||||
<arg>--outputPath</arg><arg>${outputPath}/relation</arg>
|
||||
<arg>--mode</arg><arg>append</arg>
|
||||
<arg>--hiveTableName</arg><arg>${hiveDbName}.relation</arg>
|
||||
<arg>--hiveMetastoreUris</arg><arg>${hiveMetastoreUris}</arg>
|
||||
</spark>
|
||||
<ok to="join_export"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<join name="join_export" to="End"/>
|
||||
|
||||
<end name="End"/>
|
||||
|
||||
</workflow-app>
|
|
@ -0,0 +1,26 @@
|
|||
<configuration>
|
||||
<property>
|
||||
<name>jobTracker</name>
|
||||
<value>yarnRM</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>nameNode</name>
|
||||
<value>hdfs://nameservice1</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>oozie.use.system.libpath</name>
|
||||
<value>true</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>oozie.action.sharelib.for.spark</name>
|
||||
<value>spark2</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>hiveMetastoreUris</name>
|
||||
<value>thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>sparkSqlWarehouseDir</name>
|
||||
<value>/user/hive/warehouse</value>
|
||||
</property>
|
||||
</configuration>
|
|
@ -0,0 +1,11 @@
|
|||
INSERT OVERWRITE DIRECTORY '${outputPath}/datasource'
|
||||
USING json OPTIONS ('compression' 'gzip')
|
||||
SELECT * FROM `${hiveDbName}`.`datasource`; /* EOS */
|
||||
|
||||
INSERT OVERWRITE DIRECTORY '${outputPath}/organization'
|
||||
USING json OPTIONS ('compression' 'gzip')
|
||||
SELECT * FROM `${hiveDbName}`.`organization`; /* EOS */
|
||||
|
||||
INSERT OVERWRITE DIRECTORY '${outputPath}/project'
|
||||
USING json OPTIONS ('compression' 'gzip')
|
||||
SELECT * FROM `${hiveDbName}`.`project`; /* EOS */
|
|
@ -0,0 +1,69 @@
|
|||
<workflow-app name="NewResultsCollect_Workflow" xmlns="uri:oozie:workflow:0.5">
|
||||
<parameters>
|
||||
<property>
|
||||
<name>outputPath</name>
|
||||
<description>outputDirectory</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>hiveMetastoreUris</name>
|
||||
<description>hive server metastore URIs</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>sparkSqlWarehouseDir</name>
|
||||
</property>
|
||||
<property>
|
||||
<name>hiveDbName</name>
|
||||
<description>hive database containing last generated graph</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>action</name>
|
||||
</property>
|
||||
<!-- General oozie workflow properties -->
|
||||
<property>
|
||||
<name>sparkClusterOpts</name>
|
||||
<value>--conf spark.extraListeners= --conf spark.sql.queryExecutionListeners= --conf spark.yarn.historyServer.address=http://iis-cdh5-test-m3.ocean.icm.edu.pl:18088 --conf spark.eventLog.dir=hdfs://nameservice1/user/spark/applicationHistory</value>
|
||||
<description>spark cluster-wide options</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>sparkResourceOpts</name>
|
||||
<value>--executor-memory=3G --conf spark.executor.memoryOverhead=3G --executor-cores=6 --driver-memory=8G --driver-cores=4</value>
|
||||
<description>spark resource options</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>sparkApplicationOpts</name>
|
||||
<value>--conf spark.sql.shuffle.partitions=1024</value>
|
||||
<description>spark resource options</description>
|
||||
</property>
|
||||
</parameters>
|
||||
|
||||
<start to="MigrationJob"/>
|
||||
|
||||
<kill name="Kill">
|
||||
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
|
||||
</kill>
|
||||
|
||||
<action name="MigrationJob">
|
||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||
<master>yarn</master>
|
||||
<mode>cluster</mode>
|
||||
<name>Copy data from last graph</name>
|
||||
<class>eu.dnetlib.dhp.oozie.RunSQLSparkJob</class>
|
||||
<jar>dhp-incremental-graph-${projectVersion}.jar</jar>
|
||||
<spark-opts>
|
||||
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
|
||||
${sparkClusterOpts}
|
||||
${sparkResourceOpts}
|
||||
${sparkApplicationOpts}
|
||||
</spark-opts>
|
||||
<arg>--hiveMetastoreUris</arg><arg>${hiveMetastoreUris}</arg>
|
||||
<arg>--sql</arg><arg>eu/dnetlib/dhp/oa/graph/incremental/migrate/oozie_app/${action}.sql</arg>
|
||||
<arg>--hiveDbName</arg><arg>${hiveDbName}</arg>
|
||||
<arg>--outputPath</arg><arg>${outputPath}</arg>
|
||||
</spark>
|
||||
<ok to="End"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<end name="End"/>
|
||||
|
||||
</workflow-app>
|
|
@ -0,0 +1,23 @@
|
|||
<configuration>
|
||||
<property>
|
||||
<name>jobTracker</name>
|
||||
<value>yarnRM</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>nameNode</name>
|
||||
<value>hdfs://nameservice1</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>oozie.use.system.libpath</name>
|
||||
<value>true</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>oozie.action.sharelib.for.spark</name>
|
||||
<value>spark2</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>hiveMetastoreUris</name>
|
||||
<value>thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083</value>
|
||||
</property>
|
||||
|
||||
</configuration>
|
|
@ -0,0 +1,151 @@
|
|||
<workflow-app name="Transformation_Workflow" xmlns="uri:oozie:workflow:0.5">
|
||||
<parameters>
|
||||
<property>
|
||||
<name>graphBasePath</name>
|
||||
<description>input graph to resolve</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>targetGraph</name>
|
||||
<description>outputDirectory</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>workingDir</name>
|
||||
<description>outputDirectory</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>hiveMetastoreUris</name>
|
||||
<description>hive server metastore URIs</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>hiveDbName</name>
|
||||
<description>hive database containing last generated graph</description>
|
||||
</property>
|
||||
<!-- General oozie workflow properties -->
|
||||
<property>
|
||||
<name>sparkClusterOpts</name>
|
||||
<value>--conf spark.extraListeners= --conf spark.sql.queryExecutionListeners= --conf spark.yarn.historyServer.address=http://iis-cdh5-test-m3.ocean.icm.edu.pl:18088 --conf spark.eventLog.dir=hdfs://nameservice1/user/spark/applicationHistory</value>
|
||||
<description>spark cluster-wide options</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>sparkResourceOpts</name>
|
||||
<value>--executor-memory=3G --conf spark.executor.memoryOverhead=3G --executor-cores=6 --driver-memory=8G --driver-cores=4</value>
|
||||
<description>spark resource options</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>sparkApplicationOpts</name>
|
||||
<value>--conf spark.sql.shuffle.partitions=1024</value>
|
||||
<description>spark resource options</description>
|
||||
</property>
|
||||
</parameters>
|
||||
|
||||
<start to="ResolveJob"/>
|
||||
|
||||
<kill name="Kill">
|
||||
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
|
||||
</kill>
|
||||
|
||||
|
||||
<action name="ResolveJob">
|
||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||
<master>yarn</master>
|
||||
<mode>cluster</mode>
|
||||
<name>Resolve Relations</name>
|
||||
<class>eu.dnetlib.dhp.incremental.SparkResolveRelationById</class>
|
||||
<jar>dhp-incremental-graph-${projectVersion}.jar</jar>
|
||||
<spark-opts>
|
||||
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
|
||||
${sparkClusterOpts}
|
||||
${sparkResourceOpts}
|
||||
${sparkApplicationOpts}
|
||||
</spark-opts>
|
||||
<arg>--graphBasePath</arg><arg>${graphBasePath}</arg>
|
||||
<arg>--targetGraph</arg><arg>${targetGraph}</arg>
|
||||
<arg>--workingDir</arg><arg>${workingDir}/resolve_relation</arg>
|
||||
<arg>--hiveMetastoreUris</arg><arg>${hiveMetastoreUris}</arg>
|
||||
<arg>--hiveDbName</arg><arg>${hiveDbName}</arg>
|
||||
</spark>
|
||||
<ok to="reset_outputpath"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<action name="reset_outputpath">
|
||||
<fs>
|
||||
<delete path="${targetGraph}/dataset"/>
|
||||
<delete path="${targetGraph}/datasource"/>
|
||||
<delete path="${targetGraph}/organization"/>
|
||||
<delete path="${targetGraph}/otherresearchproduct"/>
|
||||
<delete path="${targetGraph}/person"/>
|
||||
<delete path="${targetGraph}/project"/>
|
||||
<delete path="${targetGraph}/publication"/>
|
||||
<delete path="${targetGraph}/software"/>
|
||||
</fs>
|
||||
<ok to="copy_dataset"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
<action name="copy_dataset">
|
||||
<distcp xmlns="uri:oozie:distcp-action:0.2">
|
||||
<arg>${nameNode}/${graphBasePath}/dataset</arg>
|
||||
<arg>${nameNode}/${targetGraph}/dataset</arg>
|
||||
</distcp>
|
||||
<ok to="copy_datasource"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
<action name="copy_datasource">
|
||||
<distcp xmlns="uri:oozie:distcp-action:0.2">
|
||||
<arg>${nameNode}/${graphBasePath}/datasource</arg>
|
||||
<arg>${nameNode}/${targetGraph}/datasource</arg>
|
||||
</distcp>
|
||||
<ok to="copy_organization"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
<action name="copy_organization">
|
||||
<distcp xmlns="uri:oozie:distcp-action:0.2">
|
||||
<arg>${nameNode}/${graphBasePath}/organization</arg>
|
||||
<arg>${nameNode}/${targetGraph}/organization</arg>
|
||||
</distcp>
|
||||
<ok to="copy_otherresearchproduct"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
<action name="copy_otherresearchproduct">
|
||||
<distcp xmlns="uri:oozie:distcp-action:0.2">
|
||||
<arg>${nameNode}/${graphBasePath}/otherresearchproduct</arg>
|
||||
<arg>${nameNode}/${targetGraph}/otherresearchproduct</arg>
|
||||
</distcp>
|
||||
<ok to="copy_person"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
<action name="copy_person">
|
||||
<distcp xmlns="uri:oozie:distcp-action:0.2">
|
||||
<arg>${nameNode}/${graphBasePath}/person</arg>
|
||||
<arg>${nameNode}/${targetGraph}/person</arg>
|
||||
</distcp>
|
||||
<ok to="copy_project"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
<action name="copy_project">
|
||||
<distcp xmlns="uri:oozie:distcp-action:0.2">
|
||||
<arg>${nameNode}/${graphBasePath}/project</arg>
|
||||
<arg>${nameNode}/${targetGraph}/project</arg>
|
||||
</distcp>
|
||||
<ok to="copy_publication"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
<action name="copy_publication">
|
||||
<distcp xmlns="uri:oozie:distcp-action:0.2">
|
||||
<arg>${nameNode}/${graphBasePath}/publication</arg>
|
||||
<arg>${nameNode}/${targetGraph}/publication</arg>
|
||||
</distcp>
|
||||
<ok to="copy_software"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
<action name="copy_software">
|
||||
<distcp xmlns="uri:oozie:distcp-action:0.2">
|
||||
<arg>${nameNode}/${graphBasePath}/software</arg>
|
||||
<arg>${nameNode}/${targetGraph}/software</arg>
|
||||
</distcp>
|
||||
<ok to="End"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<end name="End"/>
|
||||
</workflow-app>
|
|
@ -0,0 +1,32 @@
|
|||
[
|
||||
{
|
||||
"paramName": "g",
|
||||
"paramLongName": "graphBasePath",
|
||||
"paramDescription": "the path of the raw graph",
|
||||
"paramRequired": true
|
||||
},
|
||||
{
|
||||
"paramName": "t",
|
||||
"paramLongName": "targetGraph",
|
||||
"paramDescription": "the target path",
|
||||
"paramRequired": true
|
||||
},
|
||||
{
|
||||
"paramName": "wd",
|
||||
"paramLongName": "workingDir",
|
||||
"paramDescription": "the path to store the output graph",
|
||||
"paramRequired": true
|
||||
},
|
||||
{
|
||||
"paramName": "hmu",
|
||||
"paramLongName": "hiveMetastoreUris",
|
||||
"paramDescription": "the hive metastore uris",
|
||||
"paramRequired": true
|
||||
},
|
||||
{
|
||||
"paramName": "db",
|
||||
"paramLongName": "hiveDbName",
|
||||
"paramDescription": "the graph hive database name",
|
||||
"paramRequired": true
|
||||
}
|
||||
]
|
|
@ -43,6 +43,7 @@
|
|||
<module>dhp-doiboost</module>
|
||||
<module>dhp-impact-indicators</module>
|
||||
<module>dhp-swh</module>
|
||||
<module>dhp-incremental-graph</module>
|
||||
</modules>
|
||||
|
||||
<pluginRepositories>
|
||||
|
|
Loading…
Reference in New Issue