diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/sx/provision/SparkCreateActionset.scala b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/scholix/SparkCreateActionset.scala
similarity index 63%
rename from dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/sx/provision/SparkCreateActionset.scala
rename to dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/scholix/SparkCreateActionset.scala
index faf386d25..b78f411ee 100644
--- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/sx/provision/SparkCreateActionset.scala
+++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/scholix/SparkCreateActionset.scala
@@ -1,9 +1,9 @@
-package eu.dnetlib.dhp.sx.provision
+package eu.dnetlib.dhp.actionmanager.scholix
import eu.dnetlib.dhp.application.ArgumentApplicationParser
import eu.dnetlib.dhp.schema.oaf.{Oaf, Relation, Result}
-import org.apache.spark.{SparkConf, sql}
-import org.apache.spark.sql.{Dataset, Encoder, Encoders, SaveMode, SparkSession}
+import org.apache.spark.SparkConf
+import org.apache.spark.sql._
import org.slf4j.{Logger, LoggerFactory}
import scala.io.Source
@@ -34,16 +34,16 @@ object SparkCreateActionset {
val workingDirFolder = parser.get("workingDirFolder")
log.info(s"workingDirFolder -> $workingDirFolder")
- implicit val oafEncoders:Encoder[Oaf] = Encoders.kryo[Oaf]
- implicit val resultEncoders:Encoder[Result] = Encoders.kryo[Result]
- implicit val relationEncoders:Encoder[Relation] = Encoders.kryo[Relation]
+ implicit val oafEncoders: Encoder[Oaf] = Encoders.kryo[Oaf]
+ implicit val resultEncoders: Encoder[Result] = Encoders.kryo[Result]
+ implicit val relationEncoders: Encoder[Relation] = Encoders.kryo[Relation]
- import spark.implicits._
+ import spark.implicits._
val relation = spark.read.load(s"$sourcePath/relation").as[Relation]
- relation.filter(r => (r.getDataInfo== null || r.getDataInfo.getDeletedbyinference == false) && !r.getRelClass.toLowerCase.contains("merge"))
- .flatMap(r => List(r.getSource,r.getTarget)).distinct().write.mode(SaveMode.Overwrite).save(s"$workingDirFolder/id_relation")
+ relation.filter(r => (r.getDataInfo == null || r.getDataInfo.getDeletedbyinference == false) && !r.getRelClass.toLowerCase.contains("merge"))
+ .flatMap(r => List(r.getSource, r.getTarget)).distinct().write.mode(SaveMode.Overwrite).save(s"$workingDirFolder/id_relation")
val idRelation = spark.read.load(s"$workingDirFolder/id_relation").as[String]
@@ -53,12 +53,12 @@ object SparkCreateActionset {
log.info("save relation filtered")
- relation.filter(r => (r.getDataInfo== null || r.getDataInfo.getDeletedbyinference == false) && !r.getRelClass.toLowerCase.contains("merge"))
+ relation.filter(r => (r.getDataInfo == null || r.getDataInfo.getDeletedbyinference == false) && !r.getRelClass.toLowerCase.contains("merge"))
.write.mode(SaveMode.Overwrite).save(s"$workingDirFolder/actionSetOaf")
log.info("saving entities")
- val entities:Dataset[(String, Result)] = spark.read.load(s"$sourcePath/entities/*").as[Result].map(p => (p.getId, p))(Encoders.tuple(Encoders.STRING, resultEncoders))
+ val entities: Dataset[(String, Result)] = spark.read.load(s"$sourcePath/entities/*").as[Result].map(p => (p.getId, p))(Encoders.tuple(Encoders.STRING, resultEncoders))
entities.filter(r => r.isInstanceOf[Result]).map(r => r.asInstanceOf[Result])
diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/sx/provision/SparkSaveActionSet.scala b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/scholix/SparkSaveActionSet.scala
similarity index 86%
rename from dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/sx/provision/SparkSaveActionSet.scala
rename to dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/scholix/SparkSaveActionSet.scala
index d1d0b8424..1df7ea3fb 100644
--- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/sx/provision/SparkSaveActionSet.scala
+++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/scholix/SparkSaveActionSet.scala
@@ -1,9 +1,9 @@
-package eu.dnetlib.dhp.sx.provision
+package eu.dnetlib.dhp.actionmanager.scholix
import com.fasterxml.jackson.databind.ObjectMapper
import eu.dnetlib.dhp.application.ArgumentApplicationParser
import eu.dnetlib.dhp.schema.action.AtomicAction
-import eu.dnetlib.dhp.schema.oaf.{Oaf, OtherResearchProduct, Publication, Relation, Software, Dataset => OafDataset}
+import eu.dnetlib.dhp.schema.oaf.{Oaf, Dataset => OafDataset,Publication, Software, OtherResearchProduct, Relation}
import org.apache.hadoop.io.Text
import org.apache.hadoop.io.compress.GzipCodec
import org.apache.hadoop.mapred.SequenceFileOutputFormat
@@ -73,13 +73,13 @@ object SparkSaveActionSet {
val targetPath = parser.get("targetPath")
log.info(s"targetPath -> $targetPath")
- implicit val oafEncoders:Encoder[Oaf] = Encoders.kryo[Oaf]
- implicit val tEncoder:Encoder[(String,String)] = Encoders.tuple(Encoders.STRING,Encoders.STRING)
+ implicit val oafEncoders: Encoder[Oaf] = Encoders.kryo[Oaf]
+ implicit val tEncoder: Encoder[(String, String)] = Encoders.tuple(Encoders.STRING, Encoders.STRING)
spark.read.load(sourcePath).as[Oaf]
- .map(o =>toActionSet(o))
- .filter(o => o!= null)
- .rdd.map(s => (new Text(s._1), new Text(s._2))).saveAsHadoopFile(s"$targetPath", classOf[Text], classOf[Text], classOf[SequenceFileOutputFormat[Text,Text]], classOf[GzipCodec])
+ .map(o => toActionSet(o))
+ .filter(o => o != null)
+ .rdd.map(s => (new Text(s._1), new Text(s._2))).saveAsHadoopFile(s"$targetPath", classOf[Text], classOf[Text], classOf[SequenceFileOutputFormat[Text, Text]], classOf[GzipCodec])
}
diff --git a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/sx/actionset/generate_actionset.json b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/actionset/generate_actionset.json
similarity index 100%
rename from dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/sx/actionset/generate_actionset.json
rename to dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/actionset/generate_actionset.json
diff --git a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/sx/actionset/oozie_app/config-default.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/actionset/oozie_app/config-default.xml
similarity index 100%
rename from dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/sx/actionset/oozie_app/config-default.xml
rename to dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/actionset/oozie_app/config-default.xml
diff --git a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/sx/actionset/oozie_app/workflow.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/actionset/oozie_app/workflow.xml
similarity index 95%
rename from dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/sx/actionset/oozie_app/workflow.xml
rename to dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/actionset/oozie_app/workflow.xml
index 7c4b3dd26..8c045fcfe 100644
--- a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/sx/actionset/oozie_app/workflow.xml
+++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/actionset/oozie_app/workflow.xml
@@ -26,7 +26,7 @@
cluster
Create Action Set
eu.dnetlib.dhp.sx.provision.SparkCreateActionset
- dhp-graph-provision-${projectVersion}.jar
+ dhp-aggregation-${projectVersion}.jar
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
@@ -53,7 +53,7 @@
cluster
Save Action Set
eu.dnetlib.dhp.sx.provision.SparkSaveActionSet
- dhp-graph-provision-${projectVersion}.jar
+ dhp-aggregation-${projectVersion}.jar
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
diff --git a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/sx/actionset/save_actionset.json b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/actionset/save_actionset.json
similarity index 100%
rename from dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/sx/actionset/save_actionset.json
rename to dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/actionset/save_actionset.json