From 3721df7aa6d54bf20c1c709a1dcd3be3d8dc3af4 Mon Sep 17 00:00:00 2001 From: Sandro La Bruzzo Date: Thu, 29 Jul 2021 10:45:24 +0200 Subject: [PATCH] refactoring create actionset of scholexplorer, moved on package dhp-aggregation --- .../scholix}/SparkCreateActionset.scala | 22 +++++++++---------- .../scholix}/SparkSaveActionSet.scala | 14 ++++++------ .../dhp/sx/actionset/generate_actionset.json | 0 .../sx/actionset/oozie_app/config-default.xml | 0 .../dhp/sx/actionset/oozie_app/workflow.xml | 4 ++-- .../dhp/sx/actionset/save_actionset.json | 0 6 files changed, 20 insertions(+), 20 deletions(-) rename dhp-workflows/{dhp-graph-provision/src/main/java/eu/dnetlib/dhp/sx/provision => dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/scholix}/SparkCreateActionset.scala (63%) rename dhp-workflows/{dhp-graph-provision/src/main/java/eu/dnetlib/dhp/sx/provision => dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/scholix}/SparkSaveActionSet.scala (86%) rename dhp-workflows/{dhp-graph-provision => dhp-aggregation}/src/main/resources/eu/dnetlib/dhp/sx/actionset/generate_actionset.json (100%) rename dhp-workflows/{dhp-graph-provision => dhp-aggregation}/src/main/resources/eu/dnetlib/dhp/sx/actionset/oozie_app/config-default.xml (100%) rename dhp-workflows/{dhp-graph-provision => dhp-aggregation}/src/main/resources/eu/dnetlib/dhp/sx/actionset/oozie_app/workflow.xml (95%) rename dhp-workflows/{dhp-graph-provision => dhp-aggregation}/src/main/resources/eu/dnetlib/dhp/sx/actionset/save_actionset.json (100%) diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/sx/provision/SparkCreateActionset.scala b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/scholix/SparkCreateActionset.scala similarity index 63% rename from dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/sx/provision/SparkCreateActionset.scala rename to dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/scholix/SparkCreateActionset.scala index faf386d25..b78f411ee 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/sx/provision/SparkCreateActionset.scala +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/scholix/SparkCreateActionset.scala @@ -1,9 +1,9 @@ -package eu.dnetlib.dhp.sx.provision +package eu.dnetlib.dhp.actionmanager.scholix import eu.dnetlib.dhp.application.ArgumentApplicationParser import eu.dnetlib.dhp.schema.oaf.{Oaf, Relation, Result} -import org.apache.spark.{SparkConf, sql} -import org.apache.spark.sql.{Dataset, Encoder, Encoders, SaveMode, SparkSession} +import org.apache.spark.SparkConf +import org.apache.spark.sql._ import org.slf4j.{Logger, LoggerFactory} import scala.io.Source @@ -34,16 +34,16 @@ object SparkCreateActionset { val workingDirFolder = parser.get("workingDirFolder") log.info(s"workingDirFolder -> $workingDirFolder") - implicit val oafEncoders:Encoder[Oaf] = Encoders.kryo[Oaf] - implicit val resultEncoders:Encoder[Result] = Encoders.kryo[Result] - implicit val relationEncoders:Encoder[Relation] = Encoders.kryo[Relation] + implicit val oafEncoders: Encoder[Oaf] = Encoders.kryo[Oaf] + implicit val resultEncoders: Encoder[Result] = Encoders.kryo[Result] + implicit val relationEncoders: Encoder[Relation] = Encoders.kryo[Relation] - import spark.implicits._ + import spark.implicits._ val relation = spark.read.load(s"$sourcePath/relation").as[Relation] - relation.filter(r => (r.getDataInfo== null || r.getDataInfo.getDeletedbyinference == false) && !r.getRelClass.toLowerCase.contains("merge")) - .flatMap(r => List(r.getSource,r.getTarget)).distinct().write.mode(SaveMode.Overwrite).save(s"$workingDirFolder/id_relation") + relation.filter(r => (r.getDataInfo == null || r.getDataInfo.getDeletedbyinference == false) && !r.getRelClass.toLowerCase.contains("merge")) + .flatMap(r => List(r.getSource, r.getTarget)).distinct().write.mode(SaveMode.Overwrite).save(s"$workingDirFolder/id_relation") val idRelation = spark.read.load(s"$workingDirFolder/id_relation").as[String] @@ -53,12 +53,12 @@ object SparkCreateActionset { log.info("save relation filtered") - relation.filter(r => (r.getDataInfo== null || r.getDataInfo.getDeletedbyinference == false) && !r.getRelClass.toLowerCase.contains("merge")) + relation.filter(r => (r.getDataInfo == null || r.getDataInfo.getDeletedbyinference == false) && !r.getRelClass.toLowerCase.contains("merge")) .write.mode(SaveMode.Overwrite).save(s"$workingDirFolder/actionSetOaf") log.info("saving entities") - val entities:Dataset[(String, Result)] = spark.read.load(s"$sourcePath/entities/*").as[Result].map(p => (p.getId, p))(Encoders.tuple(Encoders.STRING, resultEncoders)) + val entities: Dataset[(String, Result)] = spark.read.load(s"$sourcePath/entities/*").as[Result].map(p => (p.getId, p))(Encoders.tuple(Encoders.STRING, resultEncoders)) entities.filter(r => r.isInstanceOf[Result]).map(r => r.asInstanceOf[Result]) diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/sx/provision/SparkSaveActionSet.scala b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/scholix/SparkSaveActionSet.scala similarity index 86% rename from dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/sx/provision/SparkSaveActionSet.scala rename to dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/scholix/SparkSaveActionSet.scala index d1d0b8424..1df7ea3fb 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/sx/provision/SparkSaveActionSet.scala +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/scholix/SparkSaveActionSet.scala @@ -1,9 +1,9 @@ -package eu.dnetlib.dhp.sx.provision +package eu.dnetlib.dhp.actionmanager.scholix import com.fasterxml.jackson.databind.ObjectMapper import eu.dnetlib.dhp.application.ArgumentApplicationParser import eu.dnetlib.dhp.schema.action.AtomicAction -import eu.dnetlib.dhp.schema.oaf.{Oaf, OtherResearchProduct, Publication, Relation, Software, Dataset => OafDataset} +import eu.dnetlib.dhp.schema.oaf.{Oaf, Dataset => OafDataset,Publication, Software, OtherResearchProduct, Relation} import org.apache.hadoop.io.Text import org.apache.hadoop.io.compress.GzipCodec import org.apache.hadoop.mapred.SequenceFileOutputFormat @@ -73,13 +73,13 @@ object SparkSaveActionSet { val targetPath = parser.get("targetPath") log.info(s"targetPath -> $targetPath") - implicit val oafEncoders:Encoder[Oaf] = Encoders.kryo[Oaf] - implicit val tEncoder:Encoder[(String,String)] = Encoders.tuple(Encoders.STRING,Encoders.STRING) + implicit val oafEncoders: Encoder[Oaf] = Encoders.kryo[Oaf] + implicit val tEncoder: Encoder[(String, String)] = Encoders.tuple(Encoders.STRING, Encoders.STRING) spark.read.load(sourcePath).as[Oaf] - .map(o =>toActionSet(o)) - .filter(o => o!= null) - .rdd.map(s => (new Text(s._1), new Text(s._2))).saveAsHadoopFile(s"$targetPath", classOf[Text], classOf[Text], classOf[SequenceFileOutputFormat[Text,Text]], classOf[GzipCodec]) + .map(o => toActionSet(o)) + .filter(o => o != null) + .rdd.map(s => (new Text(s._1), new Text(s._2))).saveAsHadoopFile(s"$targetPath", classOf[Text], classOf[Text], classOf[SequenceFileOutputFormat[Text, Text]], classOf[GzipCodec]) } diff --git a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/sx/actionset/generate_actionset.json b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/actionset/generate_actionset.json similarity index 100% rename from dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/sx/actionset/generate_actionset.json rename to dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/actionset/generate_actionset.json diff --git a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/sx/actionset/oozie_app/config-default.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/actionset/oozie_app/config-default.xml similarity index 100% rename from dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/sx/actionset/oozie_app/config-default.xml rename to dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/actionset/oozie_app/config-default.xml diff --git a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/sx/actionset/oozie_app/workflow.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/actionset/oozie_app/workflow.xml similarity index 95% rename from dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/sx/actionset/oozie_app/workflow.xml rename to dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/actionset/oozie_app/workflow.xml index 7c4b3dd26..8c045fcfe 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/sx/actionset/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/actionset/oozie_app/workflow.xml @@ -26,7 +26,7 @@ cluster Create Action Set eu.dnetlib.dhp.sx.provision.SparkCreateActionset - dhp-graph-provision-${projectVersion}.jar + dhp-aggregation-${projectVersion}.jar --executor-memory=${sparkExecutorMemory} --executor-cores=${sparkExecutorCores} @@ -53,7 +53,7 @@ cluster Save Action Set eu.dnetlib.dhp.sx.provision.SparkSaveActionSet - dhp-graph-provision-${projectVersion}.jar + dhp-aggregation-${projectVersion}.jar --executor-memory=${sparkExecutorMemory} --executor-cores=${sparkExecutorCores} diff --git a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/sx/actionset/save_actionset.json b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/actionset/save_actionset.json similarity index 100% rename from dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/sx/actionset/save_actionset.json rename to dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/actionset/save_actionset.json