refactoring create actionset of scholexplorer, moved on package dhp-aggregation

This commit is contained in:
Sandro La Bruzzo 2021-07-29 10:45:24 +02:00
parent 6aef3e8f46
commit 3721df7aa6
6 changed files with 20 additions and 20 deletions

View File

@ -1,9 +1,9 @@
package eu.dnetlib.dhp.sx.provision package eu.dnetlib.dhp.actionmanager.scholix
import eu.dnetlib.dhp.application.ArgumentApplicationParser import eu.dnetlib.dhp.application.ArgumentApplicationParser
import eu.dnetlib.dhp.schema.oaf.{Oaf, Relation, Result} import eu.dnetlib.dhp.schema.oaf.{Oaf, Relation, Result}
import org.apache.spark.{SparkConf, sql} import org.apache.spark.SparkConf
import org.apache.spark.sql.{Dataset, Encoder, Encoders, SaveMode, SparkSession} import org.apache.spark.sql._
import org.slf4j.{Logger, LoggerFactory} import org.slf4j.{Logger, LoggerFactory}
import scala.io.Source import scala.io.Source
@ -34,16 +34,16 @@ object SparkCreateActionset {
val workingDirFolder = parser.get("workingDirFolder") val workingDirFolder = parser.get("workingDirFolder")
log.info(s"workingDirFolder -> $workingDirFolder") log.info(s"workingDirFolder -> $workingDirFolder")
implicit val oafEncoders:Encoder[Oaf] = Encoders.kryo[Oaf] implicit val oafEncoders: Encoder[Oaf] = Encoders.kryo[Oaf]
implicit val resultEncoders:Encoder[Result] = Encoders.kryo[Result] implicit val resultEncoders: Encoder[Result] = Encoders.kryo[Result]
implicit val relationEncoders:Encoder[Relation] = Encoders.kryo[Relation] implicit val relationEncoders: Encoder[Relation] = Encoders.kryo[Relation]
import spark.implicits._ import spark.implicits._
val relation = spark.read.load(s"$sourcePath/relation").as[Relation] val relation = spark.read.load(s"$sourcePath/relation").as[Relation]
relation.filter(r => (r.getDataInfo== null || r.getDataInfo.getDeletedbyinference == false) && !r.getRelClass.toLowerCase.contains("merge")) relation.filter(r => (r.getDataInfo == null || r.getDataInfo.getDeletedbyinference == false) && !r.getRelClass.toLowerCase.contains("merge"))
.flatMap(r => List(r.getSource,r.getTarget)).distinct().write.mode(SaveMode.Overwrite).save(s"$workingDirFolder/id_relation") .flatMap(r => List(r.getSource, r.getTarget)).distinct().write.mode(SaveMode.Overwrite).save(s"$workingDirFolder/id_relation")
val idRelation = spark.read.load(s"$workingDirFolder/id_relation").as[String] val idRelation = spark.read.load(s"$workingDirFolder/id_relation").as[String]
@ -53,12 +53,12 @@ object SparkCreateActionset {
log.info("save relation filtered") log.info("save relation filtered")
relation.filter(r => (r.getDataInfo== null || r.getDataInfo.getDeletedbyinference == false) && !r.getRelClass.toLowerCase.contains("merge")) relation.filter(r => (r.getDataInfo == null || r.getDataInfo.getDeletedbyinference == false) && !r.getRelClass.toLowerCase.contains("merge"))
.write.mode(SaveMode.Overwrite).save(s"$workingDirFolder/actionSetOaf") .write.mode(SaveMode.Overwrite).save(s"$workingDirFolder/actionSetOaf")
log.info("saving entities") log.info("saving entities")
val entities:Dataset[(String, Result)] = spark.read.load(s"$sourcePath/entities/*").as[Result].map(p => (p.getId, p))(Encoders.tuple(Encoders.STRING, resultEncoders)) val entities: Dataset[(String, Result)] = spark.read.load(s"$sourcePath/entities/*").as[Result].map(p => (p.getId, p))(Encoders.tuple(Encoders.STRING, resultEncoders))
entities.filter(r => r.isInstanceOf[Result]).map(r => r.asInstanceOf[Result]) entities.filter(r => r.isInstanceOf[Result]).map(r => r.asInstanceOf[Result])

View File

@ -1,9 +1,9 @@
package eu.dnetlib.dhp.sx.provision package eu.dnetlib.dhp.actionmanager.scholix
import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.jackson.databind.ObjectMapper
import eu.dnetlib.dhp.application.ArgumentApplicationParser import eu.dnetlib.dhp.application.ArgumentApplicationParser
import eu.dnetlib.dhp.schema.action.AtomicAction import eu.dnetlib.dhp.schema.action.AtomicAction
import eu.dnetlib.dhp.schema.oaf.{Oaf, OtherResearchProduct, Publication, Relation, Software, Dataset => OafDataset} import eu.dnetlib.dhp.schema.oaf.{Oaf, Dataset => OafDataset,Publication, Software, OtherResearchProduct, Relation}
import org.apache.hadoop.io.Text import org.apache.hadoop.io.Text
import org.apache.hadoop.io.compress.GzipCodec import org.apache.hadoop.io.compress.GzipCodec
import org.apache.hadoop.mapred.SequenceFileOutputFormat import org.apache.hadoop.mapred.SequenceFileOutputFormat
@ -73,13 +73,13 @@ object SparkSaveActionSet {
val targetPath = parser.get("targetPath") val targetPath = parser.get("targetPath")
log.info(s"targetPath -> $targetPath") log.info(s"targetPath -> $targetPath")
implicit val oafEncoders:Encoder[Oaf] = Encoders.kryo[Oaf] implicit val oafEncoders: Encoder[Oaf] = Encoders.kryo[Oaf]
implicit val tEncoder:Encoder[(String,String)] = Encoders.tuple(Encoders.STRING,Encoders.STRING) implicit val tEncoder: Encoder[(String, String)] = Encoders.tuple(Encoders.STRING, Encoders.STRING)
spark.read.load(sourcePath).as[Oaf] spark.read.load(sourcePath).as[Oaf]
.map(o =>toActionSet(o)) .map(o => toActionSet(o))
.filter(o => o!= null) .filter(o => o != null)
.rdd.map(s => (new Text(s._1), new Text(s._2))).saveAsHadoopFile(s"$targetPath", classOf[Text], classOf[Text], classOf[SequenceFileOutputFormat[Text,Text]], classOf[GzipCodec]) .rdd.map(s => (new Text(s._1), new Text(s._2))).saveAsHadoopFile(s"$targetPath", classOf[Text], classOf[Text], classOf[SequenceFileOutputFormat[Text, Text]], classOf[GzipCodec])
} }

View File

@ -26,7 +26,7 @@
<mode>cluster</mode> <mode>cluster</mode>
<name>Create Action Set</name> <name>Create Action Set</name>
<class>eu.dnetlib.dhp.sx.provision.SparkCreateActionset</class> <class>eu.dnetlib.dhp.sx.provision.SparkCreateActionset</class>
<jar>dhp-graph-provision-${projectVersion}.jar</jar> <jar>dhp-aggregation-${projectVersion}.jar</jar>
<spark-opts> <spark-opts>
--executor-memory=${sparkExecutorMemory} --executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores} --executor-cores=${sparkExecutorCores}
@ -53,7 +53,7 @@
<mode>cluster</mode> <mode>cluster</mode>
<name>Save Action Set</name> <name>Save Action Set</name>
<class>eu.dnetlib.dhp.sx.provision.SparkSaveActionSet</class> <class>eu.dnetlib.dhp.sx.provision.SparkSaveActionSet</class>
<jar>dhp-graph-provision-${projectVersion}.jar</jar> <jar>dhp-aggregation-${projectVersion}.jar</jar>
<spark-opts> <spark-opts>
--executor-memory=${sparkExecutorMemory} --executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores} --executor-cores=${sparkExecutorCores}