forked from D-Net/dnet-hadoop
55 lines
1.4 KiB
Scala
55 lines
1.4 KiB
Scala
|
package eu.dnetlib.dhp.sx.graph
|
||
|
|
||
|
import eu.dnetlib.dhp.application.AbstractScalaApplication
|
||
|
import eu.dnetlib.dhp.schema.oaf.{Oaf, Result}
|
||
|
import org.apache.spark.sql.{Encoder, Encoders, SparkSession}
|
||
|
import org.apache.spark.sql.functions.max
|
||
|
import org.slf4j.Logger
|
||
|
|
||
|
class SparkRetrieveDataciteDelta (propertyPath:String, args:Array[String], log:Logger) extends AbstractScalaApplication(propertyPath, args, log:Logger) {
|
||
|
|
||
|
|
||
|
def retrieveLastCollectedFrom(spark:SparkSession, entitiesPath:String):String = {
|
||
|
log.info("Retrieve last entities collected From")
|
||
|
|
||
|
implicit val oafEncoder:Encoder[Result] = Encoders.kryo[Result]
|
||
|
import spark.implicits._
|
||
|
|
||
|
val entitiesDS = spark.read.load(s"$entitiesPath/*").as[Result]
|
||
|
|
||
|
entitiesDS.filter(r => r.getDateofcollection!= null).map(_.getDateofcollection).select(max("value")).first.getString(0)
|
||
|
|
||
|
|
||
|
|
||
|
}
|
||
|
|
||
|
|
||
|
/**
|
||
|
* Here all the spark applications runs this method
|
||
|
* where the whole logic of the spark node is defined
|
||
|
*/
|
||
|
override def run(): Unit = {
|
||
|
val sourcePath = parser.get("sourcePath")
|
||
|
log.info(s"SourcePath is '$sourcePath'")
|
||
|
|
||
|
val datacitePath = parser.get("datacitePath")
|
||
|
log.info(s"DatacitePath is '$datacitePath'")
|
||
|
|
||
|
|
||
|
log.info("Retrieve last entities collected From")
|
||
|
|
||
|
implicit val oafEncoder:Encoder[Result] = Encoders.kryo[Result]
|
||
|
|
||
|
val lastCollectionDate = retrieveLastCollectedFrom(spark, s"$sourcePath/entities")
|
||
|
|
||
|
|
||
|
|
||
|
|
||
|
|
||
|
|
||
|
|
||
|
|
||
|
|
||
|
}
|
||
|
}
|