From b021b8a2e19b07659f0e2a726551e94caae892c0 Mon Sep 17 00:00:00 2001 From: "sandro.labruzzo" Date: Mon, 24 Feb 2020 10:15:55 +0100 Subject: [PATCH] Added index wf --- .../java/eu/dnetlib/dhp/utils/DHPUtils.java | 2 +- .../dedup/SparkPropagateRelationsJob.java | 1 - .../dnetlib/dedup/SparkUpdateEntityJob.java | 5 +- .../dedup_delete_by_inference_parameters.json | 19 +- .../dnetlib/dhp/dedup/oozie_app/workflow.xml | 79 ++++- dhp-workflows/dhp-graph-provision/pom.xml | 45 +++ .../dnetlib/dhp/provision/ProvisionUtil.java | 47 +++ .../dhp/provision/RelatedItemInfo.java | 64 ++++ .../provision/SparkExtractRelationCount.java | 74 +++++ .../dhp/provision/SparkGenerateSummary.java | 57 ++++ .../provision/SparkIndexCollectionOnES.java | 49 +++ .../provision/scholix/CollectedFromType.java | 44 +++ .../dhp/provision/scholix/SchemeValue.java | 33 ++ .../dhp/provision/scholix/ScholixSummary.java | 289 ++++++++++++++++++ .../provision/scholix/TypedIdentifier.java | 32 ++ .../dhp/provision/scholix/Typology.java | 9 + .../provision/oozie_app/config-default.xml | 10 + .../provision/oozie_app/workflow.xml | 100 ++++++ .../eu/dnetlib/dhp/provision/index_on_es.json | 20 ++ .../input_generate_summary_parameters.json | 20 ++ .../input_related_entities_parameters.json | 20 ++ .../dhp/provision/ExtractInfoTest.java | 48 +++ .../eu/dnetlib/dhp/provision/record.json | 1 + dhp-workflows/pom.xml | 1 + pom.xml | 8 + 25 files changed, 1057 insertions(+), 20 deletions(-) create mode 100644 dhp-workflows/dhp-graph-provision/pom.xml create mode 100644 dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/provision/ProvisionUtil.java create mode 100644 dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/provision/RelatedItemInfo.java create mode 100644 dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/provision/SparkExtractRelationCount.java create mode 100644 dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/provision/SparkGenerateSummary.java create mode 100644 dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/provision/SparkIndexCollectionOnES.java create mode 100644 dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/provision/scholix/CollectedFromType.java create mode 100644 dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/provision/scholix/SchemeValue.java create mode 100644 dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/provision/scholix/ScholixSummary.java create mode 100644 dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/provision/scholix/TypedIdentifier.java create mode 100644 dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/provision/scholix/Typology.java create mode 100644 dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/graph/Application/provision/oozie_app/config-default.xml create mode 100644 dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/graph/Application/provision/oozie_app/workflow.xml create mode 100644 dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/provision/index_on_es.json create mode 100644 dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/provision/input_generate_summary_parameters.json create mode 100644 dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/provision/input_related_entities_parameters.json create mode 100644 dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/provision/ExtractInfoTest.java create mode 100644 dhp-workflows/dhp-graph-provision/src/test/resources/eu/dnetlib/dhp/provision/record.json diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/utils/DHPUtils.java b/dhp-common/src/main/java/eu/dnetlib/dhp/utils/DHPUtils.java index 5de2b70ff..ea8943efd 100644 --- a/dhp-common/src/main/java/eu/dnetlib/dhp/utils/DHPUtils.java +++ b/dhp-common/src/main/java/eu/dnetlib/dhp/utils/DHPUtils.java @@ -65,7 +65,7 @@ public class DHPUtils { return (String) o; if (o instanceof JSONArray && ((JSONArray) o).size() > 0) return (String) ((JSONArray) o).get(0); - return ""; + return o.toString(); } catch (Exception e) { return ""; } diff --git a/dhp-workflows/dhp-dedup/src/main/java/eu/dnetlib/dedup/SparkPropagateRelationsJob.java b/dhp-workflows/dhp-dedup/src/main/java/eu/dnetlib/dedup/SparkPropagateRelationsJob.java index 9a9abebe6..52c9983f0 100644 --- a/dhp-workflows/dhp-dedup/src/main/java/eu/dnetlib/dedup/SparkPropagateRelationsJob.java +++ b/dhp-workflows/dhp-dedup/src/main/java/eu/dnetlib/dedup/SparkPropagateRelationsJob.java @@ -27,7 +27,6 @@ public class SparkPropagateRelationsJob { SOURCE, TARGET } - final static String IDJSONPATH = "$.id"; final static String SOURCEJSONPATH = "$.source"; final static String TARGETJSONPATH = "$.target"; diff --git a/dhp-workflows/dhp-dedup/src/main/java/eu/dnetlib/dedup/SparkUpdateEntityJob.java b/dhp-workflows/dhp-dedup/src/main/java/eu/dnetlib/dedup/SparkUpdateEntityJob.java index e7bb4f9c2..1381633e5 100644 --- a/dhp-workflows/dhp-dedup/src/main/java/eu/dnetlib/dedup/SparkUpdateEntityJob.java +++ b/dhp-workflows/dhp-dedup/src/main/java/eu/dnetlib/dedup/SparkUpdateEntityJob.java @@ -44,6 +44,7 @@ public class SparkUpdateEntityJob { final String mergeRelPath = parser.get("mergeRelPath"); final String dedupRecordPath = parser.get("dedupRecordPath"); final String entity = parser.get("entity"); + final String destination = parser.get("targetPath"); final Dataset df = spark.read().load(mergeRelPath).as(Encoders.bean(Relation.class)); final JavaPairRDD mergedIds = df @@ -63,7 +64,7 @@ public class SparkUpdateEntityJob { .mapToPair((PairFunction) s -> new Tuple2<>(DHPUtils.getJPathString(TARGETJSONPATH, s), s)) .leftOuterJoin(mergedIds) .map(k -> k._2()._2().isPresent() ? updateDeletedByInference(k._2()._1(), Relation.class) : k._2()._1()) - .saveAsTextFile(entityPath + "_new", GzipCodec.class); + .saveAsTextFile(destination, GzipCodec.class); } else { final JavaRDD dedupEntity = sc.textFile(dedupRecordPath); JavaPairRDD entitiesWithId = sourceEntity.mapToPair((PairFunction) s -> new Tuple2<>(DHPUtils.getJPathString(IDJSONPATH, s), s)); @@ -86,7 +87,7 @@ public class SparkUpdateEntityJob { JavaRDD map = entitiesWithId.leftOuterJoin(mergedIds).map(k -> k._2()._2().isPresent() ? updateDeletedByInference(k._2()._1(), mainClass) : k._2()._1()); - map.union(dedupEntity).saveAsTextFile(entityPath + "_new", GzipCodec.class); + map.union(dedupEntity).saveAsTextFile(destination, GzipCodec.class); } diff --git a/dhp-workflows/dhp-dedup/src/main/resources/eu/dnetlib/dhp/dedup/dedup_delete_by_inference_parameters.json b/dhp-workflows/dhp-dedup/src/main/resources/eu/dnetlib/dhp/dedup/dedup_delete_by_inference_parameters.json index fecc666c4..69428a296 100644 --- a/dhp-workflows/dhp-dedup/src/main/resources/eu/dnetlib/dhp/dedup/dedup_delete_by_inference_parameters.json +++ b/dhp-workflows/dhp-dedup/src/main/resources/eu/dnetlib/dhp/dedup/dedup_delete_by_inference_parameters.json @@ -22,10 +22,17 @@ "paramLongName": "dedupRecordPath", "paramDescription": "the inputPath of dedup record", "paramRequired": true - }, { - "paramName": "e", - "paramLongName": "entity", - "paramDescription": "the type of entity", - "paramRequired": true -} + }, + { + "paramName": "e", + "paramLongName": "entity", + "paramDescription": "the type of entity", + "paramRequired": true + }, + { + "paramName": "t", + "paramLongName": "targetPath", + "paramDescription": "the targetPath", + "paramRequired": true + } ] \ No newline at end of file diff --git a/dhp-workflows/dhp-dedup/src/main/resources/eu/dnetlib/dhp/dedup/oozie_app/workflow.xml b/dhp-workflows/dhp-dedup/src/main/resources/eu/dnetlib/dhp/dedup/oozie_app/workflow.xml index 89ebb17ff..995ef076a 100644 --- a/dhp-workflows/dhp-dedup/src/main/resources/eu/dnetlib/dhp/dedup/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-dedup/src/main/resources/eu/dnetlib/dhp/dedup/oozie_app/workflow.xml @@ -26,7 +26,7 @@ - + @@ -55,8 +55,7 @@ --executor-memory ${sparkExecutorMemory} --driver-memory=${sparkDriverMemory} - --num-executors 100 - --conf spark.yarn.jars="hdfs://hadoop-rm1.garr-pa1.d4science.org:8020/user/oozie/share/lib/lib_20180405103059/spark2" + ${sparkExtraOPT} -mtyarn-cluster --sourcePath${sourcePath} @@ -80,8 +79,7 @@ --executor-memory ${sparkExecutorMemory} --driver-memory=${sparkDriverMemory} - --num-executors 100 - --conf spark.yarn.jars="hdfs://hadoop-rm1.garr-pa1.d4science.org:8020/user/oozie/share/lib/lib_20180405103059/spark2" + ${sparkExtraOPT} -mtyarn-cluster --sourcePath${sourcePath} @@ -105,8 +103,7 @@ --executor-memory ${sparkExecutorMemory} --driver-memory=${sparkDriverMemory} - --num-executors 100 - --conf spark.yarn.jars="hdfs://hadoop-rm1.garr-pa1.d4science.org:8020/user/oozie/share/lib/lib_20180405103059/spark2" + ${sparkExtraOPT} -mtyarn-cluster --sourcePath${sourcePath} @@ -130,14 +127,76 @@ --executor-memory ${sparkExecutorMemory} --driver-memory=${sparkDriverMemory} - --num-executors 100 - --conf spark.yarn.jars="hdfs://hadoop-rm1.garr-pa1.d4science.org:8020/user/oozie/share/lib/lib_20180405103059/spark2" + ${sparkExtraOPT} -mtyarn-cluster --mergeRelPath${targetPath}/${entity}/mergeRel --relationPath${sourcePath}/relation - --targetRelPath${targetPath}/${entity}/relation_updated + --targetRelPath${targetPath}/${entity}/relation_propagated + + + + + + + + ${jobTracker} + ${nameNode} + yarn-cluster + cluster + Update ${entity} and add DedupRecord + eu.dnetlib.dedup.SparkUpdateEntityJob + dhp-dedup-${projectVersion}.jar + + --executor-memory ${sparkExecutorMemory} + --driver-memory=${sparkDriverMemory} + ${sparkExtraOPT} + + -mtyarn-cluster + --entityPath${sourcePath}/${entity} + --mergeRelPath${targetPath}/${entity}/mergeRel + --entity${entity} + --dedupRecordPath${targetPath}/${entity}/dedup_records + --targetPath${targetPath}/${entity}/updated_record + + + + + + + + ${jobTracker} + ${nameNode} + yarn-cluster + cluster + Update ${entity} set deleted by Inference + eu.dnetlib.dedup.SparkUpdateEntityJob + dhp-dedup-${projectVersion}.jar + + --executor-memory ${sparkExecutorMemory} + --driver-memory=${sparkDriverMemory} + ${sparkExtraOPT} + + -mtyarn-cluster + --entityPath${targetPath}/${entity}/relation_propagated + --mergeRelPath${targetPath}/${entity}/mergeRel + --entityrelation + --dedupRecordPath${targetPath}/${entity}/dedup_records + --targetPath${targetPath}/${entity}/updated_relation + + + + + + + + + + + + + diff --git a/dhp-workflows/dhp-graph-provision/pom.xml b/dhp-workflows/dhp-graph-provision/pom.xml new file mode 100644 index 000000000..382cf26f4 --- /dev/null +++ b/dhp-workflows/dhp-graph-provision/pom.xml @@ -0,0 +1,45 @@ + + + + dhp-workflows + eu.dnetlib.dhp + 1.0.5-SNAPSHOT + + 4.0.0 + + dhp-graph-provision + + + + org.apache.spark + spark-core_2.11 + + + + org.apache.spark + spark-sql_2.11 + + + + eu.dnetlib.dhp + dhp-common + ${project.version} + + + + eu.dnetlib.dhp + dhp-schemas + ${project.version} + + + + org.elasticsearch + elasticsearch-hadoop + + + + + + + \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/provision/ProvisionUtil.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/provision/ProvisionUtil.java new file mode 100644 index 000000000..db14aa671 --- /dev/null +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/provision/ProvisionUtil.java @@ -0,0 +1,47 @@ +package eu.dnetlib.dhp.provision; + +import eu.dnetlib.dhp.provision.scholix.Typology; +import eu.dnetlib.dhp.utils.DHPUtils; +import org.apache.commons.lang3.StringUtils; + +public class ProvisionUtil { + + public final static String deletedByInferenceJPATH = "$.dataInfo.deletedbyinference"; + public final static String TARGETJSONPATH = "$.target"; + public final static String SOURCEJSONPATH = "$.source"; + + public static RelatedItemInfo getItemType(final String item, final String idPath) { + String targetId = DHPUtils.getJPathString(idPath, item); + switch (StringUtils.substringBefore(targetId, "|")) { + case "50": + return new RelatedItemInfo().setRelatedPublication(1); + case "60": + return new RelatedItemInfo().setRelatedDataset(1); + case "70": + return new RelatedItemInfo().setRelatedUnknown(1); + default: + throw new RuntimeException("Unknonw target ID"); + + } + + } + + public static Boolean isNotDeleted(final String item) { + return !"true".equalsIgnoreCase(DHPUtils.getJPathString(deletedByInferenceJPATH, item)); + } + + public static Typology getItemTypeFromId(String id) { + + switch (StringUtils.substringBefore(id, "|")) { + case "50": + return Typology.publication; + case "60": + return Typology.dataset; + case "70": + return Typology.unknown; + default: + throw new RuntimeException("Unknonw ID type"); + + } + } +} diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/provision/RelatedItemInfo.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/provision/RelatedItemInfo.java new file mode 100644 index 000000000..bf89b3115 --- /dev/null +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/provision/RelatedItemInfo.java @@ -0,0 +1,64 @@ +package eu.dnetlib.dhp.provision; + +import java.io.Serializable; + +/** + * This class models the information of related items + */ + +public class RelatedItemInfo implements Serializable { + + private String id; + + private int relatedDataset = 0; + + private int relatedPublication = 0; + + private int relatedUnknown = 0; + + + public String getId() { + return id; + } + + public RelatedItemInfo setId(String id) { + this.id = id; + return this; + } + + public RelatedItemInfo add(RelatedItemInfo other) { + if (other != null) { + relatedDataset += other.getRelatedDataset(); + relatedPublication += other.getRelatedPublication(); + relatedUnknown += other.getRelatedUnknown(); + } + return this; + } + + public int getRelatedDataset() { + return relatedDataset; + } + + public RelatedItemInfo setRelatedDataset(int relatedDataset) { + this.relatedDataset = relatedDataset; + return this; + } + + public int getRelatedPublication() { + return relatedPublication; + } + + public RelatedItemInfo setRelatedPublication(int relatedPublication) { + this.relatedPublication = relatedPublication; + return this; + } + + public int getRelatedUnknown() { + return relatedUnknown; + } + + public RelatedItemInfo setRelatedUnknown(int relatedUnknown) { + this.relatedUnknown = relatedUnknown; + return this; + } +} diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/provision/SparkExtractRelationCount.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/provision/SparkExtractRelationCount.java new file mode 100644 index 000000000..d3991448f --- /dev/null +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/provision/SparkExtractRelationCount.java @@ -0,0 +1,74 @@ +package eu.dnetlib.dhp.provision; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.jayway.jsonpath.JsonPath; +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.utils.DHPUtils; +import net.minidev.json.JSONArray; +import org.apache.commons.io.IOUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.io.compress.GzipCodec; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.function.Function2; +import org.apache.spark.api.java.function.PairFunction; +import org.apache.spark.sql.SparkSession; +import scala.Tuple2; + + +/** + * SparkExtractRelationCount is a spark job that takes in input relation RDD + * and retrieve for each item in relation which are the number of + * - Related Dataset + * - Related Publication + * - Related Unknown + */ +public class SparkExtractRelationCount { + + + + + + public static void main(String[] args) throws Exception { + final ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString(SparkExtractRelationCount.class.getResourceAsStream("/eu/dnetlib/dhp/provision/input_related_entities_parameters.json"))); + parser.parseArgument(args); + final SparkSession spark = SparkSession + .builder() + .appName(SparkExtractRelationCount.class.getSimpleName()) + .master(parser.get("master")) + .getOrCreate(); + + + final String workingDirPath = parser.get("workingDirPath"); + + final String relationPath = parser.get("relationPath"); + + final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); + + sc.textFile(relationPath) + // We start to Filter the relation not deleted by Inference + .filter(ProvisionUtil::isNotDeleted) + // Then we create a PairRDD + .mapToPair((PairFunction) f + -> new Tuple2<>(DHPUtils.getJPathString(ProvisionUtil.SOURCEJSONPATH, f), ProvisionUtil.getItemType(f, ProvisionUtil.TARGETJSONPATH))) + //We reduce and sum the number of Relations + .reduceByKey((Function2) (v1, v2) -> { + if (v1 == null && v2 == null) + return new RelatedItemInfo(); + return v1 != null ? v1.add(v2) : v2; + }) + //Set the source Id in RelatedItem object + .map(k -> k._2().setId(k._1())) + // Convert to JSON and save as TextFile + .map(k -> { + ObjectMapper mapper = new ObjectMapper(); + return mapper.writeValueAsString(k); + }).saveAsTextFile(workingDirPath + "/relatedItemCount", GzipCodec.class); + } + + + + + + + +} diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/provision/SparkGenerateSummary.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/provision/SparkGenerateSummary.java new file mode 100644 index 000000000..7245a9064 --- /dev/null +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/provision/SparkGenerateSummary.java @@ -0,0 +1,57 @@ +package eu.dnetlib.dhp.provision; + +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.provision.scholix.ScholixSummary; +import eu.dnetlib.dhp.utils.DHPUtils; +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.io.compress.GzipCodec; +import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.function.Function; +import org.apache.spark.api.java.function.PairFunction; +import org.apache.spark.sql.SparkSession; +import scala.Tuple2; + +public class SparkGenerateSummary { + + private static final String jsonIDPath = "$.id"; + + + public static void main(String[] args) throws Exception { + final ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString(SparkGenerateSummary.class.getResourceAsStream("/eu/dnetlib/dhp/provision/input_generate_summary_parameters.json"))); + parser.parseArgument(args); + final SparkSession spark = SparkSession + .builder() + .appName(SparkExtractRelationCount.class.getSimpleName()) + .master(parser.get("master")) + .getOrCreate(); + + + final String graphPath = parser.get("graphPath"); + final String workingDirPath = parser.get("workingDirPath"); + + final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); + JavaPairRDD relationCount = sc.textFile(workingDirPath+"/relatedItemCount").mapToPair((PairFunction) i -> new Tuple2<>(DHPUtils.getJPathString(jsonIDPath, i), i)); + + JavaPairRDD entities = + sc.textFile(graphPath + "/publication") + .filter(ProvisionUtil::isNotDeleted) + .mapToPair((PairFunction) i -> new Tuple2<>(DHPUtils.getJPathString(jsonIDPath, i), i)) + .union( + sc.textFile(graphPath + "/dataset") + .filter(ProvisionUtil::isNotDeleted) + .mapToPair((PairFunction) i -> new Tuple2<>(DHPUtils.getJPathString(jsonIDPath, i), i)) + ) + .union( + sc.textFile(graphPath + "/unknown") + .filter(ProvisionUtil::isNotDeleted) + .mapToPair((PairFunction) i -> new Tuple2<>(DHPUtils.getJPathString(jsonIDPath, i), i)) + ); + entities.join(relationCount).map((Function>, String>) k -> + ScholixSummary.fromJsonOAF(ProvisionUtil.getItemTypeFromId(k._1()), k._2()._1(), k._2()._2())).saveAsTextFile(workingDirPath+"/summary", GzipCodec.class); + + + ; + + } +} diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/provision/SparkIndexCollectionOnES.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/provision/SparkIndexCollectionOnES.java new file mode 100644 index 000000000..aa1734b2f --- /dev/null +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/provision/SparkIndexCollectionOnES.java @@ -0,0 +1,49 @@ +package eu.dnetlib.dhp.provision; + +import com.fasterxml.jackson.databind.ObjectMapper; +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.provision.scholix.ScholixSummary; +import org.apache.commons.io.IOUtils; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.SparkSession; +import org.elasticsearch.spark.rdd.api.java.JavaEsSpark; + +import java.util.HashMap; +import java.util.Map; + +public class SparkIndexCollectionOnES { + + public static void main(String[] args) throws Exception{ + + final ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString(SparkIndexCollectionOnES.class.getResourceAsStream("/eu/dnetlib/dhp/provision/index_on_es.json"))); + parser.parseArgument(args); + + SparkConf conf = new SparkConf().setAppName(SparkIndexCollectionOnES.class.getSimpleName()) + .setMaster(parser.get("master")); + + + final String sourcePath = parser.get("sourcePath"); + final String index = parser.get("index"); + + final SparkSession spark = SparkSession.builder().config(conf).getOrCreate(); + + + final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); + + JavaRDD inputRdd = sc.textFile(sourcePath); + + Map esCfg = new HashMap<>(); + esCfg.put("es.nodes", "10.19.65.51, 10.19.65.52, 10.19.65.53, 10.19.65.54"); + esCfg.put("es.mapping.id", "id"); + esCfg.put("es.batch.write.retry.count", "8"); + esCfg.put("es.batch.write.retry.wait", "60s"); + esCfg.put("es.batch.size.entries", "200"); + esCfg.put("es.nodes.wan.only", "true"); + + + JavaEsSpark.saveJsonToEs(inputRdd,index, esCfg); + + } +} diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/provision/scholix/CollectedFromType.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/provision/scholix/CollectedFromType.java new file mode 100644 index 000000000..2a6f0ab8d --- /dev/null +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/provision/scholix/CollectedFromType.java @@ -0,0 +1,44 @@ +package eu.dnetlib.dhp.provision.scholix; + +import java.io.Serializable; + +public class CollectedFromType implements Serializable { + + private String datasourceName; + private String datasourceId; + private String completionStatus; + + + public CollectedFromType() { + } + + public CollectedFromType(String datasourceName, String datasourceId, String completionStatus) { + this.datasourceName = datasourceName; + this.datasourceId = datasourceId; + this.completionStatus = completionStatus; + } + + public String getDatasourceName() { + return datasourceName; + } + + public void setDatasourceName(String datasourceName) { + this.datasourceName = datasourceName; + } + + public String getDatasourceId() { + return datasourceId; + } + + public void setDatasourceId(String datasourceId) { + this.datasourceId = datasourceId; + } + + public String getCompletionStatus() { + return completionStatus; + } + + public void setCompletionStatus(String completionStatus) { + this.completionStatus = completionStatus; + } +} diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/provision/scholix/SchemeValue.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/provision/scholix/SchemeValue.java new file mode 100644 index 000000000..6e77fea70 --- /dev/null +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/provision/scholix/SchemeValue.java @@ -0,0 +1,33 @@ +package eu.dnetlib.dhp.provision.scholix; + +import java.io.Serializable; + +public class SchemeValue implements Serializable { + private String scheme; + private String value; + + public SchemeValue() { + + } + + public SchemeValue(String scheme, String value) { + this.scheme = scheme; + this.value = value; + } + + public String getScheme() { + return scheme; + } + + public void setScheme(String scheme) { + this.scheme = scheme; + } + + public String getValue() { + return value; + } + + public void setValue(String value) { + this.value = value; + } +} diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/provision/scholix/ScholixSummary.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/provision/scholix/ScholixSummary.java new file mode 100644 index 000000000..690566823 --- /dev/null +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/provision/scholix/ScholixSummary.java @@ -0,0 +1,289 @@ +package eu.dnetlib.dhp.provision.scholix; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; +import eu.dnetlib.dhp.provision.RelatedItemInfo; +import eu.dnetlib.dhp.schema.oaf.Author; +import eu.dnetlib.dhp.schema.oaf.StructuredProperty; +import eu.dnetlib.dhp.schema.scholexplorer.DLIDataset; +import eu.dnetlib.dhp.schema.scholexplorer.DLIPublication; +import eu.dnetlib.dhp.schema.scholexplorer.DLIUnknown; + +import java.io.Serializable; +import java.util.List; +import java.util.stream.Collectors; + +public class ScholixSummary implements Serializable { + private String id; + private List localIdentifier; + private Typology typology; + private List title; + private List author; + private List date; + private String description; + private List subject; + private List publisher; + private int relatedPublications; + private int relatedDatasets; + private int relatedUnknown; + private List datasources; + + + public String getId() { + return id; + } + + public void setId(String id) { + this.id = id; + } + + public List getLocalIdentifier() { + return localIdentifier; + } + + public void setLocalIdentifier(List localIdentifier) { + this.localIdentifier = localIdentifier; + } + + public Typology getTypology() { + return typology; + } + + public void setTypology(Typology typology) { + this.typology = typology; + } + + public List getTitle() { + return title; + } + + public void setTitle(List title) { + this.title = title; + } + + public List getAuthor() { + return author; + } + + public void setAuthor(List author) { + this.author = author; + } + + public List getDate() { + return date; + } + + public void setDate(List date) { + this.date = date; + } + + @JsonProperty("abstract") + public String getDescription() { + return description; + } + + @JsonProperty("abstract") + public void setDescription(String description) { + this.description = description; + } + + public List getSubject() { + return subject; + } + + public void setSubject(List subject) { + this.subject = subject; + } + + public List getPublisher() { + return publisher; + } + + public void setPublisher(List publisher) { + this.publisher = publisher; + } + + public int getRelatedPublications() { + return relatedPublications; + } + + public void setRelatedPublications(int relatedPublications) { + this.relatedPublications = relatedPublications; + } + + public int getRelatedDatasets() { + return relatedDatasets; + } + + public void setRelatedDatasets(int relatedDatasets) { + this.relatedDatasets = relatedDatasets; + } + + public int getRelatedUnknown() { + return relatedUnknown; + } + + public void setRelatedUnknown(int relatedUnknown) { + this.relatedUnknown = relatedUnknown; + } + + public List getDatasources() { + return datasources; + } + + public void setDatasources(List datasources) { + this.datasources = datasources; + } + + + public static String fromJsonOAF(final Typology oafType, final String oafJson, final String relEntityJson) { + try { + final ObjectMapper mapper = new ObjectMapper(); + mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + + RelatedItemInfo relatedItemInfo = mapper.readValue(relEntityJson, RelatedItemInfo.class); + + switch (oafType) { + case dataset: + return mapper.writeValueAsString(summaryFromDataset(mapper.readValue(oafJson, DLIDataset.class), relatedItemInfo)); + case publication: + return mapper.writeValueAsString(summaryFromPublication(mapper.readValue(oafJson, DLIPublication.class), relatedItemInfo)); + case unknown: + return mapper.writeValueAsString(summaryFromUnknown(mapper.readValue(oafJson, DLIUnknown.class), relatedItemInfo)); + } + + + } catch (Throwable e) { + throw new RuntimeException(e); + } + + return null; + } + + + private static ScholixSummary summaryFromDataset(final DLIDataset item, final RelatedItemInfo relatedItemInfo) { + ScholixSummary summary = new ScholixSummary(); + summary.setId(item.getId()); + + if (item.getPid() != null) + summary.setLocalIdentifier(item.getPid().stream() + .map(p -> new TypedIdentifier(p.getValue(), p.getQualifier().getClassid())) + .collect(Collectors.toList()) + ); + + summary.setTypology(Typology.dataset); + if (item.getTitle() != null) + summary.setTitle(item.getTitle().stream().map(StructuredProperty::getValue).collect(Collectors.toList())); + + if (item.getAuthor() != null) { + summary.setAuthor(item.getAuthor().stream().map(Author::getFullname).collect(Collectors.toList())); + } + + if (item.getRelevantdate() != null) + summary.setDate( + item.getRelevantdate().stream() + .filter(d -> "date".equalsIgnoreCase(d.getQualifier().getClassname())) + .map(StructuredProperty::getValue) + .collect(Collectors.toList()) + ); + + if (item.getDescription() != null && item.getDescription().size() > 0) + summary.setDescription(item.getDescription().get(0).getValue()); + + if (item.getSubject() != null) { + summary.setSubject(item.getSubject().stream() + .map(s -> new SchemeValue(s.getQualifier().getClassid(), s.getValue())) + .collect(Collectors.toList()) + ); + } + + + summary.setRelatedDatasets(relatedItemInfo.getRelatedDataset()); + summary.setRelatedPublications(relatedItemInfo.getRelatedPublication()); + summary.setRelatedUnknown(relatedItemInfo.getRelatedUnknown()); + + if (item.getDlicollectedfrom() != null) + summary.setDatasources(item.getDlicollectedfrom().stream() + .map( + c -> new CollectedFromType(c.getName(), c.getId(), c.getCompletionStatus()) + ).collect(Collectors.toList())); + + + return summary; + } + + private static ScholixSummary summaryFromPublication(final DLIPublication item, final RelatedItemInfo relatedItemInfo) { + ScholixSummary summary = new ScholixSummary(); + summary.setId(item.getId()); + + if (item.getPid() != null) + summary.setLocalIdentifier(item.getPid().stream() + .map(p -> new TypedIdentifier(p.getValue(), p.getQualifier().getClassid())) + .collect(Collectors.toList()) + ); + + summary.setTypology(Typology.dataset); + if (item.getTitle() != null) + summary.setTitle(item.getTitle().stream().map(StructuredProperty::getValue).collect(Collectors.toList())); + + if (item.getAuthor() != null) { + summary.setAuthor(item.getAuthor().stream().map(Author::getFullname).collect(Collectors.toList())); + } + + if (item.getRelevantdate() != null) + summary.setDate( + item.getRelevantdate().stream() + .filter(d -> "date".equalsIgnoreCase(d.getQualifier().getClassname())) + .map(StructuredProperty::getValue) + .collect(Collectors.toList()) + ); + + if (item.getDescription() != null && item.getDescription().size() > 0) + summary.setDescription(item.getDescription().get(0).getValue()); + + if (item.getSubject() != null) { + summary.setSubject(item.getSubject().stream() + .map(s -> new SchemeValue(s.getQualifier().getClassid(), s.getValue())) + .collect(Collectors.toList()) + ); + } + + + summary.setRelatedDatasets(relatedItemInfo.getRelatedDataset()); + summary.setRelatedPublications(relatedItemInfo.getRelatedPublication()); + summary.setRelatedUnknown(relatedItemInfo.getRelatedUnknown()); + + if (item.getDlicollectedfrom() != null) + summary.setDatasources(item.getDlicollectedfrom().stream() + .map( + c -> new CollectedFromType(c.getName(), c.getId(), c.getCompletionStatus()) + ).collect(Collectors.toList())); + + + return summary; + } + + private static ScholixSummary summaryFromUnknown(final DLIUnknown item, final RelatedItemInfo relatedItemInfo) { + ScholixSummary summary = new ScholixSummary(); + summary.setId(item.getId()); + if (item.getPid() != null) + summary.setLocalIdentifier(item.getPid().stream() + .map(p -> new TypedIdentifier(p.getValue(), p.getQualifier().getClassid())) + .collect(Collectors.toList()) + ); + + summary.setRelatedDatasets(relatedItemInfo.getRelatedDataset()); + summary.setRelatedPublications(relatedItemInfo.getRelatedPublication()); + summary.setRelatedUnknown(relatedItemInfo.getRelatedUnknown()); + + if (item.getDlicollectedfrom() != null) + summary.setDatasources(item.getDlicollectedfrom().stream() + .map( + c -> new CollectedFromType(c.getName(), c.getId(), c.getCompletionStatus()) + ).collect(Collectors.toList())); + + + return summary; + } +} diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/provision/scholix/TypedIdentifier.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/provision/scholix/TypedIdentifier.java new file mode 100644 index 000000000..5d9ced6cf --- /dev/null +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/provision/scholix/TypedIdentifier.java @@ -0,0 +1,32 @@ +package eu.dnetlib.dhp.provision.scholix; + +import java.io.Serializable; + +public class TypedIdentifier implements Serializable { + private String id; + private String type; + + public TypedIdentifier() { + } + + public TypedIdentifier(String id, String type) { + this.id = id; + this.type = type; + } + + public String getId() { + return id; + } + + public void setId(String id) { + this.id = id; + } + + public String getType() { + return type; + } + + public void setType(String type) { + this.type = type; + } +} \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/provision/scholix/Typology.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/provision/scholix/Typology.java new file mode 100644 index 000000000..78ddcae51 --- /dev/null +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/provision/scholix/Typology.java @@ -0,0 +1,9 @@ +package eu.dnetlib.dhp.provision.scholix; + +import java.io.Serializable; + +public enum Typology implements Serializable { + dataset, + publication, + unknown +} diff --git a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/graph/Application/provision/oozie_app/config-default.xml b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/graph/Application/provision/oozie_app/config-default.xml new file mode 100644 index 000000000..6fb2a1253 --- /dev/null +++ b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/graph/Application/provision/oozie_app/config-default.xml @@ -0,0 +1,10 @@ + + + oozie.use.system.libpath + true + + + oozie.action.sharelib.for.spark + spark2 + + \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/graph/Application/provision/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/graph/Application/provision/oozie_app/workflow.xml new file mode 100644 index 000000000..7e509d7bf --- /dev/null +++ b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/graph/Application/provision/oozie_app/workflow.xml @@ -0,0 +1,100 @@ + + + + workingDirPath + the source path + + + graphPath + the graph path + + + index + index name + + + sparkDriverMemory + memory for driver process + + + sparkExecutorMemory + memory for individual executor + + + sparkExecutorCores + number of cores used by single executor + + + + + + + + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] + + + + + + + + + + + + + ${jobTracker} + ${nameNode} + yarn-cluster + cluster + calculate for each ID the number of related Dataset, publication and Unknown + eu.dnetlib.dhp.provision.SparkExtractRelationCount + dhp-graph-provision-${projectVersion}.jar + --executor-memory ${sparkExecutorMemory} --driver-memory=${sparkDriverMemory} ${sparkExtraOPT} + -mt yarn-cluster + --workingDirPath${workingDirPath} + --relationPath${graphPath}/relation + + + + + + + + ${jobTracker} + ${nameNode} + yarn-cluster + cluster + generate Summary + eu.dnetlib.dhp.provision.SparkGenerateSummary + dhp-graph-provision-${projectVersion}.jar + --executor-memory ${sparkExecutorMemory} --driver-memory=${sparkDriverMemory} ${sparkExtraOPT} + -mt yarn-cluster + --workingDirPath${workingDirPath} + --graphPath${graphPath} + + + + + + + + ${jobTracker} + ${nameNode} + yarn-cluster + cluster + generate Summary + eu.dnetlib.dhp.provision.SparkIndexCollectionOnES + dhp-graph-provision-${projectVersion}.jar + --executor-memory ${sparkExecutorMemory} --num-executors 20 --driver-memory=${sparkDriverMemory} ${sparkExtraOPT} + -mt yarn-cluster + --sourcePath${workingDirPath}/summary + --index${index}_object + + + + + + + + \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/provision/index_on_es.json b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/provision/index_on_es.json new file mode 100644 index 000000000..e1c30ba39 --- /dev/null +++ b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/provision/index_on_es.json @@ -0,0 +1,20 @@ +[ + { + "paramName": "mt", + "paramLongName": "master", + "paramDescription": "should be local or yarn", + "paramRequired": true + }, + { + "paramName": "s", + "paramLongName": "sourcePath", + "paramDescription": "the working path where generated files", + "paramRequired": true + }, + { + "paramName": "i", + "paramLongName": "index", + "paramDescription": "the index name", + "paramRequired": true + } +] \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/provision/input_generate_summary_parameters.json b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/provision/input_generate_summary_parameters.json new file mode 100644 index 000000000..37fbffb9b --- /dev/null +++ b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/provision/input_generate_summary_parameters.json @@ -0,0 +1,20 @@ +[ + { + "paramName": "mt", + "paramLongName": "master", + "paramDescription": "should be local or yarn", + "paramRequired": true + }, + { + "paramName": "w", + "paramLongName": "workingDirPath", + "paramDescription": "the working path where generated files", + "paramRequired": true + }, + { + "paramName": "g", + "paramLongName": "graphPath", + "paramDescription": "the relationPath path ", + "paramRequired": true + } +] \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/provision/input_related_entities_parameters.json b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/provision/input_related_entities_parameters.json new file mode 100644 index 000000000..4106ab352 --- /dev/null +++ b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/provision/input_related_entities_parameters.json @@ -0,0 +1,20 @@ +[ + { + "paramName": "mt", + "paramLongName": "master", + "paramDescription": "should be local or yarn", + "paramRequired": true + }, + { + "paramName": "w", + "paramLongName": "workingDirPath", + "paramDescription": "the working path where generated files", + "paramRequired": true + }, + { + "paramName": "r", + "paramLongName": "relationPath", + "paramDescription": "the relationPath path ", + "paramRequired": true + } +] \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/provision/ExtractInfoTest.java b/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/provision/ExtractInfoTest.java new file mode 100644 index 000000000..a45ee5d18 --- /dev/null +++ b/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/provision/ExtractInfoTest.java @@ -0,0 +1,48 @@ +package eu.dnetlib.dhp.provision; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.provision.scholix.ScholixSummary; +import org.apache.commons.io.IOUtils; +import org.junit.Ignore; +import org.junit.Test; + +public class ExtractInfoTest { + + @Test + public void test() throws Exception { + + final String json = IOUtils.toString(getClass().getResourceAsStream("record.json")); + + + ProvisionUtil.getItemType(json,ProvisionUtil.TARGETJSONPATH); + + } + + + @Test + public void testSerialization() throws Exception { + + ScholixSummary summary = new ScholixSummary(); + summary.setDescription("descrizione"); + ObjectMapper mapper = new ObjectMapper(); + String json = mapper.writeValueAsString(summary); + System.out.println(json); + System.out.println(mapper.readValue(json, ScholixSummary.class).getDescription()); + } + + + @Test + @Ignore + public void testIndex() throws Exception { + SparkIndexCollectionOnES.main( + + new String[] { + "-mt", "local[*]", + "-s", "/home/sandro/dli", + "-i", "dli_object" + } + ); + } +} diff --git a/dhp-workflows/dhp-graph-provision/src/test/resources/eu/dnetlib/dhp/provision/record.json b/dhp-workflows/dhp-graph-provision/src/test/resources/eu/dnetlib/dhp/provision/record.json new file mode 100644 index 000000000..a79e7334f --- /dev/null +++ b/dhp-workflows/dhp-graph-provision/src/test/resources/eu/dnetlib/dhp/provision/record.json @@ -0,0 +1 @@ +{"dataInfo":{"invisible":false,"inferred":null,"deletedbyinference":false,"trust":"0.9","inferenceprovenance":null,"provenanceaction":null},"lastupdatetimestamp":null,"relType":"references","subRelType":null,"relClass":"datacite","source":"50|f2123fce7e56c73dc8f1bf64ec59b477","target":"50|b618cbe39ba940a29993ac324e5f9621","collectedFrom":[{"key":"dli_________::datacite","value":"Datasets in Datacite","dataInfo":null}]} \ No newline at end of file diff --git a/dhp-workflows/pom.xml b/dhp-workflows/pom.xml index cf71190a4..06986547e 100644 --- a/dhp-workflows/pom.xml +++ b/dhp-workflows/pom.xml @@ -18,6 +18,7 @@ dhp-distcp dhp-graph-mapper dhp-dedup + dhp-graph-provision diff --git a/pom.xml b/pom.xml index ada3a33a4..039b94d44 100644 --- a/pom.xml +++ b/pom.xml @@ -243,6 +243,14 @@ ${vtd.version} + + org.elasticsearch + elasticsearch-hadoop + 7.6.0 + + + + org.apache.oozie oozie-client