diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/datacite/AbstractRestClient.scala b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/datacite/AbstractRestClient.scala
index 3c77700757..8df2032830 100644
--- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/datacite/AbstractRestClient.scala
+++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/datacite/AbstractRestClient.scala
@@ -57,10 +57,13 @@ abstract class AbstractRestClient extends Iterator[String]{
private def doHTTPRequest[A <: HttpUriRequest](r: A) :String ={
val client = HttpClients.createDefault
+ var tries = 4
try {
- var tries = 4
while (tries > 0) {
+
+ println(s"requesting ${r.getURI}")
val response = client.execute(r)
+ println(s"get response with status${response.getStatusLine.getStatusCode}")
if (response.getStatusLine.getStatusCode > 400) {
tries -= 1
}
diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/datacite/DataciteAPIImporter.scala b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/datacite/DataciteAPIImporter.scala
index c2ad6855cb..36ec9e8c33 100644
--- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/datacite/DataciteAPIImporter.scala
+++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/datacite/DataciteAPIImporter.scala
@@ -3,7 +3,7 @@ package eu.dnetlib.dhp.actionmanager.datacite
import org.json4s.{DefaultFormats, JValue}
import org.json4s.jackson.JsonMethods.{compact, parse, render}
-class DataciteAPIImporter(timestamp: Long = 0, blocks: Long = 10) extends AbstractRestClient {
+class DataciteAPIImporter(timestamp: Long = 0, blocks: Long = 10, until:Long = -1) extends AbstractRestClient {
override def extractInfo(input: String): Unit = {
implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats
@@ -16,9 +16,15 @@ class DataciteAPIImporter(timestamp: Long = 0, blocks: Long = 10) extends Abstra
current_index = 0
}
+ def get_url():String ={
+ val to = if (until> 0) s"$until" else "*"
+ s"https://api.datacite.org/dois?page[cursor]=1&page[size]=$blocks&query=updated:[$timestamp%20TO%20$to]"
+
+ }
+
override def getBufferData(): Unit = {
if (!complete) {
- val response = if (scroll_value.isDefined) doHTTPGETRequest(scroll_value.get) else doHTTPGETRequest(s"https://api.datacite.org/dois?page[cursor]=1&page[size]=$blocks&query=updated:[$timestamp%20TO%20*]")
+ val response = if (scroll_value.isDefined) doHTTPGETRequest(scroll_value.get) else doHTTPGETRequest(get_url())
extractInfo(response)
}
}
diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/datacite/DataciteToOAFTransformation.scala b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/datacite/DataciteToOAFTransformation.scala
index 1ae1f086e3..1776a4ad68 100644
--- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/datacite/DataciteToOAFTransformation.scala
+++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/datacite/DataciteToOAFTransformation.scala
@@ -164,9 +164,8 @@ object DataciteToOAFTransformation {
case _: Throwable => try {
return Some(LocalDate.parse(a_date, df_it).toString)
} catch {
- case _: Throwable => try {
+ case _: Throwable =>
return None
- }
}
}
}
diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/datacite/ImportDatacite.scala b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/datacite/ImportDatacite.scala
index d5edb674a6..6cec4ea344 100644
--- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/datacite/ImportDatacite.scala
+++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/datacite/ImportDatacite.scala
@@ -1,5 +1,6 @@
package eu.dnetlib.dhp.actionmanager.datacite
+import eu.dnetlib.dhp.actionmanager.datacite.DataciteToOAFTransformation.df_it
import eu.dnetlib.dhp.application.ArgumentApplicationParser
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, LocalFileSystem, Path}
@@ -15,7 +16,7 @@ import org.apache.spark.sql.functions.max
import org.slf4j.{Logger, LoggerFactory}
import java.time.format.DateTimeFormatter._
-import java.time.{LocalDateTime, ZoneOffset}
+import java.time.{LocalDate, LocalDateTime, ZoneOffset}
import scala.io.Source
object ImportDatacite {
@@ -23,21 +24,20 @@ object ImportDatacite {
val log: Logger = LoggerFactory.getLogger(ImportDatacite.getClass)
- def convertAPIStringToDataciteItem(input:String): DataciteType = {
+ def convertAPIStringToDataciteItem(input: String): DataciteType = {
implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats
lazy val json: org.json4s.JValue = parse(input)
val doi = (json \ "attributes" \ "doi").extract[String].toLowerCase
val isActive = (json \ "attributes" \ "isActive").extract[Boolean]
- val timestamp_string = (json \ "attributes" \ "updated").extract[String]
+ val timestamp_string = (json \ "attributes" \ "updated").extract[String]
val dt = LocalDateTime.parse(timestamp_string, ISO_DATE_TIME)
- DataciteType(doi = doi, timestamp = dt.toInstant(ZoneOffset.UTC).toEpochMilli/1000, isActive = isActive, json = input)
+ DataciteType(doi = doi, timestamp = dt.toInstant(ZoneOffset.UTC).toEpochMilli / 1000, isActive = isActive, json = input)
}
-
def main(args: Array[String]): Unit = {
val parser = new ArgumentApplicationParser(Source.fromInputStream(getClass.getResourceAsStream("/eu/dnetlib/dhp/actionmanager/datacite/import_from_api.json")).mkString)
@@ -53,9 +53,13 @@ object ImportDatacite {
val dataciteDump = parser.get("dataciteDumpPath")
log.info(s"dataciteDump is $dataciteDump")
- val hdfsTargetPath =new Path(targetPath)
+ val hdfsTargetPath = new Path(targetPath)
log.info(s"hdfsTargetPath is $hdfsTargetPath")
+
+ val spkipImport = parser.get("skipImport")
+ log.info(s"skipImport is $spkipImport")
+
val spark: SparkSession = SparkSession.builder()
.appName(ImportDatacite.getClass.getSimpleName)
.master(master)
@@ -69,7 +73,7 @@ object ImportDatacite {
// Because of Maven
conf.set("fs.hdfs.impl", classOf[DistributedFileSystem].getName)
conf.set("fs.file.impl", classOf[LocalFileSystem].getName)
- val sc:SparkContext = spark.sparkContext
+ val sc: SparkContext = spark.sparkContext
sc.setLogLevel("ERROR")
import spark.implicits._
@@ -84,14 +88,14 @@ object ImportDatacite {
return a
if (a == null)
return b
- if(a.timestamp >b.timestamp) {
+ if (a.timestamp > b.timestamp) {
return a
}
b
}
override def merge(a: DataciteType, b: DataciteType): DataciteType = {
- reduce(a,b)
+ reduce(a, b)
}
override def bufferEncoder: Encoder[DataciteType] = implicitly[Encoder[DataciteType]]
@@ -101,69 +105,77 @@ object ImportDatacite {
override def finish(reduction: DataciteType): DataciteType = reduction
}
- val dump:Dataset[DataciteType] = spark.read.load(dataciteDump).as[DataciteType]
+ val dump: Dataset[DataciteType] = spark.read.load(dataciteDump).as[DataciteType]
val ts = dump.select(max("timestamp")).first().getLong(0)
- log.info(s"last Timestamp is $ts")
+ println(s"last Timestamp is $ts")
- val cnt = writeSequenceFile(hdfsTargetPath, ts, conf)
+ val cnt = if ("true".equalsIgnoreCase(spkipImport)) 1 else writeSequenceFile(hdfsTargetPath, ts, conf)
+ println(s"Imported from Datacite API $cnt documents")
- log.info(s"Imported from Datacite API $cnt documents")
+ if (cnt > 0) {
- if (cnt > 0) {
-
- val inputRdd:RDD[DataciteType] = sc.sequenceFile(targetPath, classOf[Int], classOf[Text])
+ val inputRdd: RDD[DataciteType] = sc.sequenceFile(targetPath, classOf[Int], classOf[Text])
.map(s => s._2.toString)
.map(s => convertAPIStringToDataciteItem(s))
spark.createDataset(inputRdd).write.mode(SaveMode.Overwrite).save(s"${targetPath}_dataset")
- val ds:Dataset[DataciteType] = spark.read.load(s"${targetPath}_dataset").as[DataciteType]
+ val ds: Dataset[DataciteType] = spark.read.load(s"${targetPath}_dataset").as[DataciteType]
dump
.union(ds)
.groupByKey(_.doi)
.agg(dataciteAggregator.toColumn)
- .map(s=>s._2)
+ .map(s => s._2)
.repartition(4000)
.write.mode(SaveMode.Overwrite).save(s"${dataciteDump}_updated")
val fs = FileSystem.get(sc.hadoopConfiguration)
fs.delete(new Path(s"$dataciteDump"), true)
- fs.rename(new Path(s"${dataciteDump}_updated"),new Path(s"$dataciteDump"))
+ fs.rename(new Path(s"${dataciteDump}_updated"), new Path(s"$dataciteDump"))
}
}
- private def writeSequenceFile(hdfsTargetPath: Path, timestamp: Long, conf: Configuration):Long = {
- val client = new DataciteAPIImporter(timestamp*1000, 1000)
+ private def writeSequenceFile(hdfsTargetPath: Path, timestamp: Long, conf: Configuration): Long = {
+ var from:Long = timestamp * 1000
+ val delta:Long = 50000000L
+ var client: DataciteAPIImporter = null
+ val now :Long =System.currentTimeMillis()
var i = 0
try {
val writer = SequenceFile.createWriter(conf, SequenceFile.Writer.file(hdfsTargetPath), SequenceFile.Writer.keyClass(classOf[IntWritable]), SequenceFile.Writer.valueClass(classOf[Text]))
try {
-
var start: Long = System.currentTimeMillis
- var end: Long = 0
- val key: IntWritable = new IntWritable(i)
- val value: Text = new Text
- while ( {
- client.hasNext
- }) {
- key.set({
- i += 1;
- i - 1
- })
- value.set(client.next())
- writer.append(key, value)
- writer.hflush()
- if (i % 1000 == 0) {
- end = System.currentTimeMillis
- val time = (end - start) / 1000.0F
- println(s"Imported $i in $time seconds")
- start = System.currentTimeMillis
+ while (from < now) {
+ client = new DataciteAPIImporter(from, 1000, from + delta)
+ var end: Long = 0
+ val key: IntWritable = new IntWritable(i)
+ val value: Text = new Text
+ while (client.hasNext) {
+ key.set({
+ i += 1;
+ i - 1
+ })
+ value.set(client.next())
+ writer.append(key, value)
+ writer.hflush()
+ if (i % 1000 == 0) {
+ end = System.currentTimeMillis
+ val time = (end - start) / 1000.0F
+ println(s"Imported $i in $time seconds")
+ start = System.currentTimeMillis
+ }
}
+ println(s"updating from value: $from -> ${from+delta}")
+ from = from + delta
}
+ } catch {
+ case e: Throwable =>
+ println("Error", e)
} finally if (writer != null) writer.close()
}
i
}
+
}
\ No newline at end of file
diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/datacite/import_from_api.json b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/datacite/import_from_api.json
index 967e4445a6..69fb039ba8 100644
--- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/datacite/import_from_api.json
+++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/datacite/import_from_api.json
@@ -12,6 +12,12 @@
"paramDescription": "the path of the Datacite dump",
"paramRequired": true
},
+ {
+ "paramName": "s",
+ "paramLongName": "skipImport",
+ "paramDescription": "avoid to downlaod new items but apply the previous update",
+ "paramRequired": false
+ },
{
"paramName": "n",
"paramLongName": "namenode",
diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/datacite/oozie_app/workflow.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/datacite/oozie_app/workflow.xml
index 047794c9c1..15378c6c7b 100644
--- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/datacite/oozie_app/workflow.xml
+++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/datacite/oozie_app/workflow.xml
@@ -13,6 +13,11 @@
nativeInputPath
the path of the input MDStore
+
+ skipimport
+ false
+ the path of the input MDStore
+
@@ -51,6 +56,7 @@
-t${nativeInputPath}
-d${mdstoreInputPath}
-n${nameNode}
+ -s${skipimport}
--masteryarn-cluster
@@ -81,7 +87,7 @@
-tr${isLookupUrl}
--masteryarn-cluster
-
+
diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/SparkGenerateDOIBoostActionSet.scala b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/SparkGenerateDOIBoostActionSet.scala
index 21d3454da7..3bfca0859b 100644
--- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/SparkGenerateDOIBoostActionSet.scala
+++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/SparkGenerateDOIBoostActionSet.scala
@@ -57,7 +57,7 @@ object SparkGenerateDOIBoostActionSet {
val asCRelation = spark.read.load(crossRefRelation).as[Relation]
- .filter(r => r!= null || (r.getSource != null && r.getTarget != null))
+ .filter(r => r!= null && r.getSource != null && r.getTarget != null)
.map(d=>DoiBoostMappingUtil.toActionSet(d))(Encoders.tuple(Encoders.STRING, Encoders.STRING))
diff --git a/dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/doiboost/crossref/CrossrefMappingTest.scala b/dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/doiboost/crossref/CrossrefMappingTest.scala
index 4568e23a57..cc112528e6 100644
--- a/dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/doiboost/crossref/CrossrefMappingTest.scala
+++ b/dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/doiboost/crossref/CrossrefMappingTest.scala
@@ -59,6 +59,17 @@ class CrossrefMappingTest {
}
+ @Test
+ def testSum() :Unit = {
+ val from:Long = 1613135645000L
+ val delta:Long = 1000000L
+
+
+ println(s"updating from value: $from -> ${from+delta}")
+
+
+ }
+
@Test
def testOrcidID() :Unit = {
val json = Source.fromInputStream(getClass.getResourceAsStream("orcid_data.json")).mkString