Compare commits

...

16 Commits

Author SHA1 Message Date
Claudio Atzori a5f23d8a4c WIP: materialize graph as Hive DB 2020-08-07 15:33:07 +02:00
Claudio Atzori cce21eafc2 WIP: materialize graph as Hive DB, configured spark actions to include hive support] 2020-08-06 21:48:29 +02:00
Claudio Atzori fde8738ed2 added hiveMetastoreUris parameter to default parameters 2020-08-06 21:46:47 +02:00
Claudio Atzori 3be0e5c2cd Merge branch 'master' into graph_db 2020-08-05 12:54:48 +02:00
Claudio Atzori f3ce97ecf9 WIP: materialize graph as Hive DB, mergeAggregatorGraphs [added workflow node to drop the DB] 2020-08-04 12:29:42 +02:00
Claudio Atzori 771bf8bcc4 WIP: materialize graph as Hive DB, mergeAggregatorGraphs 2020-08-04 12:26:09 +02:00
Claudio Atzori 0da1d2c0c9 introduced GraphFormat.DEFAULT, indicating a common value to be used across the workflows 2020-08-04 12:25:31 +02:00
Claudio Atzori 1fcc28968e integrated changes from master 2020-08-04 10:57:44 +02:00
Claudio Atzori da2f8af72d adjusted MergeClaimsApplication param specs 2020-08-03 19:56:16 +02:00
Claudio Atzori 4ff184973b code formatting 2020-07-30 11:33:03 +02:00
Claudio Atzori 9e594cf4c2 WIP: materialize graph as Hive DB, aggregator graph 2020-07-29 19:25:11 +02:00
Claudio Atzori ee0b2191f8 adjusted test assertions to reflect update ordering defined in SortableRelationKey 2020-07-29 14:37:35 +02:00
Claudio Atzori fd289d389c adjusted dedup stats block test assertions to reflect updated configuration 2020-07-29 14:36:52 +02:00
Claudio Atzori 2dbac631c9 WIP: factoring out utilities into dhp-workflows-common 2020-07-29 13:08:20 +02:00
Claudio Atzori 91811ab43a Merge branch 'master' into graph_db 2020-07-28 15:15:27 +02:00
Claudio Atzori b0b6c6bd47 WIP dhp-workflows-common 2020-07-28 14:59:14 +02:00
41 changed files with 834 additions and 337 deletions

View File

@ -195,10 +195,10 @@ public class SparkDedupTest implements Serializable {
.count(); .count();
assertEquals(3432, orgs_simrel); assertEquals(3432, orgs_simrel);
assertEquals(7152, pubs_simrel); assertEquals(6944, pubs_simrel);
assertEquals(344, sw_simrel); assertEquals(318, sw_simrel);
assertEquals(458, ds_simrel); assertEquals(458, ds_simrel);
assertEquals(6750, orp_simrel); assertEquals(6746, orp_simrel);
} }
@Test @Test
@ -344,10 +344,10 @@ public class SparkDedupTest implements Serializable {
.count(); .count();
assertEquals(1276, orgs_mergerel); assertEquals(1276, orgs_mergerel);
assertEquals(1442, pubs_mergerel); assertEquals(1418, pubs_mergerel);
assertEquals(288, sw_mergerel); assertEquals(276, sw_mergerel);
assertEquals(472, ds_mergerel); assertEquals(472, ds_mergerel);
assertEquals(718, orp_mergerel); assertEquals(716, orp_mergerel);
} }
@Test @Test
@ -391,8 +391,8 @@ public class SparkDedupTest implements Serializable {
.count(); .count();
assertEquals(82, orgs_deduprecord); assertEquals(82, orgs_deduprecord);
assertEquals(66, pubs_deduprecord); assertEquals(65, pubs_deduprecord);
assertEquals(51, sw_deduprecord); assertEquals(50, sw_deduprecord);
assertEquals(96, ds_deduprecord); assertEquals(96, ds_deduprecord);
assertEquals(89, orp_deduprecord); assertEquals(89, orp_deduprecord);
} }
@ -473,11 +473,11 @@ public class SparkDedupTest implements Serializable {
.distinct() .distinct()
.count(); .count();
assertEquals(897, publications); assertEquals(896, publications);
assertEquals(835, organizations); assertEquals(835, organizations);
assertEquals(100, projects); assertEquals(100, projects);
assertEquals(100, datasource); assertEquals(100, datasource);
assertEquals(200, softwares); assertEquals(199, softwares);
assertEquals(388, dataset); assertEquals(388, dataset);
assertEquals(517, otherresearchproduct); assertEquals(517, otherresearchproduct);
@ -533,7 +533,7 @@ public class SparkDedupTest implements Serializable {
long relations = jsc.textFile(testDedupGraphBasePath + "/relation").count(); long relations = jsc.textFile(testDedupGraphBasePath + "/relation").count();
assertEquals(4866, relations); assertEquals(4828, relations);
// check deletedbyinference // check deletedbyinference
final Dataset<Relation> mergeRels = spark final Dataset<Relation> mergeRels = spark

View File

@ -168,10 +168,10 @@ public class SparkStatsTest implements Serializable {
.textFile(testOutputBasePath + "/" + testActionSetId + "/otherresearchproduct_blockstats") .textFile(testOutputBasePath + "/" + testActionSetId + "/otherresearchproduct_blockstats")
.count(); .count();
assertEquals(121, orgs_blocks); assertEquals(549, orgs_blocks);
assertEquals(110, pubs_blocks); assertEquals(868, pubs_blocks);
assertEquals(21, sw_blocks); assertEquals(473, sw_blocks);
assertEquals(67, ds_blocks); assertEquals(523, ds_blocks);
assertEquals(55, orp_blocks); assertEquals(756, orp_blocks);
} }
} }

View File

@ -17,8 +17,7 @@
}, },
"pace" : { "pace" : {
"clustering" : [ "clustering" : [
{ "name" : "ngrampairs", "fields" : [ "title" ], "params" : { "max" : "1", "ngramLen" : "3"} }, { "name" : "wordsStatsSuffixPrefixChain", "fields" : [ "title" ], "params" : { "mod" : "10" } },
{ "name" : "suffixprefix", "fields" : [ "title" ], "params" : { "max" : "1", "len" : "3" } },
{ "name" : "lowercase", "fields" : [ "doi" ], "params" : { } } { "name" : "lowercase", "fields" : [ "doi" ], "params" : { } }
], ],
"decisionTree" : { "decisionTree" : {

View File

@ -17,8 +17,7 @@
}, },
"pace" : { "pace" : {
"clustering" : [ "clustering" : [
{ "name" : "ngrampairs", "fields" : [ "title" ], "params" : { "max" : "1", "ngramLen" : "3"} }, { "name" : "wordsStatsSuffixPrefixChain", "fields" : [ "title" ], "params" : { "mod" : "10" } },
{ "name" : "suffixprefix", "fields" : [ "title" ], "params" : { "max" : "1", "len" : "3" } },
{ "name" : "lowercase", "fields" : [ "doi" ], "params" : { } } { "name" : "lowercase", "fields" : [ "doi" ], "params" : { } }
], ],
"decisionTree" : { "decisionTree" : {

View File

@ -29,8 +29,7 @@
}, },
"pace": { "pace": {
"clustering" : [ "clustering" : [
{ "name" : "ngrampairs", "fields" : [ "title" ], "params" : { "max" : "1", "ngramLen" : "3"} }, { "name" : "wordsStatsSuffixPrefixChain", "fields" : [ "title" ], "params" : { "mod" : "10" } },
{ "name" : "suffixprefix", "fields" : [ "title" ], "params" : { "max" : "1", "len" : "3" } },
{ "name" : "lowercase", "fields" : [ "doi" ], "params" : { } } { "name" : "lowercase", "fields" : [ "doi" ], "params" : { } }
], ],
"decisionTree": { "decisionTree": {

View File

@ -17,8 +17,7 @@
}, },
"pace" : { "pace" : {
"clustering" : [ "clustering" : [
{ "name" : "ngrampairs", "fields" : [ "title" ], "params" : { "max" : "1", "ngramLen" : "3"} }, { "name" : "wordsStatsSuffixPrefixChain", "fields" : [ "title" ], "params" : { "mod" : "10" } },
{ "name" : "suffixprefix", "fields" : [ "title" ], "params" : { "max" : "1", "len" : "3" } },
{ "name" : "lowercase", "fields" : [ "doi" ], "params" : { } } { "name" : "lowercase", "fields" : [ "doi" ], "params" : { } }
], ],
"decisionTree": { "decisionTree": {

View File

@ -71,6 +71,11 @@
<artifactId>dhp-schemas</artifactId> <artifactId>dhp-schemas</artifactId>
<version>${project.version}</version> <version>${project.version}</version>
</dependency> </dependency>
<dependency>
<groupId>eu.dnetlib.dhp</groupId>
<artifactId>dhp-workflows-common</artifactId>
<version>${project.version}</version>
</dependency>
<dependency> <dependency>
<groupId>com.jayway.jsonpath</groupId> <groupId>com.jayway.jsonpath</groupId>

View File

@ -1,9 +1,10 @@
package eu.dnetlib.dhp.oa.graph.clean; package eu.dnetlib.dhp.oa.graph.clean;
import static eu.dnetlib.dhp.common.GraphSupport.*;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import java.io.BufferedInputStream;
import java.util.Objects; import java.util.Objects;
import java.util.Optional; import java.util.Optional;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@ -14,7 +15,6 @@ import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -22,10 +22,8 @@ import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.HdfsSupport; import eu.dnetlib.dhp.common.*;
import eu.dnetlib.dhp.oa.graph.raw.AbstractMdRecordToOafMapper; import eu.dnetlib.dhp.oa.graph.raw.AbstractMdRecordToOafMapper;
import eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils;
import eu.dnetlib.dhp.oa.graph.raw.common.VocabularyGroup;
import eu.dnetlib.dhp.schema.common.ModelConstants; import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.oaf.*; import eu.dnetlib.dhp.schema.oaf.*;
import eu.dnetlib.dhp.utils.ISLookupClientFactory; import eu.dnetlib.dhp.utils.ISLookupClientFactory;
@ -53,50 +51,64 @@ public class CleanGraphSparkJob {
.orElse(Boolean.TRUE); .orElse(Boolean.TRUE);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged); log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
String inputPath = parser.get("inputPath"); String inputGraph = parser.get("inputGraph");
log.info("inputPath: {}", inputPath); log.info("inputGraph: {}", inputGraph);
String outputPath = parser.get("outputPath"); String outputGraph = parser.get("outputGraph");
log.info("outputPath: {}", outputPath); log.info("outputGraph: {}", outputGraph);
String isLookupUrl = parser.get("isLookupUrl"); GraphFormat inputGraphFormat = Optional
log.info("isLookupUrl: {}", isLookupUrl); .ofNullable(parser.get("inputGraphFormat"))
.map(GraphFormat::valueOf)
.orElse(GraphFormat.DEFAULT);
log.info("inputGraphFormat: {}", inputGraphFormat);
GraphFormat outputGraphFormat = Optional
.ofNullable(parser.get("outputGraphFormat"))
.map(GraphFormat::valueOf)
.orElse(GraphFormat.DEFAULT);
log.info("outputGraphFormat: {}", outputGraphFormat);
String isLookUpUrl = parser.get("isLookUpUrl");
log.info("isLookUpUrl: {}", isLookUpUrl);
String graphTableClassName = parser.get("graphTableClassName"); String graphTableClassName = parser.get("graphTableClassName");
log.info("graphTableClassName: {}", graphTableClassName); log.info("graphTableClassName: {}", graphTableClassName);
Class<? extends OafEntity> entityClazz = (Class<? extends OafEntity>) Class.forName(graphTableClassName); String hiveMetastoreUris = parser.get("hiveMetastoreUris");
log.info("hiveMetastoreUris: {}", hiveMetastoreUris);
final ISLookUpService isLookupService = ISLookupClientFactory.getLookUpService(isLookupUrl);
final VocabularyGroup vocs = VocabularyGroup.loadVocsFromIS(isLookupService);
SparkConf conf = new SparkConf(); SparkConf conf = new SparkConf();
runWithSparkSession( conf.set("hive.metastore.uris", hiveMetastoreUris);
Class<? extends Oaf> clazz = (Class<? extends Oaf>) Class.forName(graphTableClassName);
final ISLookUpService isLookupService = ISLookupClientFactory.getLookUpService(isLookUpUrl);
final VocabularyGroup vocs = VocabularyGroup.loadVocsFromIS(isLookupService);
runWithSparkHiveSession(
conf, conf,
isSparkSessionManaged, isSparkSessionManaged,
spark -> { spark -> cleanGraphTable(spark, vocs, inputGraph, inputGraphFormat, outputGraph, outputGraphFormat, clazz));
removeOutputDir(spark, outputPath);
fixGraphTable(spark, vocs, inputPath, entityClazz, outputPath);
});
} }
private static <T extends Oaf> void fixGraphTable( private static <T extends Oaf> void cleanGraphTable(
SparkSession spark, SparkSession spark,
VocabularyGroup vocs, VocabularyGroup vocs,
String inputPath, String inputGraph,
Class<T> clazz, GraphFormat inputGraphFormat,
String outputPath) { String outputGraph,
GraphFormat outputGraphFormat,
Class<T> clazz) {
final CleaningRuleMap mapping = CleaningRuleMap.create(vocs); final CleaningRuleMap mapping = CleaningRuleMap.create(vocs);
readTableFromPath(spark, inputPath, clazz) Dataset<T> cleaned = readGraph(spark, inputGraph, clazz, inputGraphFormat)
.map((MapFunction<T, T>) value -> fixVocabularyNames(value), Encoders.bean(clazz)) .map((MapFunction<T, T>) value -> fixVocabularyNames(value), Encoders.bean(clazz))
.map((MapFunction<T, T>) value -> OafCleaner.apply(value, mapping), Encoders.bean(clazz)) .map((MapFunction<T, T>) value -> OafCleaner.apply(value, mapping), Encoders.bean(clazz))
.map((MapFunction<T, T>) value -> fixDefaults(value), Encoders.bean(clazz)) .map((MapFunction<T, T>) value -> fixDefaults(value), Encoders.bean(clazz));
.write()
.mode(SaveMode.Overwrite) saveGraphTable(cleaned, clazz, outputGraph, outputGraphFormat);
.option("compression", "gzip")
.json(outputPath);
} }
protected static <T extends Oaf> T fixVocabularyNames(T value) { protected static <T extends Oaf> T fixVocabularyNames(T value) {

View File

@ -1,6 +1,9 @@
package eu.dnetlib.dhp.oa.graph.merge; package eu.dnetlib.dhp.oa.graph.merge;
import static eu.dnetlib.dhp.common.GraphSupport.deleteGraphTable;
import static eu.dnetlib.dhp.common.GraphSupport.saveGraphTable;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import java.util.Objects; import java.util.Objects;
@ -20,7 +23,8 @@ import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.HdfsSupport; import eu.dnetlib.dhp.common.GraphFormat;
import eu.dnetlib.dhp.common.GraphSupport;
import eu.dnetlib.dhp.oa.graph.clean.CleanGraphSparkJob; import eu.dnetlib.dhp.oa.graph.clean.CleanGraphSparkJob;
import eu.dnetlib.dhp.schema.common.ModelSupport; import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.*; import eu.dnetlib.dhp.schema.oaf.*;
@ -35,8 +39,6 @@ public class MergeGraphSparkJob {
private static final Logger log = LoggerFactory.getLogger(CleanGraphSparkJob.class); private static final Logger log = LoggerFactory.getLogger(CleanGraphSparkJob.class);
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private static final String PRIORITY_DEFAULT = "BETA"; // BETA | PROD private static final String PRIORITY_DEFAULT = "BETA"; // BETA | PROD
public static void main(String[] args) throws Exception { public static void main(String[] args) throws Exception {
@ -60,46 +62,61 @@ public class MergeGraphSparkJob {
.orElse(Boolean.TRUE); .orElse(Boolean.TRUE);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged); log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
String betaInputPath = parser.get("betaInputPath"); String betaInputGraph = parser.get("betaInputGraph");
log.info("betaInputPath: {}", betaInputPath); log.info("betaInputGraph: {}", betaInputGraph);
String prodInputPath = parser.get("prodInputPath"); String prodInputGraph = parser.get("prodInputGraph");
log.info("prodInputPath: {}", prodInputPath); log.info("prodInputGraph: {}", prodInputGraph);
String outputPath = parser.get("outputPath"); String outputGraph = parser.get("outputGraph");
log.info("outputPath: {}", outputPath); log.info("outputGraph: {}", outputGraph);
GraphFormat inputGraphFormat = Optional
.ofNullable(parser.get("inputGraphFormat"))
.map(GraphFormat::valueOf)
.orElse(GraphFormat.DEFAULT);
log.info("inputGraphFormat: {}", inputGraphFormat);
GraphFormat outputGraphFormat = Optional
.ofNullable(parser.get("outputGraphFormat"))
.map(GraphFormat::valueOf)
.orElse(GraphFormat.DEFAULT);
log.info("outputGraphFormat: {}", outputGraphFormat);
String graphTableClassName = parser.get("graphTableClassName"); String graphTableClassName = parser.get("graphTableClassName");
log.info("graphTableClassName: {}", graphTableClassName); log.info("graphTableClassName: {}", graphTableClassName);
Class<? extends OafEntity> entityClazz = (Class<? extends OafEntity>) Class.forName(graphTableClassName); Class<? extends Oaf> clazz = (Class<? extends Oaf>) Class.forName(graphTableClassName);
String hiveMetastoreUris = parser.get("hiveMetastoreUris");
log.info("hiveMetastoreUris: {}", hiveMetastoreUris);
SparkConf conf = new SparkConf(); SparkConf conf = new SparkConf();
conf.set("hive.metastore.uris", hiveMetastoreUris);
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
conf.registerKryoClasses(ModelSupport.getOafModelClasses()); conf.registerKryoClasses(ModelSupport.getOafModelClasses());
runWithSparkSession( runWithSparkHiveSession(
conf, conf,
isSparkSessionManaged, isSparkSessionManaged,
spark -> { spark -> mergeGraphTable(
removeOutputDir(spark, outputPath); spark, priority, betaInputGraph, clazz, prodInputGraph, clazz, outputGraph, inputGraphFormat,
mergeGraphTable(spark, priority, betaInputPath, prodInputPath, entityClazz, entityClazz, outputPath); outputGraphFormat));
});
} }
private static <P extends Oaf, B extends Oaf> void mergeGraphTable( private static <P extends Oaf, B extends Oaf> void mergeGraphTable(
SparkSession spark, SparkSession spark,
String priority, String priority,
String betaInputPath, String betaInputGraph,
String prodInputPath,
Class<P> p_clazz,
Class<B> b_clazz, Class<B> b_clazz,
String outputPath) { String prodInputGraph,
Class<P> p_clazz,
String outputGraph, GraphFormat inputGraphFormat, GraphFormat outputGraphFormat) {
Dataset<Tuple2<String, B>> beta = readTableFromPath(spark, betaInputPath, b_clazz); Dataset<Tuple2<String, B>> beta = readGraph(spark, betaInputGraph, b_clazz, inputGraphFormat);
Dataset<Tuple2<String, P>> prod = readTableFromPath(spark, prodInputPath, p_clazz); Dataset<Tuple2<String, P>> prod = readGraph(spark, prodInputGraph, p_clazz, inputGraphFormat);
prod Dataset<P> merged = prod
.joinWith(beta, prod.col("_1").equalTo(beta.col("_1")), "full_outer") .joinWith(beta, prod.col("_1").equalTo(beta.col("_1")), "full_outer")
.map((MapFunction<Tuple2<Tuple2<String, P>, Tuple2<String, B>>, P>) value -> { .map((MapFunction<Tuple2<Tuple2<String, P>, Tuple2<String, B>>, P>) value -> {
Optional<P> p = Optional.ofNullable(value._1()).map(Tuple2::_2); Optional<P> p = Optional.ofNullable(value._1()).map(Tuple2::_2);
@ -112,11 +129,9 @@ public class MergeGraphSparkJob {
return mergeWithPriorityToPROD(p, b); return mergeWithPriorityToPROD(p, b);
} }
}, Encoders.bean(p_clazz)) }, Encoders.bean(p_clazz))
.filter((FilterFunction<P>) Objects::nonNull) .filter((FilterFunction<P>) Objects::nonNull);
.write()
.mode(SaveMode.Overwrite) saveGraphTable(merged, p_clazz, outputGraph, outputGraphFormat);
.option("compression", "gzip")
.json(outputPath);
} }
private static <P extends Oaf, B extends Oaf> P mergeWithPriorityToPROD(Optional<P> p, Optional<B> b) { private static <P extends Oaf, B extends Oaf> P mergeWithPriorityToPROD(Optional<P> p, Optional<B> b) {
@ -139,24 +154,16 @@ public class MergeGraphSparkJob {
return null; return null;
} }
private static <T extends Oaf> Dataset<Tuple2<String, T>> readTableFromPath( private static <T extends Oaf> Dataset<Tuple2<String, T>> readGraph(
SparkSession spark, String inputEntityPath, Class<T> clazz) { SparkSession spark, String inputGraph, Class<T> clazz, GraphFormat inputGraphFormat) {
log.info("Reading Graph table from: {}", inputEntityPath); log.info("Reading Graph table from: {}", inputGraph);
return spark return GraphSupport
.read() .readGraph(spark, inputGraph, clazz, inputGraphFormat)
.textFile(inputEntityPath) .map((MapFunction<T, Tuple2<String, T>>) t -> {
.map( final String id = ModelSupport.idFn().apply(t);
(MapFunction<String, Tuple2<String, T>>) value -> { return new Tuple2<>(id, t);
final T t = OBJECT_MAPPER.readValue(value, clazz); }, Encoders.tuple(Encoders.STRING(), Encoders.kryo(clazz)));
final String id = ModelSupport.idFn().apply(t);
return new Tuple2<>(id, t);
},
Encoders.tuple(Encoders.STRING(), Encoders.kryo(clazz)));
}
private static void removeOutputDir(SparkSession spark, String path) {
HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration());
} }
} }

View File

@ -1,7 +1,7 @@
package eu.dnetlib.dhp.oa.graph.raw; package eu.dnetlib.dhp.oa.graph.raw;
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.*; import static eu.dnetlib.dhp.common.OafMapperUtils.*;
import static eu.dnetlib.dhp.schema.common.ModelConstants.*; import static eu.dnetlib.dhp.schema.common.ModelConstants.*;
import java.util.*; import java.util.*;
@ -12,7 +12,7 @@ import org.dom4j.DocumentFactory;
import org.dom4j.DocumentHelper; import org.dom4j.DocumentHelper;
import org.dom4j.Node; import org.dom4j.Node;
import eu.dnetlib.dhp.oa.graph.raw.common.VocabularyGroup; import eu.dnetlib.dhp.common.VocabularyGroup;
import eu.dnetlib.dhp.schema.common.LicenseComparator; import eu.dnetlib.dhp.schema.common.LicenseComparator;
import eu.dnetlib.dhp.schema.common.ModelConstants; import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.oaf.*; import eu.dnetlib.dhp.schema.oaf.*;

View File

@ -27,7 +27,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.HdfsSupport; import eu.dnetlib.dhp.common.HdfsSupport;
import eu.dnetlib.dhp.oa.graph.raw.common.VocabularyGroup; import eu.dnetlib.dhp.common.VocabularyGroup;
import eu.dnetlib.dhp.schema.common.ModelSupport; import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.Dataset; import eu.dnetlib.dhp.schema.oaf.Dataset;
import eu.dnetlib.dhp.schema.oaf.Datasource; import eu.dnetlib.dhp.schema.oaf.Datasource;
@ -70,10 +70,10 @@ public class GenerateEntitiesApplication {
final String targetPath = parser.get("targetPath"); final String targetPath = parser.get("targetPath");
log.info("targetPath: {}", targetPath); log.info("targetPath: {}", targetPath);
final String isLookupUrl = parser.get("isLookupUrl"); final String isLookUpUrl = parser.get("isLookUpUrl");
log.info("isLookupUrl: {}", isLookupUrl); log.info("isLookUpUrl: {}", isLookUpUrl);
final ISLookUpService isLookupService = ISLookupClientFactory.getLookUpService(isLookupUrl); final ISLookUpService isLookupService = ISLookupClientFactory.getLookUpService(isLookUpUrl);
final VocabularyGroup vocs = VocabularyGroup.loadVocsFromIS(isLookupService); final VocabularyGroup vocs = VocabularyGroup.loadVocsFromIS(isLookupService);
final SparkConf conf = new SparkConf(); final SparkConf conf = new SparkConf();

View File

@ -1,7 +1,8 @@
package eu.dnetlib.dhp.oa.graph.raw; package eu.dnetlib.dhp.oa.graph.raw;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; import static eu.dnetlib.dhp.common.GraphSupport.*;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession;
import java.util.Objects; import java.util.Objects;
import java.util.Optional; import java.util.Optional;
@ -9,29 +10,23 @@ import java.util.Optional;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf; import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.HdfsSupport; import eu.dnetlib.dhp.common.GraphFormat;
import eu.dnetlib.dhp.schema.common.ModelSupport; import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.*; import eu.dnetlib.dhp.schema.oaf.Oaf;
import scala.Tuple2; import scala.Tuple2;
public class MergeClaimsApplication { public class MergeClaimsApplication {
private static final Logger log = LoggerFactory.getLogger(MergeClaimsApplication.class); private static final Logger log = LoggerFactory.getLogger(MergeClaimsApplication.class);
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
public static void main(final String[] args) throws Exception { public static void main(final String[] args) throws Exception {
final ArgumentApplicationParser parser = new ArgumentApplicationParser( final ArgumentApplicationParser parser = new ArgumentApplicationParser(
IOUtils IOUtils
@ -53,49 +48,47 @@ public class MergeClaimsApplication {
final String claimsGraphPath = parser.get("claimsGraphPath"); final String claimsGraphPath = parser.get("claimsGraphPath");
log.info("claimsGraphPath: {}", claimsGraphPath); log.info("claimsGraphPath: {}", claimsGraphPath);
final String outputRawGaphPath = parser.get("outputRawGaphPath"); final String outputGraph = parser.get("outputGraph");
log.info("outputRawGaphPath: {}", outputRawGaphPath); log.info("outputGraph: {}", outputGraph);
final GraphFormat format = Optional
.ofNullable(parser.get("format"))
.map(GraphFormat::valueOf)
.orElse(GraphFormat.DEFAULT);
log.info("graphFormat: {}", format);
String graphTableClassName = parser.get("graphTableClassName"); String graphTableClassName = parser.get("graphTableClassName");
log.info("graphTableClassName: {}", graphTableClassName); log.info("graphTableClassName: {}", graphTableClassName);
Class<? extends Oaf> clazz = (Class<? extends Oaf>) Class.forName(graphTableClassName); Class<? extends Oaf> clazz = (Class<? extends Oaf>) Class.forName(graphTableClassName);
String hiveMetastoreUris = parser.get("hiveMetastoreUris");
log.info("hiveMetastoreUris: {}", hiveMetastoreUris);
SparkConf conf = new SparkConf(); SparkConf conf = new SparkConf();
conf.set("hive.metastore.uris", hiveMetastoreUris);
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
conf.registerKryoClasses(ModelSupport.getOafModelClasses()); conf.registerKryoClasses(ModelSupport.getOafModelClasses());
runWithSparkSession( runWithSparkHiveSession(
conf, conf,
isSparkSessionManaged, isSparkSessionManaged,
spark -> { spark -> mergeByType(spark, rawGraphPath, claimsGraphPath, outputGraph, format, clazz));
String type = clazz.getSimpleName().toLowerCase();
String rawPath = rawGraphPath + "/" + type;
String claimPath = claimsGraphPath + "/" + type;
String outPath = outputRawGaphPath + "/" + type;
removeOutputDir(spark, outPath);
mergeByType(spark, rawPath, claimPath, outPath, clazz);
});
} }
private static <T extends Oaf> void mergeByType( private static <T extends Oaf> void mergeByType(
SparkSession spark, String rawPath, String claimPath, String outPath, Class<T> clazz) { SparkSession spark, String rawPath, String claimPath, String outputGraph, GraphFormat format, Class<T> clazz) {
Dataset<Tuple2<String, T>> raw = readFromPath(spark, rawPath, clazz) Dataset<Tuple2<String, T>> raw = readGraphJSON(spark, rawPath, clazz)
.map( .map(
(MapFunction<T, Tuple2<String, T>>) value -> new Tuple2<>(ModelSupport.idFn().apply(value), value), (MapFunction<T, Tuple2<String, T>>) value -> new Tuple2<>(ModelSupport.idFn().apply(value), value),
Encoders.tuple(Encoders.STRING(), Encoders.kryo(clazz))); Encoders.tuple(Encoders.STRING(), Encoders.kryo(clazz)));
final JavaSparkContext jsc = JavaSparkContext.fromSparkContext(spark.sparkContext()); Dataset<Tuple2<String, T>> claim = readGraphJSON(spark, claimPath, clazz)
Dataset<Tuple2<String, T>> claim = jsc
.broadcast(readFromPath(spark, claimPath, clazz))
.getValue()
.map( .map(
(MapFunction<T, Tuple2<String, T>>) value -> new Tuple2<>(ModelSupport.idFn().apply(value), value), (MapFunction<T, Tuple2<String, T>>) value -> new Tuple2<>(ModelSupport.idFn().apply(value), value),
Encoders.tuple(Encoders.STRING(), Encoders.kryo(clazz))); Encoders.tuple(Encoders.STRING(), Encoders.kryo(clazz)));
raw Dataset<T> merged = raw
.joinWith(claim, raw.col("_1").equalTo(claim.col("_1")), "full_outer") .joinWith(claim, raw.col("_1").equalTo(claim.col("_1")), "full_outer")
.map( .map(
(MapFunction<Tuple2<Tuple2<String, T>, Tuple2<String, T>>, T>) value -> { (MapFunction<Tuple2<Tuple2<String, T>, Tuple2<String, T>>, T>) value -> {
@ -107,28 +100,9 @@ public class MergeClaimsApplication {
: opClaim.isPresent() ? opClaim.get()._2() : null; : opClaim.isPresent() ? opClaim.get()._2() : null;
}, },
Encoders.bean(clazz)) Encoders.bean(clazz))
.filter(Objects::nonNull) .filter(Objects::nonNull);
.map(
(MapFunction<T, String>) value -> OBJECT_MAPPER.writeValueAsString(value), saveGraphTable(merged, clazz, outputGraph, format);
Encoders.STRING())
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.text(outPath);
} }
private static <T extends Oaf> Dataset<T> readFromPath(
SparkSession spark, String path, Class<T> clazz) {
return spark
.read()
.textFile(path)
.map(
(MapFunction<String, T>) value -> OBJECT_MAPPER.readValue(value, clazz),
Encoders.bean(clazz))
.filter((FilterFunction<T>) value -> Objects.nonNull(ModelSupport.idFn().apply(value)));
}
private static void removeOutputDir(SparkSession spark, String path) {
HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration());
}
} }

View File

@ -1,15 +1,7 @@
package eu.dnetlib.dhp.oa.graph.raw; package eu.dnetlib.dhp.oa.graph.raw;
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.asString; import static eu.dnetlib.dhp.common.OafMapperUtils.*;
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.createOpenaireId;
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.dataInfo;
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.field;
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.journal;
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.listFields;
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.listKeyValues;
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.qualifier;
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.structuredProperty;
import static eu.dnetlib.dhp.schema.common.ModelConstants.DATASET_DEFAULT_RESULTTYPE; import static eu.dnetlib.dhp.schema.common.ModelConstants.DATASET_DEFAULT_RESULTTYPE;
import static eu.dnetlib.dhp.schema.common.ModelConstants.DATASOURCE_ORGANIZATION; import static eu.dnetlib.dhp.schema.common.ModelConstants.DATASOURCE_ORGANIZATION;
import static eu.dnetlib.dhp.schema.common.ModelConstants.DNET_PROVENANCE_ACTIONS; import static eu.dnetlib.dhp.schema.common.ModelConstants.DNET_PROVENANCE_ACTIONS;
@ -53,9 +45,9 @@ import org.slf4j.LoggerFactory;
import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.DbClient; import eu.dnetlib.dhp.common.DbClient;
import eu.dnetlib.dhp.common.VocabularyGroup;
import eu.dnetlib.dhp.oa.graph.raw.common.AbstractMigrationApplication; import eu.dnetlib.dhp.oa.graph.raw.common.AbstractMigrationApplication;
import eu.dnetlib.dhp.oa.graph.raw.common.VerifyNsPrefixPredicate; import eu.dnetlib.dhp.oa.graph.raw.common.VerifyNsPrefixPredicate;
import eu.dnetlib.dhp.oa.graph.raw.common.VocabularyGroup;
import eu.dnetlib.dhp.schema.oaf.Context; import eu.dnetlib.dhp.schema.oaf.Context;
import eu.dnetlib.dhp.schema.oaf.DataInfo; import eu.dnetlib.dhp.schema.oaf.DataInfo;
import eu.dnetlib.dhp.schema.oaf.Dataset; import eu.dnetlib.dhp.schema.oaf.Dataset;

View File

@ -1,9 +1,7 @@
package eu.dnetlib.dhp.oa.graph.raw; package eu.dnetlib.dhp.oa.graph.raw;
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.createOpenaireId; import static eu.dnetlib.dhp.common.OafMapperUtils.*;
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.field;
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.structuredProperty;
import static eu.dnetlib.dhp.schema.common.ModelConstants.*; import static eu.dnetlib.dhp.schema.common.ModelConstants.*;
import java.util.ArrayList; import java.util.ArrayList;
@ -18,7 +16,7 @@ import org.dom4j.Node;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import eu.dnetlib.dhp.common.PacePerson; import eu.dnetlib.dhp.common.PacePerson;
import eu.dnetlib.dhp.oa.graph.raw.common.VocabularyGroup; import eu.dnetlib.dhp.common.VocabularyGroup;
import eu.dnetlib.dhp.schema.oaf.Author; import eu.dnetlib.dhp.schema.oaf.Author;
import eu.dnetlib.dhp.schema.oaf.DataInfo; import eu.dnetlib.dhp.schema.oaf.DataInfo;
import eu.dnetlib.dhp.schema.oaf.Field; import eu.dnetlib.dhp.schema.oaf.Field;

View File

@ -1,9 +1,7 @@
package eu.dnetlib.dhp.oa.graph.raw; package eu.dnetlib.dhp.oa.graph.raw;
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.createOpenaireId; import static eu.dnetlib.dhp.common.OafMapperUtils.*;
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.field;
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.structuredProperty;
import static eu.dnetlib.dhp.schema.common.ModelConstants.*; import static eu.dnetlib.dhp.schema.common.ModelConstants.*;
import java.util.ArrayList; import java.util.ArrayList;
@ -19,7 +17,7 @@ import org.dom4j.Node;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import eu.dnetlib.dhp.common.PacePerson; import eu.dnetlib.dhp.common.PacePerson;
import eu.dnetlib.dhp.oa.graph.raw.common.VocabularyGroup; import eu.dnetlib.dhp.common.VocabularyGroup;
import eu.dnetlib.dhp.schema.oaf.Author; import eu.dnetlib.dhp.schema.oaf.Author;
import eu.dnetlib.dhp.schema.oaf.DataInfo; import eu.dnetlib.dhp.schema.oaf.DataInfo;
import eu.dnetlib.dhp.schema.oaf.Field; import eu.dnetlib.dhp.schema.oaf.Field;

View File

@ -15,4 +15,8 @@
<name>oozie.action.sharelib.for.spark</name> <name>oozie.action.sharelib.for.spark</name>
<value>spark2</value> <value>spark2</value>
</property> </property>
<property>
<name>hiveMetastoreUris</name>
<value>thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083</value>
</property>
</configuration> </configuration>

View File

@ -2,18 +2,32 @@
<parameters> <parameters>
<property> <property>
<name>graphInputPath</name> <name>inputGraph</name>
<description>the input path to read graph content</description> <description>the input graph name (or path)</description>
</property> </property>
<property> <property>
<name>graphOutputPath</name> <name>outputGraph</name>
<description>the target path to store cleaned graph</description> <description>the output graph name (or path)</description>
</property> </property>
<property> <property>
<name>isLookupUrl</name> <name>isLookUpUrl</name>
<description>the address of the lookUp service</description> <description>the address of the lookUp service</description>
</property> </property>
<property>
<name>inputGraphFormat</name>
<value>HIVE</value>
<description>the input graph data format</description>
</property>
<property>
<name>outputGraphFormat</name>
<value>HIVE</value>
<description>the output graph data format</description>
</property>
<property>
<name>hiveMetastoreUris</name>
<description>hive server metastore URIs</description>
</property>
<property> <property>
<name>sparkDriverMemory</name> <name>sparkDriverMemory</name>
<description>memory for driver process</description> <description>memory for driver process</description>
@ -48,14 +62,70 @@
<name>spark2EventLogDir</name> <name>spark2EventLogDir</name>
<description>spark 2.* event log dir location</description> <description>spark 2.* event log dir location</description>
</property> </property>
<property>
<name>sparkSqlWarehouseDir</name>
<value>spark 2.* db directory location</value>
</property>
</parameters> </parameters>
<start to="fork_clean_graph"/> <global>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<configuration>
<property>
<name>mapreduce.job.queuename</name>
<value>${queueName}</value>
</property>
<property>
<name>oozie.launcher.mapred.job.queue.name</name>
<value>${oozieLauncherQueueName}</value>
</property>
<property>
<name>oozie.action.sharelib.for.spark</name>
<value>${oozieActionShareLibForSpark2}</value>
</property>
</configuration>
</global>
<start to="should_drop_db"/>
<kill name="Kill"> <kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message> <message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill> </kill>
<decision name="should_drop_db">
<switch>
<case to="fork_clean_graph">${wf:conf('outputGraphFormat') eq 'JSON'}</case>
<case to="reset_DB">${wf:conf('outputGraphFormat') eq 'HIVE'}</case>
<default to="reset_DB"/>
</switch>
</decision>
<action name="reset_DB">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>reset_DB</name>
<class>eu.dnetlib.dhp.common.ResetHiveDbApplication</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory ${sparkExecutorMemory}
--executor-cores ${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
--conf spark.sql.shuffle.partitions=7680
</spark-opts>
<arg>--dbName</arg><arg>${outputGraph}</arg>
<arg>--hiveMetastoreUris</arg><arg>${hiveMetastoreUris}</arg>
</spark>
<ok to="fork_clean_graph"/>
<error to="Kill"/>
</action>
<fork name="fork_clean_graph"> <fork name="fork_clean_graph">
<path start="clean_publication"/> <path start="clean_publication"/>
<path start="clean_dataset"/> <path start="clean_dataset"/>
@ -82,12 +152,16 @@
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
--conf spark.sql.shuffle.partitions=7680 --conf spark.sql.shuffle.partitions=7680
</spark-opts> </spark-opts>
<arg>--inputPath</arg><arg>${graphInputPath}/publication</arg> <arg>--inputGraph</arg><arg>${inputGraph}</arg>
<arg>--outputPath</arg><arg>${graphOutputPath}/publication</arg> <arg>--outputGraph</arg><arg>${outputGraph}</arg>
<arg>--inputGraphFormat</arg><arg>${inputGraphFormat}</arg>
<arg>--outputGraphFormat</arg><arg>${outputGraphFormat}</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Publication</arg> <arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Publication</arg>
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg> <arg>--isLookUpUrl</arg><arg>${isLookUpUrl}</arg>
<arg>--hiveMetastoreUris</arg><arg>${hiveMetastoreUris}</arg>
</spark> </spark>
<ok to="wait_clean"/> <ok to="wait_clean"/>
<error to="Kill"/> <error to="Kill"/>
@ -108,12 +182,16 @@
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
--conf spark.sql.shuffle.partitions=7680 --conf spark.sql.shuffle.partitions=7680
</spark-opts> </spark-opts>
<arg>--inputPath</arg><arg>${graphInputPath}/dataset</arg> <arg>--inputGraph</arg><arg>${inputGraph}</arg>
<arg>--outputPath</arg><arg>${graphOutputPath}/dataset</arg> <arg>--outputGraph</arg><arg>${outputGraph}</arg>
<arg>--inputGraphFormat</arg><arg>${inputGraphFormat}</arg>
<arg>--outputGraphFormat</arg><arg>${outputGraphFormat}</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Dataset</arg> <arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Dataset</arg>
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg> <arg>--isLookUpUrl</arg><arg>${isLookUpUrl}</arg>
<arg>--hiveMetastoreUris</arg><arg>${hiveMetastoreUris}</arg>
</spark> </spark>
<ok to="wait_clean"/> <ok to="wait_clean"/>
<error to="Kill"/> <error to="Kill"/>
@ -134,12 +212,16 @@
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
--conf spark.sql.shuffle.partitions=7680 --conf spark.sql.shuffle.partitions=7680
</spark-opts> </spark-opts>
<arg>--inputPath</arg><arg>${graphInputPath}/otherresearchproduct</arg> <arg>--inputGraph</arg><arg>${inputGraph}</arg>
<arg>--outputPath</arg><arg>${graphOutputPath}/otherresearchproduct</arg> <arg>--outputGraph</arg><arg>${outputGraph}</arg>
<arg>--inputGraphFormat</arg><arg>${inputGraphFormat}</arg>
<arg>--outputGraphFormat</arg><arg>${outputGraphFormat}</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.OtherResearchProduct</arg> <arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.OtherResearchProduct</arg>
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg> <arg>--isLookUpUrl</arg><arg>${isLookUpUrl}</arg>
<arg>--hiveMetastoreUris</arg><arg>${hiveMetastoreUris}</arg>
</spark> </spark>
<ok to="wait_clean"/> <ok to="wait_clean"/>
<error to="Kill"/> <error to="Kill"/>
@ -160,12 +242,16 @@
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
--conf spark.sql.shuffle.partitions=7680 --conf spark.sql.shuffle.partitions=7680
</spark-opts> </spark-opts>
<arg>--inputPath</arg><arg>${graphInputPath}/software</arg> <arg>--inputGraph</arg><arg>${inputGraph}</arg>
<arg>--outputPath</arg><arg>${graphOutputPath}/software</arg> <arg>--outputGraph</arg><arg>${outputGraph}</arg>
<arg>--inputGraphFormat</arg><arg>${inputGraphFormat}</arg>
<arg>--outputGraphFormat</arg><arg>${outputGraphFormat}</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Software</arg> <arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Software</arg>
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg> <arg>--isLookUpUrl</arg><arg>${isLookUpUrl}</arg>
<arg>--hiveMetastoreUris</arg><arg>${hiveMetastoreUris}</arg>
</spark> </spark>
<ok to="wait_clean"/> <ok to="wait_clean"/>
<error to="Kill"/> <error to="Kill"/>
@ -186,12 +272,16 @@
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
--conf spark.sql.shuffle.partitions=7680 --conf spark.sql.shuffle.partitions=7680
</spark-opts> </spark-opts>
<arg>--inputPath</arg><arg>${graphInputPath}/datasource</arg> <arg>--inputGraph</arg><arg>${inputGraph}</arg>
<arg>--outputPath</arg><arg>${graphOutputPath}/datasource</arg> <arg>--outputGraph</arg><arg>${outputGraph}</arg>
<arg>--inputGraphFormat</arg><arg>${inputGraphFormat}</arg>
<arg>--outputGraphFormat</arg><arg>${outputGraphFormat}</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Datasource</arg> <arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Datasource</arg>
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg> <arg>--isLookUpUrl</arg><arg>${isLookUpUrl}</arg>
<arg>--hiveMetastoreUris</arg><arg>${hiveMetastoreUris}</arg>
</spark> </spark>
<ok to="wait_clean"/> <ok to="wait_clean"/>
<error to="Kill"/> <error to="Kill"/>
@ -212,12 +302,16 @@
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
--conf spark.sql.shuffle.partitions=7680 --conf spark.sql.shuffle.partitions=7680
</spark-opts> </spark-opts>
<arg>--inputPath</arg><arg>${graphInputPath}/organization</arg> <arg>--inputGraph</arg><arg>${inputGraph}</arg>
<arg>--outputPath</arg><arg>${graphOutputPath}/organization</arg> <arg>--outputGraph</arg><arg>${outputGraph}</arg>
<arg>--inputGraphFormat</arg><arg>${inputGraphFormat}</arg>
<arg>--outputGraphFormat</arg><arg>${outputGraphFormat}</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Organization</arg> <arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Organization</arg>
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg> <arg>--isLookUpUrl</arg><arg>${isLookUpUrl}</arg>
<arg>--hiveMetastoreUris</arg><arg>${hiveMetastoreUris}</arg>
</spark> </spark>
<ok to="wait_clean"/> <ok to="wait_clean"/>
<error to="Kill"/> <error to="Kill"/>
@ -238,12 +332,16 @@
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
--conf spark.sql.shuffle.partitions=7680 --conf spark.sql.shuffle.partitions=7680
</spark-opts> </spark-opts>
<arg>--inputPath</arg><arg>${graphInputPath}/project</arg> <arg>--inputGraph</arg><arg>${inputGraph}</arg>
<arg>--outputPath</arg><arg>${graphOutputPath}/project</arg> <arg>--outputGraph</arg><arg>${outputGraph}</arg>
<arg>--inputGraphFormat</arg><arg>${inputGraphFormat}</arg>
<arg>--outputGraphFormat</arg><arg>${outputGraphFormat}</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Project</arg> <arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Project</arg>
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg> <arg>--isLookUpUrl</arg><arg>${isLookUpUrl}</arg>
<arg>--hiveMetastoreUris</arg><arg>${hiveMetastoreUris}</arg>
</spark> </spark>
<ok to="wait_clean"/> <ok to="wait_clean"/>
<error to="Kill"/> <error to="Kill"/>
@ -264,12 +362,16 @@
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
--conf spark.sql.shuffle.partitions=7680 --conf spark.sql.shuffle.partitions=7680
</spark-opts> </spark-opts>
<arg>--inputPath</arg><arg>${graphInputPath}/relation</arg> <arg>--inputGraph</arg><arg>${inputGraph}</arg>
<arg>--outputPath</arg><arg>${graphOutputPath}/relation</arg> <arg>--outputGraph</arg><arg>${outputGraph}</arg>
<arg>--inputGraphFormat</arg><arg>${inputGraphFormat}</arg>
<arg>--outputGraphFormat</arg><arg>${outputGraphFormat}</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Relation</arg> <arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Relation</arg>
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg> <arg>--isLookUpUrl</arg><arg>${isLookUpUrl}</arg>
<arg>--hiveMetastoreUris</arg><arg>${hiveMetastoreUris}</arg>
</spark> </spark>
<ok to="wait_clean"/> <ok to="wait_clean"/>
<error to="Kill"/> <error to="Kill"/>

View File

@ -19,7 +19,7 @@
}, },
{ {
"paramName": "isu", "paramName": "isu",
"paramLongName": "isLookupUrl", "paramLongName": "isLookUpUrl",
"paramDescription": "the url of the ISLookupService", "paramDescription": "the url of the ISLookupService",
"paramRequired": true "paramRequired": true
} }

View File

@ -11,6 +11,10 @@
<name>oozie.use.system.libpath</name> <name>oozie.use.system.libpath</name>
<value>true</value> <value>true</value>
</property> </property>
<property>
<name>oozie.action.sharelib.for.spark</name>
<value>spark2</value>
</property>
<property> <property>
<name>hiveMetastoreUris</name> <name>hiveMetastoreUris</name>
<value>thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083</value> <value>thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083</value>
@ -19,8 +23,4 @@
<name>hiveJdbcUrl</name> <name>hiveJdbcUrl</name>
<value>jdbc:hive2://iis-cdh5-test-m3.ocean.icm.edu.pl:10000</value> <value>jdbc:hive2://iis-cdh5-test-m3.ocean.icm.edu.pl:10000</value>
</property> </property>
<property>
<name>hiveDbName</name>
<value>openaire</value>
</property>
</configuration> </configuration>

View File

@ -51,6 +51,10 @@
<name>spark2EventLogDir</name> <name>spark2EventLogDir</name>
<description>spark 2.* event log dir location</description> <description>spark 2.* event log dir location</description>
</property> </property>
<property>
<name>sparkSqlWarehouseDir</name>
<value>spark 2.* db directory location</value>
</property>
</parameters> </parameters>
<global> <global>

View File

@ -6,20 +6,32 @@
"paramRequired": false "paramRequired": false
}, },
{ {
"paramName": "in", "paramName": "ig",
"paramLongName": "inputPath", "paramLongName": "inputGraph",
"paramDescription": "the path to the graph data dump to read", "paramDescription": "the input graph name (or path)",
"paramRequired": true "paramRequired": true
}, },
{ {
"paramName": "out", "paramName": "og",
"paramLongName": "outputPath", "paramLongName": "outputGraph",
"paramDescription": "the path to store the output graph", "paramDescription": "the output graph name (or path)",
"paramRequired": true
},
{
"paramName": "igf",
"paramLongName": "inputGraphFormat",
"paramDescription": "the input graph data format",
"paramRequired": true
},
{
"paramName": "ogf",
"paramLongName": "outputGraphFormat",
"paramDescription": "the output graph data format",
"paramRequired": true "paramRequired": true
}, },
{ {
"paramName": "isu", "paramName": "isu",
"paramLongName": "isLookupUrl", "paramLongName": "isLookUpUrl",
"paramDescription": "url to the ISLookup Service", "paramDescription": "url to the ISLookup Service",
"paramRequired": true "paramRequired": true
}, },
@ -28,5 +40,11 @@
"paramLongName": "graphTableClassName", "paramLongName": "graphTableClassName",
"paramDescription": "class name moelling the graph table", "paramDescription": "class name moelling the graph table",
"paramRequired": true "paramRequired": true
},
{
"paramName": "hmu",
"paramLongName": "hiveMetastoreUris",
"paramDescription": "the hive metastore uris",
"paramRequired": true
} }
] ]

View File

@ -2,22 +2,36 @@
<parameters> <parameters>
<property> <property>
<name>betaInputGgraphPath</name> <name>betaInputGraph</name>
<description>the beta graph root path</description> <description>the beta graph name (or path)</description>
</property> </property>
<property> <property>
<name>prodInputGgraphPath</name> <name>prodInputGraph</name>
<description>the production graph root path</description> <description>the production graph name (or path)</description>
</property> </property>
<property> <property>
<name>graphOutputPath</name> <name>outputGraph</name>
<description>the output merged graph root path</description> <description>the merged graph name (or path)</description>
</property> </property>
<property> <property>
<name>priority</name> <name>priority</name>
<description>decides from which infrastructure the content must win in case of ID clash</description> <description>decides from which infrastructure the content must win in case of ID clash</description>
</property> </property>
<property>
<name>inputGraphFormat</name>
<value>HIVE</value>
<description>the input graph data format</description>
</property>
<property>
<name>outputGraphFormat</name>
<value>HIVE</value>
<description>the output graph data format</description>
</property>
<property>
<name>hiveMetastoreUris</name>
<description>hive server metastore URIs</description>
</property>
<property> <property>
<name>sparkDriverMemory</name> <name>sparkDriverMemory</name>
<description>memory for driver process</description> <description>memory for driver process</description>
@ -52,14 +66,51 @@
<name>spark2EventLogDir</name> <name>spark2EventLogDir</name>
<description>spark 2.* event log dir location</description> <description>spark 2.* event log dir location</description>
</property> </property>
<property>
<name>sparkSqlWarehouseDir</name>
<value>spark 2.* db directory location</value>
</property>
</parameters> </parameters>
<start to="fork_merge_graph"/> <start to="should_drop_db"/>
<kill name="Kill"> <kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message> <message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill> </kill>
<decision name="should_drop_db">
<switch>
<case to="fork_merge_graph">${wf:conf('outputGraphFormat') eq 'JSON'}</case>
<case to="reset_DB">${wf:conf('outputGraphFormat') eq 'HIVE'}</case>
<default to="reset_DB"/>
</switch>
</decision>
<action name="reset_DB">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>reset_DB</name>
<class>eu.dnetlib.dhp.common.ResetHiveDbApplication</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory ${sparkExecutorMemory}
--executor-cores ${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
--conf spark.sql.shuffle.partitions=7680
</spark-opts>
<arg>--dbName</arg><arg>${outputGraph}</arg>
<arg>--hiveMetastoreUris</arg><arg>${hiveMetastoreUris}</arg>
</spark>
<ok to="fork_merge_graph"/>
<error to="Kill"/>
</action>
<fork name="fork_merge_graph"> <fork name="fork_merge_graph">
<path start="merge_publication"/> <path start="merge_publication"/>
<path start="merge_dataset"/> <path start="merge_dataset"/>
@ -86,13 +137,17 @@
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
--conf spark.sql.shuffle.partitions=7680 --conf spark.sql.shuffle.partitions=7680
</spark-opts> </spark-opts>
<arg>--betaInputPath</arg><arg>${betaInputGgraphPath}/publication</arg> <arg>--betaInputGraph</arg><arg>${betaInputGraph}</arg>
<arg>--prodInputPath</arg><arg>${prodInputGgraphPath}/publication</arg> <arg>--prodInputGraph</arg><arg>${prodInputGraph}</arg>
<arg>--outputPath</arg><arg>${graphOutputPath}/publication</arg> <arg>--outputGraph</arg><arg>${outputGraph}</arg>
<arg>--inputGraphFormat</arg><arg>${inputGraphFormat}</arg>
<arg>--outputGraphFormat</arg><arg>${outputGraphFormat}</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Publication</arg> <arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Publication</arg>
<arg>--priority</arg><arg>${priority}</arg> <arg>--priority</arg><arg>${priority}</arg>
<arg>--hiveMetastoreUris</arg><arg>${hiveMetastoreUris}</arg>
</spark> </spark>
<ok to="wait_merge"/> <ok to="wait_merge"/>
<error to="Kill"/> <error to="Kill"/>
@ -113,13 +168,17 @@
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
--conf spark.sql.shuffle.partitions=7680 --conf spark.sql.shuffle.partitions=7680
</spark-opts> </spark-opts>
<arg>--betaInputPath</arg><arg>${betaInputGgraphPath}/dataset</arg> <arg>--betaInputGraph</arg><arg>${betaInputGraph}</arg>
<arg>--prodInputPath</arg><arg>${prodInputGgraphPath}/dataset</arg> <arg>--prodInputGraph</arg><arg>${prodInputGraph}</arg>
<arg>--outputPath</arg><arg>${graphOutputPath}/dataset</arg> <arg>--outputGraph</arg><arg>${outputGraph}</arg>
<arg>--inputGraphFormat</arg><arg>${inputGraphFormat}</arg>
<arg>--outputGraphFormat</arg><arg>${outputGraphFormat}</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Dataset</arg> <arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Dataset</arg>
<arg>--priority</arg><arg>${priority}</arg> <arg>--priority</arg><arg>${priority}</arg>
<arg>--hiveMetastoreUris</arg><arg>${hiveMetastoreUris}</arg>
</spark> </spark>
<ok to="wait_merge"/> <ok to="wait_merge"/>
<error to="Kill"/> <error to="Kill"/>
@ -140,13 +199,17 @@
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
--conf spark.sql.shuffle.partitions=7680 --conf spark.sql.shuffle.partitions=7680
</spark-opts> </spark-opts>
<arg>--betaInputPath</arg><arg>${betaInputGgraphPath}/otherresearchproduct</arg> <arg>--betaInputGraph</arg><arg>${betaInputGraph}</arg>
<arg>--prodInputPath</arg><arg>${prodInputGgraphPath}/otherresearchproduct</arg> <arg>--prodInputGraph</arg><arg>${prodInputGraph}</arg>
<arg>--outputPath</arg><arg>${graphOutputPath}/otherresearchproduct</arg> <arg>--outputGraph</arg><arg>${outputGraph}</arg>
<arg>--inputGraphFormat</arg><arg>${inputGraphFormat}</arg>
<arg>--outputGraphFormat</arg><arg>${outputGraphFormat}</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.OtherResearchProduct</arg> <arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.OtherResearchProduct</arg>
<arg>--priority</arg><arg>${priority}</arg> <arg>--priority</arg><arg>${priority}</arg>
<arg>--hiveMetastoreUris</arg><arg>${hiveMetastoreUris}</arg>
</spark> </spark>
<ok to="wait_merge"/> <ok to="wait_merge"/>
<error to="Kill"/> <error to="Kill"/>
@ -167,13 +230,17 @@
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
--conf spark.sql.shuffle.partitions=7680 --conf spark.sql.shuffle.partitions=7680
</spark-opts> </spark-opts>
<arg>--betaInputPath</arg><arg>${betaInputGgraphPath}/software</arg> <arg>--betaInputGraph</arg><arg>${betaInputGraph}</arg>
<arg>--prodInputPath</arg><arg>${prodInputGgraphPath}/software</arg> <arg>--prodInputGraph</arg><arg>${prodInputGraph}</arg>
<arg>--outputPath</arg><arg>${graphOutputPath}/software</arg> <arg>--outputGraph</arg><arg>${outputGraph}</arg>
<arg>--inputGraphFormat</arg><arg>${inputGraphFormat}</arg>
<arg>--outputGraphFormat</arg><arg>${outputGraphFormat}</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Software</arg> <arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Software</arg>
<arg>--priority</arg><arg>${priority}</arg> <arg>--priority</arg><arg>${priority}</arg>
<arg>--hiveMetastoreUris</arg><arg>${hiveMetastoreUris}</arg>
</spark> </spark>
<ok to="wait_merge"/> <ok to="wait_merge"/>
<error to="Kill"/> <error to="Kill"/>
@ -194,13 +261,17 @@
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
--conf spark.sql.shuffle.partitions=7680 --conf spark.sql.shuffle.partitions=7680
</spark-opts> </spark-opts>
<arg>--betaInputPath</arg><arg>${betaInputGgraphPath}/datasource</arg> <arg>--betaInputGraph</arg><arg>${betaInputGraph}</arg>
<arg>--prodInputPath</arg><arg>${prodInputGgraphPath}/datasource</arg> <arg>--prodInputGraph</arg><arg>${prodInputGraph}</arg>
<arg>--outputPath</arg><arg>${graphOutputPath}/datasource</arg> <arg>--outputGraph</arg><arg>${outputGraph}</arg>
<arg>--inputGraphFormat</arg><arg>${inputGraphFormat}</arg>
<arg>--outputGraphFormat</arg><arg>${outputGraphFormat}</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Datasource</arg> <arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Datasource</arg>
<arg>--priority</arg><arg>${priority}</arg> <arg>--priority</arg><arg>${priority}</arg>
<arg>--hiveMetastoreUris</arg><arg>${hiveMetastoreUris}</arg>
</spark> </spark>
<ok to="wait_merge"/> <ok to="wait_merge"/>
<error to="Kill"/> <error to="Kill"/>
@ -221,13 +292,17 @@
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
--conf spark.sql.shuffle.partitions=7680 --conf spark.sql.shuffle.partitions=7680
</spark-opts> </spark-opts>
<arg>--betaInputPath</arg><arg>${betaInputGgraphPath}/organization</arg> <arg>--betaInputGraph</arg><arg>${betaInputGraph}</arg>
<arg>--prodInputPath</arg><arg>${prodInputGgraphPath}/organization</arg> <arg>--prodInputGraph</arg><arg>${prodInputGraph}</arg>
<arg>--outputPath</arg><arg>${graphOutputPath}/organization</arg> <arg>--outputGraph</arg><arg>${outputGraph}</arg>
<arg>--inputGraphFormat</arg><arg>${inputGraphFormat}</arg>
<arg>--outputGraphFormat</arg><arg>${outputGraphFormat}</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Organization</arg> <arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Organization</arg>
<arg>--priority</arg><arg>${priority}</arg> <arg>--priority</arg><arg>${priority}</arg>
<arg>--hiveMetastoreUris</arg><arg>${hiveMetastoreUris}</arg>
</spark> </spark>
<ok to="wait_merge"/> <ok to="wait_merge"/>
<error to="Kill"/> <error to="Kill"/>
@ -248,13 +323,17 @@
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
--conf spark.sql.shuffle.partitions=7680 --conf spark.sql.shuffle.partitions=7680
</spark-opts> </spark-opts>
<arg>--betaInputPath</arg><arg>${betaInputGgraphPath}/project</arg> <arg>--betaInputGraph</arg><arg>${betaInputGraph}</arg>
<arg>--prodInputPath</arg><arg>${prodInputGgraphPath}/project</arg> <arg>--prodInputGraph</arg><arg>${prodInputGraph}</arg>
<arg>--outputPath</arg><arg>${graphOutputPath}/project</arg> <arg>--outputGraph</arg><arg>${outputGraph}</arg>
<arg>--inputGraphFormat</arg><arg>${inputGraphFormat}</arg>
<arg>--outputGraphFormat</arg><arg>${outputGraphFormat}</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Project</arg> <arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Project</arg>
<arg>--priority</arg><arg>${priority}</arg> <arg>--priority</arg><arg>${priority}</arg>
<arg>--hiveMetastoreUris</arg><arg>${hiveMetastoreUris}</arg>
</spark> </spark>
<ok to="wait_merge"/> <ok to="wait_merge"/>
<error to="Kill"/> <error to="Kill"/>
@ -275,13 +354,17 @@
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
--conf spark.sql.shuffle.partitions=7680 --conf spark.sql.shuffle.partitions=7680
</spark-opts> </spark-opts>
<arg>--betaInputPath</arg><arg>${betaInputGgraphPath}/relation</arg> <arg>--betaInputGraph</arg><arg>${betaInputGraph}</arg>
<arg>--prodInputPath</arg><arg>${prodInputGgraphPath}/relation</arg> <arg>--prodInputGraph</arg><arg>${prodInputGraph}</arg>
<arg>--outputPath</arg><arg>${graphOutputPath}/relation</arg> <arg>--outputGraph</arg><arg>${outputGraph}</arg>
<arg>--inputGraphFormat</arg><arg>${inputGraphFormat}</arg>
<arg>--outputGraphFormat</arg><arg>${outputGraphFormat}</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Relation</arg> <arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Relation</arg>
<arg>--priority</arg><arg>${priority}</arg> <arg>--priority</arg><arg>${priority}</arg>
<arg>--hiveMetastoreUris</arg><arg>${hiveMetastoreUris}</arg>
</spark> </spark>
<ok to="wait_merge"/> <ok to="wait_merge"/>
<error to="Kill"/> <error to="Kill"/>

View File

@ -6,21 +6,27 @@
"paramRequired": false "paramRequired": false
}, },
{ {
"paramName": "rgp", "paramName": "og",
"paramLongName": "rawGraphPath", "paramLongName": "outputGraph",
"paramDescription": "the raw graph path", "paramDescription": "the output graph (db name | path)",
"paramRequired": true
},
{
"paramName": "gf",
"paramLongName": "graphFormat",
"paramDescription": "graph save format (json|parquet)",
"paramRequired": true "paramRequired": true
}, },
{ {
"paramName": "cgp", "paramName": "cgp",
"paramLongName": "claimsGraphPath", "paramLongName": "claimsGraphPath",
"paramDescription": "the path of the claims graph", "paramDescription": "the path of the claims graph, from the working directory",
"paramRequired": true "paramRequired": true
}, },
{ {
"paramName": "ogp", "paramName": "rgp",
"paramLongName": "outputRawGaphPath", "paramLongName": "rawGraphPath",
"paramDescription": "the path of output graph, combining raw and claims", "paramDescription": "the path of raw graph, from the working directory",
"paramRequired": true "paramRequired": true
}, },
{ {
@ -28,5 +34,11 @@
"paramLongName": "graphTableClassName", "paramLongName": "graphTableClassName",
"paramDescription": "class name associated to the input entity path", "paramDescription": "class name associated to the input entity path",
"paramRequired": true "paramRequired": true
},
{
"paramName": "hmu",
"paramLongName": "hiveMetastoreUris",
"paramDescription": "the hive metastore uris",
"paramRequired": true
} }
] ]

View File

@ -7,20 +7,32 @@
}, },
{ {
"paramName": "bin", "paramName": "bin",
"paramLongName": "betaInputPath", "paramLongName": "betaInputGraph",
"paramDescription": "the beta graph root path", "paramDescription": "the beta graph name (or path)",
"paramRequired": true "paramRequired": true
}, },
{ {
"paramName": "pin", "paramName": "pin",
"paramLongName": "prodInputPath", "paramLongName": "prodInputGraph",
"paramDescription": "the production graph root path", "paramDescription": "the production graph name (or path)",
"paramRequired": true "paramRequired": true
}, },
{ {
"paramName": "out", "paramName": "out",
"paramLongName": "outputPath", "paramLongName": "outputGraph",
"paramDescription": "the output merged graph root path", "paramDescription": "the output merged graph (or path)",
"paramRequired": true
},
{
"paramName": "igf",
"paramLongName": "inputGraphFormat",
"paramDescription": "the format of the input graphs",
"paramRequired": true
},
{
"paramName": "ogf",
"paramLongName": "outputGraphFormat",
"paramDescription": "the format of the output merged graph",
"paramRequired": true "paramRequired": true
}, },
{ {
@ -34,5 +46,11 @@
"paramLongName": "priority", "paramLongName": "priority",
"paramDescription": "decides from which infrastructure the content must win in case of ID clash", "paramDescription": "decides from which infrastructure the content must win in case of ID clash",
"paramRequired": false "paramRequired": false
},
{
"paramName": "hmu",
"paramLongName": "hiveMetastoreUris",
"paramDescription": "the hive metastore uris",
"paramRequired": true
} }
] ]

View File

@ -2,8 +2,13 @@
<parameters> <parameters>
<property> <property>
<name>graphOutputPath</name> <name>outputGraph</name>
<description>the target path to store raw graph</description> <description>the target graph name (or path)</description>
</property>
<property>
<name>graphFormat</name>
<value>HIVE</value>
<description>the graph data format</description>
</property> </property>
<property> <property>
<name>reuseContent</name> <name>reuseContent</name>
@ -40,7 +45,7 @@
<description>mongo database</description> <description>mongo database</description>
</property> </property>
<property> <property>
<name>isLookupUrl</name> <name>isLookUpUrl</name>
<description>the address of the lookUp service</description> <description>the address of the lookUp service</description>
</property> </property>
<property> <property>
@ -48,6 +53,11 @@
<value></value> <value></value>
<description>a blacklist of nsprefixes (comma separeted)</description> <description>a blacklist of nsprefixes (comma separeted)</description>
</property> </property>
<property>
<name>hiveMetastoreUris</name>
<description>hive server metastore URIs</description>
</property>
<property> <property>
<name>sparkDriverMemory</name> <name>sparkDriverMemory</name>
<description>memory for driver process</description> <description>memory for driver process</description>
@ -82,6 +92,10 @@
<name>spark2EventLogDir</name> <name>spark2EventLogDir</name>
<description>spark 2.* event log dir location</description> <description>spark 2.* event log dir location</description>
</property> </property>
<property>
<name>sparkSqlWarehouseDir</name>
<value>spark 2.* db directory location</value>
</property>
</parameters> </parameters>
<global> <global>
@ -132,7 +146,7 @@
<arg>--postgresUrl</arg><arg>${postgresURL}</arg> <arg>--postgresUrl</arg><arg>${postgresURL}</arg>
<arg>--postgresUser</arg><arg>${postgresUser}</arg> <arg>--postgresUser</arg><arg>${postgresUser}</arg>
<arg>--postgresPassword</arg><arg>${postgresPassword}</arg> <arg>--postgresPassword</arg><arg>${postgresPassword}</arg>
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg> <arg>--isLookUpUrl</arg><arg>${isLookUpUrl}</arg>
<arg>--action</arg><arg>claims</arg> <arg>--action</arg><arg>claims</arg>
<arg>--dbschema</arg><arg>${dbSchema}</arg> <arg>--dbschema</arg><arg>${dbSchema}</arg>
<arg>--nsPrefixBlacklist</arg><arg>${nsPrefixBlacklist}</arg> <arg>--nsPrefixBlacklist</arg><arg>${nsPrefixBlacklist}</arg>
@ -185,7 +199,7 @@
<arg>--postgresUrl</arg><arg>${postgresURL}</arg> <arg>--postgresUrl</arg><arg>${postgresURL}</arg>
<arg>--postgresUser</arg><arg>${postgresUser}</arg> <arg>--postgresUser</arg><arg>${postgresUser}</arg>
<arg>--postgresPassword</arg><arg>${postgresPassword}</arg> <arg>--postgresPassword</arg><arg>${postgresPassword}</arg>
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg> <arg>--isLookUpUrl</arg><arg>${isLookUpUrl}</arg>
<arg>--dbschema</arg><arg>${dbSchema}</arg> <arg>--dbschema</arg><arg>${dbSchema}</arg>
<arg>--nsPrefixBlacklist</arg><arg>${nsPrefixBlacklist}</arg> <arg>--nsPrefixBlacklist</arg><arg>${nsPrefixBlacklist}</arg>
</java> </java>
@ -269,7 +283,7 @@
</spark-opts> </spark-opts>
<arg>--sourcePaths</arg><arg>${contentPath}/db_claims,${contentPath}/oaf_claims,${contentPath}/odf_claims,${contentPath}/oaf_records_invisible</arg> <arg>--sourcePaths</arg><arg>${contentPath}/db_claims,${contentPath}/oaf_claims,${contentPath}/odf_claims,${contentPath}/oaf_records_invisible</arg>
<arg>--targetPath</arg><arg>${workingDir}/entities_claim</arg> <arg>--targetPath</arg><arg>${workingDir}/entities_claim</arg>
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg> <arg>--isLookUpUrl</arg><arg>${isLookUpUrl}</arg>
</spark> </spark>
<ok to="GenerateGraph_claims"/> <ok to="GenerateGraph_claims"/>
<error to="Kill"/> <error to="Kill"/>
@ -316,7 +330,7 @@
</spark-opts> </spark-opts>
<arg>--sourcePaths</arg><arg>${contentPath}/db_records,${contentPath}/oaf_records,${contentPath}/odf_records</arg> <arg>--sourcePaths</arg><arg>${contentPath}/db_records,${contentPath}/oaf_records,${contentPath}/odf_records</arg>
<arg>--targetPath</arg><arg>${workingDir}/entities</arg> <arg>--targetPath</arg><arg>${workingDir}/entities</arg>
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg> <arg>--isLookUpUrl</arg><arg>${isLookUpUrl}</arg>
</spark> </spark>
<ok to="GenerateGraph"/> <ok to="GenerateGraph"/>
<error to="Kill"/> <error to="Kill"/>
@ -346,7 +360,40 @@
<error to="Kill"/> <error to="Kill"/>
</action> </action>
<join name="wait_graphs" to="fork_merge_claims"/> <join name="wait_graphs" to="should_drop_db"/>
<decision name="should_drop_db">
<switch>
<case to="fork_merge_claims">${wf:conf('graphFormat') eq 'JSON'}</case>
<case to="reset_DB">${wf:conf('graphFormat') eq 'HIVE'}</case>
<default to="reset_DB"/>
</switch>
</decision>
<action name="reset_DB">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>reset_DB</name>
<class>eu.dnetlib.dhp.common.ResetHiveDbApplication</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory ${sparkExecutorMemory}
--executor-cores ${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
--conf spark.sql.shuffle.partitions=7680
</spark-opts>
<arg>--dbName</arg><arg>${outputGraph}</arg>
<arg>--hiveMetastoreUris</arg><arg>${hiveMetastoreUris}</arg>
</spark>
<ok to="fork_merge_claims"/>
<error to="Kill"/>
</action>
<fork name="fork_merge_claims"> <fork name="fork_merge_claims">
<path start="merge_claims_publication"/> <path start="merge_claims_publication"/>
@ -359,7 +406,6 @@
<path start="merge_claims_relation"/> <path start="merge_claims_relation"/>
</fork> </fork>
<action name="merge_claims_publication"> <action name="merge_claims_publication">
<spark xmlns="uri:oozie:spark-action:0.2"> <spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master> <master>yarn</master>
@ -375,12 +421,15 @@
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
--conf spark.sql.shuffle.partitions=7680 --conf spark.sql.shuffle.partitions=7680
</spark-opts> </spark-opts>
<arg>--rawGraphPath</arg><arg>${workingDir}/graph_raw</arg> <arg>--rawGraphPath</arg><arg>${workingDir}/graph_raw</arg>
<arg>--claimsGraphPath</arg><arg>${workingDir}/graph_claims</arg> <arg>--claimsGraphPath</arg><arg>${workingDir}/graph_claims</arg>
<arg>--outputRawGaphPath</arg><arg>${graphOutputPath}</arg> <arg>--outputGraph</arg><arg>${outputGraph}</arg>
<arg>--graphFormat</arg><arg>${graphFormat}</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Publication</arg> <arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Publication</arg>
<arg>--hiveMetastoreUris</arg><arg>${hiveMetastoreUris}</arg>
</spark> </spark>
<ok to="wait_merge"/> <ok to="wait_merge"/>
<error to="Kill"/> <error to="Kill"/>
@ -401,12 +450,15 @@
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
--conf spark.sql.shuffle.partitions=7680 --conf spark.sql.shuffle.partitions=7680
</spark-opts> </spark-opts>
<arg>--rawGraphPath</arg><arg>${workingDir}/graph_raw</arg> <arg>--rawGraphPath</arg><arg>${workingDir}/graph_raw</arg>
<arg>--claimsGraphPath</arg><arg>${workingDir}/graph_claims</arg> <arg>--claimsGraphPath</arg><arg>${workingDir}/graph_claims</arg>
<arg>--outputRawGaphPath</arg><arg>${graphOutputPath}</arg> <arg>--outputGraph</arg><arg>${outputGraph}</arg>
<arg>--graphFormat</arg><arg>${graphFormat}</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Dataset</arg> <arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Dataset</arg>
<arg>--hiveMetastoreUris</arg><arg>${hiveMetastoreUris}</arg>
</spark> </spark>
<ok to="wait_merge"/> <ok to="wait_merge"/>
<error to="Kill"/> <error to="Kill"/>
@ -427,12 +479,15 @@
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
--conf spark.sql.shuffle.partitions=3840 --conf spark.sql.shuffle.partitions=3840
</spark-opts> </spark-opts>
<arg>--rawGraphPath</arg><arg>${workingDir}/graph_raw</arg> <arg>--rawGraphPath</arg><arg>${workingDir}/graph_raw</arg>
<arg>--claimsGraphPath</arg><arg>${workingDir}/graph_claims</arg> <arg>--claimsGraphPath</arg><arg>${workingDir}/graph_claims</arg>
<arg>--outputRawGaphPath</arg><arg>${graphOutputPath}</arg> <arg>--outputGraph</arg><arg>${outputGraph}</arg>
<arg>--graphFormat</arg><arg>${graphFormat}</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Relation</arg> <arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Relation</arg>
<arg>--hiveMetastoreUris</arg><arg>${hiveMetastoreUris}</arg>
</spark> </spark>
<ok to="wait_merge"/> <ok to="wait_merge"/>
<error to="Kill"/> <error to="Kill"/>
@ -453,12 +508,15 @@
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
--conf spark.sql.shuffle.partitions=1920 --conf spark.sql.shuffle.partitions=1920
</spark-opts> </spark-opts>
<arg>--rawGraphPath</arg><arg>${workingDir}/graph_raw</arg> <arg>--rawGraphPath</arg><arg>${workingDir}/graph_raw</arg>
<arg>--claimsGraphPath</arg><arg>${workingDir}/graph_claims</arg> <arg>--claimsGraphPath</arg><arg>${workingDir}/graph_claims</arg>
<arg>--outputRawGaphPath</arg><arg>${graphOutputPath}</arg> <arg>--outputGraph</arg><arg>${outputGraph}</arg>
<arg>--graphFormat</arg><arg>${graphFormat}</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Software</arg> <arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Software</arg>
<arg>--hiveMetastoreUris</arg><arg>${hiveMetastoreUris}</arg>
</spark> </spark>
<ok to="wait_merge"/> <ok to="wait_merge"/>
<error to="Kill"/> <error to="Kill"/>
@ -479,12 +537,15 @@
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
--conf spark.sql.shuffle.partitions=1920 --conf spark.sql.shuffle.partitions=1920
</spark-opts> </spark-opts>
<arg>--rawGraphPath</arg><arg>${workingDir}/graph_raw</arg> <arg>--rawGraphPath</arg><arg>${workingDir}/graph_raw</arg>
<arg>--claimsGraphPath</arg><arg>${workingDir}/graph_claims</arg> <arg>--claimsGraphPath</arg><arg>${workingDir}/graph_claims</arg>
<arg>--outputRawGaphPath</arg><arg>${graphOutputPath}</arg> <arg>--outputGraph</arg><arg>${outputGraph}</arg>
<arg>--graphFormat</arg><arg>${graphFormat}</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.OtherResearchProduct</arg> <arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.OtherResearchProduct</arg>
<arg>--hiveMetastoreUris</arg><arg>${hiveMetastoreUris}</arg>
</spark> </spark>
<ok to="wait_merge"/> <ok to="wait_merge"/>
<error to="Kill"/> <error to="Kill"/>
@ -505,12 +566,15 @@
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
--conf spark.sql.shuffle.partitions=200 --conf spark.sql.shuffle.partitions=200
</spark-opts> </spark-opts>
<arg>--rawGraphPath</arg><arg>${workingDir}/graph_raw</arg> <arg>--rawGraphPath</arg><arg>${workingDir}/graph_raw</arg>
<arg>--claimsGraphPath</arg><arg>${workingDir}/graph_claims</arg> <arg>--claimsGraphPath</arg><arg>${workingDir}/graph_claims</arg>
<arg>--outputRawGaphPath</arg><arg>${graphOutputPath}</arg> <arg>--outputGraph</arg><arg>${outputGraph}</arg>
<arg>--graphFormat</arg><arg>${graphFormat}</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Datasource</arg> <arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Datasource</arg>
<arg>--hiveMetastoreUris</arg><arg>${hiveMetastoreUris}</arg>
</spark> </spark>
<ok to="wait_merge"/> <ok to="wait_merge"/>
<error to="Kill"/> <error to="Kill"/>
@ -531,12 +595,15 @@
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
--conf spark.sql.shuffle.partitions=200 --conf spark.sql.shuffle.partitions=200
</spark-opts> </spark-opts>
<arg>--rawGraphPath</arg><arg>${workingDir}/graph_raw</arg> <arg>--rawGraphPath</arg><arg>${workingDir}/graph_raw</arg>
<arg>--claimsGraphPath</arg><arg>${workingDir}/graph_claims</arg> <arg>--claimsGraphPath</arg><arg>${workingDir}/graph_claims</arg>
<arg>--outputRawGaphPath</arg><arg>${graphOutputPath}</arg> <arg>--outputGraph</arg><arg>${outputGraph}</arg>
<arg>--graphFormat</arg><arg>${graphFormat}</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Organization</arg> <arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Organization</arg>
<arg>--hiveMetastoreUris</arg><arg>${hiveMetastoreUris}</arg>
</spark> </spark>
<ok to="wait_merge"/> <ok to="wait_merge"/>
<error to="Kill"/> <error to="Kill"/>
@ -557,12 +624,15 @@
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
--conf spark.sql.shuffle.partitions=200 --conf spark.sql.shuffle.partitions=200
</spark-opts> </spark-opts>
<arg>--rawGraphPath</arg><arg>${workingDir}/graph_raw</arg> <arg>--rawGraphPath</arg><arg>${workingDir}/graph_raw</arg>
<arg>--claimsGraphPath</arg><arg>${workingDir}/graph_claims</arg> <arg>--claimsGraphPath</arg><arg>${workingDir}/graph_claims</arg>
<arg>--outputRawGaphPath</arg><arg>${graphOutputPath}</arg> <arg>--outputGraph</arg><arg>${outputGraph}</arg>
<arg>--graphFormat</arg><arg>${graphFormat}</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Project</arg> <arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Project</arg>
<arg>--hiveMetastoreUris</arg><arg>${hiveMetastoreUris}</arg>
</spark> </spark>
<ok to="wait_merge"/> <ok to="wait_merge"/>
<error to="Kill"/> <error to="Kill"/>

View File

@ -7,8 +7,6 @@ import static org.mockito.Mockito.lenient;
import java.io.IOException; import java.io.IOException;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream; import java.util.stream.Stream;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
@ -20,8 +18,12 @@ import org.mockito.junit.jupiter.MockitoExtension;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.oa.graph.raw.common.VocabularyGroup; import eu.dnetlib.dhp.common.CleaningRuleMap;
import eu.dnetlib.dhp.schema.oaf.*; import eu.dnetlib.dhp.common.OafCleaner;
import eu.dnetlib.dhp.common.VocabularyGroup;
import eu.dnetlib.dhp.schema.oaf.Publication;
import eu.dnetlib.dhp.schema.oaf.Qualifier;
import eu.dnetlib.dhp.schema.oaf.Result;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
@ -62,12 +64,10 @@ public class CleaningFunctionTest {
assertTrue(p_in instanceof Result); assertTrue(p_in instanceof Result);
assertTrue(p_in instanceof Publication); assertTrue(p_in instanceof Publication);
Publication p_out = OafCleaner.apply(CleanGraphSparkJob.fixVocabularyNames(p_in), mapping); Publication p_out = OafCleaner.apply(p_in, mapping);
assertNotNull(p_out); assertNotNull(p_out);
assertNotNull(p_out.getPublisher());
assertNull(p_out.getPublisher().getValue());
assertEquals("und", p_out.getLanguage().getClassid()); assertEquals("und", p_out.getLanguage().getClassid());
assertEquals("Undetermined", p_out.getLanguage().getClassname()); assertEquals("Undetermined", p_out.getLanguage().getClassname());
@ -90,16 +90,6 @@ public class CleaningFunctionTest {
Publication p_defaults = CleanGraphSparkJob.fixDefaults(p_out); Publication p_defaults = CleanGraphSparkJob.fixDefaults(p_out);
assertEquals("CLOSED", p_defaults.getBestaccessright().getClassid()); assertEquals("CLOSED", p_defaults.getBestaccessright().getClassid());
assertNull(p_out.getPublisher());
getAuthorPids(p_defaults).forEach(pid -> {
System.out
.println(
String
.format(
"%s [%s - %s]", pid.getValue(), pid.getQualifier().getClassid(),
pid.getQualifier().getClassname()));
});
// TODO add more assertions to verity the cleaned values // TODO add more assertions to verity the cleaned values
System.out.println(MAPPER.writeValueAsString(p_out)); System.out.println(MAPPER.writeValueAsString(p_out));
@ -109,7 +99,7 @@ public class CleaningFunctionTest {
*/ */
} }
private Stream<Qualifier> getAuthorPidTypes(Result pub) { private Stream<Qualifier> getAuthorPidTypes(Publication pub) {
return pub return pub
.getAuthor() .getAuthor()
.stream() .stream()
@ -118,14 +108,6 @@ public class CleaningFunctionTest {
.map(s -> s.getQualifier()); .map(s -> s.getQualifier());
} }
private Stream<StructuredProperty> getAuthorPids(Result pub) {
return pub
.getAuthor()
.stream()
.map(a -> a.getPid())
.flatMap(p -> p.stream());
}
private List<String> vocs() throws IOException { private List<String> vocs() throws IOException {
return IOUtils return IOUtils
.readLines(CleaningFunctionTest.class.getResourceAsStream("/eu/dnetlib/dhp/oa/graph/clean/terms.txt")); .readLines(CleaningFunctionTest.class.getResourceAsStream("/eu/dnetlib/dhp/oa/graph/clean/terms.txt"));

View File

@ -21,8 +21,8 @@ import org.mockito.junit.jupiter.MockitoExtension;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.common.VocabularyGroup;
import eu.dnetlib.dhp.oa.graph.clean.CleaningFunctionTest; import eu.dnetlib.dhp.oa.graph.clean.CleaningFunctionTest;
import eu.dnetlib.dhp.oa.graph.raw.common.VocabularyGroup;
import eu.dnetlib.dhp.schema.common.ModelConstants; import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.oaf.Author; import eu.dnetlib.dhp.schema.oaf.Author;
import eu.dnetlib.dhp.schema.oaf.Dataset; import eu.dnetlib.dhp.schema.oaf.Dataset;

View File

@ -27,8 +27,8 @@ import org.mockito.junit.jupiter.MockitoExtension;
import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils; import eu.dnetlib.dhp.common.OafMapperUtils;
import eu.dnetlib.dhp.oa.graph.raw.common.VocabularyGroup; import eu.dnetlib.dhp.common.VocabularyGroup;
import eu.dnetlib.dhp.schema.oaf.Datasource; import eu.dnetlib.dhp.schema.oaf.Datasource;
import eu.dnetlib.dhp.schema.oaf.Oaf; import eu.dnetlib.dhp.schema.oaf.Oaf;
import eu.dnetlib.dhp.schema.oaf.Organization; import eu.dnetlib.dhp.schema.oaf.Organization;

View File

@ -4,6 +4,7 @@ package eu.dnetlib.dhp.oa.provision;
import java.io.IOException; import java.io.IOException;
import java.nio.file.Files; import java.nio.file.Files;
import java.nio.file.Path; import java.nio.file.Path;
import java.util.List;
import org.apache.commons.io.FileUtils; import org.apache.commons.io.FileUtils;
import org.apache.spark.SparkConf; import org.apache.spark.SparkConf;
@ -20,8 +21,6 @@ import org.junit.jupiter.api.io.TempDir;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.oa.provision.model.ProvisionModelSupport; import eu.dnetlib.dhp.oa.provision.model.ProvisionModelSupport;
import eu.dnetlib.dhp.schema.oaf.Relation; import eu.dnetlib.dhp.schema.oaf.Relation;
@ -31,7 +30,8 @@ public class PrepareRelationsJobTest {
public static final String SUBRELTYPE = "subRelType"; public static final String SUBRELTYPE = "subRelType";
public static final String OUTCOME = "outcome"; public static final String OUTCOME = "outcome";
public static final String SUPPLEMENT = "supplement"; public static final String PARTICIPATION = "participation";
public static final String AFFILIATION = "affiliation";
private static SparkSession spark; private static SparkSession spark;
@ -64,7 +64,7 @@ public class PrepareRelationsJobTest {
@Test @Test
public void testRunPrepareRelationsJob(@TempDir Path testPath) throws Exception { public void testRunPrepareRelationsJob(@TempDir Path testPath) throws Exception {
final int maxRelations = 10; final int maxRelations = 20;
PrepareRelationsJob PrepareRelationsJob
.main( .main(
new String[] { new String[] {
@ -73,7 +73,8 @@ public class PrepareRelationsJobTest {
"-outputPath", testPath.toString(), "-outputPath", testPath.toString(),
"-relPartitions", "10", "-relPartitions", "10",
"-relationFilter", "asd", "-relationFilter", "asd",
"-maxRelations", String.valueOf(maxRelations) "-sourceMaxRelations", String.valueOf(maxRelations),
"-targetMaxRelations", String.valueOf(maxRelations * 100)
}); });
Dataset<Relation> out = spark Dataset<Relation> out = spark
@ -82,19 +83,31 @@ public class PrepareRelationsJobTest {
.as(Encoders.bean(Relation.class)) .as(Encoders.bean(Relation.class))
.cache(); .cache();
Assertions.assertEquals(10, out.count()); Assertions.assertEquals(maxRelations, out.count());
Dataset<Row> freq = out Dataset<Row> freq = out
.toDF() .toDF()
.cube(SUBRELTYPE) .cube(SUBRELTYPE)
.count() .count()
.filter((FilterFunction<Row>) value -> !value.isNullAt(0)); .filter((FilterFunction<Row>) value -> !value.isNullAt(0));
long outcome = freq.filter(freq.col(SUBRELTYPE).equalTo(OUTCOME)).collectAsList().get(0).getAs("count");
long supplement = freq.filter(freq.col(SUBRELTYPE).equalTo(SUPPLEMENT)).collectAsList().get(0).getAs("count");
Assertions.assertTrue(outcome > supplement); log.info(freq.collectAsList().toString());
long outcome = getRows(freq, OUTCOME).get(0).getAs("count");
long participation = getRows(freq, PARTICIPATION).get(0).getAs("count");
long affiliation = getRows(freq, AFFILIATION).get(0).getAs("count");
Assertions.assertTrue(participation == outcome);
Assertions.assertTrue(outcome > affiliation);
Assertions.assertTrue(participation > affiliation);
Assertions.assertEquals(7, outcome); Assertions.assertEquals(7, outcome);
Assertions.assertEquals(3, supplement); Assertions.assertEquals(7, participation);
Assertions.assertEquals(6, affiliation);
}
protected List<Row> getRows(Dataset<Row> freq, String col) {
return freq.filter(freq.col(SUBRELTYPE).equalTo(col)).collectAsList();
} }
} }

View File

@ -0,0 +1,32 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>dhp-workflows</artifactId>
<groupId>eu.dnetlib.dhp</groupId>
<version>1.2.4-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>dhp-workflows-common</artifactId>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
</dependency>
<dependency>
<groupId>eu.dnetlib.dhp</groupId>
<artifactId>dhp-common</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>eu.dnetlib.dhp</groupId>
<artifactId>dhp-schemas</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</project>

View File

@ -1,5 +1,5 @@
package eu.dnetlib.dhp.oa.graph.clean; package eu.dnetlib.dhp.common;
import java.io.Serializable; import java.io.Serializable;
import java.util.HashMap; import java.util.HashMap;
@ -7,7 +7,6 @@ import java.util.HashMap;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import eu.dnetlib.dhp.common.FunctionalInterfaceSupport.SerializableConsumer; import eu.dnetlib.dhp.common.FunctionalInterfaceSupport.SerializableConsumer;
import eu.dnetlib.dhp.oa.graph.raw.common.VocabularyGroup;
import eu.dnetlib.dhp.schema.common.ModelConstants; import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.oaf.Country; import eu.dnetlib.dhp.schema.oaf.Country;
import eu.dnetlib.dhp.schema.oaf.Qualifier; import eu.dnetlib.dhp.schema.oaf.Qualifier;

View File

@ -0,0 +1,10 @@
package eu.dnetlib.dhp.common;
public enum GraphFormat {
JSON, HIVE;
public static GraphFormat DEFAULT = JSON;
}

View File

@ -0,0 +1,99 @@
package eu.dnetlib.dhp.common;
import java.util.Objects;
import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.Oaf;
public class GraphSupport {
private static final Logger log = LoggerFactory.getLogger(GraphSupport.class);
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
public static <T extends Oaf> void deleteGraphTable(SparkSession spark, Class<T> clazz, String outputGraph,
GraphFormat graphFormat) {
switch (graphFormat) {
case JSON:
String outPath = outputGraph + "/" + clazz.getSimpleName().toLowerCase();
removeOutputDir(spark, outPath);
break;
case HIVE:
String table = ModelSupport.tableIdentifier(outputGraph, clazz);
String sql = String.format("DROP TABLE IF EXISTS %s PURGE", table);
log.info("running SQL: '{}'", sql);
spark.sql(sql);
break;
}
}
public static <T extends Oaf> void saveGraphTable(Dataset<T> dataset, Class<T> clazz, String outputGraph,
GraphFormat graphFormat) {
final DataFrameWriter<T> writer = dataset.write().mode(SaveMode.Overwrite);
switch (graphFormat) {
case JSON:
String type = clazz.getSimpleName().toLowerCase();
String outPath = outputGraph + "/" + type;
log.info("saving graph to path {},", outPath);
writer.option("compression", "gzip").json(outPath);
break;
case HIVE:
final String db_table = ModelSupport.tableIdentifier(outputGraph, clazz);
log.info("saving graph to '{}'", db_table);
writer.saveAsTable(db_table);
break;
}
}
public static <T extends Oaf> Dataset<T> readGraph(
SparkSession spark, String graph, Class<T> clazz, GraphFormat format) {
log.info("reading graph {}, format {}, class {}", graph, format, clazz);
Encoder<T> encoder = Encoders.bean(clazz);
switch (format) {
case JSON:
String path = graph + "/" + clazz.getSimpleName().toLowerCase();
log.info("reading path {}", path);
return spark
.read()
.textFile(path)
.map(
(MapFunction<String, T>) value -> OBJECT_MAPPER.readValue(value, clazz),
encoder)
.filter((FilterFunction<T>) value -> Objects.nonNull(ModelSupport.idFn().apply(value)));
case HIVE:
String table = ModelSupport.tableIdentifier(graph, clazz);
log.info("reading table {}", table);
return spark.table(table).as(encoder);
default:
throw new IllegalStateException(String.format("format not managed: '%s'", format));
}
}
public static <T extends Oaf> Dataset<T> readGraphJSON(SparkSession spark, String graph, Class<T> clazz) {
return readGraph(spark, graph, clazz, GraphFormat.JSON);
}
private static void removeOutputDir(SparkSession spark, String path) {
HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration());
}
}

View File

@ -1,5 +1,5 @@
package eu.dnetlib.dhp.oa.graph.clean; package eu.dnetlib.dhp.common;
import java.io.Serializable; import java.io.Serializable;
import java.lang.reflect.Field; import java.lang.reflect.Field;

View File

@ -1,11 +1,7 @@
package eu.dnetlib.dhp.oa.graph.raw.common; package eu.dnetlib.dhp.common;
import java.util.ArrayList; import java.util.*;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function; import java.util.function.Function;
import java.util.function.Predicate; import java.util.function.Predicate;
@ -13,15 +9,7 @@ import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import eu.dnetlib.dhp.schema.oaf.DataInfo; import eu.dnetlib.dhp.schema.oaf.*;
import eu.dnetlib.dhp.schema.oaf.ExtraInfo;
import eu.dnetlib.dhp.schema.oaf.Field;
import eu.dnetlib.dhp.schema.oaf.Journal;
import eu.dnetlib.dhp.schema.oaf.KeyValue;
import eu.dnetlib.dhp.schema.oaf.OAIProvenance;
import eu.dnetlib.dhp.schema.oaf.OriginDescription;
import eu.dnetlib.dhp.schema.oaf.Qualifier;
import eu.dnetlib.dhp.schema.oaf.StructuredProperty;
import eu.dnetlib.dhp.utils.DHPUtils; import eu.dnetlib.dhp.utils.DHPUtils;
public class OafMapperUtils { public class OafMapperUtils {

View File

@ -0,0 +1,60 @@
package eu.dnetlib.dhp.common;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession;
import java.util.Optional;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
public class ResetHiveDbApplication {
private static final Logger log = LoggerFactory.getLogger(ResetHiveDbApplication.class);
public static void main(final String[] args) throws Exception {
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
IOUtils
.toString(
ResetHiveDbApplication.class
.getResourceAsStream(
"/eu/dnetlib/dhp/common/reset_hive_db_parameters.json")));
parser.parseArgument(args);
Boolean isSparkSessionManaged = Optional
.ofNullable(parser.get("isSparkSessionManaged"))
.map(Boolean::valueOf)
.orElse(Boolean.TRUE);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
final String dbName = Optional
.ofNullable(parser.get("dbName"))
.orElseThrow(() -> new IllegalArgumentException("missing DB name"));
log.info("dbName: {}", dbName);
String hiveMetastoreUris = parser.get("hiveMetastoreUris");
log.info("hiveMetastoreUris: {}", hiveMetastoreUris);
SparkConf conf = new SparkConf();
conf.set("hive.metastore.uris", hiveMetastoreUris);
runWithSparkHiveSession(
conf,
isSparkSessionManaged,
spark -> {
runSQL(spark, String.format("DROP DATABASE IF EXISTS %s CASCADE", dbName));
runSQL(spark, String.format("CREATE DATABASE %s", dbName));
});
}
protected static void runSQL(SparkSession spark, String sql) {
log.info("running SQL '{}'", sql);
spark.sqlContext().sql(sql);
}
}

View File

@ -1,5 +1,5 @@
package eu.dnetlib.dhp.oa.graph.raw.common; package eu.dnetlib.dhp.common;
import java.io.Serializable; import java.io.Serializable;
import java.util.HashMap; import java.util.HashMap;

View File

@ -1,5 +1,5 @@
package eu.dnetlib.dhp.oa.graph.raw.common; package eu.dnetlib.dhp.common;
import java.io.Serializable; import java.io.Serializable;
import java.util.*; import java.util.*;

View File

@ -1,5 +1,5 @@
package eu.dnetlib.dhp.oa.graph.raw.common; package eu.dnetlib.dhp.common;
import java.io.Serializable; import java.io.Serializable;

View File

@ -0,0 +1,20 @@
[
{
"paramName": "issm",
"paramLongName": "isSparkSessionManaged",
"paramDescription": "when true will stop SparkSession after job execution",
"paramRequired": false
},
{
"paramName": "hmu",
"paramLongName": "hiveMetastoreUris",
"paramDescription": "the hive metastore uris",
"paramRequired": true
},
{
"paramName": "db",
"paramLongName": "dbName",
"paramDescription": "the graph db name",
"paramRequired": true
}
]

View File

@ -17,6 +17,7 @@
<modules> <modules>
<module>dhp-workflow-profiles</module> <module>dhp-workflow-profiles</module>
<module>dhp-workflows-common</module>
<module>dhp-aggregation</module> <module>dhp-aggregation</module>
<module>dhp-distcp</module> <module>dhp-distcp</module>
<module>dhp-actionmanager</module> <module>dhp-actionmanager</module>
@ -269,7 +270,7 @@
oozie.wf.application.path,projectVersion,oozie.use.system.libpath, oozie.wf.application.path,projectVersion,oozie.use.system.libpath,
oozieActionShareLibForSpark1,spark1YarnHistoryServerAddress,spark1EventLogDir, oozieActionShareLibForSpark1,spark1YarnHistoryServerAddress,spark1EventLogDir,
oozieActionShareLibForSpark2,spark2YarnHistoryServerAddress,spark2EventLogDir, oozieActionShareLibForSpark2,spark2YarnHistoryServerAddress,spark2EventLogDir,
sparkSqlWarehouseDir sparkSqlWarehouseDir,hiveMetastoreUris
</include> </include>
<includeSystemProperties>true</includeSystemProperties> <includeSystemProperties>true</includeSystemProperties>
<includePropertyKeysFromFiles> <includePropertyKeysFromFiles>