diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkSplitOafTODLIEntities.scala b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkSplitOafTODLIEntities.scala index d0df28b2d..822b16263 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkSplitOafTODLIEntities.scala +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkSplitOafTODLIEntities.scala @@ -4,7 +4,6 @@ import eu.dnetlib.dhp.application.ArgumentApplicationParser import eu.dnetlib.dhp.schema.oaf.{Oaf, Relation} import eu.dnetlib.dhp.schema.scholexplorer.{DLIDataset, DLIPublication, DLIUnknown} import eu.dnetlib.dhp.sx.ebi.EBIAggregator -import eu.dnetlib.dhp.sx.ebi.model.{PMArticle, PMAuthor, PMJournal} import org.apache.commons.io.IOUtils import org.apache.spark.sql.{Dataset, Encoder, Encoders, SaveMode, SparkSession} import org.slf4j.LoggerFactory @@ -18,38 +17,38 @@ object SparkSplitOafTODLIEntities { } - def main(args: Array[String]): Unit = { - val parser = new ArgumentApplicationParser(IOUtils.toString(SparkSplitOafTODLIEntities.getClass.getResourceAsStream("/eu/dnetlib/dhp/sx/graph/argumentparser/input_extract_entities_parameters.json"))) - val logger = LoggerFactory.getLogger(SparkSplitOafTODLIEntities.getClass) - parser.parseArgument(args) - val workingPath: String = parser.get("workingPath") - logger.info(s"Working dir path = $workingPath") + def extract_dataset(spark:SparkSession, workingPath:String) :Unit = { + + implicit val oafEncoder: Encoder[Oaf] = Encoders.kryo[Oaf] + implicit val datEncoder: Encoder[DLIDataset] = Encoders.kryo[DLIDataset] + + val OAFDataset:Dataset[Oaf] = spark.read.load(s"$workingPath/input/OAFDataset").as[Oaf].repartition(4000) + + val ebi_dataset:Dataset[DLIDataset] = spark.read.load(s"$workingPath/ebi/baseline_dataset_ebi").as[DLIDataset].repartition(1000) + + + OAFDataset + .filter(s => s != null && s.isInstanceOf[DLIDataset]) + .map(s =>s.asInstanceOf[DLIDataset]) + .union(ebi_dataset) + .map(d => (d.getId, d))(Encoders.tuple(Encoders.STRING, datEncoder)) + .groupByKey(_._1)(Encoders.STRING) + .agg(EBIAggregator.getDLIDatasetAggregator().toColumn) + .map(p => p._2) + .repartition(2000) + .write.mode(SaveMode.Overwrite).save(s"$workingPath/graph/dataset") + + } + + def extract_publication(spark:SparkSession, workingPath:String) :Unit = { implicit val oafEncoder: Encoder[Oaf] = Encoders.kryo[Oaf] implicit val pubEncoder: Encoder[DLIPublication] = Encoders.kryo[DLIPublication] - implicit val datEncoder: Encoder[DLIDataset] = Encoders.kryo[DLIDataset] - implicit val unkEncoder: Encoder[DLIUnknown] = Encoders.kryo[DLIUnknown] - implicit val relEncoder: Encoder[Relation] = Encoders.kryo[Relation] - - - - val spark:SparkSession = SparkSession - .builder() - .appName(SparkSplitOafTODLIEntities.getClass.getSimpleName) - .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") - .master(parser.get("master")) - .getOrCreate() - - - val OAFDataset:Dataset[Oaf] = spark.read.load(s"$workingPath/input/OAFDataset").as[Oaf] - val ebi_dataset:Dataset[DLIDataset] = spark.read.load(s"$workingPath/ebi/baseline_dataset_ebi").as[DLIDataset] - val ebi_publication:Dataset[DLIPublication] = spark.read.load(s"$workingPath/ebi/baseline_publication_ebi").as[DLIPublication] - val ebi_relation:Dataset[Relation] = spark.read.load(s"$workingPath/ebi/baseline_relation_ebi").as[Relation] - + val ebi_publication:Dataset[DLIPublication] = spark.read.load(s"$workingPath/ebi/baseline_publication_ebi").as[DLIPublication].repartition(1000) OAFDataset @@ -60,20 +59,17 @@ object SparkSplitOafTODLIEntities { .groupByKey(_._1)(Encoders.STRING) .agg(EBIAggregator.getDLIPublicationAggregator().toColumn) .map(p => p._2) - .repartition(1000) + .repartition(2000) .write.mode(SaveMode.Overwrite).save(s"$workingPath/graph/publication") - OAFDataset - .filter(s => s != null && s.isInstanceOf[DLIDataset]) - .map(s =>s.asInstanceOf[DLIDataset]) - .union(ebi_dataset) - .map(d => (d.getId, d))(Encoders.tuple(Encoders.STRING, datEncoder)) - .groupByKey(_._1)(Encoders.STRING) - .agg(EBIAggregator.getDLIDatasetAggregator().toColumn) - .map(p => p._2) - .repartition(1000) - .write.mode(SaveMode.Overwrite).save(s"$workingPath/graph/dataset") + } + def extract_unknown(spark:SparkSession, workingPath:String) :Unit = { + + implicit val oafEncoder: Encoder[Oaf] = Encoders.kryo[Oaf] + implicit val unkEncoder: Encoder[DLIUnknown] = Encoders.kryo[DLIUnknown] + + val OAFDataset:Dataset[Oaf] = spark.read.load(s"$workingPath/input/OAFDataset").as[Oaf] OAFDataset .filter(s => s != null && s.isInstanceOf[DLIUnknown]) @@ -82,9 +78,18 @@ object SparkSplitOafTODLIEntities { .groupByKey(_._1)(Encoders.STRING) .agg(EBIAggregator.getDLIUnknownAggregator().toColumn) .map(p => p._2) - .repartition(1000) .write.mode(SaveMode.Overwrite).save(s"$workingPath/graph/unknown") + } + + + def extract_relations(spark:SparkSession, workingPath:String) :Unit = { + + implicit val oafEncoder: Encoder[Oaf] = Encoders.kryo[Oaf] + implicit val relEncoder: Encoder[Relation] = Encoders.kryo[Relation] + + val OAFDataset:Dataset[Oaf] = spark.read.load(s"$workingPath/input/OAFDataset").as[Oaf] + val ebi_relation:Dataset[Relation] = spark.read.load(s"$workingPath/ebi/baseline_relation_ebi").as[Relation].repartition(2000) OAFDataset .filter(s => s != null && s.isInstanceOf[Relation]) @@ -94,9 +99,35 @@ object SparkSplitOafTODLIEntities { .groupByKey(_._1)(Encoders.STRING) .agg(EBIAggregator.getRelationAggregator().toColumn) .map(p => p._2) - .repartition(1000) + .repartition(4000) .write.mode(SaveMode.Overwrite).save(s"$workingPath/graph/relation") + } + + + def main(args: Array[String]): Unit = { + val parser = new ArgumentApplicationParser(IOUtils.toString(SparkSplitOafTODLIEntities.getClass.getResourceAsStream("/eu/dnetlib/dhp/sx/graph/argumentparser/input_extract_entities_parameters.json"))) + val logger = LoggerFactory.getLogger(SparkSplitOafTODLIEntities.getClass) + parser.parseArgument(args) + + val workingPath: String = parser.get("workingPath") + val entity:String = parser.get("entity") + logger.info(s"Working dir path = $workingPath") + + val spark:SparkSession = SparkSession + .builder() + .appName(SparkSplitOafTODLIEntities.getClass.getSimpleName) + .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") + .master(parser.get("master")) + .getOrCreate() + + + entity match { + case "publication" => extract_publication(spark, workingPath) + case "dataset" => extract_dataset(spark,workingPath) + case "relation" => extract_relations(spark, workingPath) + case "unknown" => extract_unknown(spark, workingPath) + } diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/argumentparser/input_extract_entities_parameters.json b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/argumentparser/input_extract_entities_parameters.json index febcfc898..7878931af 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/argumentparser/input_extract_entities_parameters.json +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/argumentparser/input_extract_entities_parameters.json @@ -1,4 +1,5 @@ [ {"paramName":"mt", "paramLongName":"master", "paramDescription": "should be local or yarn", "paramRequired": true}, - {"paramName":"w", "paramLongName":"workingPath", "paramDescription": "the work dir path", "paramRequired": true} + {"paramName":"w", "paramLongName":"workingPath", "paramDescription": "the work dir path", "paramRequired": true}, + {"paramName":"e", "paramLongName":"entity", "paramDescription": "the work dir path", "paramRequired": true} ] \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/step2/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/step2/oozie_app/workflow.xml index fabe7510b..9d06c42d6 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/step2/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/step2/oozie_app/workflow.xml @@ -14,30 +14,103 @@ - + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] - + ${jobTracker} ${nameNode} yarn-cluster cluster - Extract DLI Entities + Extract DLI Entities (Publication) eu.dnetlib.dhp.sx.graph.SparkSplitOafTODLIEntities dhp-graph-mapper-${projectVersion}.jar --executor-memory ${sparkExecutorMemory} --executor-cores=${sparkExecutorCores} --driver-memory=${sparkDriverMemory} - --conf spark.sql.shuffle.partitions=3840 + --conf spark.sql.shuffle.partitions=5000 ${sparkExtraOPT} -mt yarn-cluster --workingPath${workingPath} + -epublication + + + + + + + + ${jobTracker} + ${nameNode} + yarn-cluster + cluster + Extract DLI Entities (Dataset) + eu.dnetlib.dhp.sx.graph.SparkSplitOafTODLIEntities + dhp-graph-mapper-${projectVersion}.jar + + --executor-memory ${sparkExecutorMemory} + --executor-cores=${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.sql.shuffle.partitions=5000 + ${sparkExtraOPT} + + -mt yarn-cluster + --workingPath${workingPath} + -edataset + + + + + + + + ${jobTracker} + ${nameNode} + yarn-cluster + cluster + Extract DLI Entities (Unknown) + eu.dnetlib.dhp.sx.graph.SparkSplitOafTODLIEntities + dhp-graph-mapper-${projectVersion}.jar + + --executor-memory ${sparkExecutorMemory} + --executor-cores=${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.sql.shuffle.partitions=5000 + ${sparkExtraOPT} + + -mt yarn-cluster + --workingPath${workingPath} + -eunknown + + + + + + + + ${jobTracker} + ${nameNode} + yarn-cluster + cluster + Extract DLI Entities (Relation) + eu.dnetlib.dhp.sx.graph.SparkSplitOafTODLIEntities + dhp-graph-mapper-${projectVersion}.jar + + --executor-memory ${sparkExecutorMemory} + --executor-cores=${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.sql.shuffle.partitions=5000 + ${sparkExtraOPT} + + -mt yarn-cluster + --workingPath${workingPath} + -erelation