forked from D-Net/dnet-hadoop
Compare commits
16 Commits
Author | SHA1 | Date |
---|---|---|
Claudio Atzori | a5f23d8a4c | |
Claudio Atzori | cce21eafc2 | |
Claudio Atzori | fde8738ed2 | |
Claudio Atzori | 3be0e5c2cd | |
Claudio Atzori | f3ce97ecf9 | |
Claudio Atzori | 771bf8bcc4 | |
Claudio Atzori | 0da1d2c0c9 | |
Claudio Atzori | 1fcc28968e | |
Claudio Atzori | da2f8af72d | |
Claudio Atzori | 4ff184973b | |
Claudio Atzori | 9e594cf4c2 | |
Claudio Atzori | ee0b2191f8 | |
Claudio Atzori | fd289d389c | |
Claudio Atzori | 2dbac631c9 | |
Claudio Atzori | 91811ab43a | |
Claudio Atzori | b0b6c6bd47 |
|
@ -195,10 +195,10 @@ public class SparkDedupTest implements Serializable {
|
|||
.count();
|
||||
|
||||
assertEquals(3432, orgs_simrel);
|
||||
assertEquals(7152, pubs_simrel);
|
||||
assertEquals(344, sw_simrel);
|
||||
assertEquals(6944, pubs_simrel);
|
||||
assertEquals(318, sw_simrel);
|
||||
assertEquals(458, ds_simrel);
|
||||
assertEquals(6750, orp_simrel);
|
||||
assertEquals(6746, orp_simrel);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -344,10 +344,10 @@ public class SparkDedupTest implements Serializable {
|
|||
.count();
|
||||
|
||||
assertEquals(1276, orgs_mergerel);
|
||||
assertEquals(1442, pubs_mergerel);
|
||||
assertEquals(288, sw_mergerel);
|
||||
assertEquals(1418, pubs_mergerel);
|
||||
assertEquals(276, sw_mergerel);
|
||||
assertEquals(472, ds_mergerel);
|
||||
assertEquals(718, orp_mergerel);
|
||||
assertEquals(716, orp_mergerel);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -391,8 +391,8 @@ public class SparkDedupTest implements Serializable {
|
|||
.count();
|
||||
|
||||
assertEquals(82, orgs_deduprecord);
|
||||
assertEquals(66, pubs_deduprecord);
|
||||
assertEquals(51, sw_deduprecord);
|
||||
assertEquals(65, pubs_deduprecord);
|
||||
assertEquals(50, sw_deduprecord);
|
||||
assertEquals(96, ds_deduprecord);
|
||||
assertEquals(89, orp_deduprecord);
|
||||
}
|
||||
|
@ -473,11 +473,11 @@ public class SparkDedupTest implements Serializable {
|
|||
.distinct()
|
||||
.count();
|
||||
|
||||
assertEquals(897, publications);
|
||||
assertEquals(896, publications);
|
||||
assertEquals(835, organizations);
|
||||
assertEquals(100, projects);
|
||||
assertEquals(100, datasource);
|
||||
assertEquals(200, softwares);
|
||||
assertEquals(199, softwares);
|
||||
assertEquals(388, dataset);
|
||||
assertEquals(517, otherresearchproduct);
|
||||
|
||||
|
@ -533,7 +533,7 @@ public class SparkDedupTest implements Serializable {
|
|||
|
||||
long relations = jsc.textFile(testDedupGraphBasePath + "/relation").count();
|
||||
|
||||
assertEquals(4866, relations);
|
||||
assertEquals(4828, relations);
|
||||
|
||||
// check deletedbyinference
|
||||
final Dataset<Relation> mergeRels = spark
|
||||
|
|
|
@ -168,10 +168,10 @@ public class SparkStatsTest implements Serializable {
|
|||
.textFile(testOutputBasePath + "/" + testActionSetId + "/otherresearchproduct_blockstats")
|
||||
.count();
|
||||
|
||||
assertEquals(121, orgs_blocks);
|
||||
assertEquals(110, pubs_blocks);
|
||||
assertEquals(21, sw_blocks);
|
||||
assertEquals(67, ds_blocks);
|
||||
assertEquals(55, orp_blocks);
|
||||
assertEquals(549, orgs_blocks);
|
||||
assertEquals(868, pubs_blocks);
|
||||
assertEquals(473, sw_blocks);
|
||||
assertEquals(523, ds_blocks);
|
||||
assertEquals(756, orp_blocks);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -17,8 +17,7 @@
|
|||
},
|
||||
"pace" : {
|
||||
"clustering" : [
|
||||
{ "name" : "ngrampairs", "fields" : [ "title" ], "params" : { "max" : "1", "ngramLen" : "3"} },
|
||||
{ "name" : "suffixprefix", "fields" : [ "title" ], "params" : { "max" : "1", "len" : "3" } },
|
||||
{ "name" : "wordsStatsSuffixPrefixChain", "fields" : [ "title" ], "params" : { "mod" : "10" } },
|
||||
{ "name" : "lowercase", "fields" : [ "doi" ], "params" : { } }
|
||||
],
|
||||
"decisionTree" : {
|
||||
|
|
|
@ -17,8 +17,7 @@
|
|||
},
|
||||
"pace" : {
|
||||
"clustering" : [
|
||||
{ "name" : "ngrampairs", "fields" : [ "title" ], "params" : { "max" : "1", "ngramLen" : "3"} },
|
||||
{ "name" : "suffixprefix", "fields" : [ "title" ], "params" : { "max" : "1", "len" : "3" } },
|
||||
{ "name" : "wordsStatsSuffixPrefixChain", "fields" : [ "title" ], "params" : { "mod" : "10" } },
|
||||
{ "name" : "lowercase", "fields" : [ "doi" ], "params" : { } }
|
||||
],
|
||||
"decisionTree" : {
|
||||
|
|
|
@ -29,8 +29,7 @@
|
|||
},
|
||||
"pace": {
|
||||
"clustering" : [
|
||||
{ "name" : "ngrampairs", "fields" : [ "title" ], "params" : { "max" : "1", "ngramLen" : "3"} },
|
||||
{ "name" : "suffixprefix", "fields" : [ "title" ], "params" : { "max" : "1", "len" : "3" } },
|
||||
{ "name" : "wordsStatsSuffixPrefixChain", "fields" : [ "title" ], "params" : { "mod" : "10" } },
|
||||
{ "name" : "lowercase", "fields" : [ "doi" ], "params" : { } }
|
||||
],
|
||||
"decisionTree": {
|
||||
|
|
|
@ -17,8 +17,7 @@
|
|||
},
|
||||
"pace" : {
|
||||
"clustering" : [
|
||||
{ "name" : "ngrampairs", "fields" : [ "title" ], "params" : { "max" : "1", "ngramLen" : "3"} },
|
||||
{ "name" : "suffixprefix", "fields" : [ "title" ], "params" : { "max" : "1", "len" : "3" } },
|
||||
{ "name" : "wordsStatsSuffixPrefixChain", "fields" : [ "title" ], "params" : { "mod" : "10" } },
|
||||
{ "name" : "lowercase", "fields" : [ "doi" ], "params" : { } }
|
||||
],
|
||||
"decisionTree": {
|
||||
|
|
|
@ -71,6 +71,11 @@
|
|||
<artifactId>dhp-schemas</artifactId>
|
||||
<version>${project.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>eu.dnetlib.dhp</groupId>
|
||||
<artifactId>dhp-workflows-common</artifactId>
|
||||
<version>${project.version}</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>com.jayway.jsonpath</groupId>
|
||||
|
|
|
@ -1,9 +1,10 @@
|
|||
|
||||
package eu.dnetlib.dhp.oa.graph.clean;
|
||||
|
||||
import static eu.dnetlib.dhp.common.GraphSupport.*;
|
||||
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession;
|
||||
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
|
||||
|
||||
import java.io.BufferedInputStream;
|
||||
import java.util.Objects;
|
||||
import java.util.Optional;
|
||||
import java.util.stream.Collectors;
|
||||
|
@ -14,7 +15,6 @@ import org.apache.spark.SparkConf;
|
|||
import org.apache.spark.api.java.function.MapFunction;
|
||||
import org.apache.spark.sql.Dataset;
|
||||
import org.apache.spark.sql.Encoders;
|
||||
import org.apache.spark.sql.SaveMode;
|
||||
import org.apache.spark.sql.SparkSession;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
@ -22,10 +22,8 @@ import org.slf4j.LoggerFactory;
|
|||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
|
||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||
import eu.dnetlib.dhp.common.HdfsSupport;
|
||||
import eu.dnetlib.dhp.common.*;
|
||||
import eu.dnetlib.dhp.oa.graph.raw.AbstractMdRecordToOafMapper;
|
||||
import eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils;
|
||||
import eu.dnetlib.dhp.oa.graph.raw.common.VocabularyGroup;
|
||||
import eu.dnetlib.dhp.schema.common.ModelConstants;
|
||||
import eu.dnetlib.dhp.schema.oaf.*;
|
||||
import eu.dnetlib.dhp.utils.ISLookupClientFactory;
|
||||
|
@ -53,50 +51,64 @@ public class CleanGraphSparkJob {
|
|||
.orElse(Boolean.TRUE);
|
||||
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
|
||||
|
||||
String inputPath = parser.get("inputPath");
|
||||
log.info("inputPath: {}", inputPath);
|
||||
String inputGraph = parser.get("inputGraph");
|
||||
log.info("inputGraph: {}", inputGraph);
|
||||
|
||||
String outputPath = parser.get("outputPath");
|
||||
log.info("outputPath: {}", outputPath);
|
||||
String outputGraph = parser.get("outputGraph");
|
||||
log.info("outputGraph: {}", outputGraph);
|
||||
|
||||
String isLookupUrl = parser.get("isLookupUrl");
|
||||
log.info("isLookupUrl: {}", isLookupUrl);
|
||||
GraphFormat inputGraphFormat = Optional
|
||||
.ofNullable(parser.get("inputGraphFormat"))
|
||||
.map(GraphFormat::valueOf)
|
||||
.orElse(GraphFormat.DEFAULT);
|
||||
log.info("inputGraphFormat: {}", inputGraphFormat);
|
||||
|
||||
GraphFormat outputGraphFormat = Optional
|
||||
.ofNullable(parser.get("outputGraphFormat"))
|
||||
.map(GraphFormat::valueOf)
|
||||
.orElse(GraphFormat.DEFAULT);
|
||||
log.info("outputGraphFormat: {}", outputGraphFormat);
|
||||
|
||||
String isLookUpUrl = parser.get("isLookUpUrl");
|
||||
log.info("isLookUpUrl: {}", isLookUpUrl);
|
||||
|
||||
String graphTableClassName = parser.get("graphTableClassName");
|
||||
log.info("graphTableClassName: {}", graphTableClassName);
|
||||
|
||||
Class<? extends OafEntity> entityClazz = (Class<? extends OafEntity>) Class.forName(graphTableClassName);
|
||||
|
||||
final ISLookUpService isLookupService = ISLookupClientFactory.getLookUpService(isLookupUrl);
|
||||
final VocabularyGroup vocs = VocabularyGroup.loadVocsFromIS(isLookupService);
|
||||
String hiveMetastoreUris = parser.get("hiveMetastoreUris");
|
||||
log.info("hiveMetastoreUris: {}", hiveMetastoreUris);
|
||||
|
||||
SparkConf conf = new SparkConf();
|
||||
runWithSparkSession(
|
||||
conf.set("hive.metastore.uris", hiveMetastoreUris);
|
||||
|
||||
Class<? extends Oaf> clazz = (Class<? extends Oaf>) Class.forName(graphTableClassName);
|
||||
|
||||
final ISLookUpService isLookupService = ISLookupClientFactory.getLookUpService(isLookUpUrl);
|
||||
final VocabularyGroup vocs = VocabularyGroup.loadVocsFromIS(isLookupService);
|
||||
|
||||
runWithSparkHiveSession(
|
||||
conf,
|
||||
isSparkSessionManaged,
|
||||
spark -> {
|
||||
removeOutputDir(spark, outputPath);
|
||||
fixGraphTable(spark, vocs, inputPath, entityClazz, outputPath);
|
||||
});
|
||||
spark -> cleanGraphTable(spark, vocs, inputGraph, inputGraphFormat, outputGraph, outputGraphFormat, clazz));
|
||||
}
|
||||
|
||||
private static <T extends Oaf> void fixGraphTable(
|
||||
private static <T extends Oaf> void cleanGraphTable(
|
||||
SparkSession spark,
|
||||
VocabularyGroup vocs,
|
||||
String inputPath,
|
||||
Class<T> clazz,
|
||||
String outputPath) {
|
||||
String inputGraph,
|
||||
GraphFormat inputGraphFormat,
|
||||
String outputGraph,
|
||||
GraphFormat outputGraphFormat,
|
||||
Class<T> clazz) {
|
||||
|
||||
final CleaningRuleMap mapping = CleaningRuleMap.create(vocs);
|
||||
|
||||
readTableFromPath(spark, inputPath, clazz)
|
||||
Dataset<T> cleaned = readGraph(spark, inputGraph, clazz, inputGraphFormat)
|
||||
.map((MapFunction<T, T>) value -> fixVocabularyNames(value), Encoders.bean(clazz))
|
||||
.map((MapFunction<T, T>) value -> OafCleaner.apply(value, mapping), Encoders.bean(clazz))
|
||||
.map((MapFunction<T, T>) value -> fixDefaults(value), Encoders.bean(clazz))
|
||||
.write()
|
||||
.mode(SaveMode.Overwrite)
|
||||
.option("compression", "gzip")
|
||||
.json(outputPath);
|
||||
.map((MapFunction<T, T>) value -> fixDefaults(value), Encoders.bean(clazz));
|
||||
|
||||
saveGraphTable(cleaned, clazz, outputGraph, outputGraphFormat);
|
||||
}
|
||||
|
||||
protected static <T extends Oaf> T fixVocabularyNames(T value) {
|
||||
|
|
|
@ -1,6 +1,9 @@
|
|||
|
||||
package eu.dnetlib.dhp.oa.graph.merge;
|
||||
|
||||
import static eu.dnetlib.dhp.common.GraphSupport.deleteGraphTable;
|
||||
import static eu.dnetlib.dhp.common.GraphSupport.saveGraphTable;
|
||||
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession;
|
||||
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
|
||||
|
||||
import java.util.Objects;
|
||||
|
@ -20,7 +23,8 @@ import org.slf4j.LoggerFactory;
|
|||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
|
||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||
import eu.dnetlib.dhp.common.HdfsSupport;
|
||||
import eu.dnetlib.dhp.common.GraphFormat;
|
||||
import eu.dnetlib.dhp.common.GraphSupport;
|
||||
import eu.dnetlib.dhp.oa.graph.clean.CleanGraphSparkJob;
|
||||
import eu.dnetlib.dhp.schema.common.ModelSupport;
|
||||
import eu.dnetlib.dhp.schema.oaf.*;
|
||||
|
@ -35,8 +39,6 @@ public class MergeGraphSparkJob {
|
|||
|
||||
private static final Logger log = LoggerFactory.getLogger(CleanGraphSparkJob.class);
|
||||
|
||||
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
|
||||
|
||||
private static final String PRIORITY_DEFAULT = "BETA"; // BETA | PROD
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
|
@ -60,46 +62,61 @@ public class MergeGraphSparkJob {
|
|||
.orElse(Boolean.TRUE);
|
||||
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
|
||||
|
||||
String betaInputPath = parser.get("betaInputPath");
|
||||
log.info("betaInputPath: {}", betaInputPath);
|
||||
String betaInputGraph = parser.get("betaInputGraph");
|
||||
log.info("betaInputGraph: {}", betaInputGraph);
|
||||
|
||||
String prodInputPath = parser.get("prodInputPath");
|
||||
log.info("prodInputPath: {}", prodInputPath);
|
||||
String prodInputGraph = parser.get("prodInputGraph");
|
||||
log.info("prodInputGraph: {}", prodInputGraph);
|
||||
|
||||
String outputPath = parser.get("outputPath");
|
||||
log.info("outputPath: {}", outputPath);
|
||||
String outputGraph = parser.get("outputGraph");
|
||||
log.info("outputGraph: {}", outputGraph);
|
||||
|
||||
GraphFormat inputGraphFormat = Optional
|
||||
.ofNullable(parser.get("inputGraphFormat"))
|
||||
.map(GraphFormat::valueOf)
|
||||
.orElse(GraphFormat.DEFAULT);
|
||||
log.info("inputGraphFormat: {}", inputGraphFormat);
|
||||
|
||||
GraphFormat outputGraphFormat = Optional
|
||||
.ofNullable(parser.get("outputGraphFormat"))
|
||||
.map(GraphFormat::valueOf)
|
||||
.orElse(GraphFormat.DEFAULT);
|
||||
log.info("outputGraphFormat: {}", outputGraphFormat);
|
||||
|
||||
String graphTableClassName = parser.get("graphTableClassName");
|
||||
log.info("graphTableClassName: {}", graphTableClassName);
|
||||
|
||||
Class<? extends OafEntity> entityClazz = (Class<? extends OafEntity>) Class.forName(graphTableClassName);
|
||||
Class<? extends Oaf> clazz = (Class<? extends Oaf>) Class.forName(graphTableClassName);
|
||||
|
||||
String hiveMetastoreUris = parser.get("hiveMetastoreUris");
|
||||
log.info("hiveMetastoreUris: {}", hiveMetastoreUris);
|
||||
|
||||
SparkConf conf = new SparkConf();
|
||||
conf.set("hive.metastore.uris", hiveMetastoreUris);
|
||||
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
|
||||
conf.registerKryoClasses(ModelSupport.getOafModelClasses());
|
||||
|
||||
runWithSparkSession(
|
||||
runWithSparkHiveSession(
|
||||
conf,
|
||||
isSparkSessionManaged,
|
||||
spark -> {
|
||||
removeOutputDir(spark, outputPath);
|
||||
mergeGraphTable(spark, priority, betaInputPath, prodInputPath, entityClazz, entityClazz, outputPath);
|
||||
});
|
||||
spark -> mergeGraphTable(
|
||||
spark, priority, betaInputGraph, clazz, prodInputGraph, clazz, outputGraph, inputGraphFormat,
|
||||
outputGraphFormat));
|
||||
}
|
||||
|
||||
private static <P extends Oaf, B extends Oaf> void mergeGraphTable(
|
||||
SparkSession spark,
|
||||
String priority,
|
||||
String betaInputPath,
|
||||
String prodInputPath,
|
||||
Class<P> p_clazz,
|
||||
String betaInputGraph,
|
||||
Class<B> b_clazz,
|
||||
String outputPath) {
|
||||
String prodInputGraph,
|
||||
Class<P> p_clazz,
|
||||
String outputGraph, GraphFormat inputGraphFormat, GraphFormat outputGraphFormat) {
|
||||
|
||||
Dataset<Tuple2<String, B>> beta = readTableFromPath(spark, betaInputPath, b_clazz);
|
||||
Dataset<Tuple2<String, P>> prod = readTableFromPath(spark, prodInputPath, p_clazz);
|
||||
Dataset<Tuple2<String, B>> beta = readGraph(spark, betaInputGraph, b_clazz, inputGraphFormat);
|
||||
Dataset<Tuple2<String, P>> prod = readGraph(spark, prodInputGraph, p_clazz, inputGraphFormat);
|
||||
|
||||
prod
|
||||
Dataset<P> merged = prod
|
||||
.joinWith(beta, prod.col("_1").equalTo(beta.col("_1")), "full_outer")
|
||||
.map((MapFunction<Tuple2<Tuple2<String, P>, Tuple2<String, B>>, P>) value -> {
|
||||
Optional<P> p = Optional.ofNullable(value._1()).map(Tuple2::_2);
|
||||
|
@ -112,11 +129,9 @@ public class MergeGraphSparkJob {
|
|||
return mergeWithPriorityToPROD(p, b);
|
||||
}
|
||||
}, Encoders.bean(p_clazz))
|
||||
.filter((FilterFunction<P>) Objects::nonNull)
|
||||
.write()
|
||||
.mode(SaveMode.Overwrite)
|
||||
.option("compression", "gzip")
|
||||
.json(outputPath);
|
||||
.filter((FilterFunction<P>) Objects::nonNull);
|
||||
|
||||
saveGraphTable(merged, p_clazz, outputGraph, outputGraphFormat);
|
||||
}
|
||||
|
||||
private static <P extends Oaf, B extends Oaf> P mergeWithPriorityToPROD(Optional<P> p, Optional<B> b) {
|
||||
|
@ -139,24 +154,16 @@ public class MergeGraphSparkJob {
|
|||
return null;
|
||||
}
|
||||
|
||||
private static <T extends Oaf> Dataset<Tuple2<String, T>> readTableFromPath(
|
||||
SparkSession spark, String inputEntityPath, Class<T> clazz) {
|
||||
private static <T extends Oaf> Dataset<Tuple2<String, T>> readGraph(
|
||||
SparkSession spark, String inputGraph, Class<T> clazz, GraphFormat inputGraphFormat) {
|
||||
|
||||
log.info("Reading Graph table from: {}", inputEntityPath);
|
||||
return spark
|
||||
.read()
|
||||
.textFile(inputEntityPath)
|
||||
.map(
|
||||
(MapFunction<String, Tuple2<String, T>>) value -> {
|
||||
final T t = OBJECT_MAPPER.readValue(value, clazz);
|
||||
final String id = ModelSupport.idFn().apply(t);
|
||||
return new Tuple2<>(id, t);
|
||||
},
|
||||
Encoders.tuple(Encoders.STRING(), Encoders.kryo(clazz)));
|
||||
}
|
||||
|
||||
private static void removeOutputDir(SparkSession spark, String path) {
|
||||
HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration());
|
||||
log.info("Reading Graph table from: {}", inputGraph);
|
||||
return GraphSupport
|
||||
.readGraph(spark, inputGraph, clazz, inputGraphFormat)
|
||||
.map((MapFunction<T, Tuple2<String, T>>) t -> {
|
||||
final String id = ModelSupport.idFn().apply(t);
|
||||
return new Tuple2<>(id, t);
|
||||
}, Encoders.tuple(Encoders.STRING(), Encoders.kryo(clazz)));
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
|
||||
package eu.dnetlib.dhp.oa.graph.raw;
|
||||
|
||||
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.*;
|
||||
import static eu.dnetlib.dhp.common.OafMapperUtils.*;
|
||||
import static eu.dnetlib.dhp.schema.common.ModelConstants.*;
|
||||
|
||||
import java.util.*;
|
||||
|
@ -12,7 +12,7 @@ import org.dom4j.DocumentFactory;
|
|||
import org.dom4j.DocumentHelper;
|
||||
import org.dom4j.Node;
|
||||
|
||||
import eu.dnetlib.dhp.oa.graph.raw.common.VocabularyGroup;
|
||||
import eu.dnetlib.dhp.common.VocabularyGroup;
|
||||
import eu.dnetlib.dhp.schema.common.LicenseComparator;
|
||||
import eu.dnetlib.dhp.schema.common.ModelConstants;
|
||||
import eu.dnetlib.dhp.schema.oaf.*;
|
||||
|
|
|
@ -27,7 +27,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
|
|||
|
||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||
import eu.dnetlib.dhp.common.HdfsSupport;
|
||||
import eu.dnetlib.dhp.oa.graph.raw.common.VocabularyGroup;
|
||||
import eu.dnetlib.dhp.common.VocabularyGroup;
|
||||
import eu.dnetlib.dhp.schema.common.ModelSupport;
|
||||
import eu.dnetlib.dhp.schema.oaf.Dataset;
|
||||
import eu.dnetlib.dhp.schema.oaf.Datasource;
|
||||
|
@ -70,10 +70,10 @@ public class GenerateEntitiesApplication {
|
|||
final String targetPath = parser.get("targetPath");
|
||||
log.info("targetPath: {}", targetPath);
|
||||
|
||||
final String isLookupUrl = parser.get("isLookupUrl");
|
||||
log.info("isLookupUrl: {}", isLookupUrl);
|
||||
final String isLookUpUrl = parser.get("isLookUpUrl");
|
||||
log.info("isLookUpUrl: {}", isLookUpUrl);
|
||||
|
||||
final ISLookUpService isLookupService = ISLookupClientFactory.getLookUpService(isLookupUrl);
|
||||
final ISLookUpService isLookupService = ISLookupClientFactory.getLookUpService(isLookUpUrl);
|
||||
final VocabularyGroup vocs = VocabularyGroup.loadVocsFromIS(isLookupService);
|
||||
|
||||
final SparkConf conf = new SparkConf();
|
||||
|
|
|
@ -1,7 +1,8 @@
|
|||
|
||||
package eu.dnetlib.dhp.oa.graph.raw;
|
||||
|
||||
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
|
||||
import static eu.dnetlib.dhp.common.GraphSupport.*;
|
||||
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession;
|
||||
|
||||
import java.util.Objects;
|
||||
import java.util.Optional;
|
||||
|
@ -9,29 +10,23 @@ import java.util.Optional;
|
|||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.spark.SparkConf;
|
||||
import org.apache.spark.api.java.JavaSparkContext;
|
||||
import org.apache.spark.api.java.function.FilterFunction;
|
||||
import org.apache.spark.api.java.function.MapFunction;
|
||||
import org.apache.spark.sql.Dataset;
|
||||
import org.apache.spark.sql.Encoders;
|
||||
import org.apache.spark.sql.SaveMode;
|
||||
import org.apache.spark.sql.SparkSession;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
|
||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||
import eu.dnetlib.dhp.common.HdfsSupport;
|
||||
import eu.dnetlib.dhp.common.GraphFormat;
|
||||
import eu.dnetlib.dhp.schema.common.ModelSupport;
|
||||
import eu.dnetlib.dhp.schema.oaf.*;
|
||||
import eu.dnetlib.dhp.schema.oaf.Oaf;
|
||||
import scala.Tuple2;
|
||||
|
||||
public class MergeClaimsApplication {
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(MergeClaimsApplication.class);
|
||||
|
||||
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
|
||||
|
||||
public static void main(final String[] args) throws Exception {
|
||||
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
|
||||
IOUtils
|
||||
|
@ -53,49 +48,47 @@ public class MergeClaimsApplication {
|
|||
final String claimsGraphPath = parser.get("claimsGraphPath");
|
||||
log.info("claimsGraphPath: {}", claimsGraphPath);
|
||||
|
||||
final String outputRawGaphPath = parser.get("outputRawGaphPath");
|
||||
log.info("outputRawGaphPath: {}", outputRawGaphPath);
|
||||
final String outputGraph = parser.get("outputGraph");
|
||||
log.info("outputGraph: {}", outputGraph);
|
||||
|
||||
final GraphFormat format = Optional
|
||||
.ofNullable(parser.get("format"))
|
||||
.map(GraphFormat::valueOf)
|
||||
.orElse(GraphFormat.DEFAULT);
|
||||
log.info("graphFormat: {}", format);
|
||||
|
||||
String graphTableClassName = parser.get("graphTableClassName");
|
||||
log.info("graphTableClassName: {}", graphTableClassName);
|
||||
|
||||
Class<? extends Oaf> clazz = (Class<? extends Oaf>) Class.forName(graphTableClassName);
|
||||
|
||||
String hiveMetastoreUris = parser.get("hiveMetastoreUris");
|
||||
log.info("hiveMetastoreUris: {}", hiveMetastoreUris);
|
||||
|
||||
SparkConf conf = new SparkConf();
|
||||
conf.set("hive.metastore.uris", hiveMetastoreUris);
|
||||
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
|
||||
conf.registerKryoClasses(ModelSupport.getOafModelClasses());
|
||||
|
||||
runWithSparkSession(
|
||||
runWithSparkHiveSession(
|
||||
conf,
|
||||
isSparkSessionManaged,
|
||||
spark -> {
|
||||
String type = clazz.getSimpleName().toLowerCase();
|
||||
|
||||
String rawPath = rawGraphPath + "/" + type;
|
||||
String claimPath = claimsGraphPath + "/" + type;
|
||||
String outPath = outputRawGaphPath + "/" + type;
|
||||
|
||||
removeOutputDir(spark, outPath);
|
||||
mergeByType(spark, rawPath, claimPath, outPath, clazz);
|
||||
});
|
||||
spark -> mergeByType(spark, rawGraphPath, claimsGraphPath, outputGraph, format, clazz));
|
||||
}
|
||||
|
||||
private static <T extends Oaf> void mergeByType(
|
||||
SparkSession spark, String rawPath, String claimPath, String outPath, Class<T> clazz) {
|
||||
Dataset<Tuple2<String, T>> raw = readFromPath(spark, rawPath, clazz)
|
||||
SparkSession spark, String rawPath, String claimPath, String outputGraph, GraphFormat format, Class<T> clazz) {
|
||||
Dataset<Tuple2<String, T>> raw = readGraphJSON(spark, rawPath, clazz)
|
||||
.map(
|
||||
(MapFunction<T, Tuple2<String, T>>) value -> new Tuple2<>(ModelSupport.idFn().apply(value), value),
|
||||
Encoders.tuple(Encoders.STRING(), Encoders.kryo(clazz)));
|
||||
|
||||
final JavaSparkContext jsc = JavaSparkContext.fromSparkContext(spark.sparkContext());
|
||||
Dataset<Tuple2<String, T>> claim = jsc
|
||||
.broadcast(readFromPath(spark, claimPath, clazz))
|
||||
.getValue()
|
||||
Dataset<Tuple2<String, T>> claim = readGraphJSON(spark, claimPath, clazz)
|
||||
.map(
|
||||
(MapFunction<T, Tuple2<String, T>>) value -> new Tuple2<>(ModelSupport.idFn().apply(value), value),
|
||||
Encoders.tuple(Encoders.STRING(), Encoders.kryo(clazz)));
|
||||
|
||||
raw
|
||||
Dataset<T> merged = raw
|
||||
.joinWith(claim, raw.col("_1").equalTo(claim.col("_1")), "full_outer")
|
||||
.map(
|
||||
(MapFunction<Tuple2<Tuple2<String, T>, Tuple2<String, T>>, T>) value -> {
|
||||
|
@ -107,28 +100,9 @@ public class MergeClaimsApplication {
|
|||
: opClaim.isPresent() ? opClaim.get()._2() : null;
|
||||
},
|
||||
Encoders.bean(clazz))
|
||||
.filter(Objects::nonNull)
|
||||
.map(
|
||||
(MapFunction<T, String>) value -> OBJECT_MAPPER.writeValueAsString(value),
|
||||
Encoders.STRING())
|
||||
.write()
|
||||
.mode(SaveMode.Overwrite)
|
||||
.option("compression", "gzip")
|
||||
.text(outPath);
|
||||
.filter(Objects::nonNull);
|
||||
|
||||
saveGraphTable(merged, clazz, outputGraph, format);
|
||||
}
|
||||
|
||||
private static <T extends Oaf> Dataset<T> readFromPath(
|
||||
SparkSession spark, String path, Class<T> clazz) {
|
||||
return spark
|
||||
.read()
|
||||
.textFile(path)
|
||||
.map(
|
||||
(MapFunction<String, T>) value -> OBJECT_MAPPER.readValue(value, clazz),
|
||||
Encoders.bean(clazz))
|
||||
.filter((FilterFunction<T>) value -> Objects.nonNull(ModelSupport.idFn().apply(value)));
|
||||
}
|
||||
|
||||
private static void removeOutputDir(SparkSession spark, String path) {
|
||||
HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,15 +1,7 @@
|
|||
|
||||
package eu.dnetlib.dhp.oa.graph.raw;
|
||||
|
||||
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.asString;
|
||||
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.createOpenaireId;
|
||||
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.dataInfo;
|
||||
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.field;
|
||||
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.journal;
|
||||
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.listFields;
|
||||
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.listKeyValues;
|
||||
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.qualifier;
|
||||
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.structuredProperty;
|
||||
import static eu.dnetlib.dhp.common.OafMapperUtils.*;
|
||||
import static eu.dnetlib.dhp.schema.common.ModelConstants.DATASET_DEFAULT_RESULTTYPE;
|
||||
import static eu.dnetlib.dhp.schema.common.ModelConstants.DATASOURCE_ORGANIZATION;
|
||||
import static eu.dnetlib.dhp.schema.common.ModelConstants.DNET_PROVENANCE_ACTIONS;
|
||||
|
@ -53,9 +45,9 @@ import org.slf4j.LoggerFactory;
|
|||
|
||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||
import eu.dnetlib.dhp.common.DbClient;
|
||||
import eu.dnetlib.dhp.common.VocabularyGroup;
|
||||
import eu.dnetlib.dhp.oa.graph.raw.common.AbstractMigrationApplication;
|
||||
import eu.dnetlib.dhp.oa.graph.raw.common.VerifyNsPrefixPredicate;
|
||||
import eu.dnetlib.dhp.oa.graph.raw.common.VocabularyGroup;
|
||||
import eu.dnetlib.dhp.schema.oaf.Context;
|
||||
import eu.dnetlib.dhp.schema.oaf.DataInfo;
|
||||
import eu.dnetlib.dhp.schema.oaf.Dataset;
|
||||
|
|
|
@ -1,9 +1,7 @@
|
|||
|
||||
package eu.dnetlib.dhp.oa.graph.raw;
|
||||
|
||||
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.createOpenaireId;
|
||||
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.field;
|
||||
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.structuredProperty;
|
||||
import static eu.dnetlib.dhp.common.OafMapperUtils.*;
|
||||
import static eu.dnetlib.dhp.schema.common.ModelConstants.*;
|
||||
|
||||
import java.util.ArrayList;
|
||||
|
@ -18,7 +16,7 @@ import org.dom4j.Node;
|
|||
import com.google.common.collect.Lists;
|
||||
|
||||
import eu.dnetlib.dhp.common.PacePerson;
|
||||
import eu.dnetlib.dhp.oa.graph.raw.common.VocabularyGroup;
|
||||
import eu.dnetlib.dhp.common.VocabularyGroup;
|
||||
import eu.dnetlib.dhp.schema.oaf.Author;
|
||||
import eu.dnetlib.dhp.schema.oaf.DataInfo;
|
||||
import eu.dnetlib.dhp.schema.oaf.Field;
|
||||
|
|
|
@ -1,9 +1,7 @@
|
|||
|
||||
package eu.dnetlib.dhp.oa.graph.raw;
|
||||
|
||||
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.createOpenaireId;
|
||||
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.field;
|
||||
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.structuredProperty;
|
||||
import static eu.dnetlib.dhp.common.OafMapperUtils.*;
|
||||
import static eu.dnetlib.dhp.schema.common.ModelConstants.*;
|
||||
|
||||
import java.util.ArrayList;
|
||||
|
@ -19,7 +17,7 @@ import org.dom4j.Node;
|
|||
import com.google.common.collect.Lists;
|
||||
|
||||
import eu.dnetlib.dhp.common.PacePerson;
|
||||
import eu.dnetlib.dhp.oa.graph.raw.common.VocabularyGroup;
|
||||
import eu.dnetlib.dhp.common.VocabularyGroup;
|
||||
import eu.dnetlib.dhp.schema.oaf.Author;
|
||||
import eu.dnetlib.dhp.schema.oaf.DataInfo;
|
||||
import eu.dnetlib.dhp.schema.oaf.Field;
|
||||
|
|
|
@ -15,4 +15,8 @@
|
|||
<name>oozie.action.sharelib.for.spark</name>
|
||||
<value>spark2</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>hiveMetastoreUris</name>
|
||||
<value>thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083</value>
|
||||
</property>
|
||||
</configuration>
|
|
@ -2,18 +2,32 @@
|
|||
|
||||
<parameters>
|
||||
<property>
|
||||
<name>graphInputPath</name>
|
||||
<description>the input path to read graph content</description>
|
||||
<name>inputGraph</name>
|
||||
<description>the input graph name (or path)</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>graphOutputPath</name>
|
||||
<description>the target path to store cleaned graph</description>
|
||||
<name>outputGraph</name>
|
||||
<description>the output graph name (or path)</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>isLookupUrl</name>
|
||||
<name>isLookUpUrl</name>
|
||||
<description>the address of the lookUp service</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>inputGraphFormat</name>
|
||||
<value>HIVE</value>
|
||||
<description>the input graph data format</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>outputGraphFormat</name>
|
||||
<value>HIVE</value>
|
||||
<description>the output graph data format</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>hiveMetastoreUris</name>
|
||||
<description>hive server metastore URIs</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>sparkDriverMemory</name>
|
||||
<description>memory for driver process</description>
|
||||
|
@ -48,14 +62,70 @@
|
|||
<name>spark2EventLogDir</name>
|
||||
<description>spark 2.* event log dir location</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>sparkSqlWarehouseDir</name>
|
||||
<value>spark 2.* db directory location</value>
|
||||
</property>
|
||||
</parameters>
|
||||
|
||||
<start to="fork_clean_graph"/>
|
||||
<global>
|
||||
<job-tracker>${jobTracker}</job-tracker>
|
||||
<name-node>${nameNode}</name-node>
|
||||
<configuration>
|
||||
<property>
|
||||
<name>mapreduce.job.queuename</name>
|
||||
<value>${queueName}</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>oozie.launcher.mapred.job.queue.name</name>
|
||||
<value>${oozieLauncherQueueName}</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>oozie.action.sharelib.for.spark</name>
|
||||
<value>${oozieActionShareLibForSpark2}</value>
|
||||
</property>
|
||||
</configuration>
|
||||
</global>
|
||||
|
||||
<start to="should_drop_db"/>
|
||||
|
||||
<kill name="Kill">
|
||||
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
|
||||
</kill>
|
||||
|
||||
<decision name="should_drop_db">
|
||||
<switch>
|
||||
<case to="fork_clean_graph">${wf:conf('outputGraphFormat') eq 'JSON'}</case>
|
||||
<case to="reset_DB">${wf:conf('outputGraphFormat') eq 'HIVE'}</case>
|
||||
<default to="reset_DB"/>
|
||||
</switch>
|
||||
</decision>
|
||||
|
||||
<action name="reset_DB">
|
||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||
<master>yarn</master>
|
||||
<mode>cluster</mode>
|
||||
<name>reset_DB</name>
|
||||
<class>eu.dnetlib.dhp.common.ResetHiveDbApplication</class>
|
||||
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
|
||||
<spark-opts>
|
||||
--executor-memory ${sparkExecutorMemory}
|
||||
--executor-cores ${sparkExecutorCores}
|
||||
--driver-memory=${sparkDriverMemory}
|
||||
--conf spark.extraListeners=${spark2ExtraListeners}
|
||||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
|
||||
--conf spark.sql.shuffle.partitions=7680
|
||||
</spark-opts>
|
||||
<arg>--dbName</arg><arg>${outputGraph}</arg>
|
||||
<arg>--hiveMetastoreUris</arg><arg>${hiveMetastoreUris}</arg>
|
||||
</spark>
|
||||
<ok to="fork_clean_graph"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<fork name="fork_clean_graph">
|
||||
<path start="clean_publication"/>
|
||||
<path start="clean_dataset"/>
|
||||
|
@ -82,12 +152,16 @@
|
|||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
|
||||
--conf spark.sql.shuffle.partitions=7680
|
||||
</spark-opts>
|
||||
<arg>--inputPath</arg><arg>${graphInputPath}/publication</arg>
|
||||
<arg>--outputPath</arg><arg>${graphOutputPath}/publication</arg>
|
||||
<arg>--inputGraph</arg><arg>${inputGraph}</arg>
|
||||
<arg>--outputGraph</arg><arg>${outputGraph}</arg>
|
||||
<arg>--inputGraphFormat</arg><arg>${inputGraphFormat}</arg>
|
||||
<arg>--outputGraphFormat</arg><arg>${outputGraphFormat}</arg>
|
||||
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Publication</arg>
|
||||
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg>
|
||||
<arg>--isLookUpUrl</arg><arg>${isLookUpUrl}</arg>
|
||||
<arg>--hiveMetastoreUris</arg><arg>${hiveMetastoreUris}</arg>
|
||||
</spark>
|
||||
<ok to="wait_clean"/>
|
||||
<error to="Kill"/>
|
||||
|
@ -108,12 +182,16 @@
|
|||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
|
||||
--conf spark.sql.shuffle.partitions=7680
|
||||
</spark-opts>
|
||||
<arg>--inputPath</arg><arg>${graphInputPath}/dataset</arg>
|
||||
<arg>--outputPath</arg><arg>${graphOutputPath}/dataset</arg>
|
||||
<arg>--inputGraph</arg><arg>${inputGraph}</arg>
|
||||
<arg>--outputGraph</arg><arg>${outputGraph}</arg>
|
||||
<arg>--inputGraphFormat</arg><arg>${inputGraphFormat}</arg>
|
||||
<arg>--outputGraphFormat</arg><arg>${outputGraphFormat}</arg>
|
||||
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Dataset</arg>
|
||||
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg>
|
||||
<arg>--isLookUpUrl</arg><arg>${isLookUpUrl}</arg>
|
||||
<arg>--hiveMetastoreUris</arg><arg>${hiveMetastoreUris}</arg>
|
||||
</spark>
|
||||
<ok to="wait_clean"/>
|
||||
<error to="Kill"/>
|
||||
|
@ -134,12 +212,16 @@
|
|||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
|
||||
--conf spark.sql.shuffle.partitions=7680
|
||||
</spark-opts>
|
||||
<arg>--inputPath</arg><arg>${graphInputPath}/otherresearchproduct</arg>
|
||||
<arg>--outputPath</arg><arg>${graphOutputPath}/otherresearchproduct</arg>
|
||||
<arg>--inputGraph</arg><arg>${inputGraph}</arg>
|
||||
<arg>--outputGraph</arg><arg>${outputGraph}</arg>
|
||||
<arg>--inputGraphFormat</arg><arg>${inputGraphFormat}</arg>
|
||||
<arg>--outputGraphFormat</arg><arg>${outputGraphFormat}</arg>
|
||||
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.OtherResearchProduct</arg>
|
||||
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg>
|
||||
<arg>--isLookUpUrl</arg><arg>${isLookUpUrl}</arg>
|
||||
<arg>--hiveMetastoreUris</arg><arg>${hiveMetastoreUris}</arg>
|
||||
</spark>
|
||||
<ok to="wait_clean"/>
|
||||
<error to="Kill"/>
|
||||
|
@ -160,12 +242,16 @@
|
|||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
|
||||
--conf spark.sql.shuffle.partitions=7680
|
||||
</spark-opts>
|
||||
<arg>--inputPath</arg><arg>${graphInputPath}/software</arg>
|
||||
<arg>--outputPath</arg><arg>${graphOutputPath}/software</arg>
|
||||
<arg>--inputGraph</arg><arg>${inputGraph}</arg>
|
||||
<arg>--outputGraph</arg><arg>${outputGraph}</arg>
|
||||
<arg>--inputGraphFormat</arg><arg>${inputGraphFormat}</arg>
|
||||
<arg>--outputGraphFormat</arg><arg>${outputGraphFormat}</arg>
|
||||
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Software</arg>
|
||||
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg>
|
||||
<arg>--isLookUpUrl</arg><arg>${isLookUpUrl}</arg>
|
||||
<arg>--hiveMetastoreUris</arg><arg>${hiveMetastoreUris}</arg>
|
||||
</spark>
|
||||
<ok to="wait_clean"/>
|
||||
<error to="Kill"/>
|
||||
|
@ -186,12 +272,16 @@
|
|||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
|
||||
--conf spark.sql.shuffle.partitions=7680
|
||||
</spark-opts>
|
||||
<arg>--inputPath</arg><arg>${graphInputPath}/datasource</arg>
|
||||
<arg>--outputPath</arg><arg>${graphOutputPath}/datasource</arg>
|
||||
<arg>--inputGraph</arg><arg>${inputGraph}</arg>
|
||||
<arg>--outputGraph</arg><arg>${outputGraph}</arg>
|
||||
<arg>--inputGraphFormat</arg><arg>${inputGraphFormat}</arg>
|
||||
<arg>--outputGraphFormat</arg><arg>${outputGraphFormat}</arg>
|
||||
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Datasource</arg>
|
||||
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg>
|
||||
<arg>--isLookUpUrl</arg><arg>${isLookUpUrl}</arg>
|
||||
<arg>--hiveMetastoreUris</arg><arg>${hiveMetastoreUris}</arg>
|
||||
</spark>
|
||||
<ok to="wait_clean"/>
|
||||
<error to="Kill"/>
|
||||
|
@ -212,12 +302,16 @@
|
|||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
|
||||
--conf spark.sql.shuffle.partitions=7680
|
||||
</spark-opts>
|
||||
<arg>--inputPath</arg><arg>${graphInputPath}/organization</arg>
|
||||
<arg>--outputPath</arg><arg>${graphOutputPath}/organization</arg>
|
||||
<arg>--inputGraph</arg><arg>${inputGraph}</arg>
|
||||
<arg>--outputGraph</arg><arg>${outputGraph}</arg>
|
||||
<arg>--inputGraphFormat</arg><arg>${inputGraphFormat}</arg>
|
||||
<arg>--outputGraphFormat</arg><arg>${outputGraphFormat}</arg>
|
||||
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Organization</arg>
|
||||
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg>
|
||||
<arg>--isLookUpUrl</arg><arg>${isLookUpUrl}</arg>
|
||||
<arg>--hiveMetastoreUris</arg><arg>${hiveMetastoreUris}</arg>
|
||||
</spark>
|
||||
<ok to="wait_clean"/>
|
||||
<error to="Kill"/>
|
||||
|
@ -238,12 +332,16 @@
|
|||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
|
||||
--conf spark.sql.shuffle.partitions=7680
|
||||
</spark-opts>
|
||||
<arg>--inputPath</arg><arg>${graphInputPath}/project</arg>
|
||||
<arg>--outputPath</arg><arg>${graphOutputPath}/project</arg>
|
||||
<arg>--inputGraph</arg><arg>${inputGraph}</arg>
|
||||
<arg>--outputGraph</arg><arg>${outputGraph}</arg>
|
||||
<arg>--inputGraphFormat</arg><arg>${inputGraphFormat}</arg>
|
||||
<arg>--outputGraphFormat</arg><arg>${outputGraphFormat}</arg>
|
||||
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Project</arg>
|
||||
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg>
|
||||
<arg>--isLookUpUrl</arg><arg>${isLookUpUrl}</arg>
|
||||
<arg>--hiveMetastoreUris</arg><arg>${hiveMetastoreUris}</arg>
|
||||
</spark>
|
||||
<ok to="wait_clean"/>
|
||||
<error to="Kill"/>
|
||||
|
@ -264,12 +362,16 @@
|
|||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
|
||||
--conf spark.sql.shuffle.partitions=7680
|
||||
</spark-opts>
|
||||
<arg>--inputPath</arg><arg>${graphInputPath}/relation</arg>
|
||||
<arg>--outputPath</arg><arg>${graphOutputPath}/relation</arg>
|
||||
<arg>--inputGraph</arg><arg>${inputGraph}</arg>
|
||||
<arg>--outputGraph</arg><arg>${outputGraph}</arg>
|
||||
<arg>--inputGraphFormat</arg><arg>${inputGraphFormat}</arg>
|
||||
<arg>--outputGraphFormat</arg><arg>${outputGraphFormat}</arg>
|
||||
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Relation</arg>
|
||||
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg>
|
||||
<arg>--isLookUpUrl</arg><arg>${isLookUpUrl}</arg>
|
||||
<arg>--hiveMetastoreUris</arg><arg>${hiveMetastoreUris}</arg>
|
||||
</spark>
|
||||
<ok to="wait_clean"/>
|
||||
<error to="Kill"/>
|
||||
|
|
|
@ -19,7 +19,7 @@
|
|||
},
|
||||
{
|
||||
"paramName": "isu",
|
||||
"paramLongName": "isLookupUrl",
|
||||
"paramLongName": "isLookUpUrl",
|
||||
"paramDescription": "the url of the ISLookupService",
|
||||
"paramRequired": true
|
||||
}
|
||||
|
|
|
@ -11,6 +11,10 @@
|
|||
<name>oozie.use.system.libpath</name>
|
||||
<value>true</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>oozie.action.sharelib.for.spark</name>
|
||||
<value>spark2</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>hiveMetastoreUris</name>
|
||||
<value>thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083</value>
|
||||
|
@ -19,8 +23,4 @@
|
|||
<name>hiveJdbcUrl</name>
|
||||
<value>jdbc:hive2://iis-cdh5-test-m3.ocean.icm.edu.pl:10000</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>hiveDbName</name>
|
||||
<value>openaire</value>
|
||||
</property>
|
||||
</configuration>
|
|
@ -51,6 +51,10 @@
|
|||
<name>spark2EventLogDir</name>
|
||||
<description>spark 2.* event log dir location</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>sparkSqlWarehouseDir</name>
|
||||
<value>spark 2.* db directory location</value>
|
||||
</property>
|
||||
</parameters>
|
||||
|
||||
<global>
|
||||
|
|
|
@ -6,20 +6,32 @@
|
|||
"paramRequired": false
|
||||
},
|
||||
{
|
||||
"paramName": "in",
|
||||
"paramLongName": "inputPath",
|
||||
"paramDescription": "the path to the graph data dump to read",
|
||||
"paramName": "ig",
|
||||
"paramLongName": "inputGraph",
|
||||
"paramDescription": "the input graph name (or path)",
|
||||
"paramRequired": true
|
||||
},
|
||||
{
|
||||
"paramName": "out",
|
||||
"paramLongName": "outputPath",
|
||||
"paramDescription": "the path to store the output graph",
|
||||
"paramName": "og",
|
||||
"paramLongName": "outputGraph",
|
||||
"paramDescription": "the output graph name (or path)",
|
||||
"paramRequired": true
|
||||
},
|
||||
{
|
||||
"paramName": "igf",
|
||||
"paramLongName": "inputGraphFormat",
|
||||
"paramDescription": "the input graph data format",
|
||||
"paramRequired": true
|
||||
},
|
||||
{
|
||||
"paramName": "ogf",
|
||||
"paramLongName": "outputGraphFormat",
|
||||
"paramDescription": "the output graph data format",
|
||||
"paramRequired": true
|
||||
},
|
||||
{
|
||||
"paramName": "isu",
|
||||
"paramLongName": "isLookupUrl",
|
||||
"paramLongName": "isLookUpUrl",
|
||||
"paramDescription": "url to the ISLookup Service",
|
||||
"paramRequired": true
|
||||
},
|
||||
|
@ -28,5 +40,11 @@
|
|||
"paramLongName": "graphTableClassName",
|
||||
"paramDescription": "class name moelling the graph table",
|
||||
"paramRequired": true
|
||||
},
|
||||
{
|
||||
"paramName": "hmu",
|
||||
"paramLongName": "hiveMetastoreUris",
|
||||
"paramDescription": "the hive metastore uris",
|
||||
"paramRequired": true
|
||||
}
|
||||
]
|
||||
|
|
|
@ -2,22 +2,36 @@
|
|||
|
||||
<parameters>
|
||||
<property>
|
||||
<name>betaInputGgraphPath</name>
|
||||
<description>the beta graph root path</description>
|
||||
<name>betaInputGraph</name>
|
||||
<description>the beta graph name (or path)</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>prodInputGgraphPath</name>
|
||||
<description>the production graph root path</description>
|
||||
<name>prodInputGraph</name>
|
||||
<description>the production graph name (or path)</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>graphOutputPath</name>
|
||||
<description>the output merged graph root path</description>
|
||||
<name>outputGraph</name>
|
||||
<description>the merged graph name (or path)</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>priority</name>
|
||||
<description>decides from which infrastructure the content must win in case of ID clash</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>inputGraphFormat</name>
|
||||
<value>HIVE</value>
|
||||
<description>the input graph data format</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>outputGraphFormat</name>
|
||||
<value>HIVE</value>
|
||||
<description>the output graph data format</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>hiveMetastoreUris</name>
|
||||
<description>hive server metastore URIs</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>sparkDriverMemory</name>
|
||||
<description>memory for driver process</description>
|
||||
|
@ -52,14 +66,51 @@
|
|||
<name>spark2EventLogDir</name>
|
||||
<description>spark 2.* event log dir location</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>sparkSqlWarehouseDir</name>
|
||||
<value>spark 2.* db directory location</value>
|
||||
</property>
|
||||
</parameters>
|
||||
|
||||
<start to="fork_merge_graph"/>
|
||||
<start to="should_drop_db"/>
|
||||
|
||||
<kill name="Kill">
|
||||
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
|
||||
</kill>
|
||||
|
||||
<decision name="should_drop_db">
|
||||
<switch>
|
||||
<case to="fork_merge_graph">${wf:conf('outputGraphFormat') eq 'JSON'}</case>
|
||||
<case to="reset_DB">${wf:conf('outputGraphFormat') eq 'HIVE'}</case>
|
||||
<default to="reset_DB"/>
|
||||
</switch>
|
||||
</decision>
|
||||
|
||||
<action name="reset_DB">
|
||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||
<master>yarn</master>
|
||||
<mode>cluster</mode>
|
||||
<name>reset_DB</name>
|
||||
<class>eu.dnetlib.dhp.common.ResetHiveDbApplication</class>
|
||||
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
|
||||
<spark-opts>
|
||||
--executor-memory ${sparkExecutorMemory}
|
||||
--executor-cores ${sparkExecutorCores}
|
||||
--driver-memory=${sparkDriverMemory}
|
||||
--conf spark.extraListeners=${spark2ExtraListeners}
|
||||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
|
||||
--conf spark.sql.shuffle.partitions=7680
|
||||
</spark-opts>
|
||||
<arg>--dbName</arg><arg>${outputGraph}</arg>
|
||||
<arg>--hiveMetastoreUris</arg><arg>${hiveMetastoreUris}</arg>
|
||||
</spark>
|
||||
<ok to="fork_merge_graph"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<fork name="fork_merge_graph">
|
||||
<path start="merge_publication"/>
|
||||
<path start="merge_dataset"/>
|
||||
|
@ -86,13 +137,17 @@
|
|||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
|
||||
--conf spark.sql.shuffle.partitions=7680
|
||||
</spark-opts>
|
||||
<arg>--betaInputPath</arg><arg>${betaInputGgraphPath}/publication</arg>
|
||||
<arg>--prodInputPath</arg><arg>${prodInputGgraphPath}/publication</arg>
|
||||
<arg>--outputPath</arg><arg>${graphOutputPath}/publication</arg>
|
||||
<arg>--betaInputGraph</arg><arg>${betaInputGraph}</arg>
|
||||
<arg>--prodInputGraph</arg><arg>${prodInputGraph}</arg>
|
||||
<arg>--outputGraph</arg><arg>${outputGraph}</arg>
|
||||
<arg>--inputGraphFormat</arg><arg>${inputGraphFormat}</arg>
|
||||
<arg>--outputGraphFormat</arg><arg>${outputGraphFormat}</arg>
|
||||
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Publication</arg>
|
||||
<arg>--priority</arg><arg>${priority}</arg>
|
||||
<arg>--hiveMetastoreUris</arg><arg>${hiveMetastoreUris}</arg>
|
||||
</spark>
|
||||
<ok to="wait_merge"/>
|
||||
<error to="Kill"/>
|
||||
|
@ -113,13 +168,17 @@
|
|||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
|
||||
--conf spark.sql.shuffle.partitions=7680
|
||||
</spark-opts>
|
||||
<arg>--betaInputPath</arg><arg>${betaInputGgraphPath}/dataset</arg>
|
||||
<arg>--prodInputPath</arg><arg>${prodInputGgraphPath}/dataset</arg>
|
||||
<arg>--outputPath</arg><arg>${graphOutputPath}/dataset</arg>
|
||||
<arg>--betaInputGraph</arg><arg>${betaInputGraph}</arg>
|
||||
<arg>--prodInputGraph</arg><arg>${prodInputGraph}</arg>
|
||||
<arg>--outputGraph</arg><arg>${outputGraph}</arg>
|
||||
<arg>--inputGraphFormat</arg><arg>${inputGraphFormat}</arg>
|
||||
<arg>--outputGraphFormat</arg><arg>${outputGraphFormat}</arg>
|
||||
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Dataset</arg>
|
||||
<arg>--priority</arg><arg>${priority}</arg>
|
||||
<arg>--hiveMetastoreUris</arg><arg>${hiveMetastoreUris}</arg>
|
||||
</spark>
|
||||
<ok to="wait_merge"/>
|
||||
<error to="Kill"/>
|
||||
|
@ -140,13 +199,17 @@
|
|||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
|
||||
--conf spark.sql.shuffle.partitions=7680
|
||||
</spark-opts>
|
||||
<arg>--betaInputPath</arg><arg>${betaInputGgraphPath}/otherresearchproduct</arg>
|
||||
<arg>--prodInputPath</arg><arg>${prodInputGgraphPath}/otherresearchproduct</arg>
|
||||
<arg>--outputPath</arg><arg>${graphOutputPath}/otherresearchproduct</arg>
|
||||
<arg>--betaInputGraph</arg><arg>${betaInputGraph}</arg>
|
||||
<arg>--prodInputGraph</arg><arg>${prodInputGraph}</arg>
|
||||
<arg>--outputGraph</arg><arg>${outputGraph}</arg>
|
||||
<arg>--inputGraphFormat</arg><arg>${inputGraphFormat}</arg>
|
||||
<arg>--outputGraphFormat</arg><arg>${outputGraphFormat}</arg>
|
||||
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.OtherResearchProduct</arg>
|
||||
<arg>--priority</arg><arg>${priority}</arg>
|
||||
<arg>--hiveMetastoreUris</arg><arg>${hiveMetastoreUris}</arg>
|
||||
</spark>
|
||||
<ok to="wait_merge"/>
|
||||
<error to="Kill"/>
|
||||
|
@ -167,13 +230,17 @@
|
|||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
|
||||
--conf spark.sql.shuffle.partitions=7680
|
||||
</spark-opts>
|
||||
<arg>--betaInputPath</arg><arg>${betaInputGgraphPath}/software</arg>
|
||||
<arg>--prodInputPath</arg><arg>${prodInputGgraphPath}/software</arg>
|
||||
<arg>--outputPath</arg><arg>${graphOutputPath}/software</arg>
|
||||
<arg>--betaInputGraph</arg><arg>${betaInputGraph}</arg>
|
||||
<arg>--prodInputGraph</arg><arg>${prodInputGraph}</arg>
|
||||
<arg>--outputGraph</arg><arg>${outputGraph}</arg>
|
||||
<arg>--inputGraphFormat</arg><arg>${inputGraphFormat}</arg>
|
||||
<arg>--outputGraphFormat</arg><arg>${outputGraphFormat}</arg>
|
||||
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Software</arg>
|
||||
<arg>--priority</arg><arg>${priority}</arg>
|
||||
<arg>--hiveMetastoreUris</arg><arg>${hiveMetastoreUris}</arg>
|
||||
</spark>
|
||||
<ok to="wait_merge"/>
|
||||
<error to="Kill"/>
|
||||
|
@ -194,13 +261,17 @@
|
|||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
|
||||
--conf spark.sql.shuffle.partitions=7680
|
||||
</spark-opts>
|
||||
<arg>--betaInputPath</arg><arg>${betaInputGgraphPath}/datasource</arg>
|
||||
<arg>--prodInputPath</arg><arg>${prodInputGgraphPath}/datasource</arg>
|
||||
<arg>--outputPath</arg><arg>${graphOutputPath}/datasource</arg>
|
||||
<arg>--betaInputGraph</arg><arg>${betaInputGraph}</arg>
|
||||
<arg>--prodInputGraph</arg><arg>${prodInputGraph}</arg>
|
||||
<arg>--outputGraph</arg><arg>${outputGraph}</arg>
|
||||
<arg>--inputGraphFormat</arg><arg>${inputGraphFormat}</arg>
|
||||
<arg>--outputGraphFormat</arg><arg>${outputGraphFormat}</arg>
|
||||
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Datasource</arg>
|
||||
<arg>--priority</arg><arg>${priority}</arg>
|
||||
<arg>--hiveMetastoreUris</arg><arg>${hiveMetastoreUris}</arg>
|
||||
</spark>
|
||||
<ok to="wait_merge"/>
|
||||
<error to="Kill"/>
|
||||
|
@ -221,13 +292,17 @@
|
|||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
|
||||
--conf spark.sql.shuffle.partitions=7680
|
||||
</spark-opts>
|
||||
<arg>--betaInputPath</arg><arg>${betaInputGgraphPath}/organization</arg>
|
||||
<arg>--prodInputPath</arg><arg>${prodInputGgraphPath}/organization</arg>
|
||||
<arg>--outputPath</arg><arg>${graphOutputPath}/organization</arg>
|
||||
<arg>--betaInputGraph</arg><arg>${betaInputGraph}</arg>
|
||||
<arg>--prodInputGraph</arg><arg>${prodInputGraph}</arg>
|
||||
<arg>--outputGraph</arg><arg>${outputGraph}</arg>
|
||||
<arg>--inputGraphFormat</arg><arg>${inputGraphFormat}</arg>
|
||||
<arg>--outputGraphFormat</arg><arg>${outputGraphFormat}</arg>
|
||||
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Organization</arg>
|
||||
<arg>--priority</arg><arg>${priority}</arg>
|
||||
<arg>--hiveMetastoreUris</arg><arg>${hiveMetastoreUris}</arg>
|
||||
</spark>
|
||||
<ok to="wait_merge"/>
|
||||
<error to="Kill"/>
|
||||
|
@ -248,13 +323,17 @@
|
|||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
|
||||
--conf spark.sql.shuffle.partitions=7680
|
||||
</spark-opts>
|
||||
<arg>--betaInputPath</arg><arg>${betaInputGgraphPath}/project</arg>
|
||||
<arg>--prodInputPath</arg><arg>${prodInputGgraphPath}/project</arg>
|
||||
<arg>--outputPath</arg><arg>${graphOutputPath}/project</arg>
|
||||
<arg>--betaInputGraph</arg><arg>${betaInputGraph}</arg>
|
||||
<arg>--prodInputGraph</arg><arg>${prodInputGraph}</arg>
|
||||
<arg>--outputGraph</arg><arg>${outputGraph}</arg>
|
||||
<arg>--inputGraphFormat</arg><arg>${inputGraphFormat}</arg>
|
||||
<arg>--outputGraphFormat</arg><arg>${outputGraphFormat}</arg>
|
||||
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Project</arg>
|
||||
<arg>--priority</arg><arg>${priority}</arg>
|
||||
<arg>--hiveMetastoreUris</arg><arg>${hiveMetastoreUris}</arg>
|
||||
</spark>
|
||||
<ok to="wait_merge"/>
|
||||
<error to="Kill"/>
|
||||
|
@ -275,13 +354,17 @@
|
|||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
|
||||
--conf spark.sql.shuffle.partitions=7680
|
||||
</spark-opts>
|
||||
<arg>--betaInputPath</arg><arg>${betaInputGgraphPath}/relation</arg>
|
||||
<arg>--prodInputPath</arg><arg>${prodInputGgraphPath}/relation</arg>
|
||||
<arg>--outputPath</arg><arg>${graphOutputPath}/relation</arg>
|
||||
<arg>--betaInputGraph</arg><arg>${betaInputGraph}</arg>
|
||||
<arg>--prodInputGraph</arg><arg>${prodInputGraph}</arg>
|
||||
<arg>--outputGraph</arg><arg>${outputGraph}</arg>
|
||||
<arg>--inputGraphFormat</arg><arg>${inputGraphFormat}</arg>
|
||||
<arg>--outputGraphFormat</arg><arg>${outputGraphFormat}</arg>
|
||||
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Relation</arg>
|
||||
<arg>--priority</arg><arg>${priority}</arg>
|
||||
<arg>--hiveMetastoreUris</arg><arg>${hiveMetastoreUris}</arg>
|
||||
</spark>
|
||||
<ok to="wait_merge"/>
|
||||
<error to="Kill"/>
|
||||
|
|
|
@ -6,21 +6,27 @@
|
|||
"paramRequired": false
|
||||
},
|
||||
{
|
||||
"paramName": "rgp",
|
||||
"paramLongName": "rawGraphPath",
|
||||
"paramDescription": "the raw graph path",
|
||||
"paramName": "og",
|
||||
"paramLongName": "outputGraph",
|
||||
"paramDescription": "the output graph (db name | path)",
|
||||
"paramRequired": true
|
||||
},
|
||||
{
|
||||
"paramName": "gf",
|
||||
"paramLongName": "graphFormat",
|
||||
"paramDescription": "graph save format (json|parquet)",
|
||||
"paramRequired": true
|
||||
},
|
||||
{
|
||||
"paramName": "cgp",
|
||||
"paramLongName": "claimsGraphPath",
|
||||
"paramDescription": "the path of the claims graph",
|
||||
"paramDescription": "the path of the claims graph, from the working directory",
|
||||
"paramRequired": true
|
||||
},
|
||||
{
|
||||
"paramName": "ogp",
|
||||
"paramLongName": "outputRawGaphPath",
|
||||
"paramDescription": "the path of output graph, combining raw and claims",
|
||||
"paramName": "rgp",
|
||||
"paramLongName": "rawGraphPath",
|
||||
"paramDescription": "the path of raw graph, from the working directory",
|
||||
"paramRequired": true
|
||||
},
|
||||
{
|
||||
|
@ -28,5 +34,11 @@
|
|||
"paramLongName": "graphTableClassName",
|
||||
"paramDescription": "class name associated to the input entity path",
|
||||
"paramRequired": true
|
||||
},
|
||||
{
|
||||
"paramName": "hmu",
|
||||
"paramLongName": "hiveMetastoreUris",
|
||||
"paramDescription": "the hive metastore uris",
|
||||
"paramRequired": true
|
||||
}
|
||||
]
|
|
@ -7,20 +7,32 @@
|
|||
},
|
||||
{
|
||||
"paramName": "bin",
|
||||
"paramLongName": "betaInputPath",
|
||||
"paramDescription": "the beta graph root path",
|
||||
"paramLongName": "betaInputGraph",
|
||||
"paramDescription": "the beta graph name (or path)",
|
||||
"paramRequired": true
|
||||
},
|
||||
{
|
||||
"paramName": "pin",
|
||||
"paramLongName": "prodInputPath",
|
||||
"paramDescription": "the production graph root path",
|
||||
"paramLongName": "prodInputGraph",
|
||||
"paramDescription": "the production graph name (or path)",
|
||||
"paramRequired": true
|
||||
},
|
||||
{
|
||||
"paramName": "out",
|
||||
"paramLongName": "outputPath",
|
||||
"paramDescription": "the output merged graph root path",
|
||||
"paramLongName": "outputGraph",
|
||||
"paramDescription": "the output merged graph (or path)",
|
||||
"paramRequired": true
|
||||
},
|
||||
{
|
||||
"paramName": "igf",
|
||||
"paramLongName": "inputGraphFormat",
|
||||
"paramDescription": "the format of the input graphs",
|
||||
"paramRequired": true
|
||||
},
|
||||
{
|
||||
"paramName": "ogf",
|
||||
"paramLongName": "outputGraphFormat",
|
||||
"paramDescription": "the format of the output merged graph",
|
||||
"paramRequired": true
|
||||
},
|
||||
{
|
||||
|
@ -34,5 +46,11 @@
|
|||
"paramLongName": "priority",
|
||||
"paramDescription": "decides from which infrastructure the content must win in case of ID clash",
|
||||
"paramRequired": false
|
||||
},
|
||||
{
|
||||
"paramName": "hmu",
|
||||
"paramLongName": "hiveMetastoreUris",
|
||||
"paramDescription": "the hive metastore uris",
|
||||
"paramRequired": true
|
||||
}
|
||||
]
|
|
@ -2,8 +2,13 @@
|
|||
|
||||
<parameters>
|
||||
<property>
|
||||
<name>graphOutputPath</name>
|
||||
<description>the target path to store raw graph</description>
|
||||
<name>outputGraph</name>
|
||||
<description>the target graph name (or path)</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>graphFormat</name>
|
||||
<value>HIVE</value>
|
||||
<description>the graph data format</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>reuseContent</name>
|
||||
|
@ -40,7 +45,7 @@
|
|||
<description>mongo database</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>isLookupUrl</name>
|
||||
<name>isLookUpUrl</name>
|
||||
<description>the address of the lookUp service</description>
|
||||
</property>
|
||||
<property>
|
||||
|
@ -48,6 +53,11 @@
|
|||
<value></value>
|
||||
<description>a blacklist of nsprefixes (comma separeted)</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>hiveMetastoreUris</name>
|
||||
<description>hive server metastore URIs</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>sparkDriverMemory</name>
|
||||
<description>memory for driver process</description>
|
||||
|
@ -82,6 +92,10 @@
|
|||
<name>spark2EventLogDir</name>
|
||||
<description>spark 2.* event log dir location</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>sparkSqlWarehouseDir</name>
|
||||
<value>spark 2.* db directory location</value>
|
||||
</property>
|
||||
</parameters>
|
||||
|
||||
<global>
|
||||
|
@ -132,7 +146,7 @@
|
|||
<arg>--postgresUrl</arg><arg>${postgresURL}</arg>
|
||||
<arg>--postgresUser</arg><arg>${postgresUser}</arg>
|
||||
<arg>--postgresPassword</arg><arg>${postgresPassword}</arg>
|
||||
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg>
|
||||
<arg>--isLookUpUrl</arg><arg>${isLookUpUrl}</arg>
|
||||
<arg>--action</arg><arg>claims</arg>
|
||||
<arg>--dbschema</arg><arg>${dbSchema}</arg>
|
||||
<arg>--nsPrefixBlacklist</arg><arg>${nsPrefixBlacklist}</arg>
|
||||
|
@ -185,7 +199,7 @@
|
|||
<arg>--postgresUrl</arg><arg>${postgresURL}</arg>
|
||||
<arg>--postgresUser</arg><arg>${postgresUser}</arg>
|
||||
<arg>--postgresPassword</arg><arg>${postgresPassword}</arg>
|
||||
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg>
|
||||
<arg>--isLookUpUrl</arg><arg>${isLookUpUrl}</arg>
|
||||
<arg>--dbschema</arg><arg>${dbSchema}</arg>
|
||||
<arg>--nsPrefixBlacklist</arg><arg>${nsPrefixBlacklist}</arg>
|
||||
</java>
|
||||
|
@ -269,7 +283,7 @@
|
|||
</spark-opts>
|
||||
<arg>--sourcePaths</arg><arg>${contentPath}/db_claims,${contentPath}/oaf_claims,${contentPath}/odf_claims,${contentPath}/oaf_records_invisible</arg>
|
||||
<arg>--targetPath</arg><arg>${workingDir}/entities_claim</arg>
|
||||
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg>
|
||||
<arg>--isLookUpUrl</arg><arg>${isLookUpUrl}</arg>
|
||||
</spark>
|
||||
<ok to="GenerateGraph_claims"/>
|
||||
<error to="Kill"/>
|
||||
|
@ -316,7 +330,7 @@
|
|||
</spark-opts>
|
||||
<arg>--sourcePaths</arg><arg>${contentPath}/db_records,${contentPath}/oaf_records,${contentPath}/odf_records</arg>
|
||||
<arg>--targetPath</arg><arg>${workingDir}/entities</arg>
|
||||
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg>
|
||||
<arg>--isLookUpUrl</arg><arg>${isLookUpUrl}</arg>
|
||||
</spark>
|
||||
<ok to="GenerateGraph"/>
|
||||
<error to="Kill"/>
|
||||
|
@ -346,7 +360,40 @@
|
|||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<join name="wait_graphs" to="fork_merge_claims"/>
|
||||
<join name="wait_graphs" to="should_drop_db"/>
|
||||
|
||||
<decision name="should_drop_db">
|
||||
<switch>
|
||||
<case to="fork_merge_claims">${wf:conf('graphFormat') eq 'JSON'}</case>
|
||||
<case to="reset_DB">${wf:conf('graphFormat') eq 'HIVE'}</case>
|
||||
<default to="reset_DB"/>
|
||||
</switch>
|
||||
</decision>
|
||||
|
||||
<action name="reset_DB">
|
||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||
<master>yarn</master>
|
||||
<mode>cluster</mode>
|
||||
<name>reset_DB</name>
|
||||
<class>eu.dnetlib.dhp.common.ResetHiveDbApplication</class>
|
||||
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
|
||||
<spark-opts>
|
||||
--executor-memory ${sparkExecutorMemory}
|
||||
--executor-cores ${sparkExecutorCores}
|
||||
--driver-memory=${sparkDriverMemory}
|
||||
--conf spark.extraListeners=${spark2ExtraListeners}
|
||||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
|
||||
--conf spark.sql.shuffle.partitions=7680
|
||||
</spark-opts>
|
||||
<arg>--dbName</arg><arg>${outputGraph}</arg>
|
||||
<arg>--hiveMetastoreUris</arg><arg>${hiveMetastoreUris}</arg>
|
||||
</spark>
|
||||
<ok to="fork_merge_claims"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<fork name="fork_merge_claims">
|
||||
<path start="merge_claims_publication"/>
|
||||
|
@ -359,7 +406,6 @@
|
|||
<path start="merge_claims_relation"/>
|
||||
</fork>
|
||||
|
||||
|
||||
<action name="merge_claims_publication">
|
||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||
<master>yarn</master>
|
||||
|
@ -375,12 +421,15 @@
|
|||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
|
||||
--conf spark.sql.shuffle.partitions=7680
|
||||
</spark-opts>
|
||||
<arg>--rawGraphPath</arg><arg>${workingDir}/graph_raw</arg>
|
||||
<arg>--claimsGraphPath</arg><arg>${workingDir}/graph_claims</arg>
|
||||
<arg>--outputRawGaphPath</arg><arg>${graphOutputPath}</arg>
|
||||
<arg>--outputGraph</arg><arg>${outputGraph}</arg>
|
||||
<arg>--graphFormat</arg><arg>${graphFormat}</arg>
|
||||
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Publication</arg>
|
||||
<arg>--hiveMetastoreUris</arg><arg>${hiveMetastoreUris}</arg>
|
||||
</spark>
|
||||
<ok to="wait_merge"/>
|
||||
<error to="Kill"/>
|
||||
|
@ -401,12 +450,15 @@
|
|||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
|
||||
--conf spark.sql.shuffle.partitions=7680
|
||||
</spark-opts>
|
||||
<arg>--rawGraphPath</arg><arg>${workingDir}/graph_raw</arg>
|
||||
<arg>--claimsGraphPath</arg><arg>${workingDir}/graph_claims</arg>
|
||||
<arg>--outputRawGaphPath</arg><arg>${graphOutputPath}</arg>
|
||||
<arg>--outputGraph</arg><arg>${outputGraph}</arg>
|
||||
<arg>--graphFormat</arg><arg>${graphFormat}</arg>
|
||||
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Dataset</arg>
|
||||
<arg>--hiveMetastoreUris</arg><arg>${hiveMetastoreUris}</arg>
|
||||
</spark>
|
||||
<ok to="wait_merge"/>
|
||||
<error to="Kill"/>
|
||||
|
@ -427,12 +479,15 @@
|
|||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
|
||||
--conf spark.sql.shuffle.partitions=3840
|
||||
</spark-opts>
|
||||
<arg>--rawGraphPath</arg><arg>${workingDir}/graph_raw</arg>
|
||||
<arg>--claimsGraphPath</arg><arg>${workingDir}/graph_claims</arg>
|
||||
<arg>--outputRawGaphPath</arg><arg>${graphOutputPath}</arg>
|
||||
<arg>--outputGraph</arg><arg>${outputGraph}</arg>
|
||||
<arg>--graphFormat</arg><arg>${graphFormat}</arg>
|
||||
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Relation</arg>
|
||||
<arg>--hiveMetastoreUris</arg><arg>${hiveMetastoreUris}</arg>
|
||||
</spark>
|
||||
<ok to="wait_merge"/>
|
||||
<error to="Kill"/>
|
||||
|
@ -453,12 +508,15 @@
|
|||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
|
||||
--conf spark.sql.shuffle.partitions=1920
|
||||
</spark-opts>
|
||||
<arg>--rawGraphPath</arg><arg>${workingDir}/graph_raw</arg>
|
||||
<arg>--claimsGraphPath</arg><arg>${workingDir}/graph_claims</arg>
|
||||
<arg>--outputRawGaphPath</arg><arg>${graphOutputPath}</arg>
|
||||
<arg>--outputGraph</arg><arg>${outputGraph}</arg>
|
||||
<arg>--graphFormat</arg><arg>${graphFormat}</arg>
|
||||
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Software</arg>
|
||||
<arg>--hiveMetastoreUris</arg><arg>${hiveMetastoreUris}</arg>
|
||||
</spark>
|
||||
<ok to="wait_merge"/>
|
||||
<error to="Kill"/>
|
||||
|
@ -479,12 +537,15 @@
|
|||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
|
||||
--conf spark.sql.shuffle.partitions=1920
|
||||
</spark-opts>
|
||||
<arg>--rawGraphPath</arg><arg>${workingDir}/graph_raw</arg>
|
||||
<arg>--claimsGraphPath</arg><arg>${workingDir}/graph_claims</arg>
|
||||
<arg>--outputRawGaphPath</arg><arg>${graphOutputPath}</arg>
|
||||
<arg>--outputGraph</arg><arg>${outputGraph}</arg>
|
||||
<arg>--graphFormat</arg><arg>${graphFormat}</arg>
|
||||
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.OtherResearchProduct</arg>
|
||||
<arg>--hiveMetastoreUris</arg><arg>${hiveMetastoreUris}</arg>
|
||||
</spark>
|
||||
<ok to="wait_merge"/>
|
||||
<error to="Kill"/>
|
||||
|
@ -505,12 +566,15 @@
|
|||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
|
||||
--conf spark.sql.shuffle.partitions=200
|
||||
</spark-opts>
|
||||
<arg>--rawGraphPath</arg><arg>${workingDir}/graph_raw</arg>
|
||||
<arg>--claimsGraphPath</arg><arg>${workingDir}/graph_claims</arg>
|
||||
<arg>--outputRawGaphPath</arg><arg>${graphOutputPath}</arg>
|
||||
<arg>--outputGraph</arg><arg>${outputGraph}</arg>
|
||||
<arg>--graphFormat</arg><arg>${graphFormat}</arg>
|
||||
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Datasource</arg>
|
||||
<arg>--hiveMetastoreUris</arg><arg>${hiveMetastoreUris}</arg>
|
||||
</spark>
|
||||
<ok to="wait_merge"/>
|
||||
<error to="Kill"/>
|
||||
|
@ -531,12 +595,15 @@
|
|||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
|
||||
--conf spark.sql.shuffle.partitions=200
|
||||
</spark-opts>
|
||||
<arg>--rawGraphPath</arg><arg>${workingDir}/graph_raw</arg>
|
||||
<arg>--claimsGraphPath</arg><arg>${workingDir}/graph_claims</arg>
|
||||
<arg>--outputRawGaphPath</arg><arg>${graphOutputPath}</arg>
|
||||
<arg>--outputGraph</arg><arg>${outputGraph}</arg>
|
||||
<arg>--graphFormat</arg><arg>${graphFormat}</arg>
|
||||
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Organization</arg>
|
||||
<arg>--hiveMetastoreUris</arg><arg>${hiveMetastoreUris}</arg>
|
||||
</spark>
|
||||
<ok to="wait_merge"/>
|
||||
<error to="Kill"/>
|
||||
|
@ -557,12 +624,15 @@
|
|||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
|
||||
--conf spark.sql.shuffle.partitions=200
|
||||
</spark-opts>
|
||||
<arg>--rawGraphPath</arg><arg>${workingDir}/graph_raw</arg>
|
||||
<arg>--claimsGraphPath</arg><arg>${workingDir}/graph_claims</arg>
|
||||
<arg>--outputRawGaphPath</arg><arg>${graphOutputPath}</arg>
|
||||
<arg>--outputGraph</arg><arg>${outputGraph}</arg>
|
||||
<arg>--graphFormat</arg><arg>${graphFormat}</arg>
|
||||
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Project</arg>
|
||||
<arg>--hiveMetastoreUris</arg><arg>${hiveMetastoreUris}</arg>
|
||||
</spark>
|
||||
<ok to="wait_merge"/>
|
||||
<error to="Kill"/>
|
||||
|
|
|
@ -7,8 +7,6 @@ import static org.mockito.Mockito.lenient;
|
|||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.function.Predicate;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import org.apache.commons.io.IOUtils;
|
||||
|
@ -20,8 +18,12 @@ import org.mockito.junit.jupiter.MockitoExtension;
|
|||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
|
||||
import eu.dnetlib.dhp.oa.graph.raw.common.VocabularyGroup;
|
||||
import eu.dnetlib.dhp.schema.oaf.*;
|
||||
import eu.dnetlib.dhp.common.CleaningRuleMap;
|
||||
import eu.dnetlib.dhp.common.OafCleaner;
|
||||
import eu.dnetlib.dhp.common.VocabularyGroup;
|
||||
import eu.dnetlib.dhp.schema.oaf.Publication;
|
||||
import eu.dnetlib.dhp.schema.oaf.Qualifier;
|
||||
import eu.dnetlib.dhp.schema.oaf.Result;
|
||||
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
|
||||
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
|
||||
|
||||
|
@ -62,12 +64,10 @@ public class CleaningFunctionTest {
|
|||
assertTrue(p_in instanceof Result);
|
||||
assertTrue(p_in instanceof Publication);
|
||||
|
||||
Publication p_out = OafCleaner.apply(CleanGraphSparkJob.fixVocabularyNames(p_in), mapping);
|
||||
Publication p_out = OafCleaner.apply(p_in, mapping);
|
||||
|
||||
assertNotNull(p_out);
|
||||
|
||||
assertNotNull(p_out.getPublisher());
|
||||
assertNull(p_out.getPublisher().getValue());
|
||||
assertEquals("und", p_out.getLanguage().getClassid());
|
||||
assertEquals("Undetermined", p_out.getLanguage().getClassname());
|
||||
|
||||
|
@ -90,16 +90,6 @@ public class CleaningFunctionTest {
|
|||
|
||||
Publication p_defaults = CleanGraphSparkJob.fixDefaults(p_out);
|
||||
assertEquals("CLOSED", p_defaults.getBestaccessright().getClassid());
|
||||
assertNull(p_out.getPublisher());
|
||||
|
||||
getAuthorPids(p_defaults).forEach(pid -> {
|
||||
System.out
|
||||
.println(
|
||||
String
|
||||
.format(
|
||||
"%s [%s - %s]", pid.getValue(), pid.getQualifier().getClassid(),
|
||||
pid.getQualifier().getClassname()));
|
||||
});
|
||||
|
||||
// TODO add more assertions to verity the cleaned values
|
||||
System.out.println(MAPPER.writeValueAsString(p_out));
|
||||
|
@ -109,7 +99,7 @@ public class CleaningFunctionTest {
|
|||
*/
|
||||
}
|
||||
|
||||
private Stream<Qualifier> getAuthorPidTypes(Result pub) {
|
||||
private Stream<Qualifier> getAuthorPidTypes(Publication pub) {
|
||||
return pub
|
||||
.getAuthor()
|
||||
.stream()
|
||||
|
@ -118,14 +108,6 @@ public class CleaningFunctionTest {
|
|||
.map(s -> s.getQualifier());
|
||||
}
|
||||
|
||||
private Stream<StructuredProperty> getAuthorPids(Result pub) {
|
||||
return pub
|
||||
.getAuthor()
|
||||
.stream()
|
||||
.map(a -> a.getPid())
|
||||
.flatMap(p -> p.stream());
|
||||
}
|
||||
|
||||
private List<String> vocs() throws IOException {
|
||||
return IOUtils
|
||||
.readLines(CleaningFunctionTest.class.getResourceAsStream("/eu/dnetlib/dhp/oa/graph/clean/terms.txt"));
|
||||
|
|
|
@ -21,8 +21,8 @@ import org.mockito.junit.jupiter.MockitoExtension;
|
|||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
|
||||
import eu.dnetlib.dhp.common.VocabularyGroup;
|
||||
import eu.dnetlib.dhp.oa.graph.clean.CleaningFunctionTest;
|
||||
import eu.dnetlib.dhp.oa.graph.raw.common.VocabularyGroup;
|
||||
import eu.dnetlib.dhp.schema.common.ModelConstants;
|
||||
import eu.dnetlib.dhp.schema.oaf.Author;
|
||||
import eu.dnetlib.dhp.schema.oaf.Dataset;
|
||||
|
|
|
@ -27,8 +27,8 @@ import org.mockito.junit.jupiter.MockitoExtension;
|
|||
import com.fasterxml.jackson.core.type.TypeReference;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
|
||||
import eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils;
|
||||
import eu.dnetlib.dhp.oa.graph.raw.common.VocabularyGroup;
|
||||
import eu.dnetlib.dhp.common.OafMapperUtils;
|
||||
import eu.dnetlib.dhp.common.VocabularyGroup;
|
||||
import eu.dnetlib.dhp.schema.oaf.Datasource;
|
||||
import eu.dnetlib.dhp.schema.oaf.Oaf;
|
||||
import eu.dnetlib.dhp.schema.oaf.Organization;
|
||||
|
|
|
@ -4,6 +4,7 @@ package eu.dnetlib.dhp.oa.provision;
|
|||
import java.io.IOException;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.commons.io.FileUtils;
|
||||
import org.apache.spark.SparkConf;
|
||||
|
@ -20,8 +21,6 @@ import org.junit.jupiter.api.io.TempDir;
|
|||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
|
||||
import eu.dnetlib.dhp.oa.provision.model.ProvisionModelSupport;
|
||||
import eu.dnetlib.dhp.schema.oaf.Relation;
|
||||
|
||||
|
@ -31,7 +30,8 @@ public class PrepareRelationsJobTest {
|
|||
|
||||
public static final String SUBRELTYPE = "subRelType";
|
||||
public static final String OUTCOME = "outcome";
|
||||
public static final String SUPPLEMENT = "supplement";
|
||||
public static final String PARTICIPATION = "participation";
|
||||
public static final String AFFILIATION = "affiliation";
|
||||
|
||||
private static SparkSession spark;
|
||||
|
||||
|
@ -64,7 +64,7 @@ public class PrepareRelationsJobTest {
|
|||
@Test
|
||||
public void testRunPrepareRelationsJob(@TempDir Path testPath) throws Exception {
|
||||
|
||||
final int maxRelations = 10;
|
||||
final int maxRelations = 20;
|
||||
PrepareRelationsJob
|
||||
.main(
|
||||
new String[] {
|
||||
|
@ -73,7 +73,8 @@ public class PrepareRelationsJobTest {
|
|||
"-outputPath", testPath.toString(),
|
||||
"-relPartitions", "10",
|
||||
"-relationFilter", "asd",
|
||||
"-maxRelations", String.valueOf(maxRelations)
|
||||
"-sourceMaxRelations", String.valueOf(maxRelations),
|
||||
"-targetMaxRelations", String.valueOf(maxRelations * 100)
|
||||
});
|
||||
|
||||
Dataset<Relation> out = spark
|
||||
|
@ -82,19 +83,31 @@ public class PrepareRelationsJobTest {
|
|||
.as(Encoders.bean(Relation.class))
|
||||
.cache();
|
||||
|
||||
Assertions.assertEquals(10, out.count());
|
||||
Assertions.assertEquals(maxRelations, out.count());
|
||||
|
||||
Dataset<Row> freq = out
|
||||
.toDF()
|
||||
.cube(SUBRELTYPE)
|
||||
.count()
|
||||
.filter((FilterFunction<Row>) value -> !value.isNullAt(0));
|
||||
long outcome = freq.filter(freq.col(SUBRELTYPE).equalTo(OUTCOME)).collectAsList().get(0).getAs("count");
|
||||
long supplement = freq.filter(freq.col(SUBRELTYPE).equalTo(SUPPLEMENT)).collectAsList().get(0).getAs("count");
|
||||
|
||||
Assertions.assertTrue(outcome > supplement);
|
||||
log.info(freq.collectAsList().toString());
|
||||
|
||||
long outcome = getRows(freq, OUTCOME).get(0).getAs("count");
|
||||
long participation = getRows(freq, PARTICIPATION).get(0).getAs("count");
|
||||
long affiliation = getRows(freq, AFFILIATION).get(0).getAs("count");
|
||||
|
||||
Assertions.assertTrue(participation == outcome);
|
||||
Assertions.assertTrue(outcome > affiliation);
|
||||
Assertions.assertTrue(participation > affiliation);
|
||||
|
||||
Assertions.assertEquals(7, outcome);
|
||||
Assertions.assertEquals(3, supplement);
|
||||
Assertions.assertEquals(7, participation);
|
||||
Assertions.assertEquals(6, affiliation);
|
||||
}
|
||||
|
||||
protected List<Row> getRows(Dataset<Row> freq, String col) {
|
||||
return freq.filter(freq.col(SUBRELTYPE).equalTo(col)).collectAsList();
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -0,0 +1,32 @@
|
|||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<parent>
|
||||
<artifactId>dhp-workflows</artifactId>
|
||||
<groupId>eu.dnetlib.dhp</groupId>
|
||||
<version>1.2.4-SNAPSHOT</version>
|
||||
</parent>
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<artifactId>dhp-workflows-common</artifactId>
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.apache.spark</groupId>
|
||||
<artifactId>spark-core_2.11</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.spark</groupId>
|
||||
<artifactId>spark-sql_2.11</artifactId>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>eu.dnetlib.dhp</groupId>
|
||||
<artifactId>dhp-common</artifactId>
|
||||
<version>${project.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>eu.dnetlib.dhp</groupId>
|
||||
<artifactId>dhp-schemas</artifactId>
|
||||
<version>${project.version}</version>
|
||||
</dependency>
|
||||
|
||||
</dependencies>
|
||||
</project>
|
|
@ -1,5 +1,5 @@
|
|||
|
||||
package eu.dnetlib.dhp.oa.graph.clean;
|
||||
package eu.dnetlib.dhp.common;
|
||||
|
||||
import java.io.Serializable;
|
||||
import java.util.HashMap;
|
||||
|
@ -7,7 +7,6 @@ import java.util.HashMap;
|
|||
import org.apache.commons.lang3.StringUtils;
|
||||
|
||||
import eu.dnetlib.dhp.common.FunctionalInterfaceSupport.SerializableConsumer;
|
||||
import eu.dnetlib.dhp.oa.graph.raw.common.VocabularyGroup;
|
||||
import eu.dnetlib.dhp.schema.common.ModelConstants;
|
||||
import eu.dnetlib.dhp.schema.oaf.Country;
|
||||
import eu.dnetlib.dhp.schema.oaf.Qualifier;
|
|
@ -0,0 +1,10 @@
|
|||
|
||||
package eu.dnetlib.dhp.common;
|
||||
|
||||
public enum GraphFormat {
|
||||
|
||||
JSON, HIVE;
|
||||
|
||||
public static GraphFormat DEFAULT = JSON;
|
||||
|
||||
}
|
|
@ -0,0 +1,99 @@
|
|||
|
||||
package eu.dnetlib.dhp.common;
|
||||
|
||||
import java.util.Objects;
|
||||
|
||||
import org.apache.spark.api.java.function.FilterFunction;
|
||||
import org.apache.spark.api.java.function.MapFunction;
|
||||
import org.apache.spark.sql.*;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
|
||||
import eu.dnetlib.dhp.schema.common.ModelSupport;
|
||||
import eu.dnetlib.dhp.schema.oaf.Oaf;
|
||||
|
||||
public class GraphSupport {
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(GraphSupport.class);
|
||||
|
||||
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
|
||||
|
||||
public static <T extends Oaf> void deleteGraphTable(SparkSession spark, Class<T> clazz, String outputGraph,
|
||||
GraphFormat graphFormat) {
|
||||
switch (graphFormat) {
|
||||
|
||||
case JSON:
|
||||
String outPath = outputGraph + "/" + clazz.getSimpleName().toLowerCase();
|
||||
removeOutputDir(spark, outPath);
|
||||
break;
|
||||
case HIVE:
|
||||
String table = ModelSupport.tableIdentifier(outputGraph, clazz);
|
||||
String sql = String.format("DROP TABLE IF EXISTS %s PURGE", table);
|
||||
log.info("running SQL: '{}'", sql);
|
||||
spark.sql(sql);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
public static <T extends Oaf> void saveGraphTable(Dataset<T> dataset, Class<T> clazz, String outputGraph,
|
||||
GraphFormat graphFormat) {
|
||||
|
||||
final DataFrameWriter<T> writer = dataset.write().mode(SaveMode.Overwrite);
|
||||
switch (graphFormat) {
|
||||
case JSON:
|
||||
String type = clazz.getSimpleName().toLowerCase();
|
||||
String outPath = outputGraph + "/" + type;
|
||||
|
||||
log.info("saving graph to path {},", outPath);
|
||||
|
||||
writer.option("compression", "gzip").json(outPath);
|
||||
break;
|
||||
case HIVE:
|
||||
final String db_table = ModelSupport.tableIdentifier(outputGraph, clazz);
|
||||
|
||||
log.info("saving graph to '{}'", db_table);
|
||||
|
||||
writer.saveAsTable(db_table);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
public static <T extends Oaf> Dataset<T> readGraph(
|
||||
SparkSession spark, String graph, Class<T> clazz, GraphFormat format) {
|
||||
|
||||
log.info("reading graph {}, format {}, class {}", graph, format, clazz);
|
||||
|
||||
Encoder<T> encoder = Encoders.bean(clazz);
|
||||
|
||||
switch (format) {
|
||||
case JSON:
|
||||
String path = graph + "/" + clazz.getSimpleName().toLowerCase();
|
||||
log.info("reading path {}", path);
|
||||
return spark
|
||||
.read()
|
||||
.textFile(path)
|
||||
.map(
|
||||
(MapFunction<String, T>) value -> OBJECT_MAPPER.readValue(value, clazz),
|
||||
encoder)
|
||||
.filter((FilterFunction<T>) value -> Objects.nonNull(ModelSupport.idFn().apply(value)));
|
||||
case HIVE:
|
||||
String table = ModelSupport.tableIdentifier(graph, clazz);
|
||||
log.info("reading table {}", table);
|
||||
|
||||
return spark.table(table).as(encoder);
|
||||
default:
|
||||
throw new IllegalStateException(String.format("format not managed: '%s'", format));
|
||||
}
|
||||
}
|
||||
|
||||
public static <T extends Oaf> Dataset<T> readGraphJSON(SparkSession spark, String graph, Class<T> clazz) {
|
||||
return readGraph(spark, graph, clazz, GraphFormat.JSON);
|
||||
}
|
||||
|
||||
private static void removeOutputDir(SparkSession spark, String path) {
|
||||
HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration());
|
||||
}
|
||||
|
||||
}
|
|
@ -1,5 +1,5 @@
|
|||
|
||||
package eu.dnetlib.dhp.oa.graph.clean;
|
||||
package eu.dnetlib.dhp.common;
|
||||
|
||||
import java.io.Serializable;
|
||||
import java.lang.reflect.Field;
|
|
@ -1,11 +1,7 @@
|
|||
|
||||
package eu.dnetlib.dhp.oa.graph.raw.common;
|
||||
package eu.dnetlib.dhp.common;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.function.Function;
|
||||
import java.util.function.Predicate;
|
||||
|
@ -13,15 +9,7 @@ import java.util.stream.Collectors;
|
|||
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
|
||||
import eu.dnetlib.dhp.schema.oaf.DataInfo;
|
||||
import eu.dnetlib.dhp.schema.oaf.ExtraInfo;
|
||||
import eu.dnetlib.dhp.schema.oaf.Field;
|
||||
import eu.dnetlib.dhp.schema.oaf.Journal;
|
||||
import eu.dnetlib.dhp.schema.oaf.KeyValue;
|
||||
import eu.dnetlib.dhp.schema.oaf.OAIProvenance;
|
||||
import eu.dnetlib.dhp.schema.oaf.OriginDescription;
|
||||
import eu.dnetlib.dhp.schema.oaf.Qualifier;
|
||||
import eu.dnetlib.dhp.schema.oaf.StructuredProperty;
|
||||
import eu.dnetlib.dhp.schema.oaf.*;
|
||||
import eu.dnetlib.dhp.utils.DHPUtils;
|
||||
|
||||
public class OafMapperUtils {
|
|
@ -0,0 +1,60 @@
|
|||
|
||||
package eu.dnetlib.dhp.common;
|
||||
|
||||
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession;
|
||||
|
||||
import java.util.Optional;
|
||||
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.spark.SparkConf;
|
||||
import org.apache.spark.sql.SparkSession;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||
|
||||
public class ResetHiveDbApplication {
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(ResetHiveDbApplication.class);
|
||||
|
||||
public static void main(final String[] args) throws Exception {
|
||||
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
|
||||
IOUtils
|
||||
.toString(
|
||||
ResetHiveDbApplication.class
|
||||
.getResourceAsStream(
|
||||
"/eu/dnetlib/dhp/common/reset_hive_db_parameters.json")));
|
||||
parser.parseArgument(args);
|
||||
|
||||
Boolean isSparkSessionManaged = Optional
|
||||
.ofNullable(parser.get("isSparkSessionManaged"))
|
||||
.map(Boolean::valueOf)
|
||||
.orElse(Boolean.TRUE);
|
||||
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
|
||||
|
||||
final String dbName = Optional
|
||||
.ofNullable(parser.get("dbName"))
|
||||
.orElseThrow(() -> new IllegalArgumentException("missing DB name"));
|
||||
log.info("dbName: {}", dbName);
|
||||
|
||||
String hiveMetastoreUris = parser.get("hiveMetastoreUris");
|
||||
log.info("hiveMetastoreUris: {}", hiveMetastoreUris);
|
||||
|
||||
SparkConf conf = new SparkConf();
|
||||
conf.set("hive.metastore.uris", hiveMetastoreUris);
|
||||
|
||||
runWithSparkHiveSession(
|
||||
conf,
|
||||
isSparkSessionManaged,
|
||||
spark -> {
|
||||
runSQL(spark, String.format("DROP DATABASE IF EXISTS %s CASCADE", dbName));
|
||||
runSQL(spark, String.format("CREATE DATABASE %s", dbName));
|
||||
});
|
||||
}
|
||||
|
||||
protected static void runSQL(SparkSession spark, String sql) {
|
||||
log.info("running SQL '{}'", sql);
|
||||
spark.sqlContext().sql(sql);
|
||||
}
|
||||
|
||||
}
|
|
@ -1,5 +1,5 @@
|
|||
|
||||
package eu.dnetlib.dhp.oa.graph.raw.common;
|
||||
package eu.dnetlib.dhp.common;
|
||||
|
||||
import java.io.Serializable;
|
||||
import java.util.HashMap;
|
|
@ -1,5 +1,5 @@
|
|||
|
||||
package eu.dnetlib.dhp.oa.graph.raw.common;
|
||||
package eu.dnetlib.dhp.common;
|
||||
|
||||
import java.io.Serializable;
|
||||
import java.util.*;
|
|
@ -1,5 +1,5 @@
|
|||
|
||||
package eu.dnetlib.dhp.oa.graph.raw.common;
|
||||
package eu.dnetlib.dhp.common;
|
||||
|
||||
import java.io.Serializable;
|
||||
|
|
@ -0,0 +1,20 @@
|
|||
[
|
||||
{
|
||||
"paramName": "issm",
|
||||
"paramLongName": "isSparkSessionManaged",
|
||||
"paramDescription": "when true will stop SparkSession after job execution",
|
||||
"paramRequired": false
|
||||
},
|
||||
{
|
||||
"paramName": "hmu",
|
||||
"paramLongName": "hiveMetastoreUris",
|
||||
"paramDescription": "the hive metastore uris",
|
||||
"paramRequired": true
|
||||
},
|
||||
{
|
||||
"paramName": "db",
|
||||
"paramLongName": "dbName",
|
||||
"paramDescription": "the graph db name",
|
||||
"paramRequired": true
|
||||
}
|
||||
]
|
|
@ -17,6 +17,7 @@
|
|||
|
||||
<modules>
|
||||
<module>dhp-workflow-profiles</module>
|
||||
<module>dhp-workflows-common</module>
|
||||
<module>dhp-aggregation</module>
|
||||
<module>dhp-distcp</module>
|
||||
<module>dhp-actionmanager</module>
|
||||
|
@ -269,7 +270,7 @@
|
|||
oozie.wf.application.path,projectVersion,oozie.use.system.libpath,
|
||||
oozieActionShareLibForSpark1,spark1YarnHistoryServerAddress,spark1EventLogDir,
|
||||
oozieActionShareLibForSpark2,spark2YarnHistoryServerAddress,spark2EventLogDir,
|
||||
sparkSqlWarehouseDir
|
||||
sparkSqlWarehouseDir,hiveMetastoreUris
|
||||
</include>
|
||||
<includeSystemProperties>true</includeSystemProperties>
|
||||
<includePropertyKeysFromFiles>
|
||||
|
|
Loading…
Reference in New Issue