diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkRetrieveDataciteDelta.scala b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkRetrieveDataciteDelta.scala new file mode 100644 index 0000000000..62f53e4ad0 --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkRetrieveDataciteDelta.scala @@ -0,0 +1,54 @@ +package eu.dnetlib.dhp.sx.graph + +import eu.dnetlib.dhp.application.AbstractScalaApplication +import eu.dnetlib.dhp.schema.oaf.{Oaf, Result} +import org.apache.spark.sql.{Encoder, Encoders, SparkSession} +import org.apache.spark.sql.functions.max +import org.slf4j.Logger + +class SparkRetrieveDataciteDelta (propertyPath:String, args:Array[String], log:Logger) extends AbstractScalaApplication(propertyPath, args, log:Logger) { + + + def retrieveLastCollectedFrom(spark:SparkSession, entitiesPath:String):String = { + log.info("Retrieve last entities collected From") + + implicit val oafEncoder:Encoder[Result] = Encoders.kryo[Result] + import spark.implicits._ + + val entitiesDS = spark.read.load(s"$entitiesPath/*").as[Result] + + entitiesDS.filter(r => r.getDateofcollection!= null).map(_.getDateofcollection).select(max("value")).first.getString(0) + + + + } + + + /** + * Here all the spark applications runs this method + * where the whole logic of the spark node is defined + */ + override def run(): Unit = { + val sourcePath = parser.get("sourcePath") + log.info(s"SourcePath is '$sourcePath'") + + val datacitePath = parser.get("datacitePath") + log.info(s"DatacitePath is '$datacitePath'") + + + log.info("Retrieve last entities collected From") + + implicit val oafEncoder:Encoder[Result] = Encoders.kryo[Result] + + val lastCollectionDate = retrieveLastCollectedFrom(spark, s"$sourcePath/entities") + + + + + + + + + + } +} diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/finalGraph/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/finalGraph/oozie_app/workflow.xml index 17996c82c0..85c0d486d8 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/finalGraph/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/finalGraph/oozie_app/workflow.xml @@ -79,7 +79,7 @@ --executor-cores=${sparkExecutorCores} --driver-memory=${sparkDriverMemory} --conf spark.extraListeners=${spark2ExtraListeners} - --conf spark.sql.shuffle.partitions=20000 + --conf spark.sql.shuffle.partitions=30000 --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} diff --git a/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/sx/graph/SparkConvertRDDtoDataset.scala b/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/sx/graph/SparkConvertRDDtoDataset.scala index 23f039c706..2115df1fd2 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/sx/graph/SparkConvertRDDtoDataset.scala +++ b/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/sx/graph/SparkConvertRDDtoDataset.scala @@ -2,12 +2,11 @@ package eu.dnetlib.dhp.sx.graph import com.fasterxml.jackson.databind.ObjectMapper import eu.dnetlib.dhp.application.ArgumentApplicationParser -import eu.dnetlib.dhp.schema.oaf.{OtherResearchProduct, Publication, Relation, Software,Dataset => OafDataset} +import eu.dnetlib.dhp.schema.oaf.{OtherResearchProduct, Publication, Relation, Result, Software, Dataset => OafDataset} import org.apache.commons.io.IOUtils import org.apache.spark.SparkConf import org.apache.spark.sql.{Encoder, Encoders, SaveMode, SparkSession} import org.slf4j.{Logger, LoggerFactory} - object SparkConvertRDDtoDataset { def main(args: Array[String]): Unit = { @@ -32,39 +31,40 @@ object SparkConvertRDDtoDataset { val entityPath = s"$t/entities" val relPath = s"$t/relation" val mapper = new ObjectMapper() - implicit val datasetEncoder: Encoder[OafDataset] = Encoders.kryo(classOf[OafDataset]) - implicit val publicationEncoder: Encoder[Publication] = Encoders.kryo(classOf[Publication]) - implicit val relationEncoder: Encoder[Relation] = Encoders.kryo(classOf[Relation]) - implicit val orpEncoder: Encoder[OtherResearchProduct] = Encoders.kryo(classOf[OtherResearchProduct]) - implicit val softwareEncoder: Encoder[Software] = Encoders.kryo(classOf[Software]) + implicit val datasetEncoder: Encoder[OafDataset] = Encoders.kryo(classOf[OafDataset]) + implicit val publicationEncoder: Encoder[Publication] = Encoders.kryo(classOf[Publication]) + implicit val relationEncoder: Encoder[Relation] = Encoders.kryo(classOf[Relation]) + implicit val orpEncoder: Encoder[OtherResearchProduct] = Encoders.kryo(classOf[OtherResearchProduct]) + implicit val softwareEncoder: Encoder[Software] = Encoders.kryo(classOf[Software]) log.info("Converting dataset") - val rddDataset = spark.sparkContext.textFile(s"$sourcePath/dataset").map(s => mapper.readValue(s, classOf[OafDataset])) + val rddDataset =spark.sparkContext.textFile(s"$sourcePath/dataset").map(s => mapper.readValue(s, classOf[OafDataset])).filter(r=> r.getDataInfo!= null && r.getDataInfo.getDeletedbyinference == false) spark.createDataset(rddDataset).as[OafDataset].write.mode(SaveMode.Overwrite).save(s"$entityPath/dataset") log.info("Converting publication") - val rddPublication = spark.sparkContext.textFile(s"$sourcePath/publication").map(s => mapper.readValue(s, classOf[Publication])) + val rddPublication =spark.sparkContext.textFile(s"$sourcePath/publication").map(s => mapper.readValue(s, classOf[Publication])).filter(r=> r.getDataInfo!= null && r.getDataInfo.getDeletedbyinference == false) spark.createDataset(rddPublication).as[Publication].write.mode(SaveMode.Overwrite).save(s"$entityPath/publication") log.info("Converting software") - val rddSoftware = spark.sparkContext.textFile(s"$sourcePath/software").map(s => mapper.readValue(s, classOf[Software])) + val rddSoftware =spark.sparkContext.textFile(s"$sourcePath/software").map(s => mapper.readValue(s, classOf[Software])).filter(r=> r.getDataInfo!= null && r.getDataInfo.getDeletedbyinference == false) spark.createDataset(rddSoftware).as[Software].write.mode(SaveMode.Overwrite).save(s"$entityPath/software") log.info("Converting otherresearchproduct") - val rddOtherResearchProduct = spark.sparkContext.textFile(s"$sourcePath/otherresearchproduct").map(s => mapper.readValue(s, classOf[OtherResearchProduct])) + val rddOtherResearchProduct =spark.sparkContext.textFile(s"$sourcePath/otherresearchproduct").map(s => mapper.readValue(s, classOf[OtherResearchProduct])).filter(r=> r.getDataInfo!= null && r.getDataInfo.getDeletedbyinference == false) spark.createDataset(rddOtherResearchProduct).as[OtherResearchProduct].write.mode(SaveMode.Overwrite).save(s"$entityPath/otherresearchproduct") log.info("Converting Relation") - val relationSemanticFilter = List("cites", "iscitedby", "merges", "ismergedin") + val relationSemanticFilter = List("cites", "iscitedby","merges", "ismergedin", "HasAmongTopNSimilarDocuments","IsAmongTopNSimilarDocuments" ) - val rddRelation = spark.sparkContext.textFile(s"$sourcePath/relation") + val rddRelation =spark.sparkContext.textFile(s"$sourcePath/relation") .map(s => mapper.readValue(s, classOf[Relation])) - .filter(r => r.getSource.startsWith("50") && r.getTarget.startsWith("50")) + .filter(r=> r.getDataInfo!= null && r.getDataInfo.getDeletedbyinference == false) + .filter(r=> r.getSource.startsWith("50") && r.getTarget.startsWith("50")) .filter(r => !relationSemanticFilter.exists(k => k.equalsIgnoreCase(r.getRelClass))) spark.createDataset(rddRelation).as[Relation].write.mode(SaveMode.Overwrite).save(s"$relPath")