forked from D-Net/dnet-hadoop
fixed spark memory issue in SparkSplitOafTODLIEntities
This commit is contained in:
parent
3a81a940b7
commit
1d9fdb7367
|
@ -4,7 +4,6 @@ import eu.dnetlib.dhp.application.ArgumentApplicationParser
|
||||||
import eu.dnetlib.dhp.schema.oaf.{Oaf, Relation}
|
import eu.dnetlib.dhp.schema.oaf.{Oaf, Relation}
|
||||||
import eu.dnetlib.dhp.schema.scholexplorer.{DLIDataset, DLIPublication, DLIUnknown}
|
import eu.dnetlib.dhp.schema.scholexplorer.{DLIDataset, DLIPublication, DLIUnknown}
|
||||||
import eu.dnetlib.dhp.sx.ebi.EBIAggregator
|
import eu.dnetlib.dhp.sx.ebi.EBIAggregator
|
||||||
import eu.dnetlib.dhp.sx.ebi.model.{PMArticle, PMAuthor, PMJournal}
|
|
||||||
import org.apache.commons.io.IOUtils
|
import org.apache.commons.io.IOUtils
|
||||||
import org.apache.spark.sql.{Dataset, Encoder, Encoders, SaveMode, SparkSession}
|
import org.apache.spark.sql.{Dataset, Encoder, Encoders, SaveMode, SparkSession}
|
||||||
import org.slf4j.LoggerFactory
|
import org.slf4j.LoggerFactory
|
||||||
|
@ -18,38 +17,38 @@ object SparkSplitOafTODLIEntities {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
def main(args: Array[String]): Unit = {
|
|
||||||
val parser = new ArgumentApplicationParser(IOUtils.toString(SparkSplitOafTODLIEntities.getClass.getResourceAsStream("/eu/dnetlib/dhp/sx/graph/argumentparser/input_extract_entities_parameters.json")))
|
|
||||||
val logger = LoggerFactory.getLogger(SparkSplitOafTODLIEntities.getClass)
|
|
||||||
parser.parseArgument(args)
|
|
||||||
|
|
||||||
val workingPath: String = parser.get("workingPath")
|
def extract_dataset(spark:SparkSession, workingPath:String) :Unit = {
|
||||||
logger.info(s"Working dir path = $workingPath")
|
|
||||||
|
implicit val oafEncoder: Encoder[Oaf] = Encoders.kryo[Oaf]
|
||||||
|
implicit val datEncoder: Encoder[DLIDataset] = Encoders.kryo[DLIDataset]
|
||||||
|
|
||||||
|
val OAFDataset:Dataset[Oaf] = spark.read.load(s"$workingPath/input/OAFDataset").as[Oaf].repartition(4000)
|
||||||
|
|
||||||
|
val ebi_dataset:Dataset[DLIDataset] = spark.read.load(s"$workingPath/ebi/baseline_dataset_ebi").as[DLIDataset].repartition(1000)
|
||||||
|
|
||||||
|
|
||||||
|
OAFDataset
|
||||||
|
.filter(s => s != null && s.isInstanceOf[DLIDataset])
|
||||||
|
.map(s =>s.asInstanceOf[DLIDataset])
|
||||||
|
.union(ebi_dataset)
|
||||||
|
.map(d => (d.getId, d))(Encoders.tuple(Encoders.STRING, datEncoder))
|
||||||
|
.groupByKey(_._1)(Encoders.STRING)
|
||||||
|
.agg(EBIAggregator.getDLIDatasetAggregator().toColumn)
|
||||||
|
.map(p => p._2)
|
||||||
|
.repartition(2000)
|
||||||
|
.write.mode(SaveMode.Overwrite).save(s"$workingPath/graph/dataset")
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
def extract_publication(spark:SparkSession, workingPath:String) :Unit = {
|
||||||
|
|
||||||
implicit val oafEncoder: Encoder[Oaf] = Encoders.kryo[Oaf]
|
implicit val oafEncoder: Encoder[Oaf] = Encoders.kryo[Oaf]
|
||||||
implicit val pubEncoder: Encoder[DLIPublication] = Encoders.kryo[DLIPublication]
|
implicit val pubEncoder: Encoder[DLIPublication] = Encoders.kryo[DLIPublication]
|
||||||
implicit val datEncoder: Encoder[DLIDataset] = Encoders.kryo[DLIDataset]
|
|
||||||
implicit val unkEncoder: Encoder[DLIUnknown] = Encoders.kryo[DLIUnknown]
|
|
||||||
implicit val relEncoder: Encoder[Relation] = Encoders.kryo[Relation]
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
val spark:SparkSession = SparkSession
|
|
||||||
.builder()
|
|
||||||
.appName(SparkSplitOafTODLIEntities.getClass.getSimpleName)
|
|
||||||
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
|
|
||||||
.master(parser.get("master"))
|
|
||||||
.getOrCreate()
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
val OAFDataset:Dataset[Oaf] = spark.read.load(s"$workingPath/input/OAFDataset").as[Oaf]
|
val OAFDataset:Dataset[Oaf] = spark.read.load(s"$workingPath/input/OAFDataset").as[Oaf]
|
||||||
|
|
||||||
val ebi_dataset:Dataset[DLIDataset] = spark.read.load(s"$workingPath/ebi/baseline_dataset_ebi").as[DLIDataset]
|
val ebi_publication:Dataset[DLIPublication] = spark.read.load(s"$workingPath/ebi/baseline_publication_ebi").as[DLIPublication].repartition(1000)
|
||||||
val ebi_publication:Dataset[DLIPublication] = spark.read.load(s"$workingPath/ebi/baseline_publication_ebi").as[DLIPublication]
|
|
||||||
val ebi_relation:Dataset[Relation] = spark.read.load(s"$workingPath/ebi/baseline_relation_ebi").as[Relation]
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
OAFDataset
|
OAFDataset
|
||||||
|
@ -60,20 +59,17 @@ object SparkSplitOafTODLIEntities {
|
||||||
.groupByKey(_._1)(Encoders.STRING)
|
.groupByKey(_._1)(Encoders.STRING)
|
||||||
.agg(EBIAggregator.getDLIPublicationAggregator().toColumn)
|
.agg(EBIAggregator.getDLIPublicationAggregator().toColumn)
|
||||||
.map(p => p._2)
|
.map(p => p._2)
|
||||||
.repartition(1000)
|
.repartition(2000)
|
||||||
.write.mode(SaveMode.Overwrite).save(s"$workingPath/graph/publication")
|
.write.mode(SaveMode.Overwrite).save(s"$workingPath/graph/publication")
|
||||||
|
|
||||||
OAFDataset
|
}
|
||||||
.filter(s => s != null && s.isInstanceOf[DLIDataset])
|
|
||||||
.map(s =>s.asInstanceOf[DLIDataset])
|
|
||||||
.union(ebi_dataset)
|
|
||||||
.map(d => (d.getId, d))(Encoders.tuple(Encoders.STRING, datEncoder))
|
|
||||||
.groupByKey(_._1)(Encoders.STRING)
|
|
||||||
.agg(EBIAggregator.getDLIDatasetAggregator().toColumn)
|
|
||||||
.map(p => p._2)
|
|
||||||
.repartition(1000)
|
|
||||||
.write.mode(SaveMode.Overwrite).save(s"$workingPath/graph/dataset")
|
|
||||||
|
|
||||||
|
def extract_unknown(spark:SparkSession, workingPath:String) :Unit = {
|
||||||
|
|
||||||
|
implicit val oafEncoder: Encoder[Oaf] = Encoders.kryo[Oaf]
|
||||||
|
implicit val unkEncoder: Encoder[DLIUnknown] = Encoders.kryo[DLIUnknown]
|
||||||
|
|
||||||
|
val OAFDataset:Dataset[Oaf] = spark.read.load(s"$workingPath/input/OAFDataset").as[Oaf]
|
||||||
|
|
||||||
OAFDataset
|
OAFDataset
|
||||||
.filter(s => s != null && s.isInstanceOf[DLIUnknown])
|
.filter(s => s != null && s.isInstanceOf[DLIUnknown])
|
||||||
|
@ -82,9 +78,18 @@ object SparkSplitOafTODLIEntities {
|
||||||
.groupByKey(_._1)(Encoders.STRING)
|
.groupByKey(_._1)(Encoders.STRING)
|
||||||
.agg(EBIAggregator.getDLIUnknownAggregator().toColumn)
|
.agg(EBIAggregator.getDLIUnknownAggregator().toColumn)
|
||||||
.map(p => p._2)
|
.map(p => p._2)
|
||||||
.repartition(1000)
|
|
||||||
.write.mode(SaveMode.Overwrite).save(s"$workingPath/graph/unknown")
|
.write.mode(SaveMode.Overwrite).save(s"$workingPath/graph/unknown")
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def extract_relations(spark:SparkSession, workingPath:String) :Unit = {
|
||||||
|
|
||||||
|
implicit val oafEncoder: Encoder[Oaf] = Encoders.kryo[Oaf]
|
||||||
|
implicit val relEncoder: Encoder[Relation] = Encoders.kryo[Relation]
|
||||||
|
|
||||||
|
val OAFDataset:Dataset[Oaf] = spark.read.load(s"$workingPath/input/OAFDataset").as[Oaf]
|
||||||
|
val ebi_relation:Dataset[Relation] = spark.read.load(s"$workingPath/ebi/baseline_relation_ebi").as[Relation].repartition(2000)
|
||||||
|
|
||||||
OAFDataset
|
OAFDataset
|
||||||
.filter(s => s != null && s.isInstanceOf[Relation])
|
.filter(s => s != null && s.isInstanceOf[Relation])
|
||||||
|
@ -94,9 +99,35 @@ object SparkSplitOafTODLIEntities {
|
||||||
.groupByKey(_._1)(Encoders.STRING)
|
.groupByKey(_._1)(Encoders.STRING)
|
||||||
.agg(EBIAggregator.getRelationAggregator().toColumn)
|
.agg(EBIAggregator.getRelationAggregator().toColumn)
|
||||||
.map(p => p._2)
|
.map(p => p._2)
|
||||||
.repartition(1000)
|
.repartition(4000)
|
||||||
.write.mode(SaveMode.Overwrite).save(s"$workingPath/graph/relation")
|
.write.mode(SaveMode.Overwrite).save(s"$workingPath/graph/relation")
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def main(args: Array[String]): Unit = {
|
||||||
|
val parser = new ArgumentApplicationParser(IOUtils.toString(SparkSplitOafTODLIEntities.getClass.getResourceAsStream("/eu/dnetlib/dhp/sx/graph/argumentparser/input_extract_entities_parameters.json")))
|
||||||
|
val logger = LoggerFactory.getLogger(SparkSplitOafTODLIEntities.getClass)
|
||||||
|
parser.parseArgument(args)
|
||||||
|
|
||||||
|
val workingPath: String = parser.get("workingPath")
|
||||||
|
val entity:String = parser.get("entity")
|
||||||
|
logger.info(s"Working dir path = $workingPath")
|
||||||
|
|
||||||
|
val spark:SparkSession = SparkSession
|
||||||
|
.builder()
|
||||||
|
.appName(SparkSplitOafTODLIEntities.getClass.getSimpleName)
|
||||||
|
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
|
||||||
|
.master(parser.get("master"))
|
||||||
|
.getOrCreate()
|
||||||
|
|
||||||
|
|
||||||
|
entity match {
|
||||||
|
case "publication" => extract_publication(spark, workingPath)
|
||||||
|
case "dataset" => extract_dataset(spark,workingPath)
|
||||||
|
case "relation" => extract_relations(spark, workingPath)
|
||||||
|
case "unknown" => extract_unknown(spark, workingPath)
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -1,4 +1,5 @@
|
||||||
[
|
[
|
||||||
{"paramName":"mt", "paramLongName":"master", "paramDescription": "should be local or yarn", "paramRequired": true},
|
{"paramName":"mt", "paramLongName":"master", "paramDescription": "should be local or yarn", "paramRequired": true},
|
||||||
{"paramName":"w", "paramLongName":"workingPath", "paramDescription": "the work dir path", "paramRequired": true}
|
{"paramName":"w", "paramLongName":"workingPath", "paramDescription": "the work dir path", "paramRequired": true},
|
||||||
|
{"paramName":"e", "paramLongName":"entity", "paramDescription": "the work dir path", "paramRequired": true}
|
||||||
]
|
]
|
|
@ -14,30 +14,103 @@
|
||||||
</property>
|
</property>
|
||||||
</parameters>
|
</parameters>
|
||||||
|
|
||||||
<start to="ExtractDLIEntities"/>
|
<start to="ExtractDLIPublication"/>
|
||||||
|
|
||||||
<kill name="Kill">
|
<kill name="Kill">
|
||||||
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
|
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
|
||||||
</kill>
|
</kill>
|
||||||
|
|
||||||
<action name="ExtractDLIEntities">
|
<action name="ExtractDLIPublication">
|
||||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||||
<job-tracker>${jobTracker}</job-tracker>
|
<job-tracker>${jobTracker}</job-tracker>
|
||||||
<name-node>${nameNode}</name-node>
|
<name-node>${nameNode}</name-node>
|
||||||
<master>yarn-cluster</master>
|
<master>yarn-cluster</master>
|
||||||
<mode>cluster</mode>
|
<mode>cluster</mode>
|
||||||
<name>Extract DLI Entities</name>
|
<name>Extract DLI Entities (Publication)</name>
|
||||||
<class>eu.dnetlib.dhp.sx.graph.SparkSplitOafTODLIEntities</class>
|
<class>eu.dnetlib.dhp.sx.graph.SparkSplitOafTODLIEntities</class>
|
||||||
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
|
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
|
||||||
<spark-opts>
|
<spark-opts>
|
||||||
--executor-memory ${sparkExecutorMemory}
|
--executor-memory ${sparkExecutorMemory}
|
||||||
--executor-cores=${sparkExecutorCores}
|
--executor-cores=${sparkExecutorCores}
|
||||||
--driver-memory=${sparkDriverMemory}
|
--driver-memory=${sparkDriverMemory}
|
||||||
--conf spark.sql.shuffle.partitions=3840
|
--conf spark.sql.shuffle.partitions=5000
|
||||||
${sparkExtraOPT}
|
${sparkExtraOPT}
|
||||||
</spark-opts>
|
</spark-opts>
|
||||||
<arg>-mt</arg> <arg>yarn-cluster</arg>
|
<arg>-mt</arg> <arg>yarn-cluster</arg>
|
||||||
<arg>--workingPath</arg><arg>${workingPath}</arg>
|
<arg>--workingPath</arg><arg>${workingPath}</arg>
|
||||||
|
<arg>-e</arg><arg>publication</arg>
|
||||||
|
</spark>
|
||||||
|
<ok to="ExtractDLIDataset"/>
|
||||||
|
<error to="Kill"/>
|
||||||
|
</action>
|
||||||
|
|
||||||
|
<action name="ExtractDLIDataset">
|
||||||
|
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||||
|
<job-tracker>${jobTracker}</job-tracker>
|
||||||
|
<name-node>${nameNode}</name-node>
|
||||||
|
<master>yarn-cluster</master>
|
||||||
|
<mode>cluster</mode>
|
||||||
|
<name>Extract DLI Entities (Dataset)</name>
|
||||||
|
<class>eu.dnetlib.dhp.sx.graph.SparkSplitOafTODLIEntities</class>
|
||||||
|
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
|
||||||
|
<spark-opts>
|
||||||
|
--executor-memory ${sparkExecutorMemory}
|
||||||
|
--executor-cores=${sparkExecutorCores}
|
||||||
|
--driver-memory=${sparkDriverMemory}
|
||||||
|
--conf spark.sql.shuffle.partitions=5000
|
||||||
|
${sparkExtraOPT}
|
||||||
|
</spark-opts>
|
||||||
|
<arg>-mt</arg> <arg>yarn-cluster</arg>
|
||||||
|
<arg>--workingPath</arg><arg>${workingPath}</arg>
|
||||||
|
<arg>-e</arg><arg>dataset</arg>
|
||||||
|
</spark>
|
||||||
|
<ok to="ExtractDLIUnknown"/>
|
||||||
|
<error to="Kill"/>
|
||||||
|
</action>
|
||||||
|
|
||||||
|
<action name="ExtractDLIUnknown">
|
||||||
|
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||||
|
<job-tracker>${jobTracker}</job-tracker>
|
||||||
|
<name-node>${nameNode}</name-node>
|
||||||
|
<master>yarn-cluster</master>
|
||||||
|
<mode>cluster</mode>
|
||||||
|
<name>Extract DLI Entities (Unknown)</name>
|
||||||
|
<class>eu.dnetlib.dhp.sx.graph.SparkSplitOafTODLIEntities</class>
|
||||||
|
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
|
||||||
|
<spark-opts>
|
||||||
|
--executor-memory ${sparkExecutorMemory}
|
||||||
|
--executor-cores=${sparkExecutorCores}
|
||||||
|
--driver-memory=${sparkDriverMemory}
|
||||||
|
--conf spark.sql.shuffle.partitions=5000
|
||||||
|
${sparkExtraOPT}
|
||||||
|
</spark-opts>
|
||||||
|
<arg>-mt</arg> <arg>yarn-cluster</arg>
|
||||||
|
<arg>--workingPath</arg><arg>${workingPath}</arg>
|
||||||
|
<arg>-e</arg><arg>unknown</arg>
|
||||||
|
</spark>
|
||||||
|
<ok to="ExtractDLIRelation"/>
|
||||||
|
<error to="Kill"/>
|
||||||
|
</action>
|
||||||
|
|
||||||
|
<action name="ExtractDLIRelation">
|
||||||
|
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||||
|
<job-tracker>${jobTracker}</job-tracker>
|
||||||
|
<name-node>${nameNode}</name-node>
|
||||||
|
<master>yarn-cluster</master>
|
||||||
|
<mode>cluster</mode>
|
||||||
|
<name>Extract DLI Entities (Relation)</name>
|
||||||
|
<class>eu.dnetlib.dhp.sx.graph.SparkSplitOafTODLIEntities</class>
|
||||||
|
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
|
||||||
|
<spark-opts>
|
||||||
|
--executor-memory ${sparkExecutorMemory}
|
||||||
|
--executor-cores=${sparkExecutorCores}
|
||||||
|
--driver-memory=${sparkDriverMemory}
|
||||||
|
--conf spark.sql.shuffle.partitions=5000
|
||||||
|
${sparkExtraOPT}
|
||||||
|
</spark-opts>
|
||||||
|
<arg>-mt</arg> <arg>yarn-cluster</arg>
|
||||||
|
<arg>--workingPath</arg><arg>${workingPath}</arg>
|
||||||
|
<arg>-e</arg><arg>relation</arg>
|
||||||
</spark>
|
</spark>
|
||||||
<ok to="End"/>
|
<ok to="End"/>
|
||||||
<error to="Kill"/>
|
<error to="Kill"/>
|
||||||
|
|
Loading…
Reference in New Issue