diff --git a/.gitignore b/.gitignore index 4ee86c120..28ec2ec19 100644 --- a/.gitignore +++ b/.gitignore @@ -4,6 +4,7 @@ *.ipr *.iml *~ +.vscode .classpath /*/.classpath /*/*/.classpath diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/provision/SparkGenerateScholix.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/provision/SparkGenerateScholix.java index 104cefce2..58a98e490 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/provision/SparkGenerateScholix.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/provision/SparkGenerateScholix.java @@ -1,55 +1,30 @@ package eu.dnetlib.dhp.provision; +import com.fasterxml.jackson.databind.ObjectMapper; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.provision.scholix.*; -import eu.dnetlib.dhp.provision.scholix.summary.*; +import eu.dnetlib.dhp.provision.scholix.summary.ScholixSummary; import eu.dnetlib.dhp.schema.oaf.Relation; import org.apache.commons.io.IOUtils; -import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.io.compress.GzipCodec; import org.apache.spark.SparkConf; -import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.MapFunction; -import org.apache.spark.api.java.function.PairFlatMapFunction; -import org.apache.spark.sql.*; - -import static org.apache.spark.sql.functions.col; - -import scala.Int; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.SparkSession; import scala.Tuple2; -import java.util.ArrayList; -import java.util.List; -import java.util.Random; - public class SparkGenerateScholix { public static void main(String[] args) throws Exception { final ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString(SparkGenerateScholix.class.getResourceAsStream("/eu/dnetlib/dhp/provision/input_generate_summary_parameters.json"))); parser.parseArgument(args); - - SparkConf conf = new SparkConf(); conf.set("spark.sql.shuffle.partitions","4000"); -// conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); -// conf.registerKryoClasses(new Class[]{ -// ScholixSummary.class, -// CollectedFromType.class, -// SchemeValue.class, -// TypedIdentifier.class, -// Typology.class, -// Relation.class, -// Scholix.class, -// ScholixCollectedFrom.class, -// ScholixEntityId.class, -// ScholixIdentifier.class, -// ScholixRelationship.class, -// ScholixResource.class -// }); - - + conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); final SparkSession spark = SparkSession .builder() .config(conf) @@ -57,6 +32,16 @@ public class SparkGenerateScholix { .master(parser.get("master")) .getOrCreate(); + conf.registerKryoClasses(new Class[]{ + Scholix.class, + ScholixCollectedFrom.class, + ScholixEntityId.class, + ScholixIdentifier.class, + ScholixRelationship.class, + ScholixResource.class + }); + + final String graphPath = parser.get("graphPath"); final String workingDirPath = parser.get("workingDirPath"); @@ -71,12 +56,16 @@ public class SparkGenerateScholix { .map((MapFunction, Scholix>) f -> Scholix.generateScholixWithSource(f._1(), f._2()), Encoders.bean(Scholix.class)); firstJoin.write().mode(SaveMode.Overwrite).save(workingDirPath+"/scholix_1"); - firstJoin = spark.read().load(workingDirPath+"/scholix_1").as(Encoders.bean(Scholix.class)); - - Dataset scholix_final = spark.read().load(workingDirPath+"/scholix_1").as(Encoders.bean(Scholix.class)); + scholixSummary + .map((MapFunction) ScholixResource::fromSummary, Encoders.bean(ScholixResource.class)) + .repartition(1000) + .write() + .mode(SaveMode.Overwrite) + .save(workingDirPath+"/scholix_target"); + Dataset target = spark.read().load(workingDirPath+"/scholix_target").as(Encoders.bean(ScholixResource.class)); scholix_final.joinWith(target, scholix_final.col("identifier").equalTo(target.col("dnetIdentifier")), "inner") @@ -87,6 +76,9 @@ public class SparkGenerateScholix { scholix.generateIdentifier(); scholix.generatelinkPublisher(); return scholix; - }, Encoders.bean(Scholix.class)).repartition(5000).write().mode(SaveMode.Overwrite).save(workingDirPath+"/scholix_index"); + }, Encoders.kryo(Scholix.class)).javaRDD().map(s-> { + ObjectMapper mapper = new ObjectMapper(); + return mapper.writeValueAsString(s); + }).saveAsTextFile(workingDirPath+"/scholix_json", GzipCodec.class); } } diff --git a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/graph/Application/provision/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/graph/Application/provision/oozie_app/workflow.xml index 1102ec4c1..0c22fbdbf 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/graph/Application/provision/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/graph/Application/provision/oozie_app/workflow.xml @@ -33,11 +33,9 @@ idSummary number of cores used by single executor - - - + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] @@ -96,12 +94,12 @@ generate Scholix eu.dnetlib.dhp.provision.SparkGenerateScholix dhp-graph-provision-${projectVersion}.jar - --executor-memory 9G --driver-memory=${sparkDriverMemory} ${sparkExtraOPT} + --executor-memory 6G --driver-memory=${sparkDriverMemory} ${sparkExtraOPT} -mt yarn-cluster --workingDirPath${workingDirPath} --graphPath${graphPath} - + @@ -111,7 +109,7 @@ ${nameNode} yarn-cluster cluster - generate Summary + index Summary eu.dnetlib.dhp.provision.SparkIndexCollectionOnES dhp-graph-provision-${projectVersion}.jar --executor-memory ${sparkExecutorMemory} --driver-memory=${sparkDriverMemory} ${sparkExtraOPT} --conf spark.dynamicAllocation.maxExecutors="64" @@ -134,7 +132,7 @@ index scholix eu.dnetlib.dhp.provision.SparkIndexCollectionOnES dhp-graph-provision-${projectVersion}.jar - --executor-memory ${sparkExecutorMemory} --driver-memory=${sparkDriverMemory} ${sparkExtraOPT} --conf spark.dynamicAllocation.maxExecutors="16" + --executor-memory ${sparkExecutorMemory} --driver-memory=${sparkDriverMemory} ${sparkExtraOPT} --conf spark.dynamicAllocation.maxExecutors="8" -mt yarn-cluster --sourcePath${workingDirPath}/scholix_json --index${index}_scholix