implemented provision workflows using the new implementation with Dataset

This commit is contained in:
Sandro La Bruzzo 2020-08-07 11:05:18 +02:00
parent a44e5abaa7
commit 718bc7bbc8
9 changed files with 175 additions and 352 deletions

View File

@ -1,34 +0,0 @@
package eu.dnetlib.dhp.provision;
import org.apache.commons.io.IOUtils;
import org.apache.spark.sql.*;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
/**
* SparkExtractRelationCount is a spark job that takes in input relation RDD and retrieve for each item in relation
* which are the number of - Related Dataset - Related Publication - Related Unknown
*/
public class SparkExtractRelationCount {
public static void main(String[] args) throws Exception {
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
IOUtils
.toString(
SparkExtractRelationCount.class
.getResourceAsStream(
"/eu/dnetlib/dhp/provision/input_related_entities_parameters.json")));
parser.parseArgument(args);
final SparkSession spark = SparkSession
.builder()
.appName(SparkExtractRelationCount.class.getSimpleName())
.master(parser.get("master"))
.getOrCreate();
final String workingDirPath = parser.get("workingDirPath");
final String relationPath = parser.get("relationPath");
DatasetJoiner.startJoin(spark, relationPath, workingDirPath + "/relatedItemCount");
}
}

View File

@ -1,12 +1,32 @@
package eu.dnetlib.dhp.provision package eu.dnetlib.dhp.provision
import org.apache.spark.sql.SparkSession import eu.dnetlib.dhp.application.ArgumentApplicationParser
import eu.dnetlib.dhp.schema.oaf.Relation
import org.apache.commons.io.IOUtils
import org.apache.spark.sql.{Encoder, Encoders, SaveMode, SparkSession}
import org.apache.spark.sql.functions.{coalesce, col, count, lit} import org.apache.spark.sql.functions.{coalesce, col, count, lit}
object DatasetJoiner {
def startJoin(spark: SparkSession, relPath:String, targetPath:String) { /**
val relation = spark.read.load(relPath) * SparkExtractRelationCount is a spark job that takes in input relation RDD and retrieve for each item in relation
* which are the number of - Related Dataset - Related Publication - Related Unknown
*/
object SparkExtractRelationCount {
def main(args: Array[String]): Unit = {
val parser = new ArgumentApplicationParser(IOUtils.toString(SparkExtractRelationCount.getClass.getResourceAsStream("/eu/dnetlib/dhp/provision/input_related_entities_parameters.json")))
parser.parseArgument(args)
val spark = SparkSession.builder.appName(SparkExtractRelationCount.getClass.getSimpleName).master(parser.get("master")).getOrCreate
val workingDirPath = parser.get("workingDirPath")
val relationPath = parser.get("relationPath")
implicit val relEncoder: Encoder[Relation] = Encoders.kryo[Relation]
val relation = spark.read.load(relationPath).as[Relation].map(r =>r)(Encoders.bean(classOf[Relation]))
val relatedPublication = relation val relatedPublication = relation
.where("target like '50%'") .where("target like '50%'")
@ -34,7 +54,7 @@ object DatasetJoiner {
coalesce(col("dataset"),lit(0)).alias("relatedDataset"), coalesce(col("dataset"),lit(0)).alias("relatedDataset"),
coalesce(col("unknown"),lit(0)).alias("relatedUnknown") coalesce(col("unknown"),lit(0)).alias("relatedUnknown")
) )
firstJoin.write.mode("overwrite").save(targetPath) firstJoin.write.mode(SaveMode.Overwrite).save(s"$workingDirPath/relatedItemCount")
} }
} }

View File

@ -1,109 +0,0 @@
package eu.dnetlib.dhp.provision;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.provision.scholix.*;
import eu.dnetlib.dhp.provision.scholix.summary.ScholixSummary;
import eu.dnetlib.dhp.schema.oaf.Relation;
import scala.Tuple2;
public class SparkGenerateScholix {
public static void main(String[] args) throws Exception {
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
IOUtils
.toString(
SparkGenerateScholix.class
.getResourceAsStream(
"/eu/dnetlib/dhp/provision/input_generate_summary_parameters.json")));
parser.parseArgument(args);
SparkConf conf = new SparkConf();
conf.set("spark.sql.shuffle.partitions", "4000");
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
final SparkSession spark = SparkSession
.builder()
.config(conf)
.appName(SparkExtractRelationCount.class.getSimpleName())
.master(parser.get("master"))
.getOrCreate();
conf
.registerKryoClasses(
new Class[] {
Scholix.class, ScholixCollectedFrom.class, ScholixEntityId.class,
ScholixIdentifier.class, ScholixRelationship.class, ScholixResource.class
});
final String graphPath = parser.get("graphPath");
final String workingDirPath = parser.get("workingDirPath");
final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
final Dataset<ScholixSummary> scholixSummary = spark
.read()
.load(workingDirPath + "/summary")
.as(Encoders.bean(ScholixSummary.class));
final Dataset<Relation> rels = spark.read().load(graphPath + "/relation").as(Encoders.bean(Relation.class));
Dataset<Scholix> firstJoin = scholixSummary
.joinWith(rels, scholixSummary.col("id").equalTo(rels.col("source")))
.map(
(MapFunction<Tuple2<ScholixSummary, Relation>, Scholix>) f -> Scholix
.generateScholixWithSource(f._1(), f._2()),
Encoders.bean(Scholix.class));
firstJoin.write().mode(SaveMode.Overwrite).save(workingDirPath + "/scholix_1");
Dataset<Scholix> scholix_final = spark
.read()
.load(workingDirPath + "/scholix_1")
.as(Encoders.bean(Scholix.class));
scholixSummary
.map(
(MapFunction<ScholixSummary, ScholixResource>) ScholixResource::fromSummary,
Encoders.bean(ScholixResource.class))
.repartition(1000)
.write()
.mode(SaveMode.Overwrite)
.save(workingDirPath + "/scholix_target");
Dataset<ScholixResource> target = spark
.read()
.load(workingDirPath + "/scholix_target")
.as(Encoders.bean(ScholixResource.class));
scholix_final
.joinWith(
target, scholix_final.col("identifier").equalTo(target.col("dnetIdentifier")), "inner")
.map(
(MapFunction<Tuple2<Scholix, ScholixResource>, Scholix>) f -> {
final Scholix scholix = f._1();
final ScholixResource scholixTarget = f._2();
scholix.setTarget(scholixTarget);
scholix.generateIdentifier();
scholix.generatelinkPublisher();
return scholix;
},
Encoders.kryo(Scholix.class))
.javaRDD()
.map(
s -> {
ObjectMapper mapper = new ObjectMapper();
return mapper.writeValueAsString(s);
})
.saveAsTextFile(workingDirPath + "/scholix_json", GzipCodec.class);
}
}

View File

@ -0,0 +1,62 @@
package eu.dnetlib.dhp.provision
import eu.dnetlib.dhp.application.ArgumentApplicationParser
import eu.dnetlib.dhp.provision.scholix.{Scholix, ScholixResource}
import eu.dnetlib.dhp.provision.scholix.summary.ScholixSummary
import eu.dnetlib.dhp.schema.oaf.Relation
import org.apache.commons.io.IOUtils
import org.apache.spark.SparkConf
import org.apache.spark.sql.{Dataset, Encoder, Encoders, SaveMode, SparkSession}
object SparkGenerateScholixIndex {
def main(args: Array[String]): Unit = {
val parser = new ArgumentApplicationParser(IOUtils.toString(SparkGenerateScholixIndex.getClass.getResourceAsStream("/eu/dnetlib/dhp/provision/input_generate_summary_parameters.json")))
parser.parseArgument(args)
val conf = new SparkConf
conf.set("spark.sql.shuffle.partitions", "4000")
val spark = SparkSession.builder.config(conf).appName(SparkGenerateScholixIndex.getClass.getSimpleName).master(parser.get("master")).getOrCreate
val graphPath = parser.get("graphPath")
val workingDirPath = parser.get("workingDirPath")
implicit val summaryEncoder:Encoder[ScholixSummary] = Encoders.kryo[ScholixSummary]
implicit val relEncoder:Encoder[Relation] = Encoders.kryo[Relation]
implicit val scholixEncoder:Encoder[Scholix] = Encoders.kryo[Scholix]
implicit val tupleScholix:Encoder[(String,Scholix)]=Encoders.tuple(Encoders.STRING, scholixEncoder)
val scholixSummary:Dataset[(String,ScholixSummary)] = spark.read.load(s"$workingDirPath/summary").as[ScholixSummary]
.map(s => (s.getId, s))(Encoders.tuple(Encoders.STRING, summaryEncoder))
val sourceRelations:Dataset[(String,Relation)]= spark.read.load(s"$graphPath/relation").as[Relation]
.map(r => (r.getSource,r))(Encoders.tuple(Encoders.STRING, relEncoder))
scholixSummary.joinWith(sourceRelations, scholixSummary("_1").equalTo(sourceRelations("_1")), "inner")
.map(r=> {
val summary = r._1._2
val relation = r._2._2
(relation.getTarget, Scholix.generateScholixWithSource(summary,relation))
}).write.mode(SaveMode.Overwrite).save(s"$workingDirPath/scholix_source")
val sTarget:Dataset[(String,Scholix)] = spark.read.load(s"$workingDirPath/scholix_source").as[(String, Scholix)]
sTarget.joinWith(scholixSummary, sTarget("_1").equalTo(scholixSummary("_1")), "inner").map(i => {
val summary = i._2._2
val scholix = i._1._2
val scholixResource = ScholixResource.fromSummary(summary)
scholix.setTarget(scholixResource)
scholix.generateIdentifier()
scholix.generatelinkPublisher()
scholix
}).write.mode(SaveMode.Overwrite).save(s"$workingDirPath/scholix")
}
}

View File

@ -1,106 +0,0 @@
package eu.dnetlib.dhp.provision;
import org.apache.commons.io.IOUtils;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SparkSession;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.provision.scholix.summary.ScholixSummary;
import eu.dnetlib.dhp.utils.DHPUtils;
import scala.Tuple2;
public class SparkGenerateSummary {
private static final String jsonIDPath = "$.id";
public static void main(String[] args) throws Exception {
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
IOUtils
.toString(
SparkGenerateSummary.class
.getResourceAsStream(
"/eu/dnetlib/dhp/provision/input_generate_summary_parameters.json")));
parser.parseArgument(args);
final SparkSession spark = SparkSession
.builder()
.appName(SparkExtractRelationCount.class.getSimpleName())
.master(parser.get("master"))
.getOrCreate();
final String graphPath = parser.get("graphPath");
final String workingDirPath = parser.get("workingDirPath");
final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
Dataset<RelatedItemInfo> rInfo = spark
.read()
.load(workingDirPath + "/relatedItemCount")
.as(Encoders.bean(RelatedItemInfo.class));
Dataset<ScholixSummary> entity = spark
.createDataset(
sc
.textFile(
graphPath + "/publication," + graphPath + "/dataset," + graphPath + "/unknown")
.map(
s -> ScholixSummary
.fromJsonOAF(
ProvisionUtil.getItemTypeFromId(DHPUtils.getJPathString(jsonIDPath, s)),
s))
.rdd(),
Encoders.bean(ScholixSummary.class));
Dataset<ScholixSummary> summaryComplete = rInfo
.joinWith(entity, rInfo.col("source").equalTo(entity.col("id")))
.map(
(MapFunction<Tuple2<RelatedItemInfo, ScholixSummary>, ScholixSummary>) t -> {
ScholixSummary scholixSummary = t._2();
RelatedItemInfo relatedItemInfo = t._1();
scholixSummary.setRelatedDatasets(relatedItemInfo.getRelatedDataset());
scholixSummary
.setRelatedPublications(
relatedItemInfo.getRelatedPublication());
scholixSummary.setRelatedUnknown(relatedItemInfo.getRelatedUnknown());
return scholixSummary;
},
Encoders.bean(ScholixSummary.class));
summaryComplete.write().save(workingDirPath + "/summary");
// JavaPairRDD<String, String> relationCount =
// sc.textFile(workingDirPath+"/relatedItemCount").mapToPair((PairFunction<String, String,
// String>) i -> new Tuple2<>(DHPUtils.getJPathString(jsonIDPath, i), i));
//
// JavaPairRDD<String, String> entities =
// sc.textFile(graphPath + "/publication")
// .filter(ProvisionUtil::isNotDeleted)
// .mapToPair((PairFunction<String, String, String>) i -> new
// Tuple2<>(DHPUtils.getJPathString(jsonIDPath, i), i))
// .union(
// sc.textFile(graphPath + "/dataset")
// .filter(ProvisionUtil::isNotDeleted)
// .mapToPair((PairFunction<String, String, String>)
// i ->
// new Tuple2<>(DHPUtils.getJPathString(jsonIDPath, i), i))
// )
// .union(
// sc.textFile(graphPath + "/unknown")
// .filter(ProvisionUtil::isNotDeleted)
// .mapToPair((PairFunction<String, String, String>)
// i ->
// new Tuple2<>(DHPUtils.getJPathString(jsonIDPath, i), i))
// );
// entities.join(relationCount).map((Function<Tuple2<String, Tuple2<String, String>>,
// String>) k ->
// ScholixSummary.fromJsonOAF(ProvisionUtil.getItemTypeFromId(k._1()),
// k._2()._1(), k._2()._2())).saveAsTextFile(workingDirPath+"/summary", GzipCodec.class);
//
//
// ;
}
}

View File

@ -0,0 +1,70 @@
package eu.dnetlib.dhp.provision
import eu.dnetlib.dhp.application.ArgumentApplicationParser
import eu.dnetlib.dhp.provision.scholix.summary.ScholixSummary
import eu.dnetlib.dhp.schema.oaf.{Oaf, OafEntity, Relation}
import eu.dnetlib.dhp.schema.scholexplorer.{DLIDataset, DLIPublication, DLIUnknown}
import org.apache.commons.io.IOUtils
import org.apache.spark.sql.{Dataset, Encoder, Encoders, SaveMode, SparkSession}
object SparkGenerateSummaryIndex {
def main(args: Array[String]): Unit = {
val parser = new ArgumentApplicationParser(IOUtils.toString(SparkGenerateSummaryIndex.getClass.getResourceAsStream("/eu/dnetlib/dhp/provision/input_generate_summary_parameters.json")))
parser.parseArgument(args)
val spark = SparkSession.builder.appName(SparkGenerateSummaryIndex.getClass.getSimpleName).master(parser.get("master")).getOrCreate
val graphPath = parser.get("graphPath")
val workingDirPath = parser.get("workingDirPath")
implicit val relatedItemInfoEncoders: Encoder[RelatedItemInfo] = Encoders.bean(classOf[RelatedItemInfo])
implicit val datasetEncoder:Encoder[DLIDataset] = Encoders.kryo[DLIDataset]
implicit val publicationEncoder:Encoder[DLIPublication] = Encoders.kryo[DLIPublication]
implicit val relationEncoder:Encoder[Relation] = Encoders.kryo[Relation]
implicit val oafEncoder: Encoder[Oaf] = Encoders.kryo[Oaf]
implicit val oafWithIdEncoder: Encoder[(String, Oaf)] = Encoders.tuple(Encoders.STRING, oafEncoder)
implicit val scholixSummaryEncoder: Encoder[ScholixSummary] = Encoders.kryo[ScholixSummary]
implicit val scholixSummaryEncoderTuple: Encoder[(String,ScholixSummary)] = Encoders.tuple(Encoders.STRING,scholixSummaryEncoder)
val pubs = spark.read.load(s"$graphPath/publication").as[Oaf].map(o => (o.asInstanceOf[DLIPublication].getId, o))
val dats = spark.read.load(s"$graphPath/dataset").as[Oaf].map(o => (o.asInstanceOf[DLIDataset].getId, o))
val ukn = spark.read.load(s"$graphPath/unknown").as[Oaf].map(o => (o.asInstanceOf[DLIUnknown].getId, o))
val summary:Dataset[(String,ScholixSummary)] = pubs.union(dats).union(ukn).map(o =>{
val s = ScholixSummary.fromOAF(o._2)
(s.getId,s)
})
val relatedItemInfoDs:Dataset[RelatedItemInfo] = spark.read.load(s"$workingDirPath/relatedItemCount").as[RelatedItemInfo]
summary.joinWith(relatedItemInfoDs, summary("_1").equalTo(relatedItemInfoDs("source")), "inner")
.map(i => {
val summary = i._1._2
val relatedItemInfo = i._2
summary.setRelatedDatasets(relatedItemInfo.getRelatedDataset)
summary.setRelatedPublications(relatedItemInfo.getRelatedPublication)
summary.setRelatedUnknown(relatedItemInfo.getRelatedUnknown)
summary
}).filter(s => s.getLocalIdentifier != null).write.mode(SaveMode.Overwrite).save(s"$workingDirPath/summary")
}
}

View File

@ -12,6 +12,8 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.provision.RelatedItemInfo; import eu.dnetlib.dhp.provision.RelatedItemInfo;
import eu.dnetlib.dhp.schema.oaf.Author; import eu.dnetlib.dhp.schema.oaf.Author;
import eu.dnetlib.dhp.schema.oaf.Oaf;
import eu.dnetlib.dhp.schema.oaf.OafEntity;
import eu.dnetlib.dhp.schema.oaf.StructuredProperty; import eu.dnetlib.dhp.schema.oaf.StructuredProperty;
import eu.dnetlib.dhp.schema.scholexplorer.DLIDataset; import eu.dnetlib.dhp.schema.scholexplorer.DLIDataset;
import eu.dnetlib.dhp.schema.scholexplorer.DLIPublication; import eu.dnetlib.dhp.schema.scholexplorer.DLIPublication;
@ -138,54 +140,20 @@ public class ScholixSummary implements Serializable {
this.datasources = datasources; this.datasources = datasources;
} }
public static ScholixSummary fromJsonOAF(final Typology oafType, final String oafJson) { public static ScholixSummary fromOAF(final Oaf oaf) {
try { try {
final ObjectMapper mapper = new ObjectMapper();
final RelatedItemInfo relatedItemInfo = new RelatedItemInfo(); final RelatedItemInfo relatedItemInfo = new RelatedItemInfo();
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
switch (oafType) {
case dataset:
return summaryFromDataset(mapper.readValue(oafJson, DLIDataset.class), relatedItemInfo);
case publication:
return summaryFromPublication(
mapper.readValue(oafJson, DLIPublication.class), relatedItemInfo);
case unknown:
return summaryFromUnknown(mapper.readValue(oafJson, DLIUnknown.class), relatedItemInfo);
}
} catch (Throwable e) {
throw new RuntimeException(e);
}
return null;
}
public static String fromJsonOAF( if (oaf instanceof DLIPublication)
final Typology oafType, final String oafJson, final String relEntityJson) { return summaryFromPublication((DLIPublication) oaf, relatedItemInfo);
try { if (oaf instanceof DLIDataset)
final ObjectMapper mapper = new ObjectMapper(); return summaryFromDataset((DLIDataset) oaf, relatedItemInfo);
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); if (oaf instanceof DLIUnknown)
return summaryFromUnknown((DLIUnknown) oaf, relatedItemInfo);
RelatedItemInfo relatedItemInfo = mapper.readValue(relEntityJson, RelatedItemInfo.class);
switch (oafType) {
case dataset:
return mapper
.writeValueAsString(
summaryFromDataset(mapper.readValue(oafJson, DLIDataset.class), relatedItemInfo));
case publication:
return mapper
.writeValueAsString(
summaryFromPublication(
mapper.readValue(oafJson, DLIPublication.class), relatedItemInfo));
case unknown:
return mapper
.writeValueAsString(
summaryFromUnknown(mapper.readValue(oafJson, DLIUnknown.class), relatedItemInfo));
}
} catch (Throwable e) { } catch (Throwable e) {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
return null; return null;
} }

View File

@ -0,0 +1,4 @@
{
"cluster1": "10.19.65.51, 10.19.65.52, 10.19.65.53, 10.19.65.54",
"cluster2": "10.19.65.55, 10.19.65.56, 10.19.65.57, 10.19.65.58"
}

View File

@ -16,18 +16,6 @@
<name>sparkExecutorMemory</name> <name>sparkExecutorMemory</name>
<description>memory for individual executor</description> <description>memory for individual executor</description>
</property> </property>
<!-- <property>-->
<!-- <name>index</name>-->
<!-- <description>index name</description>-->
<!-- </property>-->
<!-- <property>-->
<!-- <name>idScholix</name>-->
<!-- <description>the identifier name of the scholix </description>-->
<!-- </property>-->
<!-- <property>-->
<!-- <name>idSummary</name>-->
<!-- <description>the identifier name of the summary</description>-->
<!-- </property>-->
</parameters> </parameters>
<start to="DeleteTargetPath"/> <start to="DeleteTargetPath"/>
@ -53,7 +41,7 @@
<name>calculate for each ID the number of related Dataset, publication and Unknown</name> <name>calculate for each ID the number of related Dataset, publication and Unknown</name>
<class>eu.dnetlib.dhp.provision.SparkExtractRelationCount</class> <class>eu.dnetlib.dhp.provision.SparkExtractRelationCount</class>
<jar>dhp-graph-provision-scholexplorer-${projectVersion}.jar</jar> <jar>dhp-graph-provision-scholexplorer-${projectVersion}.jar</jar>
<spark-opts>--executor-memory ${sparkExecutorMemory} --driver-memory=${sparkDriverMemory} ${sparkExtraOPT}</spark-opts> <spark-opts>--executor-memory ${sparkExecutorMemory} --executor-cores=${sparkExecutorCores} --driver-memory=${sparkDriverMemory} ${sparkExtraOPT}</spark-opts>
<arg>-mt</arg> <arg>yarn-cluster</arg> <arg>-mt</arg> <arg>yarn-cluster</arg>
<arg>--workingDirPath</arg><arg>${workingDirPath}</arg> <arg>--workingDirPath</arg><arg>${workingDirPath}</arg>
<arg>--relationPath</arg><arg>${graphPath}/relation</arg> <arg>--relationPath</arg><arg>${graphPath}/relation</arg>
@ -69,9 +57,9 @@
<master>yarn-cluster</master> <master>yarn-cluster</master>
<mode>cluster</mode> <mode>cluster</mode>
<name>generate Summary</name> <name>generate Summary</name>
<class>eu.dnetlib.dhp.provision.SparkGenerateSummary</class> <class>eu.dnetlib.dhp.provision.SparkGenerateSummaryIndex</class>
<jar>dhp-graph-provision-scholexplorer-${projectVersion}.jar</jar> <jar>dhp-graph-provision-scholexplorer-${projectVersion}.jar</jar>
<spark-opts>--executor-memory ${sparkExecutorMemory} --driver-memory=${sparkDriverMemory} ${sparkExtraOPT}</spark-opts> <spark-opts>--executor-memory ${sparkExecutorMemory} --executor-cores=${sparkExecutorCores} --driver-memory=${sparkDriverMemory} --conf spark.sql.shuffle.partitions=4000 ${sparkExtraOPT}</spark-opts>
<arg>-mt</arg> <arg>yarn-cluster</arg> <arg>-mt</arg> <arg>yarn-cluster</arg>
<arg>--workingDirPath</arg><arg>${workingDirPath}</arg> <arg>--workingDirPath</arg><arg>${workingDirPath}</arg>
<arg>--graphPath</arg><arg>${graphPath}</arg> <arg>--graphPath</arg><arg>${graphPath}</arg>
@ -87,9 +75,9 @@
<master>yarn-cluster</master> <master>yarn-cluster</master>
<mode>cluster</mode> <mode>cluster</mode>
<name>generate Scholix</name> <name>generate Scholix</name>
<class>eu.dnetlib.dhp.provision.SparkGenerateScholix</class> <class>eu.dnetlib.dhp.provision.SparkGenerateScholixIndex</class>
<jar>dhp-graph-provision-scholexplorer-${projectVersion}.jar</jar> <jar>dhp-graph-provision-scholexplorer-${projectVersion}.jar</jar>
<spark-opts>--executor-memory 6G --driver-memory=${sparkDriverMemory} ${sparkExtraOPT}</spark-opts> <spark-opts>--executor-memory ${sparkExecutorMemory} --executor-cores=${sparkExecutorCores} --driver-memory=${sparkDriverMemory} --conf spark.sql.shuffle.partitions=4000 ${sparkExtraOPT}</spark-opts>
<arg>-mt</arg> <arg>yarn-cluster</arg> <arg>-mt</arg> <arg>yarn-cluster</arg>
<arg>--workingDirPath</arg><arg>${workingDirPath}</arg> <arg>--workingDirPath</arg><arg>${workingDirPath}</arg>
<arg>--graphPath</arg><arg>${graphPath}</arg> <arg>--graphPath</arg><arg>${graphPath}</arg>
@ -98,45 +86,5 @@
<error to="Kill"/> <error to="Kill"/>
</action> </action>
<!-- <action name="indexSummary">-->
<!-- <spark xmlns="uri:oozie:spark-action:0.2">-->
<!-- <job-tracker>${jobTracker}</job-tracker>-->
<!-- <name-node>${nameNode}</name-node>-->
<!-- <master>yarn-cluster</master>-->
<!-- <mode>cluster</mode>-->
<!-- <name>index Summary</name>-->
<!-- <class>eu.dnetlib.dhp.provision.SparkIndexCollectionOnES</class>-->
<!-- <jar>dhp-graph-provision-scholexplorer-${projectVersion}.jar</jar>-->
<!-- <spark-opts>&#45;&#45;executor-memory ${sparkExecutorMemory} &#45;&#45;driver-memory=${sparkDriverMemory} ${sparkExtraOPT} &#45;&#45;conf spark.dynamicAllocation.maxExecutors="32" </spark-opts>-->
<!-- <arg>-mt</arg> <arg>yarn-cluster</arg>-->
<!-- <arg>&#45;&#45;sourcePath</arg><arg>${workingDirPath}/summary</arg>-->
<!-- <arg>&#45;&#45;index</arg><arg>${index}_object</arg>-->
<!-- <arg>&#45;&#45;idPath</arg><arg>id</arg>-->
<!-- <arg>&#45;&#45;type</arg><arg>summary</arg>-->
<!-- </spark>-->
<!-- <ok to="indexScholix"/>-->
<!-- <error to="Kill"/>-->
<!-- </action>-->
<!-- <action name="indexScholix">-->
<!-- <spark xmlns="uri:oozie:spark-action:0.2">-->
<!-- <job-tracker>${jobTracker}</job-tracker>-->
<!-- <name-node>${nameNode}</name-node>-->
<!-- <master>yarn-cluster</master>-->
<!-- <mode>cluster</mode>-->
<!-- <name>index scholix</name>-->
<!-- <class>eu.dnetlib.dhp.provision.SparkIndexCollectionOnES</class>-->
<!-- <jar>dhp-graph-provision-scholexplorer-${projectVersion}.jar</jar>-->
<!-- <spark-opts>&#45;&#45;executor-memory ${sparkExecutorMemory} &#45;&#45;driver-memory=${sparkDriverMemory} ${sparkExtraOPT} &#45;&#45;conf spark.dynamicAllocation.maxExecutors="8" </spark-opts>-->
<!-- <arg>-mt</arg> <arg>yarn-cluster</arg>-->
<!-- <arg>&#45;&#45;sourcePath</arg><arg>${workingDirPath}/scholix_json</arg>-->
<!-- <arg>&#45;&#45;index</arg><arg>${index}_scholix</arg>-->
<!-- <arg>&#45;&#45;idPath</arg><arg>identifier</arg>-->
<!-- <arg>&#45;&#45;type</arg><arg>scholix</arg>-->
<!-- </spark>-->
<!-- <ok to="End"/>-->
<!-- <error to="Kill"/>-->
<!-- </action>-->
<end name="End"/> <end name="End"/>
</workflow-app> </workflow-app>