Compare commits

...

16 Commits

Author SHA1 Message Date
Claudio Atzori a5f23d8a4c WIP: materialize graph as Hive DB 4 years ago
Claudio Atzori cce21eafc2 WIP: materialize graph as Hive DB, configured spark actions to include hive support] 4 years ago
Claudio Atzori fde8738ed2 added hiveMetastoreUris parameter to default parameters 4 years ago
Claudio Atzori 3be0e5c2cd Merge branch 'master' into graph_db 4 years ago
Claudio Atzori f3ce97ecf9 WIP: materialize graph as Hive DB, mergeAggregatorGraphs [added workflow node to drop the DB] 4 years ago
Claudio Atzori 771bf8bcc4 WIP: materialize graph as Hive DB, mergeAggregatorGraphs 4 years ago
Claudio Atzori 0da1d2c0c9 introduced GraphFormat.DEFAULT, indicating a common value to be used across the workflows 4 years ago
Claudio Atzori 1fcc28968e integrated changes from master 4 years ago
Claudio Atzori da2f8af72d adjusted MergeClaimsApplication param specs 4 years ago
Claudio Atzori 4ff184973b code formatting 4 years ago
Claudio Atzori 9e594cf4c2 WIP: materialize graph as Hive DB, aggregator graph 4 years ago
Claudio Atzori ee0b2191f8 adjusted test assertions to reflect update ordering defined in SortableRelationKey 4 years ago
Claudio Atzori fd289d389c adjusted dedup stats block test assertions to reflect updated configuration 4 years ago
Claudio Atzori 2dbac631c9 WIP: factoring out utilities into dhp-workflows-common 4 years ago
Claudio Atzori 91811ab43a Merge branch 'master' into graph_db 4 years ago
Claudio Atzori b0b6c6bd47 WIP dhp-workflows-common 4 years ago

@ -195,10 +195,10 @@ public class SparkDedupTest implements Serializable {
.count();
assertEquals(3432, orgs_simrel);
assertEquals(7152, pubs_simrel);
assertEquals(344, sw_simrel);
assertEquals(6944, pubs_simrel);
assertEquals(318, sw_simrel);
assertEquals(458, ds_simrel);
assertEquals(6750, orp_simrel);
assertEquals(6746, orp_simrel);
}
@Test
@ -344,10 +344,10 @@ public class SparkDedupTest implements Serializable {
.count();
assertEquals(1276, orgs_mergerel);
assertEquals(1442, pubs_mergerel);
assertEquals(288, sw_mergerel);
assertEquals(1418, pubs_mergerel);
assertEquals(276, sw_mergerel);
assertEquals(472, ds_mergerel);
assertEquals(718, orp_mergerel);
assertEquals(716, orp_mergerel);
}
@Test
@ -391,8 +391,8 @@ public class SparkDedupTest implements Serializable {
.count();
assertEquals(82, orgs_deduprecord);
assertEquals(66, pubs_deduprecord);
assertEquals(51, sw_deduprecord);
assertEquals(65, pubs_deduprecord);
assertEquals(50, sw_deduprecord);
assertEquals(96, ds_deduprecord);
assertEquals(89, orp_deduprecord);
}
@ -473,11 +473,11 @@ public class SparkDedupTest implements Serializable {
.distinct()
.count();
assertEquals(897, publications);
assertEquals(896, publications);
assertEquals(835, organizations);
assertEquals(100, projects);
assertEquals(100, datasource);
assertEquals(200, softwares);
assertEquals(199, softwares);
assertEquals(388, dataset);
assertEquals(517, otherresearchproduct);
@ -533,7 +533,7 @@ public class SparkDedupTest implements Serializable {
long relations = jsc.textFile(testDedupGraphBasePath + "/relation").count();
assertEquals(4866, relations);
assertEquals(4828, relations);
// check deletedbyinference
final Dataset<Relation> mergeRels = spark

@ -168,10 +168,10 @@ public class SparkStatsTest implements Serializable {
.textFile(testOutputBasePath + "/" + testActionSetId + "/otherresearchproduct_blockstats")
.count();
assertEquals(121, orgs_blocks);
assertEquals(110, pubs_blocks);
assertEquals(21, sw_blocks);
assertEquals(67, ds_blocks);
assertEquals(55, orp_blocks);
assertEquals(549, orgs_blocks);
assertEquals(868, pubs_blocks);
assertEquals(473, sw_blocks);
assertEquals(523, ds_blocks);
assertEquals(756, orp_blocks);
}
}

@ -17,8 +17,7 @@
},
"pace" : {
"clustering" : [
{ "name" : "ngrampairs", "fields" : [ "title" ], "params" : { "max" : "1", "ngramLen" : "3"} },
{ "name" : "suffixprefix", "fields" : [ "title" ], "params" : { "max" : "1", "len" : "3" } },
{ "name" : "wordsStatsSuffixPrefixChain", "fields" : [ "title" ], "params" : { "mod" : "10" } },
{ "name" : "lowercase", "fields" : [ "doi" ], "params" : { } }
],
"decisionTree" : {

@ -17,8 +17,7 @@
},
"pace" : {
"clustering" : [
{ "name" : "ngrampairs", "fields" : [ "title" ], "params" : { "max" : "1", "ngramLen" : "3"} },
{ "name" : "suffixprefix", "fields" : [ "title" ], "params" : { "max" : "1", "len" : "3" } },
{ "name" : "wordsStatsSuffixPrefixChain", "fields" : [ "title" ], "params" : { "mod" : "10" } },
{ "name" : "lowercase", "fields" : [ "doi" ], "params" : { } }
],
"decisionTree" : {

@ -29,8 +29,7 @@
},
"pace": {
"clustering" : [
{ "name" : "ngrampairs", "fields" : [ "title" ], "params" : { "max" : "1", "ngramLen" : "3"} },
{ "name" : "suffixprefix", "fields" : [ "title" ], "params" : { "max" : "1", "len" : "3" } },
{ "name" : "wordsStatsSuffixPrefixChain", "fields" : [ "title" ], "params" : { "mod" : "10" } },
{ "name" : "lowercase", "fields" : [ "doi" ], "params" : { } }
],
"decisionTree": {

@ -17,8 +17,7 @@
},
"pace" : {
"clustering" : [
{ "name" : "ngrampairs", "fields" : [ "title" ], "params" : { "max" : "1", "ngramLen" : "3"} },
{ "name" : "suffixprefix", "fields" : [ "title" ], "params" : { "max" : "1", "len" : "3" } },
{ "name" : "wordsStatsSuffixPrefixChain", "fields" : [ "title" ], "params" : { "mod" : "10" } },
{ "name" : "lowercase", "fields" : [ "doi" ], "params" : { } }
],
"decisionTree": {

@ -71,6 +71,11 @@
<artifactId>dhp-schemas</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>eu.dnetlib.dhp</groupId>
<artifactId>dhp-workflows-common</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.jayway.jsonpath</groupId>

@ -1,9 +1,10 @@
package eu.dnetlib.dhp.oa.graph.clean;
import static eu.dnetlib.dhp.common.GraphSupport.*;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import java.io.BufferedInputStream;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;
@ -14,7 +15,6 @@ import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -22,10 +22,8 @@ import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.HdfsSupport;
import eu.dnetlib.dhp.common.*;
import eu.dnetlib.dhp.oa.graph.raw.AbstractMdRecordToOafMapper;
import eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils;
import eu.dnetlib.dhp.oa.graph.raw.common.VocabularyGroup;
import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.oaf.*;
import eu.dnetlib.dhp.utils.ISLookupClientFactory;
@ -53,50 +51,64 @@ public class CleanGraphSparkJob {
.orElse(Boolean.TRUE);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
String inputPath = parser.get("inputPath");
log.info("inputPath: {}", inputPath);
String inputGraph = parser.get("inputGraph");
log.info("inputGraph: {}", inputGraph);
String outputPath = parser.get("outputPath");
log.info("outputPath: {}", outputPath);
String outputGraph = parser.get("outputGraph");
log.info("outputGraph: {}", outputGraph);
String isLookupUrl = parser.get("isLookupUrl");
log.info("isLookupUrl: {}", isLookupUrl);
GraphFormat inputGraphFormat = Optional
.ofNullable(parser.get("inputGraphFormat"))
.map(GraphFormat::valueOf)
.orElse(GraphFormat.DEFAULT);
log.info("inputGraphFormat: {}", inputGraphFormat);
GraphFormat outputGraphFormat = Optional
.ofNullable(parser.get("outputGraphFormat"))
.map(GraphFormat::valueOf)
.orElse(GraphFormat.DEFAULT);
log.info("outputGraphFormat: {}", outputGraphFormat);
String isLookUpUrl = parser.get("isLookUpUrl");
log.info("isLookUpUrl: {}", isLookUpUrl);
String graphTableClassName = parser.get("graphTableClassName");
log.info("graphTableClassName: {}", graphTableClassName);
Class<? extends OafEntity> entityClazz = (Class<? extends OafEntity>) Class.forName(graphTableClassName);
String hiveMetastoreUris = parser.get("hiveMetastoreUris");
log.info("hiveMetastoreUris: {}", hiveMetastoreUris);
SparkConf conf = new SparkConf();
conf.set("hive.metastore.uris", hiveMetastoreUris);
final ISLookUpService isLookupService = ISLookupClientFactory.getLookUpService(isLookupUrl);
Class<? extends Oaf> clazz = (Class<? extends Oaf>) Class.forName(graphTableClassName);
final ISLookUpService isLookupService = ISLookupClientFactory.getLookUpService(isLookUpUrl);
final VocabularyGroup vocs = VocabularyGroup.loadVocsFromIS(isLookupService);
SparkConf conf = new SparkConf();
runWithSparkSession(
runWithSparkHiveSession(
conf,
isSparkSessionManaged,
spark -> {
removeOutputDir(spark, outputPath);
fixGraphTable(spark, vocs, inputPath, entityClazz, outputPath);
});
spark -> cleanGraphTable(spark, vocs, inputGraph, inputGraphFormat, outputGraph, outputGraphFormat, clazz));
}
private static <T extends Oaf> void fixGraphTable(
private static <T extends Oaf> void cleanGraphTable(
SparkSession spark,
VocabularyGroup vocs,
String inputPath,
Class<T> clazz,
String outputPath) {
String inputGraph,
GraphFormat inputGraphFormat,
String outputGraph,
GraphFormat outputGraphFormat,
Class<T> clazz) {
final CleaningRuleMap mapping = CleaningRuleMap.create(vocs);
readTableFromPath(spark, inputPath, clazz)
Dataset<T> cleaned = readGraph(spark, inputGraph, clazz, inputGraphFormat)
.map((MapFunction<T, T>) value -> fixVocabularyNames(value), Encoders.bean(clazz))
.map((MapFunction<T, T>) value -> OafCleaner.apply(value, mapping), Encoders.bean(clazz))
.map((MapFunction<T, T>) value -> fixDefaults(value), Encoders.bean(clazz))
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.json(outputPath);
.map((MapFunction<T, T>) value -> fixDefaults(value), Encoders.bean(clazz));
saveGraphTable(cleaned, clazz, outputGraph, outputGraphFormat);
}
protected static <T extends Oaf> T fixVocabularyNames(T value) {

@ -1,6 +1,9 @@
package eu.dnetlib.dhp.oa.graph.merge;
import static eu.dnetlib.dhp.common.GraphSupport.deleteGraphTable;
import static eu.dnetlib.dhp.common.GraphSupport.saveGraphTable;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import java.util.Objects;
@ -20,7 +23,8 @@ import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.HdfsSupport;
import eu.dnetlib.dhp.common.GraphFormat;
import eu.dnetlib.dhp.common.GraphSupport;
import eu.dnetlib.dhp.oa.graph.clean.CleanGraphSparkJob;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.*;
@ -35,8 +39,6 @@ public class MergeGraphSparkJob {
private static final Logger log = LoggerFactory.getLogger(CleanGraphSparkJob.class);
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private static final String PRIORITY_DEFAULT = "BETA"; // BETA | PROD
public static void main(String[] args) throws Exception {
@ -60,46 +62,61 @@ public class MergeGraphSparkJob {
.orElse(Boolean.TRUE);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
String betaInputPath = parser.get("betaInputPath");
log.info("betaInputPath: {}", betaInputPath);
String betaInputGraph = parser.get("betaInputGraph");
log.info("betaInputGraph: {}", betaInputGraph);
String prodInputGraph = parser.get("prodInputGraph");
log.info("prodInputGraph: {}", prodInputGraph);
String prodInputPath = parser.get("prodInputPath");
log.info("prodInputPath: {}", prodInputPath);
String outputGraph = parser.get("outputGraph");
log.info("outputGraph: {}", outputGraph);
String outputPath = parser.get("outputPath");
log.info("outputPath: {}", outputPath);
GraphFormat inputGraphFormat = Optional
.ofNullable(parser.get("inputGraphFormat"))
.map(GraphFormat::valueOf)
.orElse(GraphFormat.DEFAULT);
log.info("inputGraphFormat: {}", inputGraphFormat);
GraphFormat outputGraphFormat = Optional
.ofNullable(parser.get("outputGraphFormat"))
.map(GraphFormat::valueOf)
.orElse(GraphFormat.DEFAULT);
log.info("outputGraphFormat: {}", outputGraphFormat);
String graphTableClassName = parser.get("graphTableClassName");
log.info("graphTableClassName: {}", graphTableClassName);
Class<? extends OafEntity> entityClazz = (Class<? extends OafEntity>) Class.forName(graphTableClassName);
Class<? extends Oaf> clazz = (Class<? extends Oaf>) Class.forName(graphTableClassName);
String hiveMetastoreUris = parser.get("hiveMetastoreUris");
log.info("hiveMetastoreUris: {}", hiveMetastoreUris);
SparkConf conf = new SparkConf();
conf.set("hive.metastore.uris", hiveMetastoreUris);
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
conf.registerKryoClasses(ModelSupport.getOafModelClasses());
runWithSparkSession(
runWithSparkHiveSession(
conf,
isSparkSessionManaged,
spark -> {
removeOutputDir(spark, outputPath);
mergeGraphTable(spark, priority, betaInputPath, prodInputPath, entityClazz, entityClazz, outputPath);
});
spark -> mergeGraphTable(
spark, priority, betaInputGraph, clazz, prodInputGraph, clazz, outputGraph, inputGraphFormat,
outputGraphFormat));
}
private static <P extends Oaf, B extends Oaf> void mergeGraphTable(
SparkSession spark,
String priority,
String betaInputPath,
String prodInputPath,
Class<P> p_clazz,
String betaInputGraph,
Class<B> b_clazz,
String outputPath) {
String prodInputGraph,
Class<P> p_clazz,
String outputGraph, GraphFormat inputGraphFormat, GraphFormat outputGraphFormat) {
Dataset<Tuple2<String, B>> beta = readTableFromPath(spark, betaInputPath, b_clazz);
Dataset<Tuple2<String, P>> prod = readTableFromPath(spark, prodInputPath, p_clazz);
Dataset<Tuple2<String, B>> beta = readGraph(spark, betaInputGraph, b_clazz, inputGraphFormat);
Dataset<Tuple2<String, P>> prod = readGraph(spark, prodInputGraph, p_clazz, inputGraphFormat);
prod
Dataset<P> merged = prod
.joinWith(beta, prod.col("_1").equalTo(beta.col("_1")), "full_outer")
.map((MapFunction<Tuple2<Tuple2<String, P>, Tuple2<String, B>>, P>) value -> {
Optional<P> p = Optional.ofNullable(value._1()).map(Tuple2::_2);
@ -112,11 +129,9 @@ public class MergeGraphSparkJob {
return mergeWithPriorityToPROD(p, b);
}
}, Encoders.bean(p_clazz))
.filter((FilterFunction<P>) Objects::nonNull)
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.json(outputPath);
.filter((FilterFunction<P>) Objects::nonNull);
saveGraphTable(merged, p_clazz, outputGraph, outputGraphFormat);
}
private static <P extends Oaf, B extends Oaf> P mergeWithPriorityToPROD(Optional<P> p, Optional<B> b) {
@ -139,24 +154,16 @@ public class MergeGraphSparkJob {
return null;
}
private static <T extends Oaf> Dataset<Tuple2<String, T>> readTableFromPath(
SparkSession spark, String inputEntityPath, Class<T> clazz) {
log.info("Reading Graph table from: {}", inputEntityPath);
return spark
.read()
.textFile(inputEntityPath)
.map(
(MapFunction<String, Tuple2<String, T>>) value -> {
final T t = OBJECT_MAPPER.readValue(value, clazz);
final String id = ModelSupport.idFn().apply(t);
return new Tuple2<>(id, t);
},
Encoders.tuple(Encoders.STRING(), Encoders.kryo(clazz)));
}
private static <T extends Oaf> Dataset<Tuple2<String, T>> readGraph(
SparkSession spark, String inputGraph, Class<T> clazz, GraphFormat inputGraphFormat) {
private static void removeOutputDir(SparkSession spark, String path) {
HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration());
log.info("Reading Graph table from: {}", inputGraph);
return GraphSupport
.readGraph(spark, inputGraph, clazz, inputGraphFormat)
.map((MapFunction<T, Tuple2<String, T>>) t -> {
final String id = ModelSupport.idFn().apply(t);
return new Tuple2<>(id, t);
}, Encoders.tuple(Encoders.STRING(), Encoders.kryo(clazz)));
}
}

@ -1,7 +1,7 @@
package eu.dnetlib.dhp.oa.graph.raw;
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.*;
import static eu.dnetlib.dhp.common.OafMapperUtils.*;
import static eu.dnetlib.dhp.schema.common.ModelConstants.*;
import java.util.*;
@ -12,7 +12,7 @@ import org.dom4j.DocumentFactory;
import org.dom4j.DocumentHelper;
import org.dom4j.Node;
import eu.dnetlib.dhp.oa.graph.raw.common.VocabularyGroup;
import eu.dnetlib.dhp.common.VocabularyGroup;
import eu.dnetlib.dhp.schema.common.LicenseComparator;
import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.oaf.*;

@ -27,7 +27,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.HdfsSupport;
import eu.dnetlib.dhp.oa.graph.raw.common.VocabularyGroup;
import eu.dnetlib.dhp.common.VocabularyGroup;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.Dataset;
import eu.dnetlib.dhp.schema.oaf.Datasource;
@ -70,10 +70,10 @@ public class GenerateEntitiesApplication {
final String targetPath = parser.get("targetPath");
log.info("targetPath: {}", targetPath);
final String isLookupUrl = parser.get("isLookupUrl");
log.info("isLookupUrl: {}", isLookupUrl);
final String isLookUpUrl = parser.get("isLookUpUrl");
log.info("isLookUpUrl: {}", isLookUpUrl);
final ISLookUpService isLookupService = ISLookupClientFactory.getLookUpService(isLookupUrl);
final ISLookUpService isLookupService = ISLookupClientFactory.getLookUpService(isLookUpUrl);
final VocabularyGroup vocs = VocabularyGroup.loadVocsFromIS(isLookupService);
final SparkConf conf = new SparkConf();

@ -1,7 +1,8 @@
package eu.dnetlib.dhp.oa.graph.raw;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import static eu.dnetlib.dhp.common.GraphSupport.*;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession;
import java.util.Objects;
import java.util.Optional;
@ -9,29 +10,23 @@ import java.util.Optional;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.HdfsSupport;
import eu.dnetlib.dhp.common.GraphFormat;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.*;
import eu.dnetlib.dhp.schema.oaf.Oaf;
import scala.Tuple2;
public class MergeClaimsApplication {
private static final Logger log = LoggerFactory.getLogger(MergeClaimsApplication.class);
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
public static void main(final String[] args) throws Exception {
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
IOUtils
@ -53,49 +48,47 @@ public class MergeClaimsApplication {
final String claimsGraphPath = parser.get("claimsGraphPath");
log.info("claimsGraphPath: {}", claimsGraphPath);
final String outputRawGaphPath = parser.get("outputRawGaphPath");
log.info("outputRawGaphPath: {}", outputRawGaphPath);
final String outputGraph = parser.get("outputGraph");
log.info("outputGraph: {}", outputGraph);
final GraphFormat format = Optional
.ofNullable(parser.get("format"))
.map(GraphFormat::valueOf)
.orElse(GraphFormat.DEFAULT);
log.info("graphFormat: {}", format);
String graphTableClassName = parser.get("graphTableClassName");
log.info("graphTableClassName: {}", graphTableClassName);
Class<? extends Oaf> clazz = (Class<? extends Oaf>) Class.forName(graphTableClassName);
String hiveMetastoreUris = parser.get("hiveMetastoreUris");
log.info("hiveMetastoreUris: {}", hiveMetastoreUris);
SparkConf conf = new SparkConf();
conf.set("hive.metastore.uris", hiveMetastoreUris);
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
conf.registerKryoClasses(ModelSupport.getOafModelClasses());
runWithSparkSession(
runWithSparkHiveSession(
conf,
isSparkSessionManaged,
spark -> {
String type = clazz.getSimpleName().toLowerCase();
String rawPath = rawGraphPath + "/" + type;
String claimPath = claimsGraphPath + "/" + type;
String outPath = outputRawGaphPath + "/" + type;
removeOutputDir(spark, outPath);
mergeByType(spark, rawPath, claimPath, outPath, clazz);
});
spark -> mergeByType(spark, rawGraphPath, claimsGraphPath, outputGraph, format, clazz));
}
private static <T extends Oaf> void mergeByType(
SparkSession spark, String rawPath, String claimPath, String outPath, Class<T> clazz) {
Dataset<Tuple2<String, T>> raw = readFromPath(spark, rawPath, clazz)
SparkSession spark, String rawPath, String claimPath, String outputGraph, GraphFormat format, Class<T> clazz) {
Dataset<Tuple2<String, T>> raw = readGraphJSON(spark, rawPath, clazz)
.map(
(MapFunction<T, Tuple2<String, T>>) value -> new Tuple2<>(ModelSupport.idFn().apply(value), value),
Encoders.tuple(Encoders.STRING(), Encoders.kryo(clazz)));
final JavaSparkContext jsc = JavaSparkContext.fromSparkContext(spark.sparkContext());
Dataset<Tuple2<String, T>> claim = jsc
.broadcast(readFromPath(spark, claimPath, clazz))
.getValue()
Dataset<Tuple2<String, T>> claim = readGraphJSON(spark, claimPath, clazz)
.map(
(MapFunction<T, Tuple2<String, T>>) value -> new Tuple2<>(ModelSupport.idFn().apply(value), value),
Encoders.tuple(Encoders.STRING(), Encoders.kryo(clazz)));
raw
Dataset<T> merged = raw
.joinWith(claim, raw.col("_1").equalTo(claim.col("_1")), "full_outer")
.map(
(MapFunction<Tuple2<Tuple2<String, T>, Tuple2<String, T>>, T>) value -> {
@ -107,28 +100,9 @@ public class MergeClaimsApplication {
: opClaim.isPresent() ? opClaim.get()._2() : null;
},
Encoders.bean(clazz))
.filter(Objects::nonNull)
.map(
(MapFunction<T, String>) value -> OBJECT_MAPPER.writeValueAsString(value),
Encoders.STRING())
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.text(outPath);
}
.filter(Objects::nonNull);
private static <T extends Oaf> Dataset<T> readFromPath(
SparkSession spark, String path, Class<T> clazz) {
return spark
.read()
.textFile(path)
.map(
(MapFunction<String, T>) value -> OBJECT_MAPPER.readValue(value, clazz),
Encoders.bean(clazz))
.filter((FilterFunction<T>) value -> Objects.nonNull(ModelSupport.idFn().apply(value)));
saveGraphTable(merged, clazz, outputGraph, format);
}
private static void removeOutputDir(SparkSession spark, String path) {
HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration());
}
}

@ -1,15 +1,7 @@
package eu.dnetlib.dhp.oa.graph.raw;
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.asString;
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.createOpenaireId;
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.dataInfo;
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.field;
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.journal;
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.listFields;
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.listKeyValues;
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.qualifier;
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.structuredProperty;
import static eu.dnetlib.dhp.common.OafMapperUtils.*;
import static eu.dnetlib.dhp.schema.common.ModelConstants.DATASET_DEFAULT_RESULTTYPE;
import static eu.dnetlib.dhp.schema.common.ModelConstants.DATASOURCE_ORGANIZATION;
import static eu.dnetlib.dhp.schema.common.ModelConstants.DNET_PROVENANCE_ACTIONS;
@ -53,9 +45,9 @@ import org.slf4j.LoggerFactory;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.DbClient;
import eu.dnetlib.dhp.common.VocabularyGroup;
import eu.dnetlib.dhp.oa.graph.raw.common.AbstractMigrationApplication;
import eu.dnetlib.dhp.oa.graph.raw.common.VerifyNsPrefixPredicate;
import eu.dnetlib.dhp.oa.graph.raw.common.VocabularyGroup;
import eu.dnetlib.dhp.schema.oaf.Context;
import eu.dnetlib.dhp.schema.oaf.DataInfo;
import eu.dnetlib.dhp.schema.oaf.Dataset;

@ -1,9 +1,7 @@
package eu.dnetlib.dhp.oa.graph.raw;
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.createOpenaireId;
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.field;
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.structuredProperty;
import static eu.dnetlib.dhp.common.OafMapperUtils.*;
import static eu.dnetlib.dhp.schema.common.ModelConstants.*;
import java.util.ArrayList;
@ -18,7 +16,7 @@ import org.dom4j.Node;
import com.google.common.collect.Lists;
import eu.dnetlib.dhp.common.PacePerson;
import eu.dnetlib.dhp.oa.graph.raw.common.VocabularyGroup;
import eu.dnetlib.dhp.common.VocabularyGroup;
import eu.dnetlib.dhp.schema.oaf.Author;
import eu.dnetlib.dhp.schema.oaf.DataInfo;
import eu.dnetlib.dhp.schema.oaf.Field;

@ -1,9 +1,7 @@
package eu.dnetlib.dhp.oa.graph.raw;
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.createOpenaireId;
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.field;
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.structuredProperty;
import static eu.dnetlib.dhp.common.OafMapperUtils.*;
import static eu.dnetlib.dhp.schema.common.ModelConstants.*;
import java.util.ArrayList;
@ -19,7 +17,7 @@ import org.dom4j.Node;
import com.google.common.collect.Lists;
import eu.dnetlib.dhp.common.PacePerson;
import eu.dnetlib.dhp.oa.graph.raw.common.VocabularyGroup;
import eu.dnetlib.dhp.common.VocabularyGroup;
import eu.dnetlib.dhp.schema.oaf.Author;
import eu.dnetlib.dhp.schema.oaf.DataInfo;
import eu.dnetlib.dhp.schema.oaf.Field;

@ -15,4 +15,8 @@
<name>oozie.action.sharelib.for.spark</name>
<value>spark2</value>
</property>
<property>
<name>hiveMetastoreUris</name>
<value>thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083</value>
</property>
</configuration>

@ -2,18 +2,32 @@
<parameters>
<property>
<name>graphInputPath</name>
<description>the input path to read graph content</description>
<name>inputGraph</name>
<description>the input graph name (or path)</description>
</property>
<property>
<name>graphOutputPath</name>
<description>the target path to store cleaned graph</description>
<name>outputGraph</name>
<description>the output graph name (or path)</description>
</property>
<property>
<name>isLookupUrl</name>
<name>isLookUpUrl</name>
<description>the address of the lookUp service</description>
</property>
<property>
<name>inputGraphFormat</name>
<value>HIVE</value>
<description>the input graph data format</description>
</property>
<property>
<name>outputGraphFormat</name>
<value>HIVE</value>
<description>the output graph data format</description>
</property>
<property>
<name>hiveMetastoreUris</name>
<description>hive server metastore URIs</description>
</property>
<property>
<name>sparkDriverMemory</name>
<description>memory for driver process</description>
@ -48,14 +62,70 @@
<name>spark2EventLogDir</name>
<description>spark 2.* event log dir location</description>
</property>
<property>
<name>sparkSqlWarehouseDir</name>
<value>spark 2.* db directory location</value>
</property>
</parameters>
<start to="fork_clean_graph"/>
<global>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<configuration>
<property>
<name>mapreduce.job.queuename</name>
<value>${queueName}</value>
</property>
<property>
<name>oozie.launcher.mapred.job.queue.name</name>
<value>${oozieLauncherQueueName}</value>
</property>
<property>
<name>oozie.action.sharelib.for.spark</name>
<value>${oozieActionShareLibForSpark2}</value>
</property>
</configuration>
</global>
<start to="should_drop_db"/>
<kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<decision name="should_drop_db">
<switch>
<case to="fork_clean_graph">${wf:conf('outputGraphFormat') eq 'JSON'}</case>
<case to="reset_DB">${wf:conf('outputGraphFormat') eq 'HIVE'}</case>
<default to="reset_DB"/>
</switch>
</decision>
<action name="reset_DB">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>reset_DB</name>
<class>eu.dnetlib.dhp.common.ResetHiveDbApplication</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory ${sparkExecutorMemory}
--executor-cores ${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
--conf spark.sql.shuffle.partitions=7680
</spark-opts>
<arg>--dbName</arg><arg>${outputGraph}</arg>
<arg>--hiveMetastoreUris</arg><arg>${hiveMetastoreUris}</arg>
</spark>
<ok to="fork_clean_graph"/>
<error to="Kill"/>
</action>
<fork name="fork_clean_graph">
<path start="clean_publication"/>
<path start="clean_dataset"/>
@ -82,12 +152,16 @@
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
--conf spark.sql.shuffle.partitions=7680
</spark-opts>
<arg>--inputPath</arg><arg>${graphInputPath}/publication</arg>
<arg>--outputPath</arg><arg>${graphOutputPath}/publication</arg>
<arg>--inputGraph</arg><arg>${inputGraph}</arg>
<arg>--outputGraph</arg><arg>${outputGraph}</arg>
<arg>--inputGraphFormat</arg><arg>${inputGraphFormat}</arg>
<arg>--outputGraphFormat</arg><arg>${outputGraphFormat}</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Publication</arg>
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg>
<arg>--isLookUpUrl</arg><arg>${isLookUpUrl}</arg>
<arg>--hiveMetastoreUris</arg><arg>${hiveMetastoreUris}</arg>
</spark>
<ok to="wait_clean"/>
<error to="Kill"/>
@ -108,12 +182,16 @@
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
--conf spark.sql.shuffle.partitions=7680
</spark-opts>
<arg>--inputPath</arg><arg>${graphInputPath}/dataset</arg>
<arg>--outputPath</arg><arg>${graphOutputPath}/dataset</arg>
<arg>--inputGraph</arg><arg>${inputGraph}</arg>
<arg>--outputGraph</arg><arg>${outputGraph}</arg>
<arg>--inputGraphFormat</arg><arg>${inputGraphFormat}</arg>
<arg>--outputGraphFormat</arg><arg>${outputGraphFormat}</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Dataset</arg>
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg>
<arg>--isLookUpUrl</arg><arg>${isLookUpUrl}</arg>
<arg>--hiveMetastoreUris</arg><arg>${hiveMetastoreUris}</arg>
</spark>
<ok to="wait_clean"/>
<error to="Kill"/>
@ -134,12 +212,16 @@
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
--conf spark.sql.shuffle.partitions=7680
</spark-opts>
<arg>--inputPath</arg><arg>${graphInputPath}/otherresearchproduct</arg>
<arg>--outputPath</arg><arg>${graphOutputPath}/otherresearchproduct</arg>
<arg>--inputGraph</arg><arg>${inputGraph}</arg>
<arg>--outputGraph</arg><arg>${outputGraph}</arg>
<arg>--inputGraphFormat</arg><arg>${inputGraphFormat}</arg>
<arg>--outputGraphFormat</arg><arg>${outputGraphFormat}</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.OtherResearchProduct</arg>
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg>
<arg>--isLookUpUrl</arg><arg>${isLookUpUrl}</arg>
<arg>--hiveMetastoreUris</arg><arg>${hiveMetastoreUris}</arg>
</spark>
<ok to="wait_clean"/>
<error to="Kill"/>
@ -160,12 +242,16 @@
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
--conf spark.sql.shuffle.partitions=7680
</spark-opts>
<arg>--inputPath</arg><arg>${graphInputPath}/software</arg>
<arg>--outputPath</arg><arg>${graphOutputPath}/software</arg>
<arg>--inputGraph</arg><arg>${inputGraph}</arg>
<arg>--outputGraph</arg><arg>${outputGraph}</arg>
<arg>--inputGraphFormat</arg><arg>${inputGraphFormat}</arg>
<arg>--outputGraphFormat</arg><arg>${outputGraphFormat}</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Software</arg>
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg>
<arg>--isLookUpUrl</arg><arg>${isLookUpUrl}</arg>
<arg>--hiveMetastoreUris</arg><arg>${hiveMetastoreUris}</arg>
</spark>
<ok to="wait_clean"/>
<error to="Kill"/>
@ -186,12 +272,16 @@
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
--conf spark.sql.shuffle.partitions=7680
</spark-opts>
<arg>--inputPath</arg><arg>${graphInputPath}/datasource</arg>
<arg>--outputPath</arg><arg>${graphOutputPath}/datasource</arg>
<arg>--inputGraph</arg><arg>${inputGraph}</arg>
<arg>--outputGraph</arg><arg>${outputGraph}</arg>
<arg>--inputGraphFormat</arg><arg>${inputGraphFormat}</arg>
<arg>--outputGraphFormat</arg><arg>${outputGraphFormat}</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Datasource</arg>
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg>
<arg>--isLookUpUrl</arg><arg>${isLookUpUrl}</arg>
<arg>--hiveMetastoreUris</arg><arg>${hiveMetastoreUris}</arg>
</spark>
<ok to="wait_clean"/>
<error to="Kill"/>
@ -212,12 +302,16 @@
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
--conf spark.sql.shuffle.partitions=7680
</spark-opts>
<arg>--inputPath</arg><arg>${graphInputPath}/organization</arg>
<arg>--outputPath</arg><arg>${graphOutputPath}/organization</arg>
<arg>--inputGraph</arg><arg>${inputGraph}</arg>
<arg>--outputGraph</arg><arg>${outputGraph}</arg>
<arg>--inputGraphFormat</arg><arg>${inputGraphFormat}</arg>
<arg>--outputGraphFormat</arg><arg>${outputGraphFormat}</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Organization</arg>
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg>
<arg>--isLookUpUrl</arg><arg>${isLookUpUrl}</arg>
<arg>--hiveMetastoreUris</arg><arg>${hiveMetastoreUris}</arg>
</spark>
<ok to="wait_clean"/>
<error to="Kill"/>
@ -238,12 +332,16 @@
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
--conf spark.sql.shuffle.partitions=7680
</spark-opts>
<arg>--inputPath</arg><arg>${graphInputPath}/project</arg>
<arg>--outputPath</arg><arg>${graphOutputPath}/project</arg>
<arg>--inputGraph</arg><arg>${inputGraph}</arg>
<arg>--outputGraph</arg><arg>${outputGraph}</arg>
<arg>--inputGraphFormat</arg><arg>${inputGraphFormat}</arg>
<arg>--outputGraphFormat</arg><arg>${outputGraphFormat}</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Project</arg>
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg>
<arg>--isLookUpUrl</arg><arg>${isLookUpUrl}</arg>
<arg>--hiveMetastoreUris</arg><arg>${hiveMetastoreUris}</arg>
</spark>
<ok to="wait_clean"/>
<error to="Kill"/>
@ -264,12 +362,16 @@
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
--conf spark.sql.shuffle.partitions=7680
</spark-opts>
<arg>--inputPath</arg><arg>${graphInputPath}/relation</arg>
<arg>--outputPath</arg><arg>${graphOutputPath}/relation</arg>
<arg>--inputGraph</arg><arg>${inputGraph}</arg>
<arg>--outputGraph</arg><arg>${outputGraph}</arg>
<arg>--inputGraphFormat</arg><arg>${inputGraphFormat}</arg>
<arg>--outputGraphFormat</arg><arg>${outputGraphFormat}</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Relation</arg>
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg>
<arg>--isLookUpUrl</arg><arg>${isLookUpUrl}</arg>
<arg>--hiveMetastoreUris</arg><arg>${hiveMetastoreUris}</arg>
</spark>
<ok to="wait_clean"/>
<error to="Kill"/>

@ -19,7 +19,7 @@
},
{
"paramName": "isu",
"paramLongName": "isLookupUrl",
"paramLongName": "isLookUpUrl",
"paramDescription": "the url of the ISLookupService",
"paramRequired": true
}

@ -11,6 +11,10 @@
<name>oozie.use.system.libpath</name>
<value>true</value>
</property>
<property>
<name>oozie.action.sharelib.for.spark</name>
<value>spark2</value>
</property>
<property>
<name>hiveMetastoreUris</name>
<value>thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083</value>
@ -19,8 +23,4 @@
<name>hiveJdbcUrl</name>
<value>jdbc:hive2://iis-cdh5-test-m3.ocean.icm.edu.pl:10000</value>
</property>
<property>
<name>hiveDbName</name>
<value>openaire</value>
</property>
</configuration>

@ -51,6 +51,10 @@
<name>spark2EventLogDir</name>
<description>spark 2.* event log dir location</description>
</property>
<property>
<name>sparkSqlWarehouseDir</name>
<value>spark 2.* db directory location</value>
</property>
</parameters>
<global>

@ -6,20 +6,32 @@
"paramRequired": false
},
{
"paramName": "in",
"paramLongName": "inputPath",
"paramDescription": "the path to the graph data dump to read",
"paramName": "ig",
"paramLongName": "inputGraph",
"paramDescription": "the input graph name (or path)",
"paramRequired": true
},
{
"paramName": "out",
"paramLongName": "outputPath",
"paramDescription": "the path to store the output graph",
"paramName": "og",
"paramLongName": "outputGraph",
"paramDescription": "the output graph name (or path)",
"paramRequired": true
},
{
"paramName": "igf",
"paramLongName": "inputGraphFormat",
"paramDescription": "the input graph data format",
"paramRequired": true
},
{
"paramName": "ogf",
"paramLongName": "outputGraphFormat",
"paramDescription": "the output graph data format",
"paramRequired": true
},
{
"paramName": "isu",
"paramLongName": "isLookupUrl",
"paramLongName": "isLookUpUrl",
"paramDescription": "url to the ISLookup Service",
"paramRequired": true
},
@ -28,5 +40,11 @@
"paramLongName": "graphTableClassName",
"paramDescription": "class name moelling the graph table",
"paramRequired": true
},
{
"paramName": "hmu",
"paramLongName": "hiveMetastoreUris",
"paramDescription": "the hive metastore uris",
"paramRequired": true
}
]

@ -2,22 +2,36 @@
<parameters>
<property>
<name>betaInputGgraphPath</name>
<description>the beta graph root path</description>
<name>betaInputGraph</name>
<description>the beta graph name (or path)</description>
</property>
<property>
<name>prodInputGgraphPath</name>
<description>the production graph root path</description>
<name>prodInputGraph</name>
<description>the production graph name (or path)</description>
</property>
<property>
<name>graphOutputPath</name>
<description>the output merged graph root path</description>
<name>outputGraph</name>
<description>the merged graph name (or path)</description>
</property>
<property>
<name>priority</name>
<description>decides from which infrastructure the content must win in case of ID clash</description>
</property>
<property>
<name>inputGraphFormat</name>
<value>HIVE</value>
<description>the input graph data format</description>
</property>
<property>
<name>outputGraphFormat</name>
<value>HIVE</value>
<description>the output graph data format</description>
</property>
<property>
<name>hiveMetastoreUris</name>
<description>hive server metastore URIs</description>
</property>
<property>
<name>sparkDriverMemory</name>
<description>memory for driver process</description>
@ -52,14 +66,51 @@
<name>spark2EventLogDir</name>
<description>spark 2.* event log dir location</description>
</property>
<property>
<name>sparkSqlWarehouseDir</name>
<value>spark 2.* db directory location</value>
</property>
</parameters>
<start to="fork_merge_graph"/>
<start to="should_drop_db"/>
<kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<decision name="should_drop_db">
<switch>
<case to="fork_merge_graph">${wf:conf('outputGraphFormat') eq 'JSON'}</case>
<case to="reset_DB">${wf:conf('outputGraphFormat') eq 'HIVE'}</case>
<default to="reset_DB"/>
</switch>
</decision>
<action name="reset_DB">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>reset_DB</name>
<class>eu.dnetlib.dhp.common.ResetHiveDbApplication</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory ${sparkExecutorMemory}
--executor-cores ${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
--conf spark.sql.shuffle.partitions=7680
</spark-opts>
<arg>--dbName</arg><arg>${outputGraph}</arg>
<arg>--hiveMetastoreUris</arg><arg>${hiveMetastoreUris}</arg>
</spark>
<ok to="fork_merge_graph"/>
<error to="Kill"/>
</action>
<fork name="fork_merge_graph">
<path start="merge_publication"/>
<path start="merge_dataset"/>
@ -86,13 +137,17 @@
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
--conf spark.sql.shuffle.partitions=7680
</spark-opts>
<arg>--betaInputPath</arg><arg>${betaInputGgraphPath}/publication</arg>
<arg>--prodInputPath</arg><arg>${prodInputGgraphPath}/publication</arg>
<arg>--outputPath</arg><arg>${graphOutputPath}/publication</arg>
<arg>--betaInputGraph</arg><arg>${betaInputGraph}</arg>
<arg>--prodInputGraph</arg><arg>${prodInputGraph}</arg>
<arg>--outputGraph</arg><arg>${outputGraph}</arg>
<arg>--inputGraphFormat</arg><arg>${inputGraphFormat}</arg>
<arg>--outputGraphFormat</arg><arg>${outputGraphFormat}</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Publication</arg>
<arg>--priority</arg><arg>${priority}</arg>
<arg>--hiveMetastoreUris</arg><arg>${hiveMetastoreUris}</arg>
</spark>
<ok to="wait_merge"/>
<error to="Kill"/>
@ -113,13 +168,17 @@
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
--conf spark.sql.shuffle.partitions=7680
</spark-opts>
<arg>--betaInputPath</arg><arg>${betaInputGgraphPath}/dataset</arg>
<arg>--prodInputPath</arg><arg>${prodInputGgraphPath}/dataset</arg>
<arg>--outputPath</arg><arg>${graphOutputPath}/dataset</arg>
<arg>--betaInputGraph</arg><arg>${betaInputGraph}</arg>
<arg>--prodInputGraph</arg><arg>${prodInputGraph}</arg>
<arg>--outputGraph</arg><arg>${outputGraph}</arg>
<arg>--inputGraphFormat</arg><arg>${inputGraphFormat}</arg>
<arg>--outputGraphFormat</arg><arg>${outputGraphFormat}</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Dataset</arg>
<arg>--priority</arg><arg>${priority}</arg>
<arg>--hiveMetastoreUris</arg><arg>${hiveMetastoreUris}</arg>
</spark>
<ok to="wait_merge"/>
<error to="Kill"/>
@ -140,13 +199,17 @@
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
--conf spark.sql.shuffle.partitions=7680
</spark-opts>
<arg>--betaInputPath</arg><arg>${betaInputGgraphPath}/otherresearchproduct</arg>
<arg>--prodInputPath</arg><arg>${prodInputGgraphPath}/otherresearchproduct</arg>
<arg>--outputPath</arg><arg>${graphOutputPath}/otherresearchproduct</arg>
<arg>--betaInputGraph</arg><arg>${betaInputGraph}</arg>
<arg>--prodInputGraph</arg><arg>${prodInputGraph}</arg>
<arg>--outputGraph</arg><arg>${outputGraph}</arg>
<arg>--inputGraphFormat</arg><arg>${inputGraphFormat}</arg>
<arg>--outputGraphFormat</arg><arg>${outputGraphFormat}</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.OtherResearchProduct</arg>
<arg>--priority</arg><arg>${priority}</arg>
<arg>--hiveMetastoreUris</arg><arg>${hiveMetastoreUris}</arg>
</spark>
<ok to="wait_merge"/>
<error to="Kill"/>
@ -167,13 +230,17 @@
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
--conf spark.sql.shuffle.partitions=7680
</spark-opts>
<arg>--betaInputPath</arg><arg>${betaInputGgraphPath}/software</arg>
<arg>--prodInputPath</arg><arg>${prodInputGgraphPath}/software</arg>
<arg>--outputPath</arg><arg>${graphOutputPath}/software</arg>
<arg>--betaInputGraph</arg><arg>${betaInputGraph}</arg>
<arg>--prodInputGraph</arg><arg>${prodInputGraph}</arg>
<arg>--outputGraph</arg><arg>${outputGraph}</arg>
<arg>--inputGraphFormat</arg><arg>${inputGraphFormat}</arg>
<arg>--outputGraphFormat</arg><arg>${outputGraphFormat}</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Software</arg>
<arg>--priority</arg><arg>${priority}</arg>
<arg>--hiveMetastoreUris</arg><arg>${hiveMetastoreUris}</arg>
</spark>
<ok to="wait_merge"/>
<error to="Kill"/>
@ -194,13 +261,17 @@
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
--conf spark.sql.shuffle.partitions=7680
</spark-opts>
<arg>--betaInputPath</arg><arg>${betaInputGgraphPath}/datasource</arg>
<arg>--prodInputPath</arg><arg>${prodInputGgraphPath}/datasource</arg>
<arg>--outputPath</arg><arg>${graphOutputPath}/datasource</arg>
<arg>--betaInputGraph</arg><arg>${betaInputGraph}</arg>
<arg>--prodInputGraph</arg><arg>${prodInputGraph}</arg>
<arg>--outputGraph</arg><arg>${outputGraph}</arg>
<arg>--inputGraphFormat</arg><arg>${inputGraphFormat}</arg>
<arg>--outputGraphFormat</arg><arg>${outputGraphFormat}</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Datasource</arg>
<arg>--priority</arg><arg>${priority}</arg>
<arg>--hiveMetastoreUris</arg><arg>${hiveMetastoreUris}</arg>
</spark>
<ok to="wait_merge"/>
<error to="Kill"/>
@ -221,13 +292,17 @@
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
--conf spark.sql.shuffle.partitions=7680
</spark-opts>
<arg>--betaInputPath</arg><arg>${betaInputGgraphPath}/organization</arg>
<arg>--prodInputPath</arg><arg>${prodInputGgraphPath}/organization</arg>
<arg>--outputPath</arg><arg>${graphOutputPath}/organization</arg>
<arg>--betaInputGraph</arg><arg>${betaInputGraph}</arg>
<arg>--prodInputGraph</arg><arg>${prodInputGraph}</arg>
<arg>--outputGraph</arg><arg>${outputGraph}</arg>
<arg>--inputGraphFormat</arg><arg>${inputGraphFormat}</arg>
<arg>--outputGraphFormat</arg><arg>${outputGraphFormat}</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Organization</arg>
<arg>--priority</arg><arg>${priority}</arg>
<arg>--hiveMetastoreUris</arg><arg>${hiveMetastoreUris}</arg>
</spark>
<ok to="wait_merge"/>
<error to="Kill"/>
@ -248,13 +323,17 @@
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
--conf spark.sql.shuffle.partitions=7680
</spark-opts>
<arg>--betaInputPath</arg><arg>${betaInputGgraphPath}/project</arg>
<arg>--prodInputPath</arg><arg>${prodInputGgraphPath}/project</arg>
<arg>--outputPath</arg><arg>${graphOutputPath}/project</arg>
<arg>--betaInputGraph</arg><arg>${betaInputGraph}</arg>
<arg>--prodInputGraph</arg><arg>${prodInputGraph}</arg>
<arg>--outputGraph</arg><arg>${outputGraph}</arg>
<arg>--inputGraphFormat</arg><arg>${inputGraphFormat}</arg>
<arg>--outputGraphFormat</arg><arg>${outputGraphFormat}</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Project</arg>
<arg>--priority</arg><arg>${priority}</arg>
<arg>--hiveMetastoreUris</arg><arg>${hiveMetastoreUris}</arg>
</spark>
<ok to="wait_merge"/>
<error to="Kill"/>
@ -275,13 +354,17 @@
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
--conf spark.sql.shuffle.partitions=7680
</spark-opts>
<arg>--betaInputPath</arg><arg>${betaInputGgraphPath}/relation</arg>
<arg>--prodInputPath</arg><arg>${prodInputGgraphPath}/relation</arg>
<arg>--outputPath</arg><arg>${graphOutputPath}/relation</arg>
<arg>--betaInputGraph</arg><arg>${betaInputGraph}</arg>
<arg>--prodInputGraph</arg><arg>${prodInputGraph}</arg>
<arg>--outputGraph</arg><arg>${outputGraph}</arg>
<arg>--inputGraphFormat</arg><arg>${inputGraphFormat}</arg>
<arg>--outputGraphFormat</arg><arg>${outputGraphFormat}</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Relation</arg>
<arg>--priority</arg><arg>${priority}</arg>
<arg>--hiveMetastoreUris</arg><arg>${hiveMetastoreUris}</arg>
</spark>
<ok to="wait_merge"/>
<error to="Kill"/>

@ -6,21 +6,27 @@
"paramRequired": false
},
{
"paramName": "rgp",
"paramLongName": "rawGraphPath",
"paramDescription": "the raw graph path",
"paramName": "og",
"paramLongName": "outputGraph",
"paramDescription": "the output graph (db name | path)",
"paramRequired": true
},
{
"paramName": "gf",
"paramLongName": "graphFormat",
"paramDescription": "graph save format (json|parquet)",
"paramRequired": true
},
{
"paramName": "cgp",
"paramLongName": "claimsGraphPath",
"paramDescription": "the path of the claims graph",
"paramDescription": "the path of the claims graph, from the working directory",
"paramRequired": true
},
{
"paramName": "ogp",
"paramLongName": "outputRawGaphPath",
"paramDescription": "the path of output graph, combining raw and claims",
"paramName": "rgp",
"paramLongName": "rawGraphPath",
"paramDescription": "the path of raw graph, from the working directory",
"paramRequired": true
},
{
@ -28,5 +34,11 @@
"paramLongName": "graphTableClassName",
"paramDescription": "class name associated to the input entity path",
"paramRequired": true
},
{
"paramName": "hmu",
"paramLongName": "hiveMetastoreUris",
"paramDescription": "the hive metastore uris",
"paramRequired": true
}
]

@ -7,20 +7,32 @@
},
{
"paramName": "bin",
"paramLongName": "betaInputPath",
"paramDescription": "the beta graph root path",
"paramLongName": "betaInputGraph",
"paramDescription": "the beta graph name (or path)",
"paramRequired": true
},
{
"paramName": "pin",
"paramLongName": "prodInputPath",
"paramDescription": "the production graph root path",
"paramLongName": "prodInputGraph",
"paramDescription": "the production graph name (or path)",
"paramRequired": true
},
{
"paramName": "out",
"paramLongName": "outputPath",
"paramDescription": "the output merged graph root path",
"paramLongName": "outputGraph",
"paramDescription": "the output merged graph (or path)",
"paramRequired": true
},
{
"paramName": "igf",
"paramLongName": "inputGraphFormat",
"paramDescription": "the format of the input graphs",
"paramRequired": true
},
{
"paramName": "ogf",
"paramLongName": "outputGraphFormat",
"paramDescription": "the format of the output merged graph",
"paramRequired": true
},
{
@ -34,5 +46,11 @@
"paramLongName": "priority",
"paramDescription": "decides from which infrastructure the content must win in case of ID clash",
"paramRequired": false
},
{
"paramName": "hmu",
"paramLongName": "hiveMetastoreUris",
"paramDescription": "the hive metastore uris",
"paramRequired": true
}
]

@ -2,8 +2,13 @@
<parameters>
<property>
<name>graphOutputPath</name>
<description>the target path to store raw graph</description>
<name>outputGraph</name>
<description>the target graph name (or path)</description>
</property>
<property>
<name>graphFormat</name>
<value>HIVE</value>
<description>the graph data format</description>
</property>
<property>
<name>reuseContent</name>
@ -40,7 +45,7 @@
<description>mongo database</description>
</property>
<property>
<name>isLookupUrl</name>
<name>isLookUpUrl</name>
<description>the address of the lookUp service</description>
</property>
<property>
@ -48,6 +53,11 @@
<value></value>
<description>a blacklist of nsprefixes (comma separeted)</description>
</property>
<property>
<name>hiveMetastoreUris</name>
<description>hive server metastore URIs</description>
</property>
<property>
<name>sparkDriverMemory</name>
<description>memory for driver process</description>
@ -82,6 +92,10 @@
<name>spark2EventLogDir</name>
<description>spark 2.* event log dir location</description>
</property>
<property>
<name>sparkSqlWarehouseDir</name>
<value>spark 2.* db directory location</value>
</property>
</parameters>
<global>
@ -132,7 +146,7 @@
<arg>--postgresUrl</arg><arg>${postgresURL}</arg>
<arg>--postgresUser</arg><arg>${postgresUser}</arg>
<arg>--postgresPassword</arg><arg>${postgresPassword}</arg>
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg>
<arg>--isLookUpUrl</arg><arg>${isLookUpUrl}</arg>
<arg>--action</arg><arg>claims</arg>
<arg>--dbschema</arg><arg>${dbSchema}</arg>
<arg>--nsPrefixBlacklist</arg><arg>${nsPrefixBlacklist}</arg>
@ -185,7 +199,7 @@
<arg>--postgresUrl</arg><arg>${postgresURL}</arg>
<arg>--postgresUser</arg><arg>${postgresUser}</arg>
<arg>--postgresPassword</arg><arg>${postgresPassword}</arg>
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg>
<arg>--isLookUpUrl</arg><arg>${isLookUpUrl}</arg>
<arg>--dbschema</arg><arg>${dbSchema}</arg>
<arg>--nsPrefixBlacklist</arg><arg>${nsPrefixBlacklist}</arg>
</java>
@ -269,7 +283,7 @@
</spark-opts>
<arg>--sourcePaths</arg><arg>${contentPath}/db_claims,${contentPath}/oaf_claims,${contentPath}/odf_claims,${contentPath}/oaf_records_invisible</arg>
<arg>--targetPath</arg><arg>${workingDir}/entities_claim</arg>
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg>
<arg>--isLookUpUrl</arg><arg>${isLookUpUrl}</arg>
</spark>
<ok to="GenerateGraph_claims"/>
<error to="Kill"/>
@ -316,7 +330,7 @@
</spark-opts>
<arg>--sourcePaths</arg><arg>${contentPath}/db_records,${contentPath}/oaf_records,${contentPath}/odf_records</arg>
<arg>--targetPath</arg><arg>${workingDir}/entities</arg>
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg>
<arg>--isLookUpUrl</arg><arg>${isLookUpUrl}</arg>
</spark>
<ok to="GenerateGraph"/>
<error to="Kill"/>
@ -346,7 +360,40 @@
<error to="Kill"/>
</action>
<join name="wait_graphs" to="fork_merge_claims"/>
<join name="wait_graphs" to="should_drop_db"/>
<decision name="should_drop_db">
<switch>
<case to="fork_merge_claims">${wf:conf('graphFormat') eq 'JSON'}</case>
<case to="reset_DB">${wf:conf('graphFormat') eq 'HIVE'}</case>
<default to="reset_DB"/>
</switch>
</decision>
<action name="reset_DB">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>reset_DB</name>
<class>eu.dnetlib.dhp.common.ResetHiveDbApplication</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory ${sparkExecutorMemory}
--executor-cores ${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
--conf spark.sql.shuffle.partitions=7680
</spark-opts>
<arg>--dbName</arg><arg>${outputGraph}</arg>
<arg>--hiveMetastoreUris</arg><arg>${hiveMetastoreUris}</arg>
</spark>
<ok to="fork_merge_claims"/>
<error to="Kill"/>
</action>
<fork name="fork_merge_claims">
<path start="merge_claims_publication"/>
@ -359,7 +406,6 @@
<path start="merge_claims_relation"/>
</fork>
<action name="merge_claims_publication">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
@ -375,12 +421,15 @@
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
--conf spark.sql.shuffle.partitions=7680
</spark-opts>
<arg>--rawGraphPath</arg><arg>${workingDir}/graph_raw</arg>
<arg>--claimsGraphPath</arg><arg>${workingDir}/graph_claims</arg>
<arg>--outputRawGaphPath</arg><arg>${graphOutputPath}</arg>
<arg>--outputGraph</arg><arg>${outputGraph}</arg>
<arg>--graphFormat</arg><arg>${graphFormat}</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Publication</arg>
<arg>--hiveMetastoreUris</arg><arg>${hiveMetastoreUris}</arg>
</spark>
<ok to="wait_merge"/>
<error to="Kill"/>
@ -401,12 +450,15 @@
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
--conf spark.sql.shuffle.partitions=7680
</spark-opts>
<arg>--rawGraphPath</arg><arg>${workingDir}/graph_raw</arg>
<arg>--claimsGraphPath</arg><arg>${workingDir}/graph_claims</arg>
<arg>--outputRawGaphPath</arg><arg>${graphOutputPath}</arg>
<arg>--outputGraph</arg><arg>${outputGraph}</arg>
<arg>--graphFormat</arg><arg>${graphFormat}</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Dataset</arg>
<arg>--hiveMetastoreUris</arg><arg>${hiveMetastoreUris}</arg>
</spark>
<ok to="wait_merge"/>
<error to="Kill"/>
@ -427,12 +479,15 @@
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
--conf spark.sql.shuffle.partitions=3840
</spark-opts>
<arg>--rawGraphPath</arg><arg>${workingDir}/graph_raw</arg>
<arg>--claimsGraphPath</arg><arg>${workingDir}/graph_claims</arg>
<arg>--outputRawGaphPath</arg><arg>${graphOutputPath}</arg>
<arg>--outputGraph</arg><arg>${outputGraph}</arg>
<arg>--graphFormat</arg><arg>${graphFormat}</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Relation</arg>
<arg>--hiveMetastoreUris</arg><arg>${hiveMetastoreUris}</arg>
</spark>
<ok to="wait_merge"/>
<error to="Kill"/>
@ -453,12 +508,15 @@
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
--conf spark.sql.shuffle.partitions=1920
</spark-opts>
<arg>--rawGraphPath</arg><arg>${workingDir}/graph_raw</arg>
<arg>--claimsGraphPath</arg><arg>${workingDir}/graph_claims</arg>
<arg>--outputRawGaphPath</arg><arg>${graphOutputPath}</arg>
<arg>--outputGraph</arg><arg>${outputGraph}</arg>
<arg>--graphFormat</arg><arg>${graphFormat}</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Software</arg>
<arg>--hiveMetastoreUris</arg><arg>${hiveMetastoreUris}</arg>
</spark>
<ok to="wait_merge"/>
<error to="Kill"/>
@ -479,12 +537,15 @@
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
--conf spark.sql.shuffle.partitions=1920
</spark-opts>
<arg>--rawGraphPath</arg><arg>${workingDir}/graph_raw</arg>
<arg>--claimsGraphPath</arg><arg>${workingDir}/graph_claims</arg>
<arg>--outputRawGaphPath</arg><arg>${graphOutputPath}</arg>
<arg>--outputGraph</arg><arg>${outputGraph}</arg>
<arg>--graphFormat</arg><arg>${graphFormat}</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.OtherResearchProduct</arg>
<arg>--hiveMetastoreUris</arg><arg>${hiveMetastoreUris}</arg>
</spark>
<ok to="wait_merge"/>
<error to="Kill"/>
@ -505,12 +566,15 @@
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
--conf spark.sql.shuffle.partitions=200
</spark-opts>
<arg>--rawGraphPath</arg><arg>${workingDir}/graph_raw</arg>
<arg>--claimsGraphPath</arg><arg>${workingDir}/graph_claims</arg>
<arg>--outputRawGaphPath</arg><arg>${graphOutputPath}</arg>
<arg>--outputGraph</arg><arg>${outputGraph}</arg>
<arg>--graphFormat</arg><arg>${graphFormat}</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Datasource</arg>
<arg>--hiveMetastoreUris</arg><arg>${hiveMetastoreUris}</arg>
</spark>
<ok to="wait_merge"/>
<error to="Kill"/>
@ -531,12 +595,15 @@
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
--conf spark.sql.shuffle.partitions=200
</spark-opts>
<arg>--rawGraphPath</arg><arg>${workingDir}/graph_raw</arg>
<arg>--claimsGraphPath</arg><arg>${workingDir}/graph_claims</arg>
<arg>--outputRawGaphPath</arg><arg>${graphOutputPath}</arg>
<arg>--outputGraph</arg><arg>${outputGraph}</arg>
<arg>--graphFormat</arg><arg>${graphFormat}</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Organization</arg>
<arg>--hiveMetastoreUris</arg><arg>${hiveMetastoreUris}</arg>
</spark>
<ok to="wait_merge"/>
<error to="Kill"/>
@ -557,12 +624,15 @@
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
--conf spark.sql.shuffle.partitions=200
</spark-opts>
<arg>--rawGraphPath</arg><arg>${workingDir}/graph_raw</arg>
<arg>--claimsGraphPath</arg><arg>${workingDir}/graph_claims</arg>
<arg>--outputRawGaphPath</arg><arg>${graphOutputPath}</arg>
<arg>--outputGraph</arg><arg>${outputGraph}</arg>
<arg>--graphFormat</arg><arg>${graphFormat}</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Project</arg>
<arg>--hiveMetastoreUris</arg><arg>${hiveMetastoreUris}</arg>
</spark>
<ok to="wait_merge"/>
<error to="Kill"/>

@ -7,8 +7,6 @@ import static org.mockito.Mockito.lenient;
import java.io.IOException;
import java.util.List;
import java.util.Set;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.io.IOUtils;
@ -20,8 +18,12 @@ import org.mockito.junit.jupiter.MockitoExtension;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.oa.graph.raw.common.VocabularyGroup;
import eu.dnetlib.dhp.schema.oaf.*;
import eu.dnetlib.dhp.common.CleaningRuleMap;
import eu.dnetlib.dhp.common.OafCleaner;
import eu.dnetlib.dhp.common.VocabularyGroup;
import eu.dnetlib.dhp.schema.oaf.Publication;
import eu.dnetlib.dhp.schema.oaf.Qualifier;
import eu.dnetlib.dhp.schema.oaf.Result;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
@ -62,12 +64,10 @@ public class CleaningFunctionTest {
assertTrue(p_in instanceof Result);
assertTrue(p_in instanceof Publication);
Publication p_out = OafCleaner.apply(CleanGraphSparkJob.fixVocabularyNames(p_in), mapping);
Publication p_out = OafCleaner.apply(p_in, mapping);
assertNotNull(p_out);
assertNotNull(p_out.getPublisher());
assertNull(p_out.getPublisher().getValue());
assertEquals("und", p_out.getLanguage().getClassid());
assertEquals("Undetermined", p_out.getLanguage().getClassname());
@ -90,16 +90,6 @@ public class CleaningFunctionTest {
Publication p_defaults = CleanGraphSparkJob.fixDefaults(p_out);
assertEquals("CLOSED", p_defaults.getBestaccessright().getClassid());
assertNull(p_out.getPublisher());
getAuthorPids(p_defaults).forEach(pid -> {
System.out
.println(
String
.format(
"%s [%s - %s]", pid.getValue(), pid.getQualifier().getClassid(),
pid.getQualifier().getClassname()));
});
// TODO add more assertions to verity the cleaned values
System.out.println(MAPPER.writeValueAsString(p_out));
@ -109,7 +99,7 @@ public class CleaningFunctionTest {
*/
}
private Stream<Qualifier> getAuthorPidTypes(Result pub) {
private Stream<Qualifier> getAuthorPidTypes(Publication pub) {
return pub
.getAuthor()
.stream()
@ -118,14 +108,6 @@ public class CleaningFunctionTest {
.map(s -> s.getQualifier());
}
private Stream<StructuredProperty> getAuthorPids(Result pub) {
return pub
.getAuthor()
.stream()
.map(a -> a.getPid())
.flatMap(p -> p.stream());
}
private List<String> vocs() throws IOException {
return IOUtils
.readLines(CleaningFunctionTest.class.getResourceAsStream("/eu/dnetlib/dhp/oa/graph/clean/terms.txt"));

@ -21,8 +21,8 @@ import org.mockito.junit.jupiter.MockitoExtension;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.common.VocabularyGroup;
import eu.dnetlib.dhp.oa.graph.clean.CleaningFunctionTest;
import eu.dnetlib.dhp.oa.graph.raw.common.VocabularyGroup;
import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.oaf.Author;
import eu.dnetlib.dhp.schema.oaf.Dataset;

@ -27,8 +27,8 @@ import org.mockito.junit.jupiter.MockitoExtension;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils;
import eu.dnetlib.dhp.oa.graph.raw.common.VocabularyGroup;
import eu.dnetlib.dhp.common.OafMapperUtils;
import eu.dnetlib.dhp.common.VocabularyGroup;
import eu.dnetlib.dhp.schema.oaf.Datasource;
import eu.dnetlib.dhp.schema.oaf.Oaf;
import eu.dnetlib.dhp.schema.oaf.Organization;

@ -4,6 +4,7 @@ package eu.dnetlib.dhp.oa.provision;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.List;
import org.apache.commons.io.FileUtils;
import org.apache.spark.SparkConf;
@ -20,8 +21,6 @@ import org.junit.jupiter.api.io.TempDir;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.oa.provision.model.ProvisionModelSupport;
import eu.dnetlib.dhp.schema.oaf.Relation;
@ -31,7 +30,8 @@ public class PrepareRelationsJobTest {
public static final String SUBRELTYPE = "subRelType";
public static final String OUTCOME = "outcome";
public static final String SUPPLEMENT = "supplement";
public static final String PARTICIPATION = "participation";
public static final String AFFILIATION = "affiliation";
private static SparkSession spark;
@ -64,7 +64,7 @@ public class PrepareRelationsJobTest {
@Test
public void testRunPrepareRelationsJob(@TempDir Path testPath) throws Exception {
final int maxRelations = 10;
final int maxRelations = 20;
PrepareRelationsJob
.main(
new String[] {
@ -73,7 +73,8 @@ public class PrepareRelationsJobTest {
"-outputPath", testPath.toString(),
"-relPartitions", "10",
"-relationFilter", "asd",
"-maxRelations", String.valueOf(maxRelations)
"-sourceMaxRelations", String.valueOf(maxRelations),
"-targetMaxRelations", String.valueOf(maxRelations * 100)
});
Dataset<Relation> out = spark
@ -82,19 +83,31 @@ public class PrepareRelationsJobTest {
.as(Encoders.bean(Relation.class))
.cache();
Assertions.assertEquals(10, out.count());
Assertions.assertEquals(maxRelations, out.count());
Dataset<Row> freq = out
.toDF()
.cube(SUBRELTYPE)
.count()
.filter((FilterFunction<Row>) value -> !value.isNullAt(0));
long outcome = freq.filter(freq.col(SUBRELTYPE).equalTo(OUTCOME)).collectAsList().get(0).getAs("count");
long supplement = freq.filter(freq.col(SUBRELTYPE).equalTo(SUPPLEMENT)).collectAsList().get(0).getAs("count");
Assertions.assertTrue(outcome > supplement);
log.info(freq.collectAsList().toString());
long outcome = getRows(freq, OUTCOME).get(0).getAs("count");
long participation = getRows(freq, PARTICIPATION).get(0).getAs("count");
long affiliation = getRows(freq, AFFILIATION).get(0).getAs("count");
Assertions.assertTrue(participation == outcome);
Assertions.assertTrue(outcome > affiliation);
Assertions.assertTrue(participation > affiliation);
Assertions.assertEquals(7, outcome);
Assertions.assertEquals(3, supplement);
Assertions.assertEquals(7, participation);
Assertions.assertEquals(6, affiliation);
}
protected List<Row> getRows(Dataset<Row> freq, String col) {
return freq.filter(freq.col(SUBRELTYPE).equalTo(col)).collectAsList();
}
}

@ -0,0 +1,32 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>dhp-workflows</artifactId>
<groupId>eu.dnetlib.dhp</groupId>
<version>1.2.4-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>dhp-workflows-common</artifactId>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
</dependency>
<dependency>
<groupId>eu.dnetlib.dhp</groupId>
<artifactId>dhp-common</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>eu.dnetlib.dhp</groupId>
<artifactId>dhp-schemas</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</project>

@ -1,5 +1,5 @@
package eu.dnetlib.dhp.oa.graph.clean;
package eu.dnetlib.dhp.common;
import java.io.Serializable;
import java.util.HashMap;
@ -7,7 +7,6 @@ import java.util.HashMap;
import org.apache.commons.lang3.StringUtils;
import eu.dnetlib.dhp.common.FunctionalInterfaceSupport.SerializableConsumer;
import eu.dnetlib.dhp.oa.graph.raw.common.VocabularyGroup;
import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.oaf.Country;
import eu.dnetlib.dhp.schema.oaf.Qualifier;

@ -0,0 +1,10 @@
package eu.dnetlib.dhp.common;
public enum GraphFormat {
JSON, HIVE;
public static GraphFormat DEFAULT = JSON;
}

@ -0,0 +1,99 @@
package eu.dnetlib.dhp.common;
import java.util.Objects;
import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.Oaf;
public class GraphSupport {
private static final Logger log = LoggerFactory.getLogger(GraphSupport.class);
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
public static <T extends Oaf> void deleteGraphTable(SparkSession spark, Class<T> clazz, String outputGraph,
GraphFormat graphFormat) {
switch (graphFormat) {
case JSON:
String outPath = outputGraph + "/" + clazz.getSimpleName().toLowerCase();
removeOutputDir(spark, outPath);
break;
case HIVE:
String table = ModelSupport.tableIdentifier(outputGraph, clazz);
String sql = String.format("DROP TABLE IF EXISTS %s PURGE", table);
log.info("running SQL: '{}'", sql);
spark.sql(sql);
break;
}
}
public static <T extends Oaf> void saveGraphTable(Dataset<T> dataset, Class<T> clazz, String outputGraph,
GraphFormat graphFormat) {
final DataFrameWriter<T> writer = dataset.write().mode(SaveMode.Overwrite);
switch (graphFormat) {
case JSON:
String type = clazz.getSimpleName().toLowerCase();
String outPath = outputGraph + "/" + type;
log.info("saving graph to path {},", outPath);
writer.option("compression", "gzip").json(outPath);
break;
case HIVE:
final String db_table = ModelSupport.tableIdentifier(outputGraph, clazz);
log.info("saving graph to '{}'", db_table);
writer.saveAsTable(db_table);
break;
}
}
public static <T extends Oaf> Dataset<T> readGraph(
SparkSession spark, String graph, Class<T> clazz, GraphFormat format) {
log.info("reading graph {}, format {}, class {}", graph, format, clazz);
Encoder<T> encoder = Encoders.bean(clazz);
switch (format) {
case JSON:
String path = graph + "/" + clazz.getSimpleName().toLowerCase();
log.info("reading path {}", path);
return spark
.read()
.textFile(path)
.map(
(MapFunction<String, T>) value -> OBJECT_MAPPER.readValue(value, clazz),
encoder)
.filter((FilterFunction<T>) value -> Objects.nonNull(ModelSupport.idFn().apply(value)));
case HIVE:
String table = ModelSupport.tableIdentifier(graph, clazz);
log.info("reading table {}", table);
return spark.table(table).as(encoder);
default:
throw new IllegalStateException(String.format("format not managed: '%s'", format));
}
}
public static <T extends Oaf> Dataset<T> readGraphJSON(SparkSession spark, String graph, Class<T> clazz) {
return readGraph(spark, graph, clazz, GraphFormat.JSON);
}
private static void removeOutputDir(SparkSession spark, String path) {
HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration());
}
}

@ -1,5 +1,5 @@
package eu.dnetlib.dhp.oa.graph.clean;
package eu.dnetlib.dhp.common;
import java.io.Serializable;
import java.lang.reflect.Field;

@ -1,11 +1,7 @@
package eu.dnetlib.dhp.oa.graph.raw.common;
package eu.dnetlib.dhp.common;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;
import java.util.function.Predicate;
@ -13,15 +9,7 @@ import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import eu.dnetlib.dhp.schema.oaf.DataInfo;
import eu.dnetlib.dhp.schema.oaf.ExtraInfo;
import eu.dnetlib.dhp.schema.oaf.Field;
import eu.dnetlib.dhp.schema.oaf.Journal;
import eu.dnetlib.dhp.schema.oaf.KeyValue;
import eu.dnetlib.dhp.schema.oaf.OAIProvenance;
import eu.dnetlib.dhp.schema.oaf.OriginDescription;
import eu.dnetlib.dhp.schema.oaf.Qualifier;
import eu.dnetlib.dhp.schema.oaf.StructuredProperty;
import eu.dnetlib.dhp.schema.oaf.*;
import eu.dnetlib.dhp.utils.DHPUtils;
public class OafMapperUtils {

@ -0,0 +1,60 @@
package eu.dnetlib.dhp.common;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession;
import java.util.Optional;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
public class ResetHiveDbApplication {
private static final Logger log = LoggerFactory.getLogger(ResetHiveDbApplication.class);
public static void main(final String[] args) throws Exception {
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
IOUtils
.toString(
ResetHiveDbApplication.class
.getResourceAsStream(
"/eu/dnetlib/dhp/common/reset_hive_db_parameters.json")));
parser.parseArgument(args);
Boolean isSparkSessionManaged = Optional
.ofNullable(parser.get("isSparkSessionManaged"))
.map(Boolean::valueOf)
.orElse(Boolean.TRUE);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
final String dbName = Optional
.ofNullable(parser.get("dbName"))
.orElseThrow(() -> new IllegalArgumentException("missing DB name"));
log.info("dbName: {}", dbName);
String hiveMetastoreUris = parser.get("hiveMetastoreUris");
log.info("hiveMetastoreUris: {}", hiveMetastoreUris);
SparkConf conf = new SparkConf();
conf.set("hive.metastore.uris", hiveMetastoreUris);
runWithSparkHiveSession(
conf,
isSparkSessionManaged,
spark -> {
runSQL(spark, String.format("DROP DATABASE IF EXISTS %s CASCADE", dbName));
runSQL(spark, String.format("CREATE DATABASE %s", dbName));
});
}
protected static void runSQL(SparkSession spark, String sql) {
log.info("running SQL '{}'", sql);
spark.sqlContext().sql(sql);
}
}

@ -1,5 +1,5 @@
package eu.dnetlib.dhp.oa.graph.raw.common;
package eu.dnetlib.dhp.common;
import java.io.Serializable;
import java.util.HashMap;

@ -0,0 +1,20 @@
[
{
"paramName": "issm",
"paramLongName": "isSparkSessionManaged",
"paramDescription": "when true will stop SparkSession after job execution",
"paramRequired": false
},
{
"paramName": "hmu",
"paramLongName": "hiveMetastoreUris",
"paramDescription": "the hive metastore uris",
"paramRequired": true
},
{
"paramName": "db",
"paramLongName": "dbName",
"paramDescription": "the graph db name",
"paramRequired": true
}
]

@ -17,6 +17,7 @@
<modules>
<module>dhp-workflow-profiles</module>
<module>dhp-workflows-common</module>
<module>dhp-aggregation</module>
<module>dhp-distcp</module>
<module>dhp-actionmanager</module>
@ -269,7 +270,7 @@
oozie.wf.application.path,projectVersion,oozie.use.system.libpath,
oozieActionShareLibForSpark1,spark1YarnHistoryServerAddress,spark1EventLogDir,
oozieActionShareLibForSpark2,spark2YarnHistoryServerAddress,spark2EventLogDir,
sparkSqlWarehouseDir
sparkSqlWarehouseDir,hiveMetastoreUris
</include>
<includeSystemProperties>true</includeSystemProperties>
<includePropertyKeysFromFiles>

Loading…
Cancel
Save