1
0
Fork 0

code refactor renamed datacite package

This commit is contained in:
Sandro La Bruzzo 2021-10-20 17:37:42 +02:00
parent ab3a99d3e9
commit aeeebd573b
19 changed files with 36 additions and 130 deletions

View File

@ -1,41 +0,0 @@
package eu.dnetlib.dhp.actionmanager.datacite
import eu.dnetlib.dhp.application.ArgumentApplicationParser
import eu.dnetlib.dhp.schema.oaf.Oaf
import org.apache.hadoop.io.Text
import org.apache.hadoop.io.compress.GzipCodec
import org.apache.hadoop.mapred.SequenceFileOutputFormat
import org.apache.spark.SparkConf
import org.apache.spark.sql.{Dataset, Encoder, Encoders, SaveMode, SparkSession}
import org.slf4j.{Logger, LoggerFactory}
import scala.io.Source
object ExportActionSetJobNode {
val log: Logger = LoggerFactory.getLogger(ExportActionSetJobNode.getClass)
def main(args: Array[String]): Unit = {
val conf = new SparkConf
val parser = new ArgumentApplicationParser(Source.fromInputStream(getClass.getResourceAsStream("/eu/dnetlib/dhp/actionmanager/datacite/exportDataset_parameters.json")).mkString)
parser.parseArgument(args)
val master = parser.get("master")
val sourcePath = parser.get("sourcePath")
val targetPath = parser.get("targetPath")
val spark: SparkSession = SparkSession.builder().config(conf)
.appName(ExportActionSetJobNode.getClass.getSimpleName)
.master(master)
.getOrCreate()
implicit val resEncoder: Encoder[Oaf] = Encoders.kryo[Oaf]
implicit val tEncoder:Encoder[(String,String)] = Encoders.tuple(Encoders.STRING,Encoders.STRING)
spark.read.load(sourcePath).as[Oaf]
.map(o =>DataciteToOAFTransformation.toActionSet(o))
.filter(o => o!= null)
.rdd.map(s => (new Text(s._1), new Text(s._2))).saveAsHadoopFile(s"$targetPath", classOf[Text], classOf[Text], classOf[SequenceFileOutputFormat[Text,Text]], classOf[GzipCodec])
}
}

View File

@ -1,46 +0,0 @@
package eu.dnetlib.dhp.actionmanager.datacite
import eu.dnetlib.dhp.application.ArgumentApplicationParser
import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup
import eu.dnetlib.dhp.schema.mdstore.MetadataRecord
import eu.dnetlib.dhp.schema.oaf.{Oaf, Result}
import eu.dnetlib.dhp.utils.ISLookupClientFactory
import org.apache.spark.SparkConf
import org.apache.spark.sql.{Dataset, Encoder, Encoders, SaveMode, SparkSession}
import org.slf4j.{Logger, LoggerFactory}
import scala.io.Source
object FilterCrossrefEntitiesSpark {
val log: Logger = LoggerFactory.getLogger(getClass.getClass)
def main(args: Array[String]): Unit = {
val conf = new SparkConf
val parser = new ArgumentApplicationParser(Source.fromInputStream(getClass.getResourceAsStream("/eu/dnetlib/dhp/actionmanager/datacite/filter_crossref_param.json")).mkString)
parser.parseArgument(args)
val master = parser.get("master")
val sourcePath = parser.get("sourcePath")
log.info("sourcePath: {}", sourcePath)
val targetPath = parser.get("targetPath")
log.info("targetPath: {}", targetPath)
val spark: SparkSession = SparkSession.builder().config(conf)
.appName(getClass.getSimpleName)
.master(master)
.getOrCreate()
implicit val oafEncoder: Encoder[Oaf] = Encoders.kryo[Oaf]
implicit val resEncoder: Encoder[Result] = Encoders.kryo[Result]
val d:Dataset[Oaf]= spark.read.load(sourcePath).as[Oaf]
d.filter(r => r.isInstanceOf[Result]).map(r => r.asInstanceOf[Result]).write.mode(SaveMode.Overwrite).save(targetPath)
}
}

View File

@ -1,12 +1,10 @@
package eu.dnetlib.dhp.actionmanager.datacite package eu.dnetlib.dhp.datacite
import org.apache.commons.io.IOUtils import org.apache.commons.io.IOUtils
import org.apache.http.client.config.RequestConfig import org.apache.http.client.config.RequestConfig
import org.apache.http.client.methods.{HttpGet, HttpPost, HttpRequestBase, HttpUriRequest} import org.apache.http.client.methods.{HttpGet, HttpPost, HttpUriRequest}
import org.apache.http.entity.StringEntity import org.apache.http.entity.StringEntity
import org.apache.http.impl.client.{HttpClientBuilder, HttpClients} import org.apache.http.impl.client.HttpClientBuilder
import java.io.IOException
abstract class AbstractRestClient extends Iterator[String] { abstract class AbstractRestClient extends Iterator[String] {

View File

@ -1,7 +1,7 @@
package eu.dnetlib.dhp.actionmanager.datacite package eu.dnetlib.dhp.datacite
import org.json4s.{DefaultFormats, JValue}
import org.json4s.jackson.JsonMethods.{compact, parse, render} import org.json4s.jackson.JsonMethods.{compact, parse, render}
import org.json4s.{DefaultFormats, JValue}
class DataciteAPIImporter(timestamp: Long = 0, blocks: Long = 10, until:Long = -1) extends AbstractRestClient { class DataciteAPIImporter(timestamp: Long = 0, blocks: Long = 10, until:Long = -1) extends AbstractRestClient {

View File

@ -1,4 +1,4 @@
package eu.dnetlib.dhp.actionmanager.datacite package eu.dnetlib.dhp.datacite
import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.jackson.databind.ObjectMapper
import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup

View File

@ -1,10 +1,11 @@
package eu.dnetlib.dhp.actionmanager.datacite package eu.dnetlib.dhp.datacite
import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.jackson.databind.ObjectMapper
import eu.dnetlib.dhp.application.ArgumentApplicationParser import eu.dnetlib.dhp.application.ArgumentApplicationParser
import eu.dnetlib.dhp.common.Constants.{MDSTORE_DATA_PATH, MDSTORE_SIZE_PATH}
import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup
import eu.dnetlib.dhp.collection.CollectionUtils.fixRelations import eu.dnetlib.dhp.collection.CollectionUtils.fixRelations
import eu.dnetlib.dhp.common.Constants.MDSTORE_DATA_PATH
import eu.dnetlib.dhp.common.Constants.MDSTORE_SIZE_PATH
import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup
import eu.dnetlib.dhp.schema.mdstore.{MDStoreVersion, MetadataRecord} import eu.dnetlib.dhp.schema.mdstore.{MDStoreVersion, MetadataRecord}
import eu.dnetlib.dhp.schema.oaf.Oaf import eu.dnetlib.dhp.schema.oaf.Oaf
import eu.dnetlib.dhp.utils.DHPUtils.writeHdfsFile import eu.dnetlib.dhp.utils.DHPUtils.writeHdfsFile
@ -21,7 +22,7 @@ object GenerateDataciteDatasetSpark {
def main(args: Array[String]): Unit = { def main(args: Array[String]): Unit = {
val conf = new SparkConf val conf = new SparkConf
val parser = new ArgumentApplicationParser(Source.fromInputStream(getClass.getResourceAsStream("/eu/dnetlib/dhp/actionmanager/datacite/generate_dataset_params.json")).mkString) val parser = new ArgumentApplicationParser(Source.fromInputStream(getClass.getResourceAsStream("/eu/dnetlib/dhp/datacite/generate_dataset_params.json")).mkString)
parser.parseArgument(args) parser.parseArgument(args)
val master = parser.get("master") val master = parser.get("master")
val sourcePath = parser.get("sourcePath") val sourcePath = parser.get("sourcePath")
@ -36,12 +37,12 @@ object GenerateDataciteDatasetSpark {
.master(master) .master(master)
.getOrCreate() .getOrCreate()
import spark.implicits._
implicit val mrEncoder: Encoder[MetadataRecord] = Encoders.kryo[MetadataRecord] implicit val mrEncoder: Encoder[MetadataRecord] = Encoders.kryo[MetadataRecord]
implicit val resEncoder: Encoder[Oaf] = Encoders.kryo[Oaf] implicit val resEncoder: Encoder[Oaf] = Encoders.kryo[Oaf]
import spark.implicits._
val mdstoreOutputVersion = parser.get("mdstoreOutputVersion") val mdstoreOutputVersion = parser.get("mdstoreOutputVersion")
val mapper = new ObjectMapper() val mapper = new ObjectMapper()
val cleanedMdStoreVersion = mapper.readValue(mdstoreOutputVersion, classOf[MDStoreVersion]) val cleanedMdStoreVersion = mapper.readValue(mdstoreOutputVersion, classOf[MDStoreVersion])
@ -54,10 +55,10 @@ object GenerateDataciteDatasetSpark {
.filter(d => d.isActive) .filter(d => d.isActive)
.flatMap(d => DataciteToOAFTransformation.generateOAF(d.json, d.timestamp, d.timestamp, vocabularies, exportLinks)) .flatMap(d => DataciteToOAFTransformation.generateOAF(d.json, d.timestamp, d.timestamp, vocabularies, exportLinks))
.filter(d => d != null) .filter(d => d != null)
.flatMap(i=> fixRelations(i)).filter(i => i != null) .flatMap(i => fixRelations(i)).filter(i => i != null)
.write.mode(SaveMode.Overwrite).save(targetPath) .write.mode(SaveMode.Overwrite).save(targetPath)
val total_items =spark.read.load(targetPath).as[Oaf].count() val total_items = spark.read.load(targetPath).as[Oaf].count()
writeHdfsFile(spark.sparkContext.hadoopConfiguration, s"$total_items", outputBasePath + MDSTORE_SIZE_PATH) writeHdfsFile(spark.sparkContext.hadoopConfiguration, s"$total_items", outputBasePath + MDSTORE_SIZE_PATH)
} }
} }

View File

@ -1,6 +1,5 @@
package eu.dnetlib.dhp.actionmanager.datacite package eu.dnetlib.dhp.datacite
import eu.dnetlib.dhp.actionmanager.datacite.DataciteToOAFTransformation.df_it
import eu.dnetlib.dhp.application.ArgumentApplicationParser import eu.dnetlib.dhp.application.ArgumentApplicationParser
import org.apache.hadoop.conf.Configuration import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, LocalFileSystem, Path} import org.apache.hadoop.fs.{FileSystem, LocalFileSystem, Path}
@ -9,14 +8,14 @@ import org.apache.hadoop.io.{IntWritable, SequenceFile, Text}
import org.apache.spark.SparkContext import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD import org.apache.spark.rdd.RDD
import org.apache.spark.sql.expressions.Aggregator import org.apache.spark.sql.expressions.Aggregator
import org.apache.spark.sql.functions.max
import org.apache.spark.sql.{Dataset, Encoder, SaveMode, SparkSession} import org.apache.spark.sql.{Dataset, Encoder, SaveMode, SparkSession}
import org.json4s.DefaultFormats import org.json4s.DefaultFormats
import org.json4s.jackson.JsonMethods.parse import org.json4s.jackson.JsonMethods.parse
import org.apache.spark.sql.functions.max
import org.slf4j.{Logger, LoggerFactory} import org.slf4j.{Logger, LoggerFactory}
import java.time.format.DateTimeFormatter._ import java.time.format.DateTimeFormatter.ISO_DATE_TIME
import java.time.{LocalDate, LocalDateTime, ZoneOffset} import java.time.{LocalDateTime, ZoneOffset}
import scala.io.Source import scala.io.Source
object ImportDatacite { object ImportDatacite {
@ -138,11 +137,11 @@ object ImportDatacite {
} }
} }
private def writeSequenceFile(hdfsTargetPath: Path, timestamp: Long, conf: Configuration, bs:Int): Long = { private def writeSequenceFile(hdfsTargetPath: Path, timestamp: Long, conf: Configuration, bs: Int): Long = {
var from:Long = timestamp * 1000 var from: Long = timestamp * 1000
val delta:Long = 100000000L val delta: Long = 100000000L
var client: DataciteAPIImporter = null var client: DataciteAPIImporter = null
val now :Long =System.currentTimeMillis() val now: Long = System.currentTimeMillis()
var i = 0 var i = 0
try { try {
val writer = SequenceFile.createWriter(conf, SequenceFile.Writer.file(hdfsTargetPath), SequenceFile.Writer.keyClass(classOf[IntWritable]), SequenceFile.Writer.valueClass(classOf[Text])) val writer = SequenceFile.createWriter(conf, SequenceFile.Writer.file(hdfsTargetPath), SequenceFile.Writer.keyClass(classOf[IntWritable]), SequenceFile.Writer.valueClass(classOf[Text]))
@ -168,7 +167,7 @@ object ImportDatacite {
start = System.currentTimeMillis start = System.currentTimeMillis
} }
} }
println(s"updating from value: $from -> ${from+delta}") println(s"updating from value: $from -> ${from + delta}")
from = from + delta from = from + delta
} }
} catch { } catch {
@ -183,4 +182,4 @@ object ImportDatacite {
i i
} }
} }

View File

@ -1,18 +1,14 @@
package eu.dnetlib.dhp.actionmanager.datacite package eu.dnetlib.dhp.datacite
import eu.dnetlib.dhp.application.ArgumentApplicationParser import eu.dnetlib.dhp.application.ArgumentApplicationParser
import eu.dnetlib.dhp.schema.oaf.{Oaf, Result} import eu.dnetlib.dhp.schema.oaf.{Oaf, Result}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.LocalFileSystem
import org.apache.hadoop.hdfs.DistributedFileSystem
import org.apache.spark.SparkConf import org.apache.spark.SparkConf
import org.apache.spark.sql.{Encoder, Encoders, SparkSession}
import org.apache.spark.sql.functions.max import org.apache.spark.sql.functions.max
import org.apache.spark.sql.{Encoder, Encoders, SparkSession}
import org.slf4j.{Logger, LoggerFactory} import org.slf4j.{Logger, LoggerFactory}
import java.text.SimpleDateFormat import java.text.SimpleDateFormat
import java.util.{Date, Locale} import java.util.Locale
import scala.io.Source import scala.io.Source
object SparkDownloadUpdateDatacite { object SparkDownloadUpdateDatacite {
@ -21,7 +17,7 @@ object SparkDownloadUpdateDatacite {
def main(args: Array[String]): Unit = { def main(args: Array[String]): Unit = {
val conf = new SparkConf val conf = new SparkConf
val parser = new ArgumentApplicationParser(Source.fromInputStream(getClass.getResourceAsStream("/eu/dnetlib/dhp/actionmanager/datacite/generate_dataset_params.json")).mkString) val parser = new ArgumentApplicationParser(Source.fromInputStream(getClass.getResourceAsStream("/eu/dnetlib/dhp/datacite/generate_dataset_params.json")).mkString)
parser.parseArgument(args) parser.parseArgument(args)
val master = parser.get("master") val master = parser.get("master")
val sourcePath = parser.get("sourcePath") val sourcePath = parser.get("sourcePath")
@ -42,9 +38,9 @@ object SparkDownloadUpdateDatacite {
import spark.implicits._ import spark.implicits._
val maxDate:String = spark.read.load(workingPath).as[Oaf].filter(s => s.isInstanceOf[Result]).map(r => r.asInstanceOf[Result].getDateofcollection).select(max("value")).first().getString(0) val maxDate: String = spark.read.load(workingPath).as[Oaf].filter(s => s.isInstanceOf[Result]).map(r => r.asInstanceOf[Result].getDateofcollection).select(max("value")).first().getString(0)
val ISO8601FORMAT = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ssZ", Locale.US) val ISO8601FORMAT = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ssZ", Locale.US)
val string_to_date =ISO8601FORMAT.parse(maxDate) val string_to_date = ISO8601FORMAT.parse(maxDate)
val ts = string_to_date.getTime val ts = string_to_date.getTime

View File

@ -28,7 +28,7 @@
<master>yarn-cluster</master> <master>yarn-cluster</master>
<mode>cluster</mode> <mode>cluster</mode>
<name>ImportDatacite</name> <name>ImportDatacite</name>
<class>eu.dnetlib.dhp.actionmanager.datacite.ImportDatacite</class> <class>eu.dnetlib.dhp.datacite.ImportDatacite</class>
<jar>dhp-aggregation-${projectVersion}.jar</jar> <jar>dhp-aggregation-${projectVersion}.jar</jar>
<spark-opts> <spark-opts>
--executor-memory=${sparkExecutorMemory} --executor-memory=${sparkExecutorMemory}

View File

@ -47,7 +47,7 @@
<master>yarn-cluster</master> <master>yarn-cluster</master>
<mode>cluster</mode> <mode>cluster</mode>
<name>TransformJob</name> <name>TransformJob</name>
<class>eu.dnetlib.dhp.actionmanager.datacite.GenerateDataciteDatasetSpark</class> <class>eu.dnetlib.dhp.datacite.GenerateDataciteDatasetSpark</class>
<jar>dhp-aggregation-${projectVersion}.jar</jar> <jar>dhp-aggregation-${projectVersion}.jar</jar>
<spark-opts> <spark-opts>
--executor-memory=${sparkExecutorMemory} --executor-memory=${sparkExecutorMemory}

View File

@ -1,8 +1,7 @@
package eu.dnetlib.dhp.actionmanager.datacite package eu.dnetlib.dhp.datacite
import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.jackson.databind.{ObjectMapper, SerializationFeature}
import com.fasterxml.jackson.databind.SerializationFeature
import eu.dnetlib.dhp.aggregation.AbstractVocabularyTest import eu.dnetlib.dhp.aggregation.AbstractVocabularyTest
import eu.dnetlib.dhp.schema.oaf.Oaf import eu.dnetlib.dhp.schema.oaf.Oaf
import org.junit.jupiter.api.extension.ExtendWith import org.junit.jupiter.api.extension.ExtendWith