Implement new jobs for collecting data from latest graph on hive and deltas from oaf mdstores (datacite and crossref)

Optimized CopyHdfsOafSparkApplication
This commit is contained in:
Giambattista Bloisi 2024-10-25 15:09:52 +02:00
parent 647f8271b4
commit 1d80c1da57
16 changed files with 813 additions and 97 deletions

View File

@ -12,11 +12,7 @@ import java.util.stream.Collectors;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf; import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset; import org.apache.spark.sql.*;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -85,14 +81,16 @@ public class SparkCountryPropagationJob {
Dataset<R> res = readPath(spark, sourcePath, resultClazz); Dataset<R> res = readPath(spark, sourcePath, resultClazz);
log.info("Reading prepared info: {}", preparedInfoPath); log.info("Reading prepared info: {}", preparedInfoPath);
final Dataset<Row> preparedInfoRaw = spark Encoder<ResultCountrySet> rcsEncoder = Encoders.bean(ResultCountrySet.class);
final Dataset<ResultCountrySet> preparedInfoRaw = spark
.read() .read()
.json(preparedInfoPath); .schema(rcsEncoder.schema())
.json(preparedInfoPath)
.as(rcsEncoder);
if (!preparedInfoRaw.isEmpty()) { if (!preparedInfoRaw.isEmpty()) {
final Dataset<ResultCountrySet> prepared = preparedInfoRaw.as(Encoders.bean(ResultCountrySet.class));
res res
.joinWith(prepared, res.col("id").equalTo(prepared.col("resultId")), "left_outer") .joinWith(preparedInfoRaw, res.col("id").equalTo(prepared.col("resultId")), "left_outer")
.map(getCountryMergeFn(), Encoders.bean(resultClazz)) .map(getCountryMergeFn(), Encoders.bean(resultClazz))
.write() .write()
.option("compression", "gzip") .option("compression", "gzip")

View File

@ -1,12 +1,10 @@
package eu.dnetlib.dhp.oa.graph.raw package eu.dnetlib.dhp.oa.graph.raw
import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper}
import eu.dnetlib.dhp.application.ArgumentApplicationParser import eu.dnetlib.dhp.application.ArgumentApplicationParser
import eu.dnetlib.dhp.common.HdfsSupport import eu.dnetlib.dhp.common.HdfsSupport
import eu.dnetlib.dhp.schema.common.ModelSupport import eu.dnetlib.dhp.schema.common.ModelSupport
import eu.dnetlib.dhp.schema.oaf.Oaf
import eu.dnetlib.dhp.utils.DHPUtils import eu.dnetlib.dhp.utils.DHPUtils
import org.apache.spark.sql.{Encoder, Encoders, SaveMode, SparkSession} import org.apache.spark.sql.{Encoders, SaveMode, SparkSession}
import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.{SparkConf, SparkContext}
import org.json4s.DefaultFormats import org.json4s.DefaultFormats
import org.json4s.jackson.JsonMethods.parse import org.json4s.jackson.JsonMethods.parse
@ -54,48 +52,60 @@ object CopyHdfsOafSparkApplication {
val hdfsPath = parser.get("hdfsPath") val hdfsPath = parser.get("hdfsPath")
log.info("hdfsPath: {}", hdfsPath) log.info("hdfsPath: {}", hdfsPath)
implicit val oafEncoder: Encoder[Oaf] = Encoders.kryo[Oaf]
val paths = val paths =
DHPUtils.mdstorePaths(mdstoreManagerUrl, mdFormat, mdLayout, mdInterpretation, true).asScala DHPUtils.mdstorePaths(mdstoreManagerUrl, mdFormat, mdLayout, mdInterpretation, true).asScala
val validPaths: List[String] = val validPaths: List[String] =
paths.filter(p => HdfsSupport.exists(p, sc.hadoopConfiguration)).toList paths.filter(p => HdfsSupport.exists(p, sc.hadoopConfiguration)).toList
val types = ModelSupport.oafTypes.entrySet.asScala
.map(e => Tuple2(e.getKey, e.getValue))
if (validPaths.nonEmpty) { if (validPaths.nonEmpty) {
val oaf = spark.read.textFile(validPaths: _*) val oaf = spark.read
val mapper = .textFile(validPaths: _*)
new ObjectMapper().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false) .map(v => (getOafType(v), v))(Encoders.tuple(Encoders.STRING, Encoders.STRING))
.cache()
types.foreach(t => try {
oaf ModelSupport.oafTypes
.filter(o => isOafType(o, t._1)) .keySet()
.map(j => mapper.readValue(j, t._2).asInstanceOf[Oaf]) .asScala
.map(s => mapper.writeValueAsString(s))(Encoders.STRING) .foreach(entity =>
.write oaf
.option("compression", "gzip") .filter(s"_1 = '${entity}'")
.mode(SaveMode.Append) .selectExpr("_2")
.text(s"$hdfsPath/${t._1}") .write
) .option("compression", "gzip")
.mode(SaveMode.Append)
.text(s"$hdfsPath/${entity}")
)
} finally {
oaf.unpersist()
}
} }
} }
def isOafType(input: String, oafType: String): Boolean = { def getOafType(input: String): String = {
implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats
lazy val json: org.json4s.JValue = parse(input) lazy val json: org.json4s.JValue = parse(input)
if (oafType == "relation") {
val hasSource = (json \ "source").extractOrElse[String](null)
val hasTarget = (json \ "target").extractOrElse[String](null)
hasSource != null && hasTarget != null val hasId = (json \ "id").extractOrElse[String](null)
val hasSource = (json \ "source").extractOrElse[String](null)
val hasTarget = (json \ "target").extractOrElse[String](null)
if (hasId == null && hasSource != null && hasTarget != null) {
"relation"
} else if (hasId != null) {
val oafType: String = ModelSupport.idPrefixEntity.get(hasId.substring(0, 2))
oafType match {
case "result" =>
(json \ "resulttype" \ "classid").extractOrElse[String](null) match {
case "other" => "otherresearchproduct"
case any => any
}
case _ => oafType
}
} else { } else {
val hasId = (json \ "id").extractOrElse[String](null) null
val resultType = (json \ "resulttype" \ "classid").extractOrElse[String]("")
hasId != null && oafType.startsWith(resultType)
} }
} }
} }

View File

@ -1,8 +1,8 @@
package eu.dnetlib.dhp.oa.graph.raw; package eu.dnetlib.dhp.oa.graph.raw;
import static org.junit.jupiter.api.Assertions.assertFalse; import static eu.dnetlib.dhp.oa.graph.raw.CopyHdfsOafSparkApplication.getOafType;
import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.assertEquals;
import java.io.IOException; import java.io.IOException;
@ -11,67 +11,24 @@ import org.junit.jupiter.api.Test;
public class CopyHdfsOafSparkApplicationTest { public class CopyHdfsOafSparkApplicationTest {
String getResourceAsStream(String path) throws IOException {
return IOUtils.toString(getClass().getResourceAsStream(path));
}
@Test @Test
void testIsOafType() throws IOException { void testIsOafType() throws IOException {
assertTrue( assertEquals("publication", getOafType(getResourceAsStream("/eu/dnetlib/dhp/oa/graph/raw/publication_1.json")));
CopyHdfsOafSparkApplication assertEquals("dataset", getOafType(getResourceAsStream("/eu/dnetlib/dhp/oa/graph/raw/dataset_1.json")));
.isOafType( assertEquals("relation", getOafType(getResourceAsStream("/eu/dnetlib/dhp/oa/graph/raw/relation_1.json")));
IOUtils assertEquals("publication", getOafType(getResourceAsStream("/eu/dnetlib/dhp/oa/graph/raw/publication_1.json")));
.toString( assertEquals(
getClass().getResourceAsStream("/eu/dnetlib/dhp/oa/graph/raw/publication_1.json")), "publication",
"publication")); getOafType(getResourceAsStream("/eu/dnetlib/dhp/oa/graph/raw/publication_2_unknownProperty.json")));
assertTrue(
CopyHdfsOafSparkApplication
.isOafType(
IOUtils
.toString(
getClass().getResourceAsStream("/eu/dnetlib/dhp/oa/graph/raw/dataset_1.json")),
"dataset"));
assertTrue(
CopyHdfsOafSparkApplication
.isOafType(
IOUtils
.toString(
getClass().getResourceAsStream("/eu/dnetlib/dhp/oa/graph/raw/relation_1.json")),
"relation"));
assertFalse(
CopyHdfsOafSparkApplication
.isOafType(
IOUtils
.toString(
getClass().getResourceAsStream("/eu/dnetlib/dhp/oa/graph/raw/publication_1.json")),
"dataset"));
assertFalse(
CopyHdfsOafSparkApplication
.isOafType(
IOUtils
.toString(
getClass().getResourceAsStream("/eu/dnetlib/dhp/oa/graph/raw/dataset_1.json")),
"publication"));
assertTrue(
CopyHdfsOafSparkApplication
.isOafType(
IOUtils
.toString(
getClass()
.getResourceAsStream(
"/eu/dnetlib/dhp/oa/graph/raw/publication_2_unknownProperty.json")),
"publication"));
} }
@Test @Test
void isOafType_Datacite_ORP() throws IOException { void isOafType_Datacite_ORP() throws IOException {
assertTrue( assertEquals(
CopyHdfsOafSparkApplication "otherresearchproduct", getOafType(getResourceAsStream("/eu/dnetlib/dhp/oa/graph/raw/datacite_orp.json")));
.isOafType(
IOUtils
.toString(
getClass()
.getResourceAsStream(
"/eu/dnetlib/dhp/oa/graph/raw/datacite_orp.json")),
"otherresearchproduct"));
} }
} }

View File

@ -0,0 +1,72 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>eu.dnetlib.dhp</groupId>
<artifactId>dhp-workflows</artifactId>
<version>1.2.5-SNAPSHOT</version>
</parent>
<artifactId>dhp-incremental-graph</artifactId>
<build>
<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>${net.alchim31.maven.version}</version>
<executions>
<execution>
<id>scala-compile-first</id>
<phase>initialize</phase>
<goals>
<goal>add-source</goal>
<goal>compile</goal>
</goals>
</execution>
<execution>
<id>scala-test-compile</id>
<phase>process-test-resources</phase>
<goals>
<goal>testCompile</goal>
</goals>
</execution>
<execution>
<id>scala-doc</id>
<phase>process-resources</phase> <!-- or wherever -->
<goals>
<goal>doc</goal>
</goals>
</execution>
</executions>
<configuration>
<failOnMultipleScalaVersions>true</failOnMultipleScalaVersions>
<scalaCompatVersion>${scala.binary.version}</scalaCompatVersion>
<scalaVersion>${scala.version}</scalaVersion>
</configuration>
</plugin>
</plugins>
</build>
<dependencies>
<dependency>
<groupId>eu.dnetlib.dhp</groupId>
<artifactId>dhp-aggregation</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>eu.dnetlib.dhp</groupId>
<artifactId>dhp-graph-mapper</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.binary.version}</artifactId>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.binary.version}</artifactId>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,161 @@
package eu.dnetlib.dhp.incremental;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession;
import static org.apache.spark.sql.functions.udf;
import java.util.Collections;
import java.util.Optional;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.expressions.UserDefinedFunction;
import org.apache.spark.sql.types.DataTypes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import eu.dnetlib.dhp.aggregation.mdstore.MDStoreActionNode;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.rest.DNetRestClient;
import eu.dnetlib.dhp.oa.graph.raw.CopyHdfsOafSparkApplication;
import eu.dnetlib.dhp.oozie.RunSQLSparkJob;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.mdstore.MDStoreVersion;
import scala.collection.JavaConversions;
public class CollectNewOafResults {
private static final Logger log = LoggerFactory.getLogger(RunSQLSparkJob.class);
private final ArgumentApplicationParser parser;
public CollectNewOafResults(ArgumentApplicationParser parser) {
this.parser = parser;
}
public static void main(String[] args) throws Exception {
String jsonConfiguration = IOUtils
.toString(
CollectNewOafResults.class
.getResourceAsStream(
"/eu/dnetlib/dhp/oa/graph/incremental/collect/collectnewresults_input_parameters.json"));
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
parser.parseArgument(args);
Boolean isSparkSessionManaged = Optional
.ofNullable(parser.get("isSparkSessionManaged"))
.map(Boolean::valueOf)
.orElse(Boolean.TRUE);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
final String wrkdirPath = parser.get("workingDir");
log.info("workingDir is {}", wrkdirPath);
final String outputPath = parser.get("outputPath");
log.info("outputPath is {}", outputPath);
final String mdStoreManagerURI = parser.get("mdStoreManagerURI");
log.info("mdStoreManagerURI is {}", mdStoreManagerURI);
final String mdStoreID = parser.get("mdStoreID");
if (StringUtils.isBlank(mdStoreID)) {
throw new IllegalArgumentException("missing or empty argument mdStoreID");
}
final String hiveDbName = parser.get("hiveDbName");
log.info("hiveDbName is {}", hiveDbName);
final MDStoreVersion currentVersion = DNetRestClient
.doGET(String.format(MDStoreActionNode.READ_LOCK_URL, mdStoreManagerURI, mdStoreID), MDStoreVersion.class);
log.info("mdstore data is {}", currentVersion.toString());
try {
SparkConf conf = new SparkConf();
conf.set("hive.metastore.uris", parser.get("hiveMetastoreUris"));
runWithSparkHiveSession(
conf,
isSparkSessionManaged,
spark -> {
// ids in the current graph
Dataset<Row> currentIds = spark
.table(hiveDbName + ".result")
.select("id")
.union(
spark
.table(hiveDbName + ".relation")
.where("relClass = 'merges'")
.selectExpr("target as id"))
.distinct();
UserDefinedFunction getOafType = udf(
(String json) -> CopyHdfsOafSparkApplication.getOafType(json), DataTypes.StringType);
// new collected ids
spark
.read()
.text(currentVersion.getHdfsPath() + "/store")
.selectExpr(
"value",
"get_json_object(value, '$.id') AS id")
.where("id IS NOT NULL")
.join(currentIds, JavaConversions.asScalaBuffer(Collections.singletonList("id")), "left_anti")
.withColumn("oaftype", getOafType.apply(new Column("value")))
.write()
.partitionBy("oaftype")
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.parquet(wrkdirPath + "/entities");
ModelSupport.oafTypes
.keySet()
.forEach(
entity -> spark
.read()
.parquet(wrkdirPath + "/entities")
.filter("oaftype = '" + entity + "'")
.select("value")
.write()
.option("compression", "gzip")
.mode(SaveMode.Append)
.text(outputPath + "/" + entity));
Dataset<Row> newIds = spark.read().parquet(wrkdirPath + "/entities").select("id");
Dataset<Row> rels = spark
.read()
.text(currentVersion.getHdfsPath() + "/store")
.selectExpr(
"value",
"get_json_object(value, '$.source') AS source",
"get_json_object(value, '$.target') AS target")
.where("source IS NOT NULL AND target IS NOT NULL");
rels
.join(
newIds.selectExpr("id as source"),
JavaConversions.asScalaBuffer(Collections.singletonList("source")), "left_semi")
.union(
rels
.join(
newIds.selectExpr("id as target"),
JavaConversions.asScalaBuffer(Collections.singletonList("target")), "left_semi"))
.distinct()
.select("value")
.write()
.option("compression", "gzip")
.mode(SaveMode.Append)
.text(outputPath + "/relation");
});
} finally {
DNetRestClient
.doGET(String.format(MDStoreActionNode.READ_UNLOCK_URL, mdStoreManagerURI, currentVersion.getId()));
}
}
}

View File

@ -0,0 +1,89 @@
package eu.dnetlib.dhp.incremental
import eu.dnetlib.dhp.application.ArgumentApplicationParser
import eu.dnetlib.dhp.schema.oaf.Relation
import eu.dnetlib.dhp.schema.oaf.utils.IdentifierFactory
import org.apache.commons.io.IOUtils
import org.apache.spark.SparkConf
import org.apache.spark.sql._
import org.apache.spark.sql.functions.{col, expr}
import org.slf4j.{Logger, LoggerFactory}
object SparkResolveRelationById {
def main(args: Array[String]): Unit = {
val log: Logger = LoggerFactory.getLogger(getClass)
val conf: SparkConf = new SparkConf()
val parser = new ArgumentApplicationParser(
IOUtils.toString(
getClass.getResourceAsStream(
"/eu/dnetlib/dhp/oa/graph/incremental/resolution/resolve_relationsbyid_params.json"
)
)
)
parser.parseArgument(args)
conf.set("hive.metastore.uris", parser.get("hiveMetastoreUris"))
val graphBasePath = parser.get("graphBasePath")
log.info(s"graphBasePath -> $graphBasePath")
val relationPath = parser.get("relationPath")
log.info(s"relationPath -> $relationPath")
val targetPath = parser.get("targetGraph")
log.info(s"targetGraph -> $targetPath")
val hiveDbName = parser.get("hiveDbName")
log.info(s"hiveDbName -> $hiveDbName")
val spark: SparkSession =
SparkSession
.builder()
.config(conf)
.enableHiveSupport()
.appName(getClass.getSimpleName)
.getOrCreate()
implicit val relEncoder: Encoder[Relation] = Encoders.bean(classOf[Relation])
val mergedrels =
spark.table(s"${hiveDbName}.relation").where("relclass = 'merges'").selectExpr("source as dedupId", "target as mergedId")
spark.read
.schema(Encoders.bean(classOf[Relation]).schema)
.json(s"$graphBasePath/relation")
.as[Relation]
.map(r => resolveRelations(r))
.join(mergedrels, col("source") === mergedrels.col("mergedId"), "left")
.withColumn("source", expr("coalesce(dedupId, source)"))
.drop("mergedId", "dedupID")
.join(mergedrels, col("target") === mergedrels.col("mergedId"), "left")
.withColumn("target", expr("coalesce(dedupId, target)"))
.drop("mergedId", "dedupID")
.write
.option("compression", "gzip")
.mode(SaveMode.Overwrite)
.json(s"$targetPath/relation")
}
private def resolveRelations(r: Relation): Relation = {
if (r.getSource.startsWith("unresolved::"))
r.setSource(resolvePid(r.getSource.substring(12)))
if (r.getTarget.startsWith("unresolved::"))
r.setTarget(resolvePid(r.getTarget.substring(12)))
r
}
private def resolvePid(str: String): String = {
val parts = str.split("::")
val id = parts(0)
val scheme: String = parts.last match {
case "arxiv" => "arXiv"
case _ => parts.last
}
IdentifierFactory.idFromPid("50", scheme, id, true)
}
}

View File

@ -0,0 +1,44 @@
[
{
"paramName": "issm",
"paramLongName": "isSparkSessionManaged",
"paramDescription": "when true will stop SparkSession after job execution",
"paramRequired": false
},
{
"paramName": "mu",
"paramLongName": "mdStoreManagerURI",
"paramDescription": "the MDStore Manager URI",
"paramRequired": true
},
{
"paramName": "mi",
"paramLongName": "mdStoreID",
"paramDescription": "the Metadata Store ID",
"paramRequired": false
},
{
"paramName": "wd",
"paramLongName": "workingDir",
"paramDescription": "the path to store the output graph",
"paramRequired": true
},
{
"paramName": "o",
"paramLongName": "outputPath",
"paramDescription": "",
"paramRequired": true
},
{
"paramName": "hmu",
"paramLongName": "hiveMetastoreUris",
"paramDescription": "the hive metastore uris",
"paramRequired": true
},
{
"paramName": "db",
"paramLongName": "hiveDbName",
"paramDescription": "the graph hive database name",
"paramRequired": true
}
]

View File

@ -0,0 +1,23 @@
<configuration>
<property>
<name>jobTracker</name>
<value>yarnRM</value>
</property>
<property>
<name>nameNode</name>
<value>hdfs://nameservice1</value>
</property>
<property>
<name>oozie.use.system.libpath</name>
<value>true</value>
</property>
<property>
<name>oozie.action.sharelib.for.spark</name>
<value>spark2</value>
</property>
<property>
<name>hiveMetastoreUris</name>
<value>thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083</value>
</property>
</configuration>

View File

@ -0,0 +1,65 @@
<workflow-app name="NewResultsCollect_Workflow" xmlns="uri:oozie:workflow:0.5">
<parameters>
<property>
<name>mdStoreManagerURI</name>
<description>the path of the cleaned mdstore</description>
</property>
<property>
<name>mdStoreID</name>
<description>the identifier of the native MDStore</description>
</property>
<property>
<name>outputPath</name>
<description>outputDirectory</description>
</property>
<property>
<name>workingDir</name>
<description>outputDirectory</description>
</property>
<property>
<name>hiveMetastoreUris</name>
<description>hive server metastore URIs</description>
</property>
<property>
<name>hiveDbName</name>
<description>hive database containing last generated graph</description>
</property>
</parameters>
<start to="CollectJob"/>
<kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<action name="CollectJob">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Collect New Oaf Results</name>
<class>eu.dnetlib.dhp.incremental.CollectNewOafResults</class>
<jar>dhp-incremental-graph-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=1024
</spark-opts>
<arg>--outputPath</arg><arg>${outputPath}</arg>
<arg>--workingDir</arg><arg>${workingDir}/collect_new_results</arg>
<arg>--mdStoreID</arg><arg>${mdStoreID}</arg>
<arg>--mdStoreManagerURI</arg><arg>${mdStoreManagerURI}</arg>
<arg>--hiveMetastoreUris</arg><arg>${hiveMetastoreUris}</arg>
<arg>--hiveDbName</arg><arg>${hiveDbName}</arg>
</spark>
<ok to="End"/>
<error to="Kill"/>
</action>
<end name="End"/>
</workflow-app>

View File

@ -0,0 +1,26 @@
<configuration>
<property>
<name>jobTracker</name>
<value>yarnRM</value>
</property>
<property>
<name>nameNode</name>
<value>hdfs://nameservice1</value>
</property>
<property>
<name>oozie.use.system.libpath</name>
<value>true</value>
</property>
<property>
<name>oozie.action.sharelib.for.spark</name>
<value>spark2</value>
</property>
<property>
<name>hiveMetastoreUris</name>
<value>thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083</value>
</property>
<property>
<name>sparkSqlWarehouseDir</name>
<value>/user/hive/warehouse</value>
</property>
</configuration>

View File

@ -0,0 +1,11 @@
INSERT OVERWRITE DIRECTORY '${outputPath}/datasource'
USING json OPTIONS ('compression' 'gzip')
SELECT * FROM `${hiveDbName}`.`datasource`; /* EOS */
INSERT OVERWRITE DIRECTORY '${outputPath}/organization'
USING json OPTIONS ('compression' 'gzip')
SELECT * FROM `${hiveDbName}`.`organization`; /* EOS */
INSERT OVERWRITE DIRECTORY '${outputPath}/project'
USING json OPTIONS ('compression' 'gzip')
SELECT * FROM `${hiveDbName}`.`project`; /* EOS */

View File

@ -0,0 +1,66 @@
<workflow-app name="NewResultsCollect_Workflow" xmlns="uri:oozie:workflow:0.5">
<parameters>
<property>
<name>outputPath</name>
<description>outputDirectory</description>
</property>
<property>
<name>hiveMetastoreUris</name>
<description>hive server metastore URIs</description>
</property>
<property>
<name>sparkSqlWarehouseDir</name>
</property>
<property>
<name>hiveDbName</name>
<description>hive database containing last generated graph</description>
</property>
<!-- General oozie workflow properties -->
<property>
<name>sparkClusterOpts</name>
<value>--conf spark.network.timeout=600 --conf spark.extraListeners= --conf spark.sql.queryExecutionListeners= --conf spark.yarn.historyServer.address=http://iis-cdh5-test-m3.ocean.icm.edu.pl:18088 --conf spark.eventLog.dir=hdfs://nameservice1/user/spark/applicationHistory</value>
<description>spark cluster-wide options</description>
</property>
<property>
<name>sparkResourceOpts</name>
<value>--executor-memory=3G --conf spark.executor.memoryOverhead=3G --executor-cores=6 --driver-memory=8G --driver-cores=4</value>
<description>spark resource options</description>
</property>
<property>
<name>sparkApplicationOpts</name>
<value>--conf spark.sql.shuffle.partitions=1024</value>
<description>spark resource options</description>
</property>
</parameters>
<start to="MigrationJob"/>
<kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<action name="MigrationJob">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Copy data from last graph</name>
<class>eu.dnetlib.dhp.oozie.RunSQLSparkJob</class>
<jar>dhp-incremental-graph-${projectVersion}.jar</jar>
<spark-opts>
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
${sparkClusterOpts}
${sparkResourceOpts}
${sparkApplicationOpts}
</spark-opts>
<arg>--hiveMetastoreUris</arg><arg>${hiveMetastoreUris}</arg>
<arg>--sql</arg><arg>eu/dnetlib/dhp/oa/graph/incremental/migrate/oozie_app/migration.sql</arg>
<arg>--hiveDbName</arg><arg>${hiveDbName}</arg>
<arg>--outputPath</arg><arg>${outputPath}</arg>
</spark>
<ok to="End"/>
<error to="Kill"/>
</action>
<end name="End"/>
</workflow-app>

View File

@ -0,0 +1,23 @@
<configuration>
<property>
<name>jobTracker</name>
<value>yarnRM</value>
</property>
<property>
<name>nameNode</name>
<value>hdfs://nameservice1</value>
</property>
<property>
<name>oozie.use.system.libpath</name>
<value>true</value>
</property>
<property>
<name>oozie.action.sharelib.for.spark</name>
<value>spark2</value>
</property>
<property>
<name>hiveMetastoreUris</name>
<value>thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083</value>
</property>
</configuration>

View File

@ -0,0 +1,138 @@
<workflow-app name="Transformation_Workflow" xmlns="uri:oozie:workflow:0.5">
<parameters>
<property>
<name>graphBasePath</name>
<description>input graph to resolve</description>
</property>
<property>
<name>targetGraph</name>
<description>outputDirectory</description>
</property>
<property>
<name>workingDir</name>
<description>outputDirectory</description>
</property>
<property>
<name>hiveMetastoreUris</name>
<description>hive server metastore URIs</description>
</property>
<property>
<name>hiveDbName</name>
<description>hive database containing last generated graph</description>
</property>
</parameters>
<start to="ResolveJob"/>
<kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<action name="ResolveJob">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Resolve Relations</name>
<class>eu.dnetlib.dhp.incremental.SparkResolveRelationById</class>
<jar>dhp-incremental-graph-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
</spark-opts>
<arg>--graphBasePath</arg><arg>${graphBasePath}</arg>
<arg>--targetGraph</arg><arg>${targetGraph}</arg>
<arg>--workingDir</arg><arg>${workingDir}/resolve_relation</arg>
<arg>--hiveMetastoreUris</arg><arg>${hiveMetastoreUris}</arg>
<arg>--hiveDbName</arg><arg>${hiveDbName}</arg>
</spark>
<ok to="reset_outputpath"/>
<error to="Kill"/>
</action>
<action name="reset_outputpath">
<fs>
<delete path="${targetGraph}/dataset"/>
<delete path="${targetGraph}/datasource"/>
<delete path="${targetGraph}/organization"/>
<delete path="${targetGraph}/otherresearchproduct"/>
<delete path="${targetGraph}/person"/>
<delete path="${targetGraph}/project"/>
<delete path="${targetGraph}/publication"/>
<delete path="${targetGraph}/software"/>
</fs>
<ok to="copy_dataset"/>
<error to="Kill"/>
</action>
<action name="copy_dataset">
<distcp xmlns="uri:oozie:distcp-action:0.2">
<arg>${nameNode}/${graphBasePath}/dataset</arg>
<arg>${nameNode}/${targetGraph}/dataset</arg>
</distcp>
<ok to="copy_datasource"/>
<error to="Kill"/>
</action>
<action name="copy_datasource">
<distcp xmlns="uri:oozie:distcp-action:0.2">
<arg>${nameNode}/${graphBasePath}/datasource</arg>
<arg>${nameNode}/${targetGraph}/datasource</arg>
</distcp>
<ok to="copy_organization"/>
<error to="Kill"/>
</action>
<action name="copy_organization">
<distcp xmlns="uri:oozie:distcp-action:0.2">
<arg>${nameNode}/${graphBasePath}/organization</arg>
<arg>${nameNode}/${targetGraph}/organization</arg>
</distcp>
<ok to="copy_otherresearchproduct"/>
<error to="Kill"/>
</action>
<action name="copy_otherresearchproduct">
<distcp xmlns="uri:oozie:distcp-action:0.2">
<arg>${nameNode}/${graphBasePath}/otherresearchproduct</arg>
<arg>${nameNode}/${targetGraph}/otherresearchproduct</arg>
</distcp>
<ok to="copy_person"/>
<error to="Kill"/>
</action>
<action name="copy_person">
<distcp xmlns="uri:oozie:distcp-action:0.2">
<arg>${nameNode}/${graphBasePath}/person</arg>
<arg>${nameNode}/${targetGraph}/person</arg>
</distcp>
<ok to="copy_project"/>
<error to="Kill"/>
</action>
<action name="copy_project">
<distcp xmlns="uri:oozie:distcp-action:0.2">
<arg>${nameNode}/${graphBasePath}/project</arg>
<arg>${nameNode}/${targetGraph}/project</arg>
</distcp>
<ok to="copy_publication"/>
<error to="Kill"/>
</action>
<action name="copy_publication">
<distcp xmlns="uri:oozie:distcp-action:0.2">
<arg>${nameNode}/${graphBasePath}/publication</arg>
<arg>${nameNode}/${targetGraph}/publication</arg>
</distcp>
<ok to="copy_software"/>
<error to="Kill"/>
</action>
<action name="copy_software">
<distcp xmlns="uri:oozie:distcp-action:0.2">
<arg>${nameNode}/${graphBasePath}/software</arg>
<arg>${nameNode}/${targetGraph}/software</arg>
</distcp>
<ok to="End"/>
<error to="Kill"/>
</action>
<end name="End"/>
</workflow-app>

View File

@ -0,0 +1,32 @@
[
{
"paramName": "g",
"paramLongName": "graphBasePath",
"paramDescription": "the path of the raw graph",
"paramRequired": true
},
{
"paramName": "t",
"paramLongName": "targetGraph",
"paramDescription": "the target path",
"paramRequired": true
},
{
"paramName": "wd",
"paramLongName": "workingDir",
"paramDescription": "the path to store the output graph",
"paramRequired": true
},
{
"paramName": "hmu",
"paramLongName": "hiveMetastoreUris",
"paramDescription": "the hive metastore uris",
"paramRequired": true
},
{
"paramName": "db",
"paramLongName": "hiveDbName",
"paramDescription": "the graph hive database name",
"paramRequired": true
}
]

View File

@ -43,6 +43,7 @@
<module>dhp-doiboost</module> <module>dhp-doiboost</module>
<module>dhp-impact-indicators</module> <module>dhp-impact-indicators</module>
<module>dhp-swh</module> <module>dhp-swh</module>
<module>dhp-incremental-graph</module>
</modules> </modules>
<pluginRepositories> <pluginRepositories>