From 718bc7bbc8c273a9e1d06ab254ffc2c15820c8cb Mon Sep 17 00:00:00 2001 From: Sandro La Bruzzo Date: Fri, 7 Aug 2020 11:05:18 +0200 Subject: [PATCH] implemented provision workflows using the new implementation with Dataset --- .../provision/SparkExtractRelationCount.java | 34 ------ ....scala => SparkExtractRelationCount.scala} | 30 ++++- .../dhp/provision/SparkGenerateScholix.java | 109 ------------------ .../provision/SparkGenerateScholixIndex.scala | 62 ++++++++++ .../dhp/provision/SparkGenerateSummary.java | 106 ----------------- .../provision/SparkGenerateSummaryIndex.scala | 70 +++++++++++ .../scholix/summary/ScholixSummary.java | 50 ++------ .../eu/dnetlib/dhp/provision/cluster.json | 4 + .../dhp/sx/provision/oozie_app/workflow.xml | 62 +--------- 9 files changed, 175 insertions(+), 352 deletions(-) delete mode 100644 dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/provision/SparkExtractRelationCount.java rename dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/provision/{DatasetJoiner.scala => SparkExtractRelationCount.scala} (53%) delete mode 100644 dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/provision/SparkGenerateScholix.java create mode 100644 dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/provision/SparkGenerateScholixIndex.scala delete mode 100644 dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/provision/SparkGenerateSummary.java create mode 100644 dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/provision/SparkGenerateSummaryIndex.scala create mode 100644 dhp-workflows/dhp-graph-provision-scholexplorer/src/main/resources/eu/dnetlib/dhp/provision/cluster.json diff --git a/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/provision/SparkExtractRelationCount.java b/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/provision/SparkExtractRelationCount.java deleted file mode 100644 index df167f104..000000000 --- a/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/provision/SparkExtractRelationCount.java +++ /dev/null @@ -1,34 +0,0 @@ - -package eu.dnetlib.dhp.provision; - -import org.apache.commons.io.IOUtils; -import org.apache.spark.sql.*; - -import eu.dnetlib.dhp.application.ArgumentApplicationParser; - -/** - * SparkExtractRelationCount is a spark job that takes in input relation RDD and retrieve for each item in relation - * which are the number of - Related Dataset - Related Publication - Related Unknown - */ -public class SparkExtractRelationCount { - - public static void main(String[] args) throws Exception { - final ArgumentApplicationParser parser = new ArgumentApplicationParser( - IOUtils - .toString( - SparkExtractRelationCount.class - .getResourceAsStream( - "/eu/dnetlib/dhp/provision/input_related_entities_parameters.json"))); - parser.parseArgument(args); - final SparkSession spark = SparkSession - .builder() - .appName(SparkExtractRelationCount.class.getSimpleName()) - .master(parser.get("master")) - .getOrCreate(); - - final String workingDirPath = parser.get("workingDirPath"); - - final String relationPath = parser.get("relationPath"); - DatasetJoiner.startJoin(spark, relationPath, workingDirPath + "/relatedItemCount"); - } -} diff --git a/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/provision/DatasetJoiner.scala b/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/provision/SparkExtractRelationCount.scala similarity index 53% rename from dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/provision/DatasetJoiner.scala rename to dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/provision/SparkExtractRelationCount.scala index afc33c34a..d6e36ac87 100644 --- a/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/provision/DatasetJoiner.scala +++ b/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/provision/SparkExtractRelationCount.scala @@ -1,12 +1,32 @@ package eu.dnetlib.dhp.provision -import org.apache.spark.sql.SparkSession +import eu.dnetlib.dhp.application.ArgumentApplicationParser +import eu.dnetlib.dhp.schema.oaf.Relation +import org.apache.commons.io.IOUtils +import org.apache.spark.sql.{Encoder, Encoders, SaveMode, SparkSession} import org.apache.spark.sql.functions.{coalesce, col, count, lit} -object DatasetJoiner { - def startJoin(spark: SparkSession, relPath:String, targetPath:String) { - val relation = spark.read.load(relPath) +/** + * SparkExtractRelationCount is a spark job that takes in input relation RDD and retrieve for each item in relation + * which are the number of - Related Dataset - Related Publication - Related Unknown + */ +object SparkExtractRelationCount { + + + def main(args: Array[String]): Unit = { + + val parser = new ArgumentApplicationParser(IOUtils.toString(SparkExtractRelationCount.getClass.getResourceAsStream("/eu/dnetlib/dhp/provision/input_related_entities_parameters.json"))) + parser.parseArgument(args) + val spark = SparkSession.builder.appName(SparkExtractRelationCount.getClass.getSimpleName).master(parser.get("master")).getOrCreate + + val workingDirPath = parser.get("workingDirPath") + + val relationPath = parser.get("relationPath") + + implicit val relEncoder: Encoder[Relation] = Encoders.kryo[Relation] + + val relation = spark.read.load(relationPath).as[Relation].map(r =>r)(Encoders.bean(classOf[Relation])) val relatedPublication = relation .where("target like '50%'") @@ -34,7 +54,7 @@ object DatasetJoiner { coalesce(col("dataset"),lit(0)).alias("relatedDataset"), coalesce(col("unknown"),lit(0)).alias("relatedUnknown") ) - firstJoin.write.mode("overwrite").save(targetPath) + firstJoin.write.mode(SaveMode.Overwrite).save(s"$workingDirPath/relatedItemCount") } } diff --git a/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/provision/SparkGenerateScholix.java b/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/provision/SparkGenerateScholix.java deleted file mode 100644 index f9f3a58ce..000000000 --- a/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/provision/SparkGenerateScholix.java +++ /dev/null @@ -1,109 +0,0 @@ - -package eu.dnetlib.dhp.provision; - -import org.apache.commons.io.IOUtils; -import org.apache.hadoop.io.compress.GzipCodec; -import org.apache.spark.SparkConf; -import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.api.java.function.MapFunction; -import org.apache.spark.sql.Dataset; -import org.apache.spark.sql.Encoders; -import org.apache.spark.sql.SaveMode; -import org.apache.spark.sql.SparkSession; - -import com.fasterxml.jackson.databind.ObjectMapper; - -import eu.dnetlib.dhp.application.ArgumentApplicationParser; -import eu.dnetlib.dhp.provision.scholix.*; -import eu.dnetlib.dhp.provision.scholix.summary.ScholixSummary; -import eu.dnetlib.dhp.schema.oaf.Relation; -import scala.Tuple2; - -public class SparkGenerateScholix { - - public static void main(String[] args) throws Exception { - final ArgumentApplicationParser parser = new ArgumentApplicationParser( - IOUtils - .toString( - SparkGenerateScholix.class - .getResourceAsStream( - "/eu/dnetlib/dhp/provision/input_generate_summary_parameters.json"))); - parser.parseArgument(args); - SparkConf conf = new SparkConf(); - conf.set("spark.sql.shuffle.partitions", "4000"); - conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); - final SparkSession spark = SparkSession - .builder() - .config(conf) - .appName(SparkExtractRelationCount.class.getSimpleName()) - .master(parser.get("master")) - .getOrCreate(); - - conf - .registerKryoClasses( - new Class[] { - Scholix.class, ScholixCollectedFrom.class, ScholixEntityId.class, - ScholixIdentifier.class, ScholixRelationship.class, ScholixResource.class - }); - - final String graphPath = parser.get("graphPath"); - final String workingDirPath = parser.get("workingDirPath"); - - final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); - - final Dataset scholixSummary = spark - .read() - .load(workingDirPath + "/summary") - .as(Encoders.bean(ScholixSummary.class)); - final Dataset rels = spark.read().load(graphPath + "/relation").as(Encoders.bean(Relation.class)); - - Dataset firstJoin = scholixSummary - .joinWith(rels, scholixSummary.col("id").equalTo(rels.col("source"))) - .map( - (MapFunction, Scholix>) f -> Scholix - .generateScholixWithSource(f._1(), f._2()), - Encoders.bean(Scholix.class)); - - firstJoin.write().mode(SaveMode.Overwrite).save(workingDirPath + "/scholix_1"); - - Dataset scholix_final = spark - .read() - .load(workingDirPath + "/scholix_1") - .as(Encoders.bean(Scholix.class)); - - scholixSummary - .map( - (MapFunction) ScholixResource::fromSummary, - Encoders.bean(ScholixResource.class)) - .repartition(1000) - .write() - .mode(SaveMode.Overwrite) - .save(workingDirPath + "/scholix_target"); - - Dataset target = spark - .read() - .load(workingDirPath + "/scholix_target") - .as(Encoders.bean(ScholixResource.class)); - - scholix_final - .joinWith( - target, scholix_final.col("identifier").equalTo(target.col("dnetIdentifier")), "inner") - .map( - (MapFunction, Scholix>) f -> { - final Scholix scholix = f._1(); - final ScholixResource scholixTarget = f._2(); - scholix.setTarget(scholixTarget); - scholix.generateIdentifier(); - scholix.generatelinkPublisher(); - return scholix; - }, - Encoders.kryo(Scholix.class)) - .javaRDD() - .map( - s -> { - ObjectMapper mapper = new ObjectMapper(); - return mapper.writeValueAsString(s); - }) - .saveAsTextFile(workingDirPath + "/scholix_json", GzipCodec.class); - } -} diff --git a/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/provision/SparkGenerateScholixIndex.scala b/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/provision/SparkGenerateScholixIndex.scala new file mode 100644 index 000000000..dbf6de05f --- /dev/null +++ b/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/provision/SparkGenerateScholixIndex.scala @@ -0,0 +1,62 @@ +package eu.dnetlib.dhp.provision + +import eu.dnetlib.dhp.application.ArgumentApplicationParser +import eu.dnetlib.dhp.provision.scholix.{Scholix, ScholixResource} +import eu.dnetlib.dhp.provision.scholix.summary.ScholixSummary +import eu.dnetlib.dhp.schema.oaf.Relation +import org.apache.commons.io.IOUtils +import org.apache.spark.SparkConf +import org.apache.spark.sql.{Dataset, Encoder, Encoders, SaveMode, SparkSession} + +object SparkGenerateScholixIndex { + + + def main(args: Array[String]): Unit = { + val parser = new ArgumentApplicationParser(IOUtils.toString(SparkGenerateScholixIndex.getClass.getResourceAsStream("/eu/dnetlib/dhp/provision/input_generate_summary_parameters.json"))) + parser.parseArgument(args) + val conf = new SparkConf + conf.set("spark.sql.shuffle.partitions", "4000") + val spark = SparkSession.builder.config(conf).appName(SparkGenerateScholixIndex.getClass.getSimpleName).master(parser.get("master")).getOrCreate + + val graphPath = parser.get("graphPath") + val workingDirPath = parser.get("workingDirPath") + + + implicit val summaryEncoder:Encoder[ScholixSummary] = Encoders.kryo[ScholixSummary] + implicit val relEncoder:Encoder[Relation] = Encoders.kryo[Relation] + implicit val scholixEncoder:Encoder[Scholix] = Encoders.kryo[Scholix] + implicit val tupleScholix:Encoder[(String,Scholix)]=Encoders.tuple(Encoders.STRING, scholixEncoder) + + + val scholixSummary:Dataset[(String,ScholixSummary)] = spark.read.load(s"$workingDirPath/summary").as[ScholixSummary] + .map(s => (s.getId, s))(Encoders.tuple(Encoders.STRING, summaryEncoder)) + val sourceRelations:Dataset[(String,Relation)]= spark.read.load(s"$graphPath/relation").as[Relation] + .map(r => (r.getSource,r))(Encoders.tuple(Encoders.STRING, relEncoder)) + + scholixSummary.joinWith(sourceRelations, scholixSummary("_1").equalTo(sourceRelations("_1")), "inner") + .map(r=> { + val summary = r._1._2 + val relation = r._2._2 + + (relation.getTarget, Scholix.generateScholixWithSource(summary,relation)) + + }).write.mode(SaveMode.Overwrite).save(s"$workingDirPath/scholix_source") + + val sTarget:Dataset[(String,Scholix)] = spark.read.load(s"$workingDirPath/scholix_source").as[(String, Scholix)] + + sTarget.joinWith(scholixSummary, sTarget("_1").equalTo(scholixSummary("_1")), "inner").map(i => { + val summary = i._2._2 + val scholix = i._1._2 + + val scholixResource = ScholixResource.fromSummary(summary) + scholix.setTarget(scholixResource) + scholix.generateIdentifier() + scholix.generatelinkPublisher() + scholix + }).write.mode(SaveMode.Overwrite).save(s"$workingDirPath/scholix") + + + + } + +} diff --git a/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/provision/SparkGenerateSummary.java b/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/provision/SparkGenerateSummary.java deleted file mode 100644 index 04bde1099..000000000 --- a/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/provision/SparkGenerateSummary.java +++ /dev/null @@ -1,106 +0,0 @@ - -package eu.dnetlib.dhp.provision; - -import org.apache.commons.io.IOUtils; -import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.api.java.function.MapFunction; -import org.apache.spark.sql.Dataset; -import org.apache.spark.sql.Encoders; -import org.apache.spark.sql.SparkSession; - -import eu.dnetlib.dhp.application.ArgumentApplicationParser; -import eu.dnetlib.dhp.provision.scholix.summary.ScholixSummary; -import eu.dnetlib.dhp.utils.DHPUtils; -import scala.Tuple2; - -public class SparkGenerateSummary { - - private static final String jsonIDPath = "$.id"; - - public static void main(String[] args) throws Exception { - final ArgumentApplicationParser parser = new ArgumentApplicationParser( - IOUtils - .toString( - SparkGenerateSummary.class - .getResourceAsStream( - "/eu/dnetlib/dhp/provision/input_generate_summary_parameters.json"))); - parser.parseArgument(args); - final SparkSession spark = SparkSession - .builder() - .appName(SparkExtractRelationCount.class.getSimpleName()) - .master(parser.get("master")) - .getOrCreate(); - - final String graphPath = parser.get("graphPath"); - final String workingDirPath = parser.get("workingDirPath"); - - final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); - - Dataset rInfo = spark - .read() - .load(workingDirPath + "/relatedItemCount") - .as(Encoders.bean(RelatedItemInfo.class)); - - Dataset entity = spark - .createDataset( - sc - .textFile( - graphPath + "/publication," + graphPath + "/dataset," + graphPath + "/unknown") - .map( - s -> ScholixSummary - .fromJsonOAF( - ProvisionUtil.getItemTypeFromId(DHPUtils.getJPathString(jsonIDPath, s)), - s)) - .rdd(), - Encoders.bean(ScholixSummary.class)); - - Dataset summaryComplete = rInfo - .joinWith(entity, rInfo.col("source").equalTo(entity.col("id"))) - .map( - (MapFunction, ScholixSummary>) t -> { - ScholixSummary scholixSummary = t._2(); - RelatedItemInfo relatedItemInfo = t._1(); - scholixSummary.setRelatedDatasets(relatedItemInfo.getRelatedDataset()); - scholixSummary - .setRelatedPublications( - relatedItemInfo.getRelatedPublication()); - scholixSummary.setRelatedUnknown(relatedItemInfo.getRelatedUnknown()); - return scholixSummary; - }, - Encoders.bean(ScholixSummary.class)); - - summaryComplete.write().save(workingDirPath + "/summary"); - - // JavaPairRDD relationCount = - // sc.textFile(workingDirPath+"/relatedItemCount").mapToPair((PairFunction) i -> new Tuple2<>(DHPUtils.getJPathString(jsonIDPath, i), i)); - // - // JavaPairRDD entities = - // sc.textFile(graphPath + "/publication") - // .filter(ProvisionUtil::isNotDeleted) - // .mapToPair((PairFunction) i -> new - // Tuple2<>(DHPUtils.getJPathString(jsonIDPath, i), i)) - // .union( - // sc.textFile(graphPath + "/dataset") - // .filter(ProvisionUtil::isNotDeleted) - // .mapToPair((PairFunction) - // i -> - // new Tuple2<>(DHPUtils.getJPathString(jsonIDPath, i), i)) - // ) - // .union( - // sc.textFile(graphPath + "/unknown") - // .filter(ProvisionUtil::isNotDeleted) - // .mapToPair((PairFunction) - // i -> - // new Tuple2<>(DHPUtils.getJPathString(jsonIDPath, i), i)) - // ); - // entities.join(relationCount).map((Function>, - // String>) k -> - // ScholixSummary.fromJsonOAF(ProvisionUtil.getItemTypeFromId(k._1()), - // k._2()._1(), k._2()._2())).saveAsTextFile(workingDirPath+"/summary", GzipCodec.class); - // - // - // ; - - } -} diff --git a/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/provision/SparkGenerateSummaryIndex.scala b/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/provision/SparkGenerateSummaryIndex.scala new file mode 100644 index 000000000..bf3d0342b --- /dev/null +++ b/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/provision/SparkGenerateSummaryIndex.scala @@ -0,0 +1,70 @@ +package eu.dnetlib.dhp.provision + +import eu.dnetlib.dhp.application.ArgumentApplicationParser +import eu.dnetlib.dhp.provision.scholix.summary.ScholixSummary +import eu.dnetlib.dhp.schema.oaf.{Oaf, OafEntity, Relation} +import eu.dnetlib.dhp.schema.scholexplorer.{DLIDataset, DLIPublication, DLIUnknown} +import org.apache.commons.io.IOUtils +import org.apache.spark.sql.{Dataset, Encoder, Encoders, SaveMode, SparkSession} + +object SparkGenerateSummaryIndex { + + def main(args: Array[String]): Unit = { + val parser = new ArgumentApplicationParser(IOUtils.toString(SparkGenerateSummaryIndex.getClass.getResourceAsStream("/eu/dnetlib/dhp/provision/input_generate_summary_parameters.json"))) + parser.parseArgument(args) + val spark = SparkSession.builder.appName(SparkGenerateSummaryIndex.getClass.getSimpleName).master(parser.get("master")).getOrCreate + + val graphPath = parser.get("graphPath") + val workingDirPath = parser.get("workingDirPath") + + implicit val relatedItemInfoEncoders: Encoder[RelatedItemInfo] = Encoders.bean(classOf[RelatedItemInfo]) + implicit val datasetEncoder:Encoder[DLIDataset] = Encoders.kryo[DLIDataset] + implicit val publicationEncoder:Encoder[DLIPublication] = Encoders.kryo[DLIPublication] + implicit val relationEncoder:Encoder[Relation] = Encoders.kryo[Relation] + implicit val oafEncoder: Encoder[Oaf] = Encoders.kryo[Oaf] + implicit val oafWithIdEncoder: Encoder[(String, Oaf)] = Encoders.tuple(Encoders.STRING, oafEncoder) + implicit val scholixSummaryEncoder: Encoder[ScholixSummary] = Encoders.kryo[ScholixSummary] + implicit val scholixSummaryEncoderTuple: Encoder[(String,ScholixSummary)] = Encoders.tuple(Encoders.STRING,scholixSummaryEncoder) + + + val pubs = spark.read.load(s"$graphPath/publication").as[Oaf].map(o => (o.asInstanceOf[DLIPublication].getId, o)) + val dats = spark.read.load(s"$graphPath/dataset").as[Oaf].map(o => (o.asInstanceOf[DLIDataset].getId, o)) + val ukn = spark.read.load(s"$graphPath/unknown").as[Oaf].map(o => (o.asInstanceOf[DLIUnknown].getId, o)) + + + val summary:Dataset[(String,ScholixSummary)] = pubs.union(dats).union(ukn).map(o =>{ + val s = ScholixSummary.fromOAF(o._2) + (s.getId,s) + }) + + + val relatedItemInfoDs:Dataset[RelatedItemInfo] = spark.read.load(s"$workingDirPath/relatedItemCount").as[RelatedItemInfo] + + + summary.joinWith(relatedItemInfoDs, summary("_1").equalTo(relatedItemInfoDs("source")), "inner") + .map(i => { + val summary = i._1._2 + val relatedItemInfo = i._2 + summary.setRelatedDatasets(relatedItemInfo.getRelatedDataset) + summary.setRelatedPublications(relatedItemInfo.getRelatedPublication) + summary.setRelatedUnknown(relatedItemInfo.getRelatedUnknown) + summary + }).filter(s => s.getLocalIdentifier != null).write.mode(SaveMode.Overwrite).save(s"$workingDirPath/summary") + + + + + + + + + + + + + + + + } + +} diff --git a/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/provision/scholix/summary/ScholixSummary.java b/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/provision/scholix/summary/ScholixSummary.java index e5ea8b9f5..3b808ba51 100644 --- a/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/provision/scholix/summary/ScholixSummary.java +++ b/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/provision/scholix/summary/ScholixSummary.java @@ -12,6 +12,8 @@ import com.fasterxml.jackson.databind.ObjectMapper; import eu.dnetlib.dhp.provision.RelatedItemInfo; import eu.dnetlib.dhp.schema.oaf.Author; +import eu.dnetlib.dhp.schema.oaf.Oaf; +import eu.dnetlib.dhp.schema.oaf.OafEntity; import eu.dnetlib.dhp.schema.oaf.StructuredProperty; import eu.dnetlib.dhp.schema.scholexplorer.DLIDataset; import eu.dnetlib.dhp.schema.scholexplorer.DLIPublication; @@ -138,54 +140,20 @@ public class ScholixSummary implements Serializable { this.datasources = datasources; } - public static ScholixSummary fromJsonOAF(final Typology oafType, final String oafJson) { + public static ScholixSummary fromOAF(final Oaf oaf) { try { - final ObjectMapper mapper = new ObjectMapper(); final RelatedItemInfo relatedItemInfo = new RelatedItemInfo(); - mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); - switch (oafType) { - case dataset: - return summaryFromDataset(mapper.readValue(oafJson, DLIDataset.class), relatedItemInfo); - case publication: - return summaryFromPublication( - mapper.readValue(oafJson, DLIPublication.class), relatedItemInfo); - case unknown: - return summaryFromUnknown(mapper.readValue(oafJson, DLIUnknown.class), relatedItemInfo); - } - } catch (Throwable e) { - throw new RuntimeException(e); - } - return null; - } - public static String fromJsonOAF( - final Typology oafType, final String oafJson, final String relEntityJson) { - try { - final ObjectMapper mapper = new ObjectMapper(); - mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); - - RelatedItemInfo relatedItemInfo = mapper.readValue(relEntityJson, RelatedItemInfo.class); - - switch (oafType) { - case dataset: - return mapper - .writeValueAsString( - summaryFromDataset(mapper.readValue(oafJson, DLIDataset.class), relatedItemInfo)); - case publication: - return mapper - .writeValueAsString( - summaryFromPublication( - mapper.readValue(oafJson, DLIPublication.class), relatedItemInfo)); - case unknown: - return mapper - .writeValueAsString( - summaryFromUnknown(mapper.readValue(oafJson, DLIUnknown.class), relatedItemInfo)); - } + if (oaf instanceof DLIPublication) + return summaryFromPublication((DLIPublication) oaf, relatedItemInfo); + if (oaf instanceof DLIDataset) + return summaryFromDataset((DLIDataset) oaf, relatedItemInfo); + if (oaf instanceof DLIUnknown) + return summaryFromUnknown((DLIUnknown) oaf, relatedItemInfo); } catch (Throwable e) { throw new RuntimeException(e); } - return null; } diff --git a/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/resources/eu/dnetlib/dhp/provision/cluster.json b/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/resources/eu/dnetlib/dhp/provision/cluster.json new file mode 100644 index 000000000..1cea6a8b9 --- /dev/null +++ b/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/resources/eu/dnetlib/dhp/provision/cluster.json @@ -0,0 +1,4 @@ +{ + "cluster1": "10.19.65.51, 10.19.65.52, 10.19.65.53, 10.19.65.54", + "cluster2": "10.19.65.55, 10.19.65.56, 10.19.65.57, 10.19.65.58" +} \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/resources/eu/dnetlib/dhp/sx/provision/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/resources/eu/dnetlib/dhp/sx/provision/oozie_app/workflow.xml index 68543e3f2..7ce35cee2 100644 --- a/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/resources/eu/dnetlib/dhp/sx/provision/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/resources/eu/dnetlib/dhp/sx/provision/oozie_app/workflow.xml @@ -16,18 +16,6 @@ sparkExecutorMemory memory for individual executor - - - - - - - - - - - - @@ -53,7 +41,7 @@ calculate for each ID the number of related Dataset, publication and Unknown eu.dnetlib.dhp.provision.SparkExtractRelationCount dhp-graph-provision-scholexplorer-${projectVersion}.jar - --executor-memory ${sparkExecutorMemory} --driver-memory=${sparkDriverMemory} ${sparkExtraOPT} + --executor-memory ${sparkExecutorMemory} --executor-cores=${sparkExecutorCores} --driver-memory=${sparkDriverMemory} ${sparkExtraOPT} -mt yarn-cluster --workingDirPath${workingDirPath} --relationPath${graphPath}/relation @@ -69,9 +57,9 @@ yarn-cluster cluster generate Summary - eu.dnetlib.dhp.provision.SparkGenerateSummary + eu.dnetlib.dhp.provision.SparkGenerateSummaryIndex dhp-graph-provision-scholexplorer-${projectVersion}.jar - --executor-memory ${sparkExecutorMemory} --driver-memory=${sparkDriverMemory} ${sparkExtraOPT} + --executor-memory ${sparkExecutorMemory} --executor-cores=${sparkExecutorCores} --driver-memory=${sparkDriverMemory} --conf spark.sql.shuffle.partitions=4000 ${sparkExtraOPT} -mt yarn-cluster --workingDirPath${workingDirPath} --graphPath${graphPath} @@ -87,9 +75,9 @@ yarn-cluster cluster generate Scholix - eu.dnetlib.dhp.provision.SparkGenerateScholix + eu.dnetlib.dhp.provision.SparkGenerateScholixIndex dhp-graph-provision-scholexplorer-${projectVersion}.jar - --executor-memory 6G --driver-memory=${sparkDriverMemory} ${sparkExtraOPT} + --executor-memory ${sparkExecutorMemory} --executor-cores=${sparkExecutorCores} --driver-memory=${sparkDriverMemory} --conf spark.sql.shuffle.partitions=4000 ${sparkExtraOPT} -mt yarn-cluster --workingDirPath${workingDirPath} --graphPath${graphPath} @@ -98,45 +86,5 @@ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - \ No newline at end of file