From 0594b92a6d92509c5b1ed8a21af6496893508884 Mon Sep 17 00:00:00 2001 From: Sandro La Bruzzo Date: Thu, 19 Mar 2020 11:11:07 +0100 Subject: [PATCH] implemented relation with dataset --- .../dnetlib/dedup/SparkUpdateEntityJob.java | 2 - .../SparkScholexplorerMergeEntitiesJob.java | 8 + .../dnetlib/dhp/provision/DatasetJoiner.scala | 29 ++ .../dnetlib/dhp/provision/ProvisionUtil.java | 30 +- .../dhp/provision/RelatedItemInfo.java | 48 ++- .../provision/SparkExtractRelationCount.java | 58 +-- .../dhp/provision/SparkGenerateScholix.java | 114 +++--- .../dhp/provision/SparkGenerateSummary.java | 67 +++- .../provision/SparkIndexCollectionOnES.java | 24 +- .../dhp/provision/scholix/Scholix.java | 70 +++- .../scholix/ScholixCollectedFrom.java | 9 +- .../provision/scholix/ScholixEntityId.java | 6 +- .../provision/scholix/ScholixIdentifier.java | 6 +- .../scholix/ScholixRelationship.java | 9 +- .../provision/scholix/ScholixResource.java | 30 +- .../scholix/summary/ScholixSummary.java | 50 ++- .../provision/oozie_app/workflow.xml | 52 +-- .../eu/dnetlib/dhp/provision/index_on_es.json | 7 + .../dnetlib/dhp/provision/scholix_index.json | 331 ++++++++++++++++++ .../dnetlib/dhp/provision/summary_index.json | 132 +++++++ .../dhp/provision/ExtractInfoTest.java | 19 +- 21 files changed, 847 insertions(+), 254 deletions(-) create mode 100644 dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/provision/DatasetJoiner.scala create mode 100644 dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/provision/scholix_index.json create mode 100644 dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/provision/summary_index.json diff --git a/dhp-workflows/dhp-dedup/src/main/java/eu/dnetlib/dedup/SparkUpdateEntityJob.java b/dhp-workflows/dhp-dedup/src/main/java/eu/dnetlib/dedup/SparkUpdateEntityJob.java index 3ea7982d1..396349481 100644 --- a/dhp-workflows/dhp-dedup/src/main/java/eu/dnetlib/dedup/SparkUpdateEntityJob.java +++ b/dhp-workflows/dhp-dedup/src/main/java/eu/dnetlib/dedup/SparkUpdateEntityJob.java @@ -25,8 +25,6 @@ import java.io.IOException; public class SparkUpdateEntityJob { final static String IDJSONPATH = "$.id"; - final static String SOURCEJSONPATH = "$.source"; - final static String TARGETJSONPATH = "$.target"; public static void main(String[] args) throws Exception { final ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString(SparkUpdateEntityJob.class.getResourceAsStream("/eu/dnetlib/dhp/dedup/dedup_delete_by_inference_parameters.json"))); diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/scholexplorer/SparkScholexplorerMergeEntitiesJob.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/scholexplorer/SparkScholexplorerMergeEntitiesJob.java index d3c257fc6..d9b88c8b2 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/scholexplorer/SparkScholexplorerMergeEntitiesJob.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/scholexplorer/SparkScholexplorerMergeEntitiesJob.java @@ -159,6 +159,14 @@ public class SparkScholexplorerMergeEntitiesJob { } , Encoders.bean(Relation.class)); secondJoin.write().mode(SaveMode.Overwrite).save(targetPath+"_fixed"); + + + FileSystem fileSystem = FileSystem.get(sc.hadoopConfiguration()); + + + fileSystem.delete(new Path(targetPath), true); + fileSystem.rename(new Path(targetPath+"_fixed"),new Path(targetPath)); + } } diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/provision/DatasetJoiner.scala b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/provision/DatasetJoiner.scala new file mode 100644 index 000000000..a550bff34 --- /dev/null +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/provision/DatasetJoiner.scala @@ -0,0 +1,29 @@ +package eu.dnetlib.dhp.provision + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.functions.{coalesce, col, count, lit} + +object DatasetJoiner { + + def startJoin(spark: SparkSession, relPath:String, targetPath:String) { + val relation = spark.read.load(relPath) + + val relatedPublication = relation.where("target like '50%'").groupBy("source").agg(count("target").as("publication")).select(col("source"). alias("p_source"), col("publication")) + val relatedDataset = relation.where("target like '60%'").groupBy("source").agg(count("target").as("dataset")).select(col("source"). alias("d_source"), col("dataset")) + val relatedUnknown = relation.where("target like '70%'").groupBy("source").agg(count("target").as("unknown")).select(col("source"). alias("u_source"), col("unknown")) + val firstJoin = relatedPublication + .join(relatedDataset,col("p_source").equalTo(col("d_source")),"full") + .select(coalesce(col("p_source"), col("d_source")).alias("id"), + col("publication"), + col("dataset")) + .join(relatedUnknown, col("u_source").equalTo(col("id")),"full") + .select(coalesce(col("u_source"), col("id")).alias("source"), + coalesce(col("publication"),lit(0)).alias("relatedPublication"), + coalesce(col("dataset"),lit(0)).alias("relatedDataset"), + coalesce(col("unknown"),lit(0)).alias("relatedUnknown") + ) + firstJoin.write.mode("overwrite").save(targetPath) + + } + +} diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/provision/ProvisionUtil.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/provision/ProvisionUtil.java index cd797f44c..aed444660 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/provision/ProvisionUtil.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/provision/ProvisionUtil.java @@ -10,21 +10,21 @@ public class ProvisionUtil { public final static String TARGETJSONPATH = "$.target"; public final static String SOURCEJSONPATH = "$.source"; - public static RelatedItemInfo getItemType(final String item, final String idPath) { - String targetId = DHPUtils.getJPathString(idPath, item); - switch (StringUtils.substringBefore(targetId, "|")) { - case "50": - return new RelatedItemInfo().setRelatedPublication(1); - case "60": - return new RelatedItemInfo().setRelatedDataset(1); - case "70": - return new RelatedItemInfo().setRelatedUnknown(1); - default: - throw new RuntimeException("Unknonw target ID"); - - } - - } +// public static RelatedItemInfo getItemType(final String item, final String idPath) { +// String targetId = DHPUtils.getJPathString(idPath, item); +// switch (StringUtils.substringBefore(targetId, "|")) { +// case "50": +// return new RelatedItemInfo(null,0,1,0); +// case "60": +// return new RelatedItemInfo(null,1,0,0); +// case "70": +// return new RelatedItemInfo(null,0,0,1); +// default: +// throw new RuntimeException("Unknonw target ID"); +// +// } +// +// } public static Boolean isNotDeleted(final String item) { return !"true".equalsIgnoreCase(DHPUtils.getJPathString(deletedByInferenceJPATH, item)); diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/provision/RelatedItemInfo.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/provision/RelatedItemInfo.java index bf89b3115..3b07aab8d 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/provision/RelatedItemInfo.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/provision/RelatedItemInfo.java @@ -8,57 +8,53 @@ import java.io.Serializable; public class RelatedItemInfo implements Serializable { - private String id; + private String source; - private int relatedDataset = 0; + private long relatedDataset = 0; - private int relatedPublication = 0; + private long relatedPublication = 0; - private int relatedUnknown = 0; + private long relatedUnknown = 0; - - public String getId() { - return id; + public RelatedItemInfo() { } - public RelatedItemInfo setId(String id) { - this.id = id; - return this; + public RelatedItemInfo(String source, long relatedDataset, long relatedPublication, long relatedUnknown) { + this.source = source; + this.relatedDataset = relatedDataset; + this.relatedPublication = relatedPublication; + this.relatedUnknown = relatedUnknown; } - public RelatedItemInfo add(RelatedItemInfo other) { - if (other != null) { - relatedDataset += other.getRelatedDataset(); - relatedPublication += other.getRelatedPublication(); - relatedUnknown += other.getRelatedUnknown(); - } - return this; + public String getSource() { + return source; } - public int getRelatedDataset() { + public void setSource(String source) { + this.source = source; + } + + public long getRelatedDataset() { return relatedDataset; } - public RelatedItemInfo setRelatedDataset(int relatedDataset) { + public void setRelatedDataset(long relatedDataset) { this.relatedDataset = relatedDataset; - return this; } - public int getRelatedPublication() { + public long getRelatedPublication() { return relatedPublication; } - public RelatedItemInfo setRelatedPublication(int relatedPublication) { + public void setRelatedPublication(long relatedPublication) { this.relatedPublication = relatedPublication; - return this; } - public int getRelatedUnknown() { + public long getRelatedUnknown() { return relatedUnknown; } - public RelatedItemInfo setRelatedUnknown(int relatedUnknown) { + public void setRelatedUnknown(int relatedUnknown) { this.relatedUnknown = relatedUnknown; - return this; } } diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/provision/SparkExtractRelationCount.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/provision/SparkExtractRelationCount.java index d3991448f..fc96db201 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/provision/SparkExtractRelationCount.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/provision/SparkExtractRelationCount.java @@ -1,19 +1,22 @@ package eu.dnetlib.dhp.provision; import com.fasterxml.jackson.databind.ObjectMapper; -import com.jayway.jsonpath.JsonPath; import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.schema.oaf.Relation; import eu.dnetlib.dhp.utils.DHPUtils; -import net.minidev.json.JSONArray; import org.apache.commons.io.IOUtils; -import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.io.compress.GzipCodec; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; -import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.*; +import org.apache.spark.sql.catalyst.expressions.Expression; import scala.Tuple2; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; + /** * SparkExtractRelationCount is a spark job that takes in input relation RDD @@ -42,27 +45,34 @@ public class SparkExtractRelationCount { final String relationPath = parser.get("relationPath"); - final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); - sc.textFile(relationPath) - // We start to Filter the relation not deleted by Inference - .filter(ProvisionUtil::isNotDeleted) - // Then we create a PairRDD - .mapToPair((PairFunction) f - -> new Tuple2<>(DHPUtils.getJPathString(ProvisionUtil.SOURCEJSONPATH, f), ProvisionUtil.getItemType(f, ProvisionUtil.TARGETJSONPATH))) - //We reduce and sum the number of Relations - .reduceByKey((Function2) (v1, v2) -> { - if (v1 == null && v2 == null) - return new RelatedItemInfo(); - return v1 != null ? v1.add(v2) : v2; - }) - //Set the source Id in RelatedItem object - .map(k -> k._2().setId(k._1())) - // Convert to JSON and save as TextFile - .map(k -> { - ObjectMapper mapper = new ObjectMapper(); - return mapper.writeValueAsString(k); - }).saveAsTextFile(workingDirPath + "/relatedItemCount", GzipCodec.class); + + + + DatasetJoiner.startJoin(spark, relationPath,workingDirPath + "/relatedItemCount"); + + + + +// sc.textFile(relationPath) +// // We start to Filter the relation not deleted by Inference +// .filter(ProvisionUtil::isNotDeleted) +// // Then we create a PairRDD +// .mapToPair((PairFunction) f +// -> new Tuple2<>(DHPUtils.getJPathString(ProvisionUtil.SOURCEJSONPATH, f), ProvisionUtil.getItemType(f, ProvisionUtil.TARGETJSONPATH))) +// //We reduce and sum the number of Relations +// .reduceByKey((Function2) (v1, v2) -> { +// if (v1 == null && v2 == null) +// return new RelatedItemInfo(); +// return v1 != null ? v1.add(v2) : v2; +// }) +// //Set the source Id in RelatedItem object +// .map(k -> k._2().setId(k._1())) +// // Convert to JSON and save as TextFile +// .map(k -> { +// ObjectMapper mapper = new ObjectMapper(); +// return mapper.writeValueAsString(k); +// }).saveAsTextFile(workingDirPath + "/relatedItemCount", GzipCodec.class); } diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/provision/SparkGenerateScholix.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/provision/SparkGenerateScholix.java index 2e08849cd..104cefce2 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/provision/SparkGenerateScholix.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/provision/SparkGenerateScholix.java @@ -1,16 +1,22 @@ package eu.dnetlib.dhp.provision; -import com.fasterxml.jackson.databind.ObjectMapper; import eu.dnetlib.dhp.application.ArgumentApplicationParser; -import eu.dnetlib.dhp.provision.scholix.Scholix; -import eu.dnetlib.dhp.provision.scholix.ScholixResource; -import eu.dnetlib.dhp.provision.scholix.summary.ScholixSummary; +import eu.dnetlib.dhp.provision.scholix.*; +import eu.dnetlib.dhp.provision.scholix.summary.*; +import eu.dnetlib.dhp.schema.oaf.Relation; import org.apache.commons.io.IOUtils; -import org.apache.hadoop.io.compress.GzipCodec; +import org.apache.commons.lang3.StringUtils; +import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.function.FlatMapFunction; +import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.api.java.function.PairFlatMapFunction; -import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.*; + +import static org.apache.spark.sql.functions.col; + +import scala.Int; import scala.Tuple2; import java.util.ArrayList; @@ -19,19 +25,34 @@ import java.util.Random; public class SparkGenerateScholix { - private static final String jsonIDPath = "$.id"; - private static final String sourceIDPath = "$.source"; - private static final String targetIDPath = "$.target"; - - - - public static void main(String[] args) throws Exception { final ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString(SparkGenerateScholix.class.getResourceAsStream("/eu/dnetlib/dhp/provision/input_generate_summary_parameters.json"))); parser.parseArgument(args); + + + SparkConf conf = new SparkConf(); + conf.set("spark.sql.shuffle.partitions","4000"); +// conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); +// conf.registerKryoClasses(new Class[]{ +// ScholixSummary.class, +// CollectedFromType.class, +// SchemeValue.class, +// TypedIdentifier.class, +// Typology.class, +// Relation.class, +// Scholix.class, +// ScholixCollectedFrom.class, +// ScholixEntityId.class, +// ScholixIdentifier.class, +// ScholixRelationship.class, +// ScholixResource.class +// }); + + final SparkSession spark = SparkSession .builder() + .config(conf) .appName(SparkExtractRelationCount.class.getSimpleName()) .master(parser.get("master")) .getOrCreate(); @@ -42,51 +63,30 @@ public class SparkGenerateScholix { final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); + final Dataset scholixSummary = spark.read().load(workingDirPath + "/summary").as(Encoders.bean(ScholixSummary.class)); + final Dataset rels = spark.read().load(graphPath + "/relation").as(Encoders.bean(Relation.class)); -// final JavaRDD relationToExport = sc.textFile(graphPath + "/relation").filter(ProvisionUtil::isNotDeleted).repartition(4000); - final JavaPairRDD scholixSummary = - sc.textFile(workingDirPath + "/summary") - .flatMapToPair((PairFlatMapFunction) i -> { - final ObjectMapper mapper = new ObjectMapper(); - final ScholixSummary summary = mapper.readValue(i, ScholixSummary.class); - ScholixResource tmp = ScholixResource.fromSummary(summary); - final List> result = new ArrayList<>(); - for (int k = 0; k<10; k++) - result.add(new Tuple2<>(String.format("%s::%d", tmp.getDnetIdentifier(), k), tmp)); - return result.iterator(); - }); -// scholixSummary.join( -// relationToExport -// .mapToPair((PairFunction) i -> new Tuple2<>(DHPUtils.getJPathString(sourceIDPath, i), i))) -// .map(Tuple2::_2) -// .mapToPair(summaryRelation -> -// new Tuple2<>( -// DHPUtils.getJPathString(targetIDPath, summaryRelation._2()), -// Scholix.generateScholixWithSource(summaryRelation._1(), summaryRelation._2()))) -// -// .map(t-> t._2().setTarget(new ScholixResource().setDnetIdentifier(t._1()))) -// .map(s-> { -// ObjectMapper mapper = new ObjectMapper(); -// return mapper.writeValueAsString(s); -// }) -// .saveAsTextFile(workingDirPath + "/scholix", GzipCodec.class); - sc.textFile(workingDirPath + "/scholix") - .mapToPair(t -> { - ObjectMapper mapper = new ObjectMapper(); - Scholix scholix = mapper.readValue(t, Scholix.class); - Random rand = new Random(); - return new Tuple2<>(String.format("%s::%d",scholix.getTarget().getDnetIdentifier(), rand.nextInt(10)), scholix); - }) - .join(scholixSummary) - .map(t-> { - Scholix item = t._2()._1().setTarget(t._2()._2()); - item.generateIdentifier(); - return item; - }) - .map(s-> new ObjectMapper().writeValueAsString(s)).saveAsTextFile(workingDirPath + "/scholix_index", GzipCodec.class); + Dataset firstJoin = scholixSummary.joinWith(rels, scholixSummary.col("id").equalTo(rels.col("source"))) + .map((MapFunction, Scholix>) f -> Scholix.generateScholixWithSource(f._1(), f._2()), Encoders.bean(Scholix.class)); + + firstJoin.write().mode(SaveMode.Overwrite).save(workingDirPath+"/scholix_1"); + firstJoin = spark.read().load(workingDirPath+"/scholix_1").as(Encoders.bean(Scholix.class)); + + + + Dataset scholix_final = spark.read().load(workingDirPath+"/scholix_1").as(Encoders.bean(Scholix.class)); + + Dataset target = spark.read().load(workingDirPath+"/scholix_target").as(Encoders.bean(ScholixResource.class)); + + scholix_final.joinWith(target, scholix_final.col("identifier").equalTo(target.col("dnetIdentifier")), "inner") + .map((MapFunction, Scholix>) f -> { + final Scholix scholix = f._1(); + final ScholixResource scholixTarget = f._2(); + scholix.setTarget(scholixTarget); + scholix.generateIdentifier(); + scholix.generatelinkPublisher(); + return scholix; + }, Encoders.bean(Scholix.class)).repartition(5000).write().mode(SaveMode.Overwrite).save(workingDirPath+"/scholix_index"); } - - - } diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/provision/SparkGenerateSummary.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/provision/SparkGenerateSummary.java index a8cdf6dd5..39b7a9468 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/provision/SparkGenerateSummary.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/provision/SparkGenerateSummary.java @@ -1,14 +1,19 @@ package eu.dnetlib.dhp.provision; +import com.fasterxml.jackson.databind.ObjectMapper; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.provision.scholix.summary.ScholixSummary; import eu.dnetlib.dhp.utils.DHPUtils; import org.apache.commons.io.IOUtils; import org.apache.hadoop.io.compress.GzipCodec; import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; +import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.api.java.function.PairFunction; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; import org.apache.spark.sql.SparkSession; import scala.Tuple2; @@ -31,27 +36,53 @@ public class SparkGenerateSummary { final String workingDirPath = parser.get("workingDirPath"); final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); - JavaPairRDD relationCount = sc.textFile(workingDirPath+"/relatedItemCount").mapToPair((PairFunction) i -> new Tuple2<>(DHPUtils.getJPathString(jsonIDPath, i), i)); - JavaPairRDD entities = - sc.textFile(graphPath + "/publication") - .filter(ProvisionUtil::isNotDeleted) - .mapToPair((PairFunction) i -> new Tuple2<>(DHPUtils.getJPathString(jsonIDPath, i), i)) - .union( - sc.textFile(graphPath + "/dataset") - .filter(ProvisionUtil::isNotDeleted) - .mapToPair((PairFunction) i -> new Tuple2<>(DHPUtils.getJPathString(jsonIDPath, i), i)) - ) - .union( - sc.textFile(graphPath + "/unknown") - .filter(ProvisionUtil::isNotDeleted) - .mapToPair((PairFunction) i -> new Tuple2<>(DHPUtils.getJPathString(jsonIDPath, i), i)) - ); - entities.join(relationCount).map((Function>, String>) k -> - ScholixSummary.fromJsonOAF(ProvisionUtil.getItemTypeFromId(k._1()), k._2()._1(), k._2()._2())).saveAsTextFile(workingDirPath+"/summary", GzipCodec.class); + Dataset rInfo = spark.read().load(workingDirPath + "/relatedItemCount").as(Encoders.bean(RelatedItemInfo.class)); - ; + Dataset entity = spark.createDataset(sc.textFile(graphPath + "/publication," + graphPath + "/dataset," + graphPath + "/unknown") + .map(s -> + ScholixSummary.fromJsonOAF(ProvisionUtil.getItemTypeFromId(DHPUtils.getJPathString(jsonIDPath, s)), s) + + + ).rdd(), Encoders.bean(ScholixSummary.class)); + + + Dataset summaryComplete = rInfo.joinWith(entity, rInfo.col("source").equalTo(entity.col("id"))).map((MapFunction, ScholixSummary>) t -> + { + ScholixSummary scholixSummary = t._2(); + RelatedItemInfo relatedItemInfo = t._1(); + scholixSummary.setRelatedDatasets(relatedItemInfo.getRelatedDataset()); + scholixSummary.setRelatedPublications(relatedItemInfo.getRelatedPublication()); + scholixSummary.setRelatedUnknown(relatedItemInfo.getRelatedUnknown()); + return scholixSummary; + }, Encoders.bean(ScholixSummary.class) + ); + + summaryComplete.write().save(workingDirPath+"/summary"); + + +// JavaPairRDD relationCount = sc.textFile(workingDirPath+"/relatedItemCount").mapToPair((PairFunction) i -> new Tuple2<>(DHPUtils.getJPathString(jsonIDPath, i), i)); +// +// JavaPairRDD entities = +// sc.textFile(graphPath + "/publication") +// .filter(ProvisionUtil::isNotDeleted) +// .mapToPair((PairFunction) i -> new Tuple2<>(DHPUtils.getJPathString(jsonIDPath, i), i)) +// .union( +// sc.textFile(graphPath + "/dataset") +// .filter(ProvisionUtil::isNotDeleted) +// .mapToPair((PairFunction) i -> new Tuple2<>(DHPUtils.getJPathString(jsonIDPath, i), i)) +// ) +// .union( +// sc.textFile(graphPath + "/unknown") +// .filter(ProvisionUtil::isNotDeleted) +// .mapToPair((PairFunction) i -> new Tuple2<>(DHPUtils.getJPathString(jsonIDPath, i), i)) +// ); +// entities.join(relationCount).map((Function>, String>) k -> +// ScholixSummary.fromJsonOAF(ProvisionUtil.getItemTypeFromId(k._1()), k._2()._1(), k._2()._2())).saveAsTextFile(workingDirPath+"/summary", GzipCodec.class); +// +// +// ; } } diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/provision/SparkIndexCollectionOnES.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/provision/SparkIndexCollectionOnES.java index 7f240cbef..ce3c6315c 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/provision/SparkIndexCollectionOnES.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/provision/SparkIndexCollectionOnES.java @@ -1,13 +1,20 @@ package eu.dnetlib.dhp.provision; +import com.fasterxml.jackson.databind.ObjectMapper; import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.provision.scholix.Scholix; +import eu.dnetlib.dhp.provision.scholix.summary.ScholixSummary; import org.apache.commons.io.IOUtils; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; import org.apache.spark.sql.SparkSession; import org.elasticsearch.spark.rdd.api.java.JavaEsSpark; +import java.nio.file.attribute.AclFileAttributeView; import java.util.HashMap; import java.util.Map; @@ -21,17 +28,30 @@ public class SparkIndexCollectionOnES { SparkConf conf = new SparkConf().setAppName(SparkIndexCollectionOnES.class.getSimpleName()) .setMaster(parser.get("master")); + conf.set("spark.sql.shuffle.partitions","4000"); + final String sourcePath = parser.get("sourcePath"); final String index = parser.get("index"); final String idPath = parser.get("idPath"); + final String type = parser.get("type"); final SparkSession spark = SparkSession.builder().config(conf).getOrCreate(); final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); - JavaRDD inputRdd = sc.textFile(sourcePath); + JavaRDD inputRdd; + + + if("summary".equalsIgnoreCase(type)) + inputRdd = spark.read().load(sourcePath).as(Encoders.bean(ScholixSummary.class)).map((MapFunction) f -> { + final ObjectMapper mapper = new ObjectMapper(); + return mapper.writeValueAsString(f); + }, Encoders.STRING()).javaRDD(); + + else + inputRdd = sc.textFile(sourcePath); Map esCfg = new HashMap<>(); esCfg.put("es.nodes", "10.19.65.51, 10.19.65.52, 10.19.65.53, 10.19.65.54"); @@ -40,8 +60,6 @@ public class SparkIndexCollectionOnES { esCfg.put("es.batch.write.retry.wait", "60s"); esCfg.put("es.batch.size.entries", "200"); esCfg.put("es.nodes.wan.only", "true"); - - JavaEsSpark.saveJsonToEs(inputRdd,index, esCfg); } diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/provision/scholix/Scholix.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/provision/scholix/Scholix.java index 3ebccfea0..c3ccf6899 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/provision/scholix/Scholix.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/provision/scholix/Scholix.java @@ -5,8 +5,7 @@ import eu.dnetlib.dhp.provision.scholix.summary.ScholixSummary; import eu.dnetlib.dhp.schema.oaf.Relation; import eu.dnetlib.dhp.utils.DHPUtils; import java.io.Serializable; -import java.util.Collections; -import java.util.List; +import java.util.*; import java.util.stream.Collectors; public class Scholix implements Serializable { @@ -25,6 +24,20 @@ public class Scholix implements Serializable { private String identifier; + public Scholix clone(final ScholixResource t) { + final Scholix clone = new Scholix(); + clone.setPublicationDate(publicationDate); + clone.setPublisher(publisher); + clone.setLinkprovider(linkprovider); + clone.setRelationship(relationship); + clone.setSource(source); + clone.setTarget(t); + clone.generatelinkPublisher(); + clone.generateIdentifier(); + return clone; + } + + public static Scholix generateScholixWithSource(final String sourceSummaryJson, final String relation) { final ObjectMapper mapper = new ObjectMapper(); @@ -46,8 +59,36 @@ public class Scholix implements Serializable { } } + public static Scholix generateScholixWithSource(final ScholixSummary scholixSummary, final Relation rel) { + final Scholix s = new Scholix(); + if (scholixSummary.getDate() != null && scholixSummary.getDate().size()>0) + s.setPublicationDate(scholixSummary.getDate().get(0)); + s.setLinkprovider(rel.getCollectedFrom().stream().map(cf -> + new ScholixEntityId(cf.getValue(), Collections.singletonList( + new ScholixIdentifier(cf.getKey(), "dnet_identifier") + ))).collect(Collectors.toList())); + s.setRelationship(new ScholixRelationship(rel.getRelType(),rel.getRelClass(),null )); + s.setSource(ScholixResource.fromSummary(scholixSummary)); - public void generateIdentifier( ) { + s.setIdentifier(rel.getTarget()); +// ScholixResource mockTarget = new ScholixResource(); +// mockTarget.setDnetIdentifier(rel.getTarget()); +// s.setTarget(mockTarget); +// s.generateIdentifier(); + return s; + } + + + public void generatelinkPublisher() { + Set publisher = new HashSet<>(); + if (source.getPublisher() != null) + publisher.addAll(source.getPublisher().stream().map(ScholixEntityId::getName).collect(Collectors.toList())); + if (target.getPublisher() != null) + publisher.addAll(target.getPublisher().stream().map(ScholixEntityId::getName).collect(Collectors.toList())); + this.publisher = publisher.stream().map(k -> new ScholixEntityId(k ,null)).collect(Collectors.toList()); + } + + public void generateIdentifier( ) { setIdentifier(DHPUtils.md5(String.format("%s::%s::%s",source.getDnetIdentifier(),relationship.getName(), target.getDnetIdentifier()))); } @@ -65,67 +106,58 @@ public class Scholix implements Serializable { } } - public String getPublicationDate() { return publicationDate; } - public Scholix setPublicationDate(String publicationDate) { + public void setPublicationDate(String publicationDate) { this.publicationDate = publicationDate; - return this; } public List getPublisher() { return publisher; } - public Scholix setPublisher(List publisher) { + public void setPublisher(List publisher) { this.publisher = publisher; - return this; } public List getLinkprovider() { return linkprovider; } - public Scholix setLinkprovider(List linkprovider) { + public void setLinkprovider(List linkprovider) { this.linkprovider = linkprovider; - return this; } public ScholixRelationship getRelationship() { return relationship; } - public Scholix setRelationship(ScholixRelationship relationship) { + public void setRelationship(ScholixRelationship relationship) { this.relationship = relationship; - return this; } public ScholixResource getSource() { return source; } - public Scholix setSource(ScholixResource source) { + public void setSource(ScholixResource source) { this.source = source; - return this; } public ScholixResource getTarget() { return target; } - public Scholix setTarget(ScholixResource target) { + public void setTarget(ScholixResource target) { this.target = target; - return this; } public String getIdentifier() { return identifier; } - - public Scholix setIdentifier(String identifier) { + public void setIdentifier(String identifier) { this.identifier = identifier; - return this; } } diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/provision/scholix/ScholixCollectedFrom.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/provision/scholix/ScholixCollectedFrom.java index 62da993ba..2ba84188d 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/provision/scholix/ScholixCollectedFrom.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/provision/scholix/ScholixCollectedFrom.java @@ -21,26 +21,23 @@ public class ScholixCollectedFrom implements Serializable { return provider; } - public ScholixCollectedFrom setProvider(ScholixEntityId provider) { + public void setProvider(ScholixEntityId provider) { this.provider = provider; - return this; } public String getProvisionMode() { return provisionMode; } - public ScholixCollectedFrom setProvisionMode(String provisionMode) { + public void setProvisionMode(String provisionMode) { this.provisionMode = provisionMode; - return this; } public String getCompletionStatus() { return completionStatus; } - public ScholixCollectedFrom setCompletionStatus(String completionStatus) { + public void setCompletionStatus(String completionStatus) { this.completionStatus = completionStatus; - return this; } } diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/provision/scholix/ScholixEntityId.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/provision/scholix/ScholixEntityId.java index a2e307e6e..0f43a8d44 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/provision/scholix/ScholixEntityId.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/provision/scholix/ScholixEntityId.java @@ -19,17 +19,15 @@ public class ScholixEntityId implements Serializable { return name; } - public ScholixEntityId setName(String name) { + public void setName(String name) { this.name = name; - return this; } public List getIdentifiers() { return identifiers; } - public ScholixEntityId setIdentifiers(List identifiers) { + public void setIdentifiers(List identifiers) { this.identifiers = identifiers; - return this; } } diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/provision/scholix/ScholixIdentifier.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/provision/scholix/ScholixIdentifier.java index 9adac698d..f354ef10a 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/provision/scholix/ScholixIdentifier.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/provision/scholix/ScholixIdentifier.java @@ -18,17 +18,15 @@ public class ScholixIdentifier implements Serializable { return identifier; } - public ScholixIdentifier setIdentifier(String identifier) { + public void setIdentifier(String identifier) { this.identifier = identifier; - return this; } public String getSchema() { return schema; } - public ScholixIdentifier setSchema(String schema) { + public void setSchema(String schema) { this.schema = schema; - return this; } } diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/provision/scholix/ScholixRelationship.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/provision/scholix/ScholixRelationship.java index 9bcb9222b..1a35038b9 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/provision/scholix/ScholixRelationship.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/provision/scholix/ScholixRelationship.java @@ -20,26 +20,23 @@ public class ScholixRelationship implements Serializable { return name; } - public ScholixRelationship setName(String name) { + public void setName(String name) { this.name = name; - return this; } public String getSchema() { return schema; } - public ScholixRelationship setSchema(String schema) { + public void setSchema(String schema) { this.schema = schema; - return this; } public String getInverse() { return inverse; } - public ScholixRelationship setInverse(String inverse) { + public void setInverse(String inverse) { this.inverse = inverse; - return this; } } diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/provision/scholix/ScholixResource.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/provision/scholix/ScholixResource.java index abcb398b5..49b891e65 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/provision/scholix/ScholixResource.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/provision/scholix/ScholixResource.java @@ -21,6 +21,9 @@ public class ScholixResource implements Serializable { private List collectedFrom; + + + public static ScholixResource fromSummary(ScholixSummary summary) { final ScholixResource resource = new ScholixResource(); @@ -66,80 +69,71 @@ public class ScholixResource implements Serializable { return identifier; } - public ScholixResource setIdentifier(List identifier) { + public void setIdentifier(List identifier) { this.identifier = identifier; - return this; } public String getDnetIdentifier() { return dnetIdentifier; } - public ScholixResource setDnetIdentifier(String dnetIdentifier) { + public void setDnetIdentifier(String dnetIdentifier) { this.dnetIdentifier = dnetIdentifier; - return this; } public String getObjectType() { return objectType; } - public ScholixResource setObjectType(String objectType) { + public void setObjectType(String objectType) { this.objectType = objectType; - return this; } public String getObjectSubType() { return objectSubType; } - public ScholixResource setObjectSubType(String objectSubType) { + public void setObjectSubType(String objectSubType) { this.objectSubType = objectSubType; - return this; } public String getTitle() { return title; } - public ScholixResource setTitle(String title) { + public void setTitle(String title) { this.title = title; - return this; } public List getCreator() { return creator; } - public ScholixResource setCreator(List creator) { + public void setCreator(List creator) { this.creator = creator; - return this; } public String getPublicationDate() { return publicationDate; } - public ScholixResource setPublicationDate(String publicationDate) { + public void setPublicationDate(String publicationDate) { this.publicationDate = publicationDate; - return this; } public List getPublisher() { return publisher; } - public ScholixResource setPublisher(List publisher) { + public void setPublisher(List publisher) { this.publisher = publisher; - return this; } public List getCollectedFrom() { return collectedFrom; } - public ScholixResource setCollectedFrom(List collectedFrom) { + public void setCollectedFrom(List collectedFrom) { this.collectedFrom = collectedFrom; - return this; } } diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/provision/scholix/summary/ScholixSummary.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/provision/scholix/summary/ScholixSummary.java index 8cde8e679..26538d156 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/provision/scholix/summary/ScholixSummary.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/provision/scholix/summary/ScholixSummary.java @@ -11,6 +11,7 @@ import eu.dnetlib.dhp.schema.scholexplorer.DLIPublication; import eu.dnetlib.dhp.schema.scholexplorer.DLIUnknown; import java.io.Serializable; +import java.util.Collections; import java.util.List; import java.util.stream.Collectors; @@ -24,9 +25,9 @@ public class ScholixSummary implements Serializable { private String description; private List subject; private List publisher; - private int relatedPublications; - private int relatedDatasets; - private int relatedUnknown; + private long relatedPublications; + private long relatedDatasets; + private long relatedUnknown; private List datasources; @@ -104,27 +105,27 @@ public class ScholixSummary implements Serializable { this.publisher = publisher; } - public int getRelatedPublications() { + public long getRelatedPublications() { return relatedPublications; } - public void setRelatedPublications(int relatedPublications) { + public void setRelatedPublications(long relatedPublications) { this.relatedPublications = relatedPublications; } - public int getRelatedDatasets() { + public long getRelatedDatasets() { return relatedDatasets; } - public void setRelatedDatasets(int relatedDatasets) { + public void setRelatedDatasets(long relatedDatasets) { this.relatedDatasets = relatedDatasets; } - public int getRelatedUnknown() { + public long getRelatedUnknown() { return relatedUnknown; } - public void setRelatedUnknown(int relatedUnknown) { + public void setRelatedUnknown(long relatedUnknown) { this.relatedUnknown = relatedUnknown; } @@ -137,6 +138,25 @@ public class ScholixSummary implements Serializable { } + public static ScholixSummary fromJsonOAF(final Typology oafType, final String oafJson) { + try { + final ObjectMapper mapper = new ObjectMapper(); + final RelatedItemInfo relatedItemInfo = new RelatedItemInfo(); + mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + switch (oafType) { + case dataset: + return summaryFromDataset(mapper.readValue(oafJson, DLIDataset.class), relatedItemInfo); + case publication: + return summaryFromPublication(mapper.readValue(oafJson, DLIPublication.class), relatedItemInfo); + case unknown: + return summaryFromUnknown(mapper.readValue(oafJson, DLIUnknown.class), relatedItemInfo); + } + } catch (Throwable e) { + throw new RuntimeException(e); + } + return null; + } + public static String fromJsonOAF(final Typology oafType, final String oafJson, final String relEntityJson) { try { final ObjectMapper mapper = new ObjectMapper(); @@ -197,7 +217,8 @@ public class ScholixSummary implements Serializable { .collect(Collectors.toList()) ); } - + if (item.getPublisher()!= null) + summary.setPublisher(Collections.singletonList(item.getPublisher().getValue())); summary.setRelatedDatasets(relatedItemInfo.getRelatedDataset()); summary.setRelatedPublications(relatedItemInfo.getRelatedPublication()); @@ -208,12 +229,10 @@ public class ScholixSummary implements Serializable { .map( c -> new CollectedFromType(c.getName(), c.getId(), c.getCompletionStatus()) ).collect(Collectors.toList())); - - return summary; } - private static ScholixSummary summaryFromPublication(final DLIPublication item, final RelatedItemInfo relatedItemInfo) { + private static ScholixSummary summaryFromPublication(final DLIPublication item, final RelatedItemInfo relatedItemInfo) { ScholixSummary summary = new ScholixSummary(); summary.setId(item.getId()); @@ -249,6 +268,9 @@ public class ScholixSummary implements Serializable { ); } + if (item.getPublisher()!= null) + summary.setPublisher(Collections.singletonList(item.getPublisher().getValue())); + summary.setRelatedDatasets(relatedItemInfo.getRelatedDataset()); summary.setRelatedPublications(relatedItemInfo.getRelatedPublication()); @@ -264,7 +286,7 @@ public class ScholixSummary implements Serializable { return summary; } - private static ScholixSummary summaryFromUnknown(final DLIUnknown item, final RelatedItemInfo relatedItemInfo) { + private static ScholixSummary summaryFromUnknown(final DLIUnknown item, final RelatedItemInfo relatedItemInfo) { ScholixSummary summary = new ScholixSummary(); summary.setId(item.getId()); if (item.getPid() != null) diff --git a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/graph/Application/provision/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/graph/Application/provision/oozie_app/workflow.xml index 83f386f5c..1102ec4c1 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/graph/Application/provision/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/graph/Application/provision/oozie_app/workflow.xml @@ -83,7 +83,25 @@ --workingDirPath${workingDirPath} --graphPath${graphPath} - + + + + + + + ${jobTracker} + ${nameNode} + yarn-cluster + cluster + generate Scholix + eu.dnetlib.dhp.provision.SparkGenerateScholix + dhp-graph-provision-${projectVersion}.jar + --executor-memory 9G --driver-memory=${sparkDriverMemory} ${sparkExtraOPT} + -mt yarn-cluster + --workingDirPath${workingDirPath} + --graphPath${graphPath} + + @@ -96,36 +114,17 @@ generate Summary eu.dnetlib.dhp.provision.SparkIndexCollectionOnES dhp-graph-provision-${projectVersion}.jar - --executor-memory ${sparkExecutorMemory} --num-executors 20 --driver-memory=${sparkDriverMemory} ${sparkExtraOPT} + --executor-memory ${sparkExecutorMemory} --driver-memory=${sparkDriverMemory} ${sparkExtraOPT} --conf spark.dynamicAllocation.maxExecutors="64" -mt yarn-cluster --sourcePath${workingDirPath}/summary --index${index}_object - - - - - - - - - - ${jobTracker} - ${nameNode} - yarn-cluster - cluster - generate Scholix - eu.dnetlib.dhp.provision.SparkGenerateScholix - dhp-graph-provision-${projectVersion}.jar - --executor-memory ${sparkExecutorMemory} --driver-memory=${sparkDriverMemory} ${sparkExtraOPT} - -mt yarn-cluster - --workingDirPath${workingDirPath} - --graphPath${graphPath} + --idPathid + --typesummary - ${jobTracker} @@ -135,15 +134,16 @@ index scholix eu.dnetlib.dhp.provision.SparkIndexCollectionOnES dhp-graph-provision-${projectVersion}.jar - --executor-memory ${sparkExecutorMemory} --num-executors 20 --driver-memory=${sparkDriverMemory} ${sparkExtraOPT} --conf spark.dynamicAllocation.maxExecutors="32" + --executor-memory ${sparkExecutorMemory} --driver-memory=${sparkDriverMemory} ${sparkExtraOPT} --conf spark.dynamicAllocation.maxExecutors="16" -mt yarn-cluster - --sourcePath${workingDirPath}/scholix_index + --sourcePath${workingDirPath}/scholix_json --index${index}_scholix + --idPathidentifier + --typescholix - \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/provision/index_on_es.json b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/provision/index_on_es.json index d4904d8d3..905b6d514 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/provision/index_on_es.json +++ b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/provision/index_on_es.json @@ -17,6 +17,13 @@ "paramDescription": "the index name", "paramRequired": true }, + + { + "paramName": "t", + "paramLongName": "type", + "paramDescription": "should be scholix or summary", + "paramRequired": true + }, { "paramName": "id", "paramLongName": "idPath", diff --git a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/provision/scholix_index.json b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/provision/scholix_index.json new file mode 100644 index 000000000..02718c1d3 --- /dev/null +++ b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/provision/scholix_index.json @@ -0,0 +1,331 @@ +{ + "mappings": { + "properties": { + "identifier": { + "type": "text", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 256 + } + } + }, + "linkprovider": { + "type": "nested", + "properties": { + "identifiers": { + "properties": { + "identifier": { + "type": "text", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 256 + } + } + }, + "schema": { + "type": "text", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 256 + } + } + } + } + }, + "name": { + "type": "keyword" + } + } + }, + "publicationDate": { + "type": "keyword" + }, + "relationship": { + "properties": { + "name": { + "type": "text", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 256 + } + } + }, + "schema": { + "type": "text", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 256 + } + } + } + } + }, + "source": { + "type": "nested", + "properties": { + "collectedFrom": { + "properties": { + "completionStatus": { + "type": "text", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 256 + } + } + }, + "provider": { + "properties": { + "identifiers": { + "properties": { + "identifier": { + "type": "text", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 256 + } + } + }, + "schema": { + "type": "text", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 256 + } + } + } + } + }, + "name": { + "type": "text", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 256 + } + } + } + } + }, + "provisionMode": { + "type": "text", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 256 + } + } + } + } + }, + "creator": { + "properties": { + "name": { + "type": "text", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 256 + } + } + } + } + }, + "dnetIdentifier": { + "type": "keyword" + }, + "identifier": { + "type": "nested", + "properties": { + "identifier": { + "type": "keyword" + }, + "schema": { + "type": "text", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 256 + } + } + }, + "type": { + "type": "keyword" + } + } + }, + "objectType": { + "type": "keyword" + }, + "publicationDate": { + "type": "keyword" + }, + "publisher": { + "type": "nested", + "properties": { + "name": { + "type": "keyword" + } + } + }, + "title": { + "type": "text", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 256 + } + } + } + } + }, + "target": { + "type": "nested", + "properties": { + "collectedFrom": { + "properties": { + "completionStatus": { + "type": "text", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 256 + } + } + }, + "provider": { + "properties": { + "identifiers": { + "properties": { + "identifier": { + "type": "text", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 256 + } + } + }, + "schema": { + "type": "text", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 256 + } + } + } + } + }, + "name": { + "type": "text", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 256 + } + } + } + } + }, + "provisionMode": { + "type": "text", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 256 + } + } + } + } + }, + "creator": { + "properties": { + "name": { + "type": "text", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 256 + } + } + } + } + }, + "dnetIdentifier": { + "type": "keyword" + }, + "identifier": { + "type": "nested", + "properties": { + "identifier": { + "type": "keyword" + }, + "schema": { + "type": "text", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 256 + } + } + }, + "type": { + "type": "keyword" + } + } + }, + "objectType": { + "type": "keyword" + }, + "publicationDate": { + "type": "keyword" + }, + "publisher": { + "type": "nested", + "properties": { + "name": { + "type": "keyword" + } + } + }, + "title": { + "type": "text", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 256 + } + } + } + } + } + } + }, + "settings": { + "index": { + "refresh_interval": "600s", + "number_of_shards": "48", + "translog": { + "sync_interval": "15s", + "durability": "ASYNC" + }, + "analysis": { + "analyzer": { + "analyzer_keyword": { + "filter": "lowercase", + "tokenizer": "keyword" + } + } + }, + "number_of_replicas": "0" + } + } +} diff --git a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/provision/summary_index.json b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/provision/summary_index.json new file mode 100644 index 000000000..105098543 --- /dev/null +++ b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/provision/summary_index.json @@ -0,0 +1,132 @@ +{ + "mappings": { + "properties": { + "abstract": { + "type": "text", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 256 + } + } + }, + "author": { + "type": "text", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 256 + } + } + }, + "datasources": { + "type": "nested", + "properties": { + "completionStatus": { + "type": "text", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 256 + } + } + }, + "datasourceId": { + "type": "keyword" + }, + "datasourceName": { + "type": "keyword" + } + } + }, + "date": { + "type": "keyword" + }, + "id": { + "type": "text", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 256 + } + } + }, + "localIdentifier": { + "type": "nested", + "properties": { + "id": { + "type": "keyword" + }, + "type": { + "type": "keyword" + } + } + }, + "publisher": { + "type": "keyword" + }, + "relatedDatasets": { + "type": "long" + }, + "relatedPublications": { + "type": "long" + }, + "relatedUnknown": { + "type": "long" + }, + "subject": { + "properties": { + "scheme": { + "type": "text", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 256 + } + } + }, + "value": { + "type": "text", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 256 + } + } + } + } + }, + "title": { + "type": "text", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 256 + } + } + }, + "typology": { + "type": "keyword" + } + } + }, + "settings": { + "index": { + "refresh_interval": "600s", + "number_of_shards": "48", + "translog": { + "sync_interval": "15s", + "durability": "ASYNC" + }, + "analysis": { + "analyzer": { + "analyzer_keyword": { + "filter": "lowercase", + "tokenizer": "keyword" + } + } + }, + "number_of_replicas": "0" + } + } +} diff --git a/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/provision/ExtractInfoTest.java b/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/provision/ExtractInfoTest.java index 12e91a72c..be06380f7 100644 --- a/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/provision/ExtractInfoTest.java +++ b/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/provision/ExtractInfoTest.java @@ -12,12 +12,10 @@ import scala.Tuple2; public class ExtractInfoTest { - @Test - public void test() throws Exception { - final String json = IOUtils.toString(getClass().getResourceAsStream("record.json")); - ProvisionUtil.getItemType(json,ProvisionUtil.TARGETJSONPATH); - } + + + @Test @@ -36,23 +34,20 @@ public class ExtractInfoTest { public void testScholix() throws Exception { final String jsonSummary = IOUtils.toString(getClass().getResourceAsStream("summary.json")); final String jsonRelation = IOUtils.toString(getClass().getResourceAsStream("relation.json")); - Scholix.generateScholixWithSource(jsonSummary, jsonRelation); - - } @Test - @Ignore + public void testIndex() throws Exception { - SparkIndexCollectionOnES.main( + SparkGenerateScholix.main( new String[] { "-mt", "local[*]", - "-s", "/home/sandro/dli", - "-i", "dli_object" + "-w", "/Users/sandro/Downloads/scholix/provision", + "-g", "/Users/sandro/Downloads/scholix/graph" } ); }