From 1dfda3624e6c030ee88c51e7c2f9a526ad1ae3c7 Mon Sep 17 00:00:00 2001 From: Sandro La Bruzzo Date: Fri, 26 Mar 2021 13:56:29 +0100 Subject: [PATCH] improved workflow importing datacite --- .../datacite/AbstractRestClient.scala | 5 +- .../datacite/DataciteAPIImporter.scala | 10 +- .../DataciteToOAFTransformation.scala | 3 +- .../datacite/ImportDatacite.scala | 92 +++++++++++-------- .../datacite/import_from_api.json | 6 ++ .../datacite/oozie_app/workflow.xml | 8 +- .../SparkGenerateDOIBoostActionSet.scala | 2 +- .../crossref/CrossrefMappingTest.scala | 11 +++ 8 files changed, 90 insertions(+), 47 deletions(-) diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/datacite/AbstractRestClient.scala b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/datacite/AbstractRestClient.scala index 3c77700757..8df2032830 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/datacite/AbstractRestClient.scala +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/datacite/AbstractRestClient.scala @@ -57,10 +57,13 @@ abstract class AbstractRestClient extends Iterator[String]{ private def doHTTPRequest[A <: HttpUriRequest](r: A) :String ={ val client = HttpClients.createDefault + var tries = 4 try { - var tries = 4 while (tries > 0) { + + println(s"requesting ${r.getURI}") val response = client.execute(r) + println(s"get response with status${response.getStatusLine.getStatusCode}") if (response.getStatusLine.getStatusCode > 400) { tries -= 1 } diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/datacite/DataciteAPIImporter.scala b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/datacite/DataciteAPIImporter.scala index c2ad6855cb..36ec9e8c33 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/datacite/DataciteAPIImporter.scala +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/datacite/DataciteAPIImporter.scala @@ -3,7 +3,7 @@ package eu.dnetlib.dhp.actionmanager.datacite import org.json4s.{DefaultFormats, JValue} import org.json4s.jackson.JsonMethods.{compact, parse, render} -class DataciteAPIImporter(timestamp: Long = 0, blocks: Long = 10) extends AbstractRestClient { +class DataciteAPIImporter(timestamp: Long = 0, blocks: Long = 10, until:Long = -1) extends AbstractRestClient { override def extractInfo(input: String): Unit = { implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats @@ -16,9 +16,15 @@ class DataciteAPIImporter(timestamp: Long = 0, blocks: Long = 10) extends Abstra current_index = 0 } + def get_url():String ={ + val to = if (until> 0) s"$until" else "*" + s"https://api.datacite.org/dois?page[cursor]=1&page[size]=$blocks&query=updated:[$timestamp%20TO%20$to]" + + } + override def getBufferData(): Unit = { if (!complete) { - val response = if (scroll_value.isDefined) doHTTPGETRequest(scroll_value.get) else doHTTPGETRequest(s"https://api.datacite.org/dois?page[cursor]=1&page[size]=$blocks&query=updated:[$timestamp%20TO%20*]") + val response = if (scroll_value.isDefined) doHTTPGETRequest(scroll_value.get) else doHTTPGETRequest(get_url()) extractInfo(response) } } diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/datacite/DataciteToOAFTransformation.scala b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/datacite/DataciteToOAFTransformation.scala index 1ae1f086e3..1776a4ad68 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/datacite/DataciteToOAFTransformation.scala +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/datacite/DataciteToOAFTransformation.scala @@ -164,9 +164,8 @@ object DataciteToOAFTransformation { case _: Throwable => try { return Some(LocalDate.parse(a_date, df_it).toString) } catch { - case _: Throwable => try { + case _: Throwable => return None - } } } } diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/datacite/ImportDatacite.scala b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/datacite/ImportDatacite.scala index d5edb674a6..6cec4ea344 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/datacite/ImportDatacite.scala +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/datacite/ImportDatacite.scala @@ -1,5 +1,6 @@ package eu.dnetlib.dhp.actionmanager.datacite +import eu.dnetlib.dhp.actionmanager.datacite.DataciteToOAFTransformation.df_it import eu.dnetlib.dhp.application.ArgumentApplicationParser import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, LocalFileSystem, Path} @@ -15,7 +16,7 @@ import org.apache.spark.sql.functions.max import org.slf4j.{Logger, LoggerFactory} import java.time.format.DateTimeFormatter._ -import java.time.{LocalDateTime, ZoneOffset} +import java.time.{LocalDate, LocalDateTime, ZoneOffset} import scala.io.Source object ImportDatacite { @@ -23,21 +24,20 @@ object ImportDatacite { val log: Logger = LoggerFactory.getLogger(ImportDatacite.getClass) - def convertAPIStringToDataciteItem(input:String): DataciteType = { + def convertAPIStringToDataciteItem(input: String): DataciteType = { implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats lazy val json: org.json4s.JValue = parse(input) val doi = (json \ "attributes" \ "doi").extract[String].toLowerCase val isActive = (json \ "attributes" \ "isActive").extract[Boolean] - val timestamp_string = (json \ "attributes" \ "updated").extract[String] + val timestamp_string = (json \ "attributes" \ "updated").extract[String] val dt = LocalDateTime.parse(timestamp_string, ISO_DATE_TIME) - DataciteType(doi = doi, timestamp = dt.toInstant(ZoneOffset.UTC).toEpochMilli/1000, isActive = isActive, json = input) + DataciteType(doi = doi, timestamp = dt.toInstant(ZoneOffset.UTC).toEpochMilli / 1000, isActive = isActive, json = input) } - def main(args: Array[String]): Unit = { val parser = new ArgumentApplicationParser(Source.fromInputStream(getClass.getResourceAsStream("/eu/dnetlib/dhp/actionmanager/datacite/import_from_api.json")).mkString) @@ -53,9 +53,13 @@ object ImportDatacite { val dataciteDump = parser.get("dataciteDumpPath") log.info(s"dataciteDump is $dataciteDump") - val hdfsTargetPath =new Path(targetPath) + val hdfsTargetPath = new Path(targetPath) log.info(s"hdfsTargetPath is $hdfsTargetPath") + + val spkipImport = parser.get("skipImport") + log.info(s"skipImport is $spkipImport") + val spark: SparkSession = SparkSession.builder() .appName(ImportDatacite.getClass.getSimpleName) .master(master) @@ -69,7 +73,7 @@ object ImportDatacite { // Because of Maven conf.set("fs.hdfs.impl", classOf[DistributedFileSystem].getName) conf.set("fs.file.impl", classOf[LocalFileSystem].getName) - val sc:SparkContext = spark.sparkContext + val sc: SparkContext = spark.sparkContext sc.setLogLevel("ERROR") import spark.implicits._ @@ -84,14 +88,14 @@ object ImportDatacite { return a if (a == null) return b - if(a.timestamp >b.timestamp) { + if (a.timestamp > b.timestamp) { return a } b } override def merge(a: DataciteType, b: DataciteType): DataciteType = { - reduce(a,b) + reduce(a, b) } override def bufferEncoder: Encoder[DataciteType] = implicitly[Encoder[DataciteType]] @@ -101,69 +105,77 @@ object ImportDatacite { override def finish(reduction: DataciteType): DataciteType = reduction } - val dump:Dataset[DataciteType] = spark.read.load(dataciteDump).as[DataciteType] + val dump: Dataset[DataciteType] = spark.read.load(dataciteDump).as[DataciteType] val ts = dump.select(max("timestamp")).first().getLong(0) - log.info(s"last Timestamp is $ts") + println(s"last Timestamp is $ts") - val cnt = writeSequenceFile(hdfsTargetPath, ts, conf) + val cnt = if ("true".equalsIgnoreCase(spkipImport)) 1 else writeSequenceFile(hdfsTargetPath, ts, conf) + println(s"Imported from Datacite API $cnt documents") - log.info(s"Imported from Datacite API $cnt documents") + if (cnt > 0) { - if (cnt > 0) { - - val inputRdd:RDD[DataciteType] = sc.sequenceFile(targetPath, classOf[Int], classOf[Text]) + val inputRdd: RDD[DataciteType] = sc.sequenceFile(targetPath, classOf[Int], classOf[Text]) .map(s => s._2.toString) .map(s => convertAPIStringToDataciteItem(s)) spark.createDataset(inputRdd).write.mode(SaveMode.Overwrite).save(s"${targetPath}_dataset") - val ds:Dataset[DataciteType] = spark.read.load(s"${targetPath}_dataset").as[DataciteType] + val ds: Dataset[DataciteType] = spark.read.load(s"${targetPath}_dataset").as[DataciteType] dump .union(ds) .groupByKey(_.doi) .agg(dataciteAggregator.toColumn) - .map(s=>s._2) + .map(s => s._2) .repartition(4000) .write.mode(SaveMode.Overwrite).save(s"${dataciteDump}_updated") val fs = FileSystem.get(sc.hadoopConfiguration) fs.delete(new Path(s"$dataciteDump"), true) - fs.rename(new Path(s"${dataciteDump}_updated"),new Path(s"$dataciteDump")) + fs.rename(new Path(s"${dataciteDump}_updated"), new Path(s"$dataciteDump")) } } - private def writeSequenceFile(hdfsTargetPath: Path, timestamp: Long, conf: Configuration):Long = { - val client = new DataciteAPIImporter(timestamp*1000, 1000) + private def writeSequenceFile(hdfsTargetPath: Path, timestamp: Long, conf: Configuration): Long = { + var from:Long = timestamp * 1000 + val delta:Long = 50000000L + var client: DataciteAPIImporter = null + val now :Long =System.currentTimeMillis() var i = 0 try { val writer = SequenceFile.createWriter(conf, SequenceFile.Writer.file(hdfsTargetPath), SequenceFile.Writer.keyClass(classOf[IntWritable]), SequenceFile.Writer.valueClass(classOf[Text])) try { - var start: Long = System.currentTimeMillis - var end: Long = 0 - val key: IntWritable = new IntWritable(i) - val value: Text = new Text - while ( { - client.hasNext - }) { - key.set({ - i += 1; - i - 1 - }) - value.set(client.next()) - writer.append(key, value) - writer.hflush() - if (i % 1000 == 0) { - end = System.currentTimeMillis - val time = (end - start) / 1000.0F - println(s"Imported $i in $time seconds") - start = System.currentTimeMillis + while (from < now) { + client = new DataciteAPIImporter(from, 1000, from + delta) + var end: Long = 0 + val key: IntWritable = new IntWritable(i) + val value: Text = new Text + while (client.hasNext) { + key.set({ + i += 1; + i - 1 + }) + value.set(client.next()) + writer.append(key, value) + writer.hflush() + if (i % 1000 == 0) { + end = System.currentTimeMillis + val time = (end - start) / 1000.0F + println(s"Imported $i in $time seconds") + start = System.currentTimeMillis + } } + println(s"updating from value: $from -> ${from+delta}") + from = from + delta } + } catch { + case e: Throwable => + println("Error", e) } finally if (writer != null) writer.close() } i } + } \ No newline at end of file diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/datacite/import_from_api.json b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/datacite/import_from_api.json index 967e4445a6..69fb039ba8 100644 --- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/datacite/import_from_api.json +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/datacite/import_from_api.json @@ -12,6 +12,12 @@ "paramDescription": "the path of the Datacite dump", "paramRequired": true }, + { + "paramName": "s", + "paramLongName": "skipImport", + "paramDescription": "avoid to downlaod new items but apply the previous update", + "paramRequired": false + }, { "paramName": "n", "paramLongName": "namenode", diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/datacite/oozie_app/workflow.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/datacite/oozie_app/workflow.xml index 047794c9c1..15378c6c7b 100644 --- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/datacite/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/datacite/oozie_app/workflow.xml @@ -13,6 +13,11 @@ nativeInputPath the path of the input MDStore + + skipimport + false + the path of the input MDStore + @@ -51,6 +56,7 @@ -t${nativeInputPath} -d${mdstoreInputPath} -n${nameNode} + -s${skipimport} --masteryarn-cluster @@ -81,7 +87,7 @@ -tr${isLookupUrl} --masteryarn-cluster - + diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/SparkGenerateDOIBoostActionSet.scala b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/SparkGenerateDOIBoostActionSet.scala index 21d3454da7..3bfca0859b 100644 --- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/SparkGenerateDOIBoostActionSet.scala +++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/SparkGenerateDOIBoostActionSet.scala @@ -57,7 +57,7 @@ object SparkGenerateDOIBoostActionSet { val asCRelation = spark.read.load(crossRefRelation).as[Relation] - .filter(r => r!= null || (r.getSource != null && r.getTarget != null)) + .filter(r => r!= null && r.getSource != null && r.getTarget != null) .map(d=>DoiBoostMappingUtil.toActionSet(d))(Encoders.tuple(Encoders.STRING, Encoders.STRING)) diff --git a/dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/doiboost/crossref/CrossrefMappingTest.scala b/dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/doiboost/crossref/CrossrefMappingTest.scala index 4568e23a57..cc112528e6 100644 --- a/dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/doiboost/crossref/CrossrefMappingTest.scala +++ b/dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/doiboost/crossref/CrossrefMappingTest.scala @@ -59,6 +59,17 @@ class CrossrefMappingTest { } + @Test + def testSum() :Unit = { + val from:Long = 1613135645000L + val delta:Long = 1000000L + + + println(s"updating from value: $from -> ${from+delta}") + + + } + @Test def testOrcidID() :Unit = { val json = Source.fromInputStream(getClass.getResourceAsStream("orcid_data.json")).mkString