From 66702b1973269c77b9780839d80375ad3e9843d9 Mon Sep 17 00:00:00 2001 From: Sandro La Bruzzo Date: Tue, 28 Sep 2021 08:58:59 +0200 Subject: [PATCH] Added node to update datacite --- .../SparkDownloadUpdateDatacite.scala | 53 +++++++++++++++++++ 1 file changed, 53 insertions(+) create mode 100644 dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/datacite/SparkDownloadUpdateDatacite.scala diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/datacite/SparkDownloadUpdateDatacite.scala b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/datacite/SparkDownloadUpdateDatacite.scala new file mode 100644 index 000000000..0b459fcf6 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/datacite/SparkDownloadUpdateDatacite.scala @@ -0,0 +1,53 @@ +package eu.dnetlib.dhp.actionmanager.datacite + + +import eu.dnetlib.dhp.application.ArgumentApplicationParser +import eu.dnetlib.dhp.schema.oaf.{Oaf, Result} +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.LocalFileSystem +import org.apache.hadoop.hdfs.DistributedFileSystem +import org.apache.spark.SparkConf +import org.apache.spark.sql.{Encoder, Encoders, SparkSession} +import org.apache.spark.sql.functions.max +import org.slf4j.{Logger, LoggerFactory} + +import java.text.SimpleDateFormat +import java.util.{Date, Locale} +import scala.io.Source + +object SparkDownloadUpdateDatacite { + val log: Logger = LoggerFactory.getLogger(getClass) + + def main(args: Array[String]): Unit = { + + val conf = new SparkConf + val parser = new ArgumentApplicationParser(Source.fromInputStream(getClass.getResourceAsStream("/eu/dnetlib/dhp/actionmanager/datacite/generate_dataset_params.json")).mkString) + parser.parseArgument(args) + val master = parser.get("master") + val sourcePath = parser.get("sourcePath") + val workingPath = parser.get("workingPath") + + val hdfsuri = parser.get("namenode") + log.info(s"namenode is $hdfsuri") + + + val spark: SparkSession = SparkSession.builder().config(conf) + .appName(getClass.getSimpleName) + .master(master) + .getOrCreate() + + implicit val oafEncoder: Encoder[Oaf] = Encoders.kryo[Oaf] + implicit val resEncoder: Encoder[Result] = Encoders.kryo[Result] + + import spark.implicits._ + + + val maxDate:String = spark.read.load(workingPath).as[Oaf].filter(s => s.isInstanceOf[Result]).map(r => r.asInstanceOf[Result].getDateofcollection).select(max("value")).first().getString(0) + val ISO8601FORMAT = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ssZ", Locale.US) + val string_to_date =ISO8601FORMAT.parse(maxDate) + val ts = string_to_date.getTime + + + } + +}