improved workflow importing datacite

This commit is contained in:
Sandro La Bruzzo 2021-03-26 13:56:29 +01:00
parent b5b7dc2104
commit 1dfda3624e
8 changed files with 90 additions and 47 deletions

View File

@ -57,10 +57,13 @@ abstract class AbstractRestClient extends Iterator[String]{
private def doHTTPRequest[A <: HttpUriRequest](r: A) :String ={ private def doHTTPRequest[A <: HttpUriRequest](r: A) :String ={
val client = HttpClients.createDefault val client = HttpClients.createDefault
var tries = 4
try { try {
var tries = 4
while (tries > 0) { while (tries > 0) {
println(s"requesting ${r.getURI}")
val response = client.execute(r) val response = client.execute(r)
println(s"get response with status${response.getStatusLine.getStatusCode}")
if (response.getStatusLine.getStatusCode > 400) { if (response.getStatusLine.getStatusCode > 400) {
tries -= 1 tries -= 1
} }

View File

@ -3,7 +3,7 @@ package eu.dnetlib.dhp.actionmanager.datacite
import org.json4s.{DefaultFormats, JValue} import org.json4s.{DefaultFormats, JValue}
import org.json4s.jackson.JsonMethods.{compact, parse, render} import org.json4s.jackson.JsonMethods.{compact, parse, render}
class DataciteAPIImporter(timestamp: Long = 0, blocks: Long = 10) extends AbstractRestClient { class DataciteAPIImporter(timestamp: Long = 0, blocks: Long = 10, until:Long = -1) extends AbstractRestClient {
override def extractInfo(input: String): Unit = { override def extractInfo(input: String): Unit = {
implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats
@ -16,9 +16,15 @@ class DataciteAPIImporter(timestamp: Long = 0, blocks: Long = 10) extends Abstra
current_index = 0 current_index = 0
} }
def get_url():String ={
val to = if (until> 0) s"$until" else "*"
s"https://api.datacite.org/dois?page[cursor]=1&page[size]=$blocks&query=updated:[$timestamp%20TO%20$to]"
}
override def getBufferData(): Unit = { override def getBufferData(): Unit = {
if (!complete) { if (!complete) {
val response = if (scroll_value.isDefined) doHTTPGETRequest(scroll_value.get) else doHTTPGETRequest(s"https://api.datacite.org/dois?page[cursor]=1&page[size]=$blocks&query=updated:[$timestamp%20TO%20*]") val response = if (scroll_value.isDefined) doHTTPGETRequest(scroll_value.get) else doHTTPGETRequest(get_url())
extractInfo(response) extractInfo(response)
} }
} }

View File

@ -164,9 +164,8 @@ object DataciteToOAFTransformation {
case _: Throwable => try { case _: Throwable => try {
return Some(LocalDate.parse(a_date, df_it).toString) return Some(LocalDate.parse(a_date, df_it).toString)
} catch { } catch {
case _: Throwable => try { case _: Throwable =>
return None return None
}
} }
} }
} }

View File

@ -1,5 +1,6 @@
package eu.dnetlib.dhp.actionmanager.datacite package eu.dnetlib.dhp.actionmanager.datacite
import eu.dnetlib.dhp.actionmanager.datacite.DataciteToOAFTransformation.df_it
import eu.dnetlib.dhp.application.ArgumentApplicationParser import eu.dnetlib.dhp.application.ArgumentApplicationParser
import org.apache.hadoop.conf.Configuration import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, LocalFileSystem, Path} import org.apache.hadoop.fs.{FileSystem, LocalFileSystem, Path}
@ -15,7 +16,7 @@ import org.apache.spark.sql.functions.max
import org.slf4j.{Logger, LoggerFactory} import org.slf4j.{Logger, LoggerFactory}
import java.time.format.DateTimeFormatter._ import java.time.format.DateTimeFormatter._
import java.time.{LocalDateTime, ZoneOffset} import java.time.{LocalDate, LocalDateTime, ZoneOffset}
import scala.io.Source import scala.io.Source
object ImportDatacite { object ImportDatacite {
@ -23,21 +24,20 @@ object ImportDatacite {
val log: Logger = LoggerFactory.getLogger(ImportDatacite.getClass) val log: Logger = LoggerFactory.getLogger(ImportDatacite.getClass)
def convertAPIStringToDataciteItem(input:String): DataciteType = { def convertAPIStringToDataciteItem(input: String): DataciteType = {
implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats
lazy val json: org.json4s.JValue = parse(input) lazy val json: org.json4s.JValue = parse(input)
val doi = (json \ "attributes" \ "doi").extract[String].toLowerCase val doi = (json \ "attributes" \ "doi").extract[String].toLowerCase
val isActive = (json \ "attributes" \ "isActive").extract[Boolean] val isActive = (json \ "attributes" \ "isActive").extract[Boolean]
val timestamp_string = (json \ "attributes" \ "updated").extract[String] val timestamp_string = (json \ "attributes" \ "updated").extract[String]
val dt = LocalDateTime.parse(timestamp_string, ISO_DATE_TIME) val dt = LocalDateTime.parse(timestamp_string, ISO_DATE_TIME)
DataciteType(doi = doi, timestamp = dt.toInstant(ZoneOffset.UTC).toEpochMilli/1000, isActive = isActive, json = input) DataciteType(doi = doi, timestamp = dt.toInstant(ZoneOffset.UTC).toEpochMilli / 1000, isActive = isActive, json = input)
} }
def main(args: Array[String]): Unit = { def main(args: Array[String]): Unit = {
val parser = new ArgumentApplicationParser(Source.fromInputStream(getClass.getResourceAsStream("/eu/dnetlib/dhp/actionmanager/datacite/import_from_api.json")).mkString) val parser = new ArgumentApplicationParser(Source.fromInputStream(getClass.getResourceAsStream("/eu/dnetlib/dhp/actionmanager/datacite/import_from_api.json")).mkString)
@ -53,9 +53,13 @@ object ImportDatacite {
val dataciteDump = parser.get("dataciteDumpPath") val dataciteDump = parser.get("dataciteDumpPath")
log.info(s"dataciteDump is $dataciteDump") log.info(s"dataciteDump is $dataciteDump")
val hdfsTargetPath =new Path(targetPath) val hdfsTargetPath = new Path(targetPath)
log.info(s"hdfsTargetPath is $hdfsTargetPath") log.info(s"hdfsTargetPath is $hdfsTargetPath")
val spkipImport = parser.get("skipImport")
log.info(s"skipImport is $spkipImport")
val spark: SparkSession = SparkSession.builder() val spark: SparkSession = SparkSession.builder()
.appName(ImportDatacite.getClass.getSimpleName) .appName(ImportDatacite.getClass.getSimpleName)
.master(master) .master(master)
@ -69,7 +73,7 @@ object ImportDatacite {
// Because of Maven // Because of Maven
conf.set("fs.hdfs.impl", classOf[DistributedFileSystem].getName) conf.set("fs.hdfs.impl", classOf[DistributedFileSystem].getName)
conf.set("fs.file.impl", classOf[LocalFileSystem].getName) conf.set("fs.file.impl", classOf[LocalFileSystem].getName)
val sc:SparkContext = spark.sparkContext val sc: SparkContext = spark.sparkContext
sc.setLogLevel("ERROR") sc.setLogLevel("ERROR")
import spark.implicits._ import spark.implicits._
@ -84,14 +88,14 @@ object ImportDatacite {
return a return a
if (a == null) if (a == null)
return b return b
if(a.timestamp >b.timestamp) { if (a.timestamp > b.timestamp) {
return a return a
} }
b b
} }
override def merge(a: DataciteType, b: DataciteType): DataciteType = { override def merge(a: DataciteType, b: DataciteType): DataciteType = {
reduce(a,b) reduce(a, b)
} }
override def bufferEncoder: Encoder[DataciteType] = implicitly[Encoder[DataciteType]] override def bufferEncoder: Encoder[DataciteType] = implicitly[Encoder[DataciteType]]
@ -101,69 +105,77 @@ object ImportDatacite {
override def finish(reduction: DataciteType): DataciteType = reduction override def finish(reduction: DataciteType): DataciteType = reduction
} }
val dump:Dataset[DataciteType] = spark.read.load(dataciteDump).as[DataciteType] val dump: Dataset[DataciteType] = spark.read.load(dataciteDump).as[DataciteType]
val ts = dump.select(max("timestamp")).first().getLong(0) val ts = dump.select(max("timestamp")).first().getLong(0)
log.info(s"last Timestamp is $ts") println(s"last Timestamp is $ts")
val cnt = writeSequenceFile(hdfsTargetPath, ts, conf) val cnt = if ("true".equalsIgnoreCase(spkipImport)) 1 else writeSequenceFile(hdfsTargetPath, ts, conf)
println(s"Imported from Datacite API $cnt documents")
log.info(s"Imported from Datacite API $cnt documents") if (cnt > 0) {
if (cnt > 0) { val inputRdd: RDD[DataciteType] = sc.sequenceFile(targetPath, classOf[Int], classOf[Text])
val inputRdd:RDD[DataciteType] = sc.sequenceFile(targetPath, classOf[Int], classOf[Text])
.map(s => s._2.toString) .map(s => s._2.toString)
.map(s => convertAPIStringToDataciteItem(s)) .map(s => convertAPIStringToDataciteItem(s))
spark.createDataset(inputRdd).write.mode(SaveMode.Overwrite).save(s"${targetPath}_dataset") spark.createDataset(inputRdd).write.mode(SaveMode.Overwrite).save(s"${targetPath}_dataset")
val ds:Dataset[DataciteType] = spark.read.load(s"${targetPath}_dataset").as[DataciteType] val ds: Dataset[DataciteType] = spark.read.load(s"${targetPath}_dataset").as[DataciteType]
dump dump
.union(ds) .union(ds)
.groupByKey(_.doi) .groupByKey(_.doi)
.agg(dataciteAggregator.toColumn) .agg(dataciteAggregator.toColumn)
.map(s=>s._2) .map(s => s._2)
.repartition(4000) .repartition(4000)
.write.mode(SaveMode.Overwrite).save(s"${dataciteDump}_updated") .write.mode(SaveMode.Overwrite).save(s"${dataciteDump}_updated")
val fs = FileSystem.get(sc.hadoopConfiguration) val fs = FileSystem.get(sc.hadoopConfiguration)
fs.delete(new Path(s"$dataciteDump"), true) fs.delete(new Path(s"$dataciteDump"), true)
fs.rename(new Path(s"${dataciteDump}_updated"),new Path(s"$dataciteDump")) fs.rename(new Path(s"${dataciteDump}_updated"), new Path(s"$dataciteDump"))
} }
} }
private def writeSequenceFile(hdfsTargetPath: Path, timestamp: Long, conf: Configuration):Long = { private def writeSequenceFile(hdfsTargetPath: Path, timestamp: Long, conf: Configuration): Long = {
val client = new DataciteAPIImporter(timestamp*1000, 1000) var from:Long = timestamp * 1000
val delta:Long = 50000000L
var client: DataciteAPIImporter = null
val now :Long =System.currentTimeMillis()
var i = 0 var i = 0
try { try {
val writer = SequenceFile.createWriter(conf, SequenceFile.Writer.file(hdfsTargetPath), SequenceFile.Writer.keyClass(classOf[IntWritable]), SequenceFile.Writer.valueClass(classOf[Text])) val writer = SequenceFile.createWriter(conf, SequenceFile.Writer.file(hdfsTargetPath), SequenceFile.Writer.keyClass(classOf[IntWritable]), SequenceFile.Writer.valueClass(classOf[Text]))
try { try {
var start: Long = System.currentTimeMillis var start: Long = System.currentTimeMillis
var end: Long = 0 while (from < now) {
val key: IntWritable = new IntWritable(i) client = new DataciteAPIImporter(from, 1000, from + delta)
val value: Text = new Text var end: Long = 0
while ( { val key: IntWritable = new IntWritable(i)
client.hasNext val value: Text = new Text
}) { while (client.hasNext) {
key.set({ key.set({
i += 1; i += 1;
i - 1 i - 1
}) })
value.set(client.next()) value.set(client.next())
writer.append(key, value) writer.append(key, value)
writer.hflush() writer.hflush()
if (i % 1000 == 0) { if (i % 1000 == 0) {
end = System.currentTimeMillis end = System.currentTimeMillis
val time = (end - start) / 1000.0F val time = (end - start) / 1000.0F
println(s"Imported $i in $time seconds") println(s"Imported $i in $time seconds")
start = System.currentTimeMillis start = System.currentTimeMillis
}
} }
println(s"updating from value: $from -> ${from+delta}")
from = from + delta
} }
} catch {
case e: Throwable =>
println("Error", e)
} finally if (writer != null) writer.close() } finally if (writer != null) writer.close()
} }
i i
} }
} }

View File

@ -12,6 +12,12 @@
"paramDescription": "the path of the Datacite dump", "paramDescription": "the path of the Datacite dump",
"paramRequired": true "paramRequired": true
}, },
{
"paramName": "s",
"paramLongName": "skipImport",
"paramDescription": "avoid to downlaod new items but apply the previous update",
"paramRequired": false
},
{ {
"paramName": "n", "paramName": "n",
"paramLongName": "namenode", "paramLongName": "namenode",

View File

@ -13,6 +13,11 @@
<name>nativeInputPath</name> <name>nativeInputPath</name>
<description>the path of the input MDStore</description> <description>the path of the input MDStore</description>
</property> </property>
<property>
<name>skipimport</name>
<value>false</value>
<description>the path of the input MDStore</description>
</property>
</parameters> </parameters>
@ -51,6 +56,7 @@
<arg>-t</arg><arg>${nativeInputPath}</arg> <arg>-t</arg><arg>${nativeInputPath}</arg>
<arg>-d</arg><arg>${mdstoreInputPath}</arg> <arg>-d</arg><arg>${mdstoreInputPath}</arg>
<arg>-n</arg><arg>${nameNode}</arg> <arg>-n</arg><arg>${nameNode}</arg>
<arg>-s</arg><arg>${skipimport}</arg>
<arg>--master</arg><arg>yarn-cluster</arg> <arg>--master</arg><arg>yarn-cluster</arg>
</spark> </spark>
<ok to="TransformJob"/> <ok to="TransformJob"/>
@ -81,7 +87,7 @@
<arg>-tr</arg><arg>${isLookupUrl}</arg> <arg>-tr</arg><arg>${isLookupUrl}</arg>
<arg>--master</arg><arg>yarn-cluster</arg> <arg>--master</arg><arg>yarn-cluster</arg>
</spark> </spark>
<ok to="DeletePathIfExists"/> <ok to="End"/>
<error to="Kill"/> <error to="Kill"/>
</action> </action>

View File

@ -57,7 +57,7 @@ object SparkGenerateDOIBoostActionSet {
val asCRelation = spark.read.load(crossRefRelation).as[Relation] val asCRelation = spark.read.load(crossRefRelation).as[Relation]
.filter(r => r!= null || (r.getSource != null && r.getTarget != null)) .filter(r => r!= null && r.getSource != null && r.getTarget != null)
.map(d=>DoiBoostMappingUtil.toActionSet(d))(Encoders.tuple(Encoders.STRING, Encoders.STRING)) .map(d=>DoiBoostMappingUtil.toActionSet(d))(Encoders.tuple(Encoders.STRING, Encoders.STRING))

View File

@ -59,6 +59,17 @@ class CrossrefMappingTest {
} }
@Test
def testSum() :Unit = {
val from:Long = 1613135645000L
val delta:Long = 1000000L
println(s"updating from value: $from -> ${from+delta}")
}
@Test @Test
def testOrcidID() :Unit = { def testOrcidID() :Unit = {
val json = Source.fromInputStream(getClass.getResourceAsStream("orcid_data.json")).mkString val json = Source.fromInputStream(getClass.getResourceAsStream("orcid_data.json")).mkString